Skip to content

Commit

Permalink
Simplifying ServerSentEventsTransport
Browse files Browse the repository at this point in the history
 - NotSafeForWork - Removing unnecessary SafeInvoker from ServerSentEventsTransport
 - DisposingOfDisposer - unifying request cancellation
  • Loading branch information
moozzyk committed Sep 5, 2014
1 parent 82cba06 commit 47a76d3
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@
<Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\DisposableAction.cs">
<Link>Infrastructure\DisposableAction.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\Disposer.cs">
<Link>Infrastructure\Disposer.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\ExceptionsExtensions.cs">
<Link>Infrastructure\ExceptionsExtensions.cs</Link>
</Compile>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected override void OnStart(IConnection connection,
_request.Abort();
};

OpenConnection(connection, connectionData, disconnectToken, initializeHandler.InitReceived, initializeHandler.Fail);
OpenConnection(connection, connectionData, disconnectToken, initializeHandler);
}

private void Reconnect(IConnection connection, string data, CancellationToken disconnectToken)
Expand All @@ -85,7 +85,7 @@ private void Reconnect(IConnection connection, string data, CancellationToken di
if (!disconnectToken.IsCancellationRequested && connection.EnsureReconnecting())
{
// Now attempt a reconnect
OpenConnection(connection, data, disconnectToken, initializeCallback: null, errorCallback: null);
OpenConnection(connection, data, disconnectToken, initializationHandler: null);
}
});
}
Expand All @@ -95,58 +95,60 @@ private void Reconnect(IConnection connection, string data, CancellationToken di
internal void OpenConnection(IConnection connection,
string data,
CancellationToken disconnectToken,
Action initializeCallback,
Action<Exception> errorCallback)
TransportInitializationHandler initializationHandler)
{
// If we're reconnecting add /connect to the url
bool reconnecting = initializeCallback == null;
var callbackInvoker = new ThreadSafeInvoker();
var requestDisposer = new Disposer();
Action initializeInvoke = () =>
{
callbackInvoker.Invoke(initializeCallback);
};
var reconnecting = initializationHandler == null;

var url = reconnecting
? UrlBuilder.BuildReconnect(connection, Name, data)
: UrlBuilder.BuildConnect(connection, Name, data);

connection.Trace(TraceLevels.Events, "SSE: GET {0}", url);

HttpClient.Get(url, req =>
var getTask = HttpClient.Get(url, req =>
{
_request = req;
_request.Accept = "text/event-stream";
connection.PrepareRequest(_request);
}, isLongRunning: true).ContinueWith(task =>
}, isLongRunning: true);

var requestCancellationRegistration = disconnectToken.SafeRegister(state =>
{
_stop = true;
// This will no-op if the request is already finished.
((IRequest)state).Abort();
if (!reconnecting)
{
initializationHandler.Fail(new OperationCanceledException(Resources.Error_ConnectionCancelled, disconnectToken));
}
}, _request);

getTask.ContinueWith(task =>
{
if (task.IsFaulted || task.IsCanceled)
{
Exception exception;
var exception = task.IsCanceled
? new OperationCanceledException(Resources.Error_TaskCancelledException)
: task.Exception.Unwrap();
if (task.IsCanceled)
if (!reconnecting)
{
exception = new OperationCanceledException(Resources.Error_TaskCancelledException);
initializationHandler.Fail(exception);
}
else
{
exception = task.Exception.Unwrap();
}
if (errorCallback != null)
{
callbackInvoker.Invoke((cb, ex) => cb(ex), errorCallback, exception);
}
else if (!_stop && reconnecting)
else if (!_stop)
{
// Only raise the error event if we failed to reconnect
connection.OnError(exception);
Reconnect(connection, data, disconnectToken);
}
requestDisposer.Dispose();
requestCancellationRegistration.Dispose();
}
else
{
Expand All @@ -161,14 +163,6 @@ internal void OpenConnection(IConnection connection,
var eventSource = new EventSourceStreamReader(connection, stream);
var esCancellationRegistration = disconnectToken.SafeRegister(state =>
{
_stop = true;
((IRequest)state).Abort();
},
_request);
eventSource.Opened = () =>
{
// This will noop if we're not in the reconnecting state
Expand All @@ -181,14 +175,11 @@ internal void OpenConnection(IConnection connection,
eventSource.Message = sseEvent =>
{
if (sseEvent.EventType == EventType.Data)
if (sseEvent.EventType == EventType.Data &&
!sseEvent.Data.Equals("initialized", StringComparison.OrdinalIgnoreCase))
{
if (sseEvent.Data.Equals("initialized", StringComparison.OrdinalIgnoreCase))
{
return;
}
TransportHelper.ProcessResponse(connection, sseEvent.Data, initializeInvoke);
TransportHelper.ProcessResponse(connection, sseEvent.Data,
reconnecting ? (Action) null : initializationHandler.InitReceived);
}
};
Expand All @@ -197,17 +188,14 @@ internal void OpenConnection(IConnection connection,
if (exception != null)
{
// Check if the request is aborted
bool isRequestAborted = ExceptionHelper.IsRequestAborted(exception);
if (!isRequestAborted)
if (!ExceptionHelper.IsRequestAborted(exception))
{
// Don't raise exceptions if the request was aborted (connection was stopped).
connection.OnError(exception);
}
}
requestDisposer.Dispose();
esCancellationRegistration.Dispose();
requestCancellationRegistration.Dispose();
response.Dispose();
if (_stop)
Expand All @@ -227,25 +215,6 @@ internal void OpenConnection(IConnection connection,
eventSource.Start();
}
});

var requestCancellationRegistration = disconnectToken.SafeRegister(state =>
{
if (state != null)
{
// This will no-op if the request is already finished.
((IRequest)state).Abort();
}
if (errorCallback != null)
{
callbackInvoker.Invoke((cb, token) =>
{
cb(new OperationCanceledException(Resources.Error_ConnectionCancelled, token));
}, errorCallback, disconnectToken);
}
}, _request);

requestDisposer.Set(requestCancellationRegistration);
}

public override void LostConnection(IConnection connection)
Expand All @@ -256,4 +225,4 @@ public override void LostConnection(IConnection connection)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ public void ProcessResponseCapturesOnReceivedExceptions()
public void CancelledTaskHandledinServerSentEvents()
{
var tcs = new TaskCompletionSource<IResponse>();
var wh = new TaskCompletionSource<Exception>();

tcs.TrySetCanceled();

var httpClient = new Mock<Microsoft.AspNet.SignalR.Client.Http.IHttpClient>();
Expand All @@ -65,12 +63,19 @@ public void CancelledTaskHandledinServerSentEvents()
.Returns(tcs.Task);

connection.SetupGet(c => c.ConnectionToken).Returns("foo");
connection.SetupGet(c => c.TotalTransportConnectTimeout).Returns(TimeSpan.FromSeconds(15));

var sse = new ServerSentEventsTransport(httpClient.Object);
sse.OpenConnection(connection.Object, null, CancellationToken.None, () => { }, ex => wh.TrySetResult(ex));

Assert.True(wh.Task.Wait(TimeSpan.FromSeconds(5)));
Assert.IsType(typeof(OperationCanceledException), wh.Task.Result);
var initializationHandler = new TransportInitializationHandler(httpClient.Object, connection.Object, null,
"serverSentEvents", CancellationToken.None, new TransportHelper());

sse.OpenConnection(connection.Object, null, CancellationToken.None, initializationHandler);

var exception = Assert.Throws<AggregateException>(
() => initializationHandler.Task.Wait(TimeSpan.FromSeconds(5)));

Assert.IsType(typeof(OperationCanceledException), exception.InnerException);
}

[Fact]
Expand Down

0 comments on commit 47a76d3

Please sign in to comment.