Skip to content

Commit

Permalink
Merge branch 'release' into dev
Browse files Browse the repository at this point in the history
Conflicts:
	src/Microsoft.AspNet.SignalR.Core/Infrastructure/TaskQueue.cs
	tests/Microsoft.AspNet.SignalR.FunctionalTests/Client/HubProxyFacts.cs
	tests/Microsoft.AspNet.SignalR.FunctionalTests/Server/Hubs/HubFacts.cs
  • Loading branch information
DamianEdwards committed Nov 4, 2013
2 parents b74bdf7 + 7b81cb3 commit 2f4e6f1
Show file tree
Hide file tree
Showing 9 changed files with 136 additions and 106 deletions.
Expand Up @@ -175,9 +175,6 @@
<Compile Include="..\Microsoft.AspNet.SignalR.Client\Transports\AutoTransport.cs">
<Link>Transports\AutoTransport.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Client\Transports\ConnectingMessageBuffer.cs">
<Link>Transports\ConnectingMessageBuffer.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Client\Transports\HttpBasedTransport.cs">
<Link>Transports\HttpBasedTransport.cs</Link>
</Compile>
Expand Down Expand Up @@ -223,6 +220,9 @@
<Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\ExceptionsExtensions.cs">
<Link>Infrastructure\ExceptionsExtensions.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\TaskQueue.cs">
<Link>Infrastructure\TaskQueue.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Core\Owin\Infrastructure\ByteBuffer.cs">
<Link>Infrastructure\ByteBuffer.cs</Link>
</Compile>
Expand Down
Expand Up @@ -192,9 +192,6 @@
<Compile Include="..\Microsoft.AspNet.SignalR.Client\Transports\AsyncStreamReader.cs">
<Link>Transports\AsyncStreamReader.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Client\Transports\ConnectingMessageBuffer.cs">
<Link>Transports\ConnectingMessageBuffer.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Client\Transports\LongPolling\NegotiateInitializer.cs">
<Link>Transports\LongPolling\NegotiateInitializer.cs</Link>
</Compile>
Expand Down Expand Up @@ -294,6 +291,9 @@
<Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\ExceptionsExtensions.cs">
<Link>Infrastructure\ExceptionsExtensions.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\TaskQueue.cs">
<Link>Infrastructure\TaskQueue.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs">
<Link>TaskAsyncHelper.cs</Link>
</Compile>
Expand Down
46 changes: 27 additions & 19 deletions src/Microsoft.AspNet.SignalR.Client/Connection.cs
Expand Up @@ -48,8 +48,6 @@ public class Connection : IConnection, IDisposable
// The default connection state is disconnected
private ConnectionState _state;

private ConnectingMessageBuffer _connectingMessageBuffer;

private KeepAliveData _keepAliveData;

private TimeSpan _reconnectWindow;
Expand All @@ -60,6 +58,10 @@ public class Connection : IConnection, IDisposable

private string _connectionData;

private TaskQueue _receiveQueue;

private TaskCompletionSource<object> _startTcs;

// Used to synchronize state changes
private readonly object _stateLock = new object();

Expand Down Expand Up @@ -163,7 +165,6 @@ public Connection(string url, string queryString)
Url = url;
QueryString = queryString;
_disconnectTimeoutOperation = DisposableAction.Empty;
_connectingMessageBuffer = new ConnectingMessageBuffer(this, OnMessageReceived);
_lastMessageAt = DateTime.UtcNow;
_lastActiveAt = DateTime.UtcNow;
_reconnectWindow = TimeSpan.Zero;
Expand Down Expand Up @@ -409,6 +410,8 @@ public Task Start(IClientTransport transport)
{
_connectTask = TaskAsyncHelper.Empty;
_disconnectCts = new CancellationTokenSource();
_startTcs = new TaskCompletionSource<object>();
_receiveQueue = new TaskQueue(_startTcs.Task);

if (!ChangeState(ConnectionState.Disconnected, ConnectionState.Connecting))
{
Expand Down Expand Up @@ -471,14 +474,12 @@ private Task StartTransport()
return _transport.Start(this, _connectionData, _disconnectCts.Token)
.RunSynchronously(() =>
{
// NOTE: We have tests that rely on this state change occuring *BEFORE* the start task is complete
ChangeState(ConnectionState.Connecting, ConnectionState.Connected);
// Now that we're connected drain any messages within the buffer
// We want to protect against state changes when draining
lock (_stateLock)
{
_connectingMessageBuffer.Drain();
}
// Now that we're connected complete the start task that the
// receive queue is waiting on
_startTcs.SetResult(null);
// Start the monitor to check for server activity
_monitor.Start();
Expand Down Expand Up @@ -553,6 +554,10 @@ public void Stop(TimeSpan timeout)
}
}

// Close the receive queue so currently running receive callback finishes and no more are run.
// We can't wait on the result of the drain because this method may be on the stack of the task returned (aka deadlock).
_receiveQueue.Drain().Catch();

// This is racy since it's outside the _stateLock, but we are trying to avoid 30s deadlocks when calling _transport.Abort()
if (State == ConnectionState.Disconnected)
{
Expand Down Expand Up @@ -591,10 +596,6 @@ private void Disconnect()

Trace(TraceLevels.StateChanges, "Disconnected");

// If we've connected then instantly disconnected we may have data in the incomingMessageBuffer
// Therefore we need to clear it incase we start the connection again.
_connectingMessageBuffer.Clear();

_disconnectTimeoutOperation.Dispose();
_disconnectCts.Cancel();
_disconnectCts.Dispose();
Expand Down Expand Up @@ -705,17 +706,24 @@ public void Trace(TraceLevels level, string format, params object[] args)


[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "0", Justification = "This is called by the transport layer")]
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "The exception is raised via an event.")]
void IConnection.OnReceived(JToken message)
{
// Try to buffer only if we're still trying to connect to the server.
// Need to protect against state changes here
if (!_connectingMessageBuffer.TryBuffer(message, _stateLock))
_receiveQueue.Enqueue(() => Task.Factory.StartNew(() =>
{
OnMessageReceived(message);
}
try
{
OnMessageReceived(message);
}
catch (Exception ex)
{
OnError(ex);
}
}));
}

[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "The exception can be from user code, needs to be a catch all."), SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "0", Justification = "This is called by the transport layer")]
[SuppressMessage("Microsoft.Design", "CA1031:DoNotCatchGeneralExceptionTypes", Justification = "The exception can be from user code, needs to be a catch all.")]
[SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "0", Justification = "This is called by the transport layer")]
protected virtual void OnMessageReceived(JToken message)
{
if (Received != null)
Expand Down
Expand Up @@ -28,7 +28,7 @@
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\Release\</OutputPath>
<DefineConstants>TRACE;STABLE_VERSION;NET4</DefineConstants>
<DefineConstants>TRACE;STABLE_VERSION;NET4;CLIENT_NET4</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
<DocumentationFile>bin\Release\Microsoft.AspNet.SignalR.Client.XML</DocumentationFile>
Expand Down Expand Up @@ -64,6 +64,9 @@
<Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\MonoUtility.cs">
<Link>Infrastructure\MonoUtility.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\TaskQueue.cs">
<Link>Infrastructure\TaskQueue.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs">
<Link>Infrastructure\TaskAsyncHelper.cs</Link>
</Compile>
Expand Down Expand Up @@ -109,7 +112,6 @@
<DependentUpon>Resources.resx</DependentUpon>
</Compile>
<Compile Include="Transports\AsyncStreamReader.cs" />
<Compile Include="Transports\ConnectingMessageBuffer.cs" />
<Compile Include="Transports\LongPolling\NegotiateInitializer.cs" />
<Compile Include="Transports\LongPolling\PollingRequestHandler.cs" />
<Compile Include="Transports\ServerSentEvents\ChunkBuffer.cs" />
Expand Down

This file was deleted.

Expand Up @@ -141,9 +141,6 @@
<Compile Include="..\Microsoft.AspNet.SignalR.Client\Transports\AsyncStreamReader.cs">
<Link>Transports\AsyncStreamReader.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Client\Transports\ConnectingMessageBuffer.cs">
<Link>Transports\ConnectingMessageBuffer.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Client\Transports\LongPolling\NegotiateInitializer.cs">
<Link>Transports\LongPolling\NegotiateInitializer.cs</Link>
</Compile>
Expand Down
28 changes: 19 additions & 9 deletions src/Microsoft.AspNet.SignalR.Core/Infrastructure/TaskQueue.cs
Expand Up @@ -35,7 +35,7 @@ public TaskQueue(Task initialTask, int maxSize)
_maxSize = maxSize;
}

#if !CLIENT_NET45
#if !CLIENT_NET45 && !CLIENT_NET4 && !PORTABLE && !NETFX_CORE
[SuppressMessage("Microsoft.Performance", "CA1811:AvoidUncalledPrivateCode", Justification = "This is shared code.")]
public IPerformanceCounter QueueSizeCounter { get; set; }
#endif
Expand All @@ -62,19 +62,16 @@ public Task Enqueue(Func<object, Task> taskFunc, object state)

if (_maxSize != null)
{
if (Interlocked.Read(ref _size) == _maxSize)
// Increment the size if the queue
if (Interlocked.Increment(ref _size) > _maxSize)
{
// REVIEW: Do we need to make the contract more clear between the
// queue full case and the queue drained case? Should we throw an exeception instead?

Interlocked.Decrement(ref _size);

// We failed to enqueue because the size limit was reached
return null;
}

// Increment the size if the queue
Interlocked.Increment(ref _size);

#if !CLIENT_NET45
#if !CLIENT_NET45 && !CLIENT_NET4 && !PORTABLE && !NETFX_CORE
var counter = QueueSizeCounter;
if (counter != null)
{
Expand All @@ -85,6 +82,19 @@ public Task Enqueue(Func<object, Task> taskFunc, object state)

Task newTask = _lastQueuedTask.Then((n, ns, q) => InvokeNext(n, ns, q), taskFunc, state, this);

#if !CLIENT_NET45 && !CLIENT_NET4 && !PORTABLE && !NETFX_CORE
var counter = QueueSizeCounter;
if (counter != null)
{
counter.Decrement();
}
#endif
}
},
this);
},
taskFunc, state);

_lastQueuedTask = newTask;
return newTask;
}
Expand Down

0 comments on commit 2f4e6f1

Please sign in to comment.