Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Queuing messages when scaleout stream is in initial state
  • Loading branch information
abnanda1 committed Mar 17, 2014
1 parent 3e66b43 commit fd7d1f1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 26 deletions.
42 changes: 33 additions & 9 deletions src/Microsoft.AspNet.SignalR.Core/Messaging/ScaleoutStream.cs
Expand Up @@ -11,11 +11,13 @@ namespace Microsoft.AspNet.SignalR.Messaging
internal class ScaleoutStream
{
private TaskCompletionSource<object> _taskCompletionSource;
private static Task _drainTask;
private TaskQueue _queue;
private StreamState _state;
private Exception _error;

private readonly int _size;
private readonly int _initializeQueueSize;
private readonly TraceSource _trace;
private readonly string _tracePrefix;
private readonly IPerformanceCounterManager _perfCounters;
Expand All @@ -34,13 +36,20 @@ public ScaleoutStream(TraceSource trace, string tracePrefix, int size, IPerforma
_size = size;
_perfCounters = performanceCounters;

InitializeCore();
_initializeQueueSize = 1000;

InitializeCore(_initializeQueueSize);
}

private bool UsingTaskQueue
{
get
{
if (_state == StreamState.Initial)
{
return true;
}

return _size > 0;
}
}
Expand Down Expand Up @@ -79,13 +88,13 @@ public Task Send(Func<object, Task> send, object state)
throw new InvalidOperationException(Resources.Error_StreamClosed);
}

if (_state == StreamState.Initial)
var context = new SendContext(this, send, state);

if (_drainTask != null)
{
throw new InvalidOperationException(Resources.Error_StreamNotOpen);
_drainTask.Wait();
}

var context = new SendContext(this, send, state);

if (UsingTaskQueue)
{
Task task = _queue.Enqueue(Send, context);
Expand All @@ -104,7 +113,7 @@ public Task Send(Func<object, Task> send, object state)
return Send(context).Finally(counter =>
{
((IPerformanceCounter)counter).Decrement();
},
},
_perfCounters.ScaleoutSendQueueLength);
}
}
Expand Down Expand Up @@ -192,17 +201,17 @@ private void Buffer()
_perfCounters.ScaleoutStreamCountOpen.Decrement();
_perfCounters.ScaleoutStreamCountBuffering.Increment();

InitializeCore();
InitializeCore(_size);
}
}
}

private void InitializeCore()
private void InitializeCore(int queueSize)
{
if (UsingTaskQueue)
{
Task task = DrainQueue();
_queue = new TaskQueue(task, _size);
_queue = new TaskQueue(task, queueSize);
_queue.QueueSizeCounter = _perfCounters.ScaleoutSendQueueLength;
}
}
Expand Down Expand Up @@ -246,7 +255,22 @@ private bool ChangeState(StreamState newState)
{
Trace(TraceEventType.Information, "Changed state from {0} to {1}", _state, newState);

var oldState = _state;

_state = newState;

// Draining the initialize queue before switching to open state
if (oldState == StreamState.Initial && newState == StreamState.Open)
{
// Ensure the queue is started
EnsureQueueStarted();

_drainTask = Drain(_queue);

// The drain here is not blocking so we can't just use this
InitializeCore(_size);
}

return true;
}

Expand Down
17 changes: 0 additions & 17 deletions tests/Microsoft.AspNet.SignalR.Tests/Server/ScaleoutStreamFacts.cs
Expand Up @@ -11,23 +11,6 @@ namespace Microsoft.AspNet.SignalR.Tests.Server
{
public class ScaleoutStreamFacts
{
[Fact]
public void EnqueueWithoutOpenThrows()
{
var perfCounters = new Microsoft.AspNet.SignalR.Infrastructure.PerformanceCounterManager();
var stream = new ScaleoutStream(new TraceSource("Queue"), "0", 1000, perfCounters);
Assert.Throws<InvalidOperationException>(() => stream.Send(_ => { throw new InvalidOperationException(); }, null));
}

[Fact]
public void EnqueueWithoutOpenRaisesOnError()
{
var perfCounters = new Microsoft.AspNet.SignalR.Infrastructure.PerformanceCounterManager();
var stream = new ScaleoutStream(new TraceSource("Queue"), "0", 1000, perfCounters);

Assert.Throws<InvalidOperationException>(() => stream.Send(_ => { throw new InvalidOperationException(); }, null));
}

[Fact]
public void ErrorOnSendThrowsNextTime()
{
Expand Down

0 comments on commit fd7d1f1

Please sign in to comment.