Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 28 additions & 10 deletions src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Net.WebSockets;
using System.Reactive;
Expand Down Expand Up @@ -110,11 +112,23 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
Id = startRequest.Id,
Type = GraphQLWebSocketMessageType.GQL_STOP
};

var initRequest = new GraphQLWebSocketRequest
{
Id = startRequest.Id,
Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT,
};


// Check if there's an authorization header on the base client.
// NOTE: perhaps there's a better way?
var authHeader = _client.HttpClient.DefaultRequestHeaders.GetValues("Authorization").FirstOrDefault();

// If the auth header actually exist, send it in the connection_init request
if (authHeader != null)
{
initRequest[GraphQLWebSocketRequest.PAYLOAD_KEY] = new Dictionary<string, string>() { { "Authorization", authHeader } };
}

var observable = Observable.Create<GraphQLResponse<TResponse>>(o =>
IncomingMessageStream
Expand Down Expand Up @@ -549,16 +563,20 @@ private async Task<WebsocketMessageWrapper> ReceiveWebsocketMessagesAsync()
_internalCancellationToken.ThrowIfCancellationRequested();
ms.Seek(0, SeekOrigin.Begin);

if (webSocketReceiveResult.MessageType == WebSocketMessageType.Text)
{
var response = await _client.JsonSerializer.DeserializeToWebsocketResponseWrapperAsync(ms);
response.MessageBytes = ms.ToArray();
Debug.WriteLine($"{response.MessageBytes.Length} bytes received for id {response.Id} on websocket {_clientWebSocket.GetHashCode()} (thread {Thread.CurrentThread.ManagedThreadId})...");
return response;
}
else
switch (webSocketReceiveResult.MessageType)
{
throw new NotSupportedException("binary websocket messages are not supported");
case WebSocketMessageType.Text:
var response = await _client.JsonSerializer.DeserializeToWebsocketResponseWrapperAsync(ms);
response.MessageBytes = ms.ToArray();
Debug.WriteLine($"{response.MessageBytes.Length} bytes received for id {response.Id} on websocket {_clientWebSocket.GetHashCode()} (thread {Thread.CurrentThread.ManagedThreadId})...");
return response;

case WebSocketMessageType.Close:
Debug.WriteLine($"Connection closed by the server.");
throw new Exception("Connection closed by the server.");

default:
throw new NotSupportedException($"Websocket message type {webSocketReceiveResult.MessageType} not supported.");
}
}
catch (Exception e)
Expand Down Expand Up @@ -629,7 +647,7 @@ private async Task CompleteAsync()
_exceptionSubject?.OnCompleted();
_exceptionSubject?.Dispose();
_internalCancellationTokenSource.Dispose();

Debug.WriteLine("GraphQLHttpWebSocket disposed");
}

Expand Down