Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions shell/agents/Microsoft.Azure.Agent/AzureCopilotReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Concurrent;
using System.Net.WebSockets;
using System.Runtime.ExceptionServices;
using System.Text.Json;

namespace Microsoft.Azure.Agent;
Expand All @@ -26,7 +27,6 @@ private AzureCopilotReceiver(ClientWebSocket webSocket)
}

internal int Watermark { get; private set; }
internal BlockingCollection<CopilotActivity> ActivityQueue => _activityQueue;

internal static async Task<AzureCopilotReceiver> CreateAsync(string streamUrl)
{
Expand All @@ -52,6 +52,7 @@ private async Task ProcessActivities()
if (result.MessageType is WebSocketMessageType.Close)
{
closingMessage = "Close message received";
_activityQueue.Add(new CopilotActivity { Error = new ConnectionDroppedException("The server websocket is closing. Connection dropped.") });
}
}
catch (OperationCanceledException)
Expand All @@ -65,6 +66,7 @@ private async Task ProcessActivities()
{
// TODO: log the closing request.
await _webSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, closingMessage, CancellationToken.None);
_activityQueue.CompleteAdding();
break;
}

Expand Down Expand Up @@ -98,8 +100,20 @@ private async Task ProcessActivities()
}
}

// TODO: log the current state of the web socket
// TODO: handle error state, such as 'aborted'
// TODO: log the current state of the web socket.
_activityQueue.Add(new CopilotActivity { Error = new ConnectionDroppedException($"The websocket got in '{_webSocket.State}' state. Connection dropped.") });
_activityQueue.CompleteAdding();
}

internal CopilotActivity Take(CancellationToken cancellationToken)
{
CopilotActivity activity = _activityQueue.Take(cancellationToken);
if (activity.Error is not null)
{
ExceptionDispatchInfo.Capture(activity.Error).Throw();
}

return activity;
}

public void Dispose()
Expand Down
4 changes: 2 additions & 2 deletions shell/agents/Microsoft.Azure.Agent/ChatSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private async Task StartConversationAsync(IHost host, CancellationToken cancella

while (true)
{
CopilotActivity activity = _copilotReceiver.ActivityQueue.Take(cancellationToken);
CopilotActivity activity = _copilotReceiver.Take(cancellationToken);
if (activity.IsMessage && activity.IsFromCopilot && _copilotReceiver.Watermark is 0)
{
activity.ExtractMetadata(out _, out ConversationState conversationState);
Expand Down Expand Up @@ -259,7 +259,7 @@ internal async Task<CopilotResponse> GetChatResponseAsync(string input, IStatusC

while (true)
{
CopilotActivity activity = _copilotReceiver.ActivityQueue.Take(cancellationToken);
CopilotActivity activity = _copilotReceiver.Take(cancellationToken);

if (activity.ReplyToId != activityId)
{
Expand Down
2 changes: 1 addition & 1 deletion shell/agents/Microsoft.Azure.Agent/Schema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ internal CopilotActivity ReadChunk(CancellationToken cancellationToken)
return null;
}

CopilotActivity activity = _receiver.ActivityQueue.Take(cancellationToken);
CopilotActivity activity = _receiver.Take(cancellationToken);

if (!activity.IsMessageUpdate)
{
Expand Down