Skip to content

Commit

Permalink
Fixed regression in reconnecting clients after the shutdown token is …
Browse files Browse the repository at this point in the history
…triggered.

- Create a linked cancellation token that represents a timedout connection.
- Added a unit test.
Fixes #449.
  • Loading branch information
davidfowl committed Jun 15, 2012
1 parent f25bf2a commit c7b1d74
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 19 deletions.
14 changes: 11 additions & 3 deletions SignalR.Hosting.Memory/MemoryHost.cs
Expand Up @@ -10,8 +10,10 @@


namespace SignalR.Hosting.Memory namespace SignalR.Hosting.Memory
{ {
public class MemoryHost : RoutingHost, IHttpClient public class MemoryHost : RoutingHost, IHttpClient, IDisposable
{ {
private readonly CancellationTokenSource _shutDownToken = new CancellationTokenSource();

public MemoryHost() public MemoryHost()
: this(new DefaultDependencyResolver()) : this(new DefaultDependencyResolver())
{ {
Expand All @@ -37,9 +39,9 @@ Task<IClientResponse> IHttpClient.PostAsync(string url, Action<IClientRequest> p
private Task<IClientResponse> ProcessRequest(string url, Action<IClientRequest> prepareRequest, Dictionary<string, string> postData) private Task<IClientResponse> ProcessRequest(string url, Action<IClientRequest> prepareRequest, Dictionary<string, string> postData)
{ {
var uri = new Uri(url); var uri = new Uri(url);

PersistentConnection connection; PersistentConnection connection;
if (TryGetConnection(uri.LocalPath, out connection))
if (!_shutDownToken.IsCancellationRequested && TryGetConnection(uri.LocalPath, out connection))
{ {
var tcs = new TaskCompletionSource<IClientResponse>(); var tcs = new TaskCompletionSource<IClientResponse>();
var clientTokenSource = new CancellationTokenSource(); var clientTokenSource = new CancellationTokenSource();
Expand All @@ -49,6 +51,7 @@ private Task<IClientResponse> ProcessRequest(string url, Action<IClientRequest>
Response response = null; Response response = null;
response = new Response(clientTokenSource.Token, () => tcs.TrySetResult(response)); response = new Response(clientTokenSource.Token, () => tcs.TrySetResult(response));
var hostContext = new HostContext(request, response); var hostContext = new HostContext(request, response);
hostContext.Items[HostConstants.ShutdownToken] = _shutDownToken.Token;


connection.Initialize(DependencyResolver); connection.Initialize(DependencyResolver);


Expand All @@ -75,5 +78,10 @@ private Task<IClientResponse> ProcessRequest(string url, Action<IClientRequest>


return TaskAsyncHelper.FromError<IClientResponse>(new InvalidOperationException("Not a valid end point")); return TaskAsyncHelper.FromError<IClientResponse>(new InvalidOperationException("Not a valid end point"));
} }

public void Dispose()
{
_shutDownToken.Cancel(throwOnFirstException: false);

This comment has been minimized.

Copy link
@kenegozi

kenegozi Jun 15, 2012

love the named parameter usage for clarity. more people should adopt that

}
} }
} }
20 changes: 20 additions & 0 deletions SignalR.Tests/PersistentConnectionFacts.cs
Expand Up @@ -85,6 +85,26 @@ public void SendRaisesOnReceivedFromAllEvents()


public class OnReconnectedAsync public class OnReconnectedAsync
{ {
[Fact]
public void ReconnectFiresAfterHostShutDown()
{
var host = new MemoryHost();
var conn = new MyReconnect();
host.DependencyResolver.Register(typeof(MyReconnect), () => conn);
host.MapConnection<MyReconnect>("/endpoint");

var connection = new Client.Connection("http://foo/endpoint");
connection.Start(host).Wait();

host.Dispose();

Thread.Sleep(TimeSpan.FromSeconds(5));

Assert.Equal(Client.ConnectionState.Reconnecting, connection.State);

connection.Stop();
}

[Fact] [Fact]
public void ReconnectFiresAfterTimeOutSSE() public void ReconnectFiresAfterTimeOutSSE()
{ {
Expand Down
7 changes: 3 additions & 4 deletions SignalR/Transports/ForeverTransport.cs
Expand Up @@ -14,7 +14,6 @@ public ForeverTransport(HostContext context, IDependencyResolver resolver)
resolver.Resolve<IJsonSerializer>(), resolver.Resolve<IJsonSerializer>(),
resolver.Resolve<ITransportHeartBeat>()) resolver.Resolve<ITransportHeartBeat>())
{ {

} }


public ForeverTransport(HostContext context, IJsonSerializer jsonSerializer, ITransportHeartBeat heartBeat) public ForeverTransport(HostContext context, IJsonSerializer jsonSerializer, ITransportHeartBeat heartBeat)
Expand Down Expand Up @@ -181,13 +180,13 @@ private Task ProcessMessages(ITransportConnection connection, Action postReceive


private void ProcessMessagesImpl(TaskCompletionSource<object> taskCompletetionSource, ITransportConnection connection, Action postReceive = null) private void ProcessMessagesImpl(TaskCompletionSource<object> taskCompletetionSource, ITransportConnection connection, Action postReceive = null)
{ {
if (!IsTimedOut && !IsDisconnected && IsAlive && !HostShutdownToken.IsCancellationRequested) if (!IsTimedOut && !IsDisconnected && IsAlive && !ConnectionEndToken.IsCancellationRequested)
{ {
// ResponseTask will either subscribe and wait for a signal then return new messages, // ResponseTask will either subscribe and wait for a signal then return new messages,
// or return immediately with messages that were pending // or return immediately with messages that were pending
var receiveAsyncTask = LastMessageId == null var receiveAsyncTask = LastMessageId == null
? connection.ReceiveAsync(TimeoutToken) ? connection.ReceiveAsync(ConnectionEndToken)
: connection.ReceiveAsync(LastMessageId, TimeoutToken); : connection.ReceiveAsync(LastMessageId, ConnectionEndToken);


if (postReceive != null) if (postReceive != null)
{ {
Expand Down
4 changes: 2 additions & 2 deletions SignalR/Transports/LongPollingTransport.cs
Expand Up @@ -202,8 +202,8 @@ private Task ProcessReceiveRequest(ITransportConnection connection, Action postR


// ReceiveAsync() will async wait until a message arrives then return // ReceiveAsync() will async wait until a message arrives then return
var receiveTask = IsConnectRequest ? var receiveTask = IsConnectRequest ?
connection.ReceiveAsync(TimeoutToken) : connection.ReceiveAsync(ConnectionEndToken) :
connection.ReceiveAsync(MessageId, TimeoutToken); connection.ReceiveAsync(MessageId, ConnectionEndToken);


if (postReceive != null) if (postReceive != null)
{ {
Expand Down
16 changes: 6 additions & 10 deletions SignalR/Transports/TransportDisconnectBase.cs
Expand Up @@ -17,6 +17,7 @@ public abstract class TransportDisconnectBase : ITrackingConnection
protected int _isDisconnected; protected int _isDisconnected;
private readonly CancellationTokenSource _timeoutTokenSource; private readonly CancellationTokenSource _timeoutTokenSource;
private readonly CancellationToken _hostShutdownToken; private readonly CancellationToken _hostShutdownToken;
private readonly CancellationTokenSource _connectionEndToken;


public TransportDisconnectBase(HostContext context, IJsonSerializer jsonSerializer, ITransportHeartBeat heartBeat) public TransportDisconnectBase(HostContext context, IJsonSerializer jsonSerializer, ITransportHeartBeat heartBeat)
{ {
Expand All @@ -25,6 +26,9 @@ public TransportDisconnectBase(HostContext context, IJsonSerializer jsonSerializ
_heartBeat = heartBeat; _heartBeat = heartBeat;
_timeoutTokenSource = new CancellationTokenSource(); _timeoutTokenSource = new CancellationTokenSource();
_hostShutdownToken = context.HostShutdownToken(); _hostShutdownToken = context.HostShutdownToken();

// Create a token that represents the end of this connection's life
_connectionEndToken = CancellationTokenSource.CreateLinkedTokenSource(_timeoutTokenSource.Token, _hostShutdownToken);
} }


public string ConnectionId public string ConnectionId
Expand Down Expand Up @@ -62,19 +66,11 @@ public virtual bool IsAlive
get { return _context.Response.IsClientConnected; } get { return _context.Response.IsClientConnected; }
} }


protected CancellationToken TimeoutToken protected CancellationToken ConnectionEndToken
{
get
{
return _timeoutTokenSource.Token;
}
}

protected CancellationToken HostShutdownToken
{ {
get get
{ {
return _hostShutdownToken; return _connectionEndToken.Token;
} }
} }


Expand Down

0 comments on commit c7b1d74

Please sign in to comment.