Skip to content

Commit

Permalink
Ensure response is initialized before messages received
Browse files Browse the repository at this point in the history
  • Loading branch information
DamianEdwards committed May 2, 2013
1 parent 5819b87 commit cf28a8e
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private static Task Initialize(object state)
var initContext = new ForeverFrameTransportContext(context.Transport, context.State);

// Ensure delegate continues to use the C# Compiler static delegate caching optimization.
return context.Transport.EnqueueOperation(s => WriteInit(s), initContext);
return WriteInit(initContext);
}

private static Task WriteInit(object state)
Expand Down
12 changes: 7 additions & 5 deletions src/Microsoft.AspNet.SignalR.Core/Transports/ForeverTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,13 @@ private Task ProcessReceiveRequest(ITransportConnection connection)
initialize = Reconnected;
}

var series = new Func<object, Task>[]
var series = new Func<object, Task>[]
{
state => ((Func<Task>)state).Invoke(),
state => InitializeResponse((ITransportConnection)state),
state => ((Func<Task>)state).Invoke()
};

var states = new object[] { TransportConnected ?? _emptyTaskFunc,
connection,
var states = new object[] { TransportConnected ?? _emptyTaskFunc,
initialize ?? _emptyTaskFunc };

Func<Task> fullInit = () => TaskAsyncHelper.Series(series, states);
Expand Down Expand Up @@ -224,6 +222,10 @@ private Task ProcessMessages(ITransportConnection connection, Func<Task> initial

try
{
// Ensure we enqueue the response initialization before any messages are received
EnqueueOperation(state => InitializeResponse((ITransportConnection)state), connection)
.Catch((ex, state) => OnError(ex, state), messageContext);

// Ensure delegate continues to use the C# Compiler static delegate caching optimization.
IDisposable subscription = connection.Receive(LastMessageId,
(response, state) => OnMessageReceived(response, state),
Expand All @@ -240,7 +242,7 @@ private Task ProcessMessages(ITransportConnection connection, Func<Task> initial

// Ensure delegate continues to use the C# Compiler static delegate caching optimization.
initialize().Then(tcs => tcs.TrySetResult(null), InitializeTcs)
.Catch((ex, state) => OnError(ex, state), messageContext);
.Catch((ex, state) => OnError(ex, state), messageContext);
}
catch (OperationCanceledException ex)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,7 @@ protected internal override Task InitializeResponse(ITransportConnection connect
{
// Ensure delegate continues to use the C# Compiler static delegate caching optimization.
return base.InitializeResponse(connection)
.Then(s => Initialize(s), this);
}

private static Task Initialize(object state)
{
var transport = (ServerSentEventsTransport)state;

// Ensure delegate continues to use the C# Compiler static delegate caching optimization.
return transport.EnqueueOperation(s => WriteInit(s), state);
.Then(s => WriteInit(s), this);
}

private static Task PerformKeepAlive(object state)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -235,19 +236,20 @@ public void ReceiveDisconnectBeforeCancellationSetup()
transportConnection.Setup(m => m.Receive(It.IsAny<string>(),
It.IsAny<Func<PersistentResponse, object, Task<bool>>>(),
It.IsAny<int>(),
It.IsAny<object>())).Callback<string, Func<PersistentResponse, object, Task<bool>>, int, object>(async (id, cb, max, st) =>
{
callback = cb;
state = st;
It.IsAny<object>()))
.Callback<string, Func<PersistentResponse, object, Task<bool>>, int, object>(async (id, cb, max, st) =>
{
callback = cb;
state = st;
bool result = await cb(new PersistentResponse() { Disconnect = true }, st);
bool result = await cb(new PersistentResponse() { Disconnect = true }, st);
if (!result)
{
disposable.Dispose();
}
})
.Returns(disposable);
if (!result)
{
disposable.Dispose();
}
})
.Returns(disposable);

var transport = new Mock<ForeverTransport>(hostContext, json, heartBeat.Object, counters.Object, traceManager.Object)
{
Expand Down Expand Up @@ -339,6 +341,138 @@ public void RequestCompletesAfterFaultedWritesInTaskQueue()
EnqueAsyncWriteAndEndRequest(writeFaulted);
}

[Fact]
public void InitializeResponseIsFirstEnqueuedOperation()
{
// Arrange
var response = new Mock<IResponse>();
response.Setup(m => m.CancellationToken).Returns(CancellationToken.None);
var request = new Mock<IRequest>();
var qs = new NameValueCollection();
qs["connectionId"] = "1";
request.Setup(m => m.QueryString).Returns(qs);
request.Setup(m => m.Url).Returns(new Uri("http://test/echo/connect"));
var counters = new Mock<IPerformanceCounterManager>();
var heartBeat = new Mock<ITransportHeartbeat>();
var json = new JsonNetSerializer();
var hostContext = new HostContext(request.Object, response.Object);
var transportConnection = new Mock<ITransportConnection>();
var traceManager = new Mock<ITraceManager>();
traceManager.Setup(m => m[It.IsAny<string>()]).Returns(new System.Diagnostics.TraceSource("foo"));

transportConnection.Setup(m => m.Receive(It.IsAny<string>(),
It.IsAny<Func<PersistentResponse, object, Task<bool>>>(),
It.IsAny<int>(),
It.IsAny<object>()))
.Returns<string, Func<PersistentResponse, object, Task<bool>>, int, object>(
(messageId, callback, maxMessages, state) =>
{
callback(new PersistentResponse(), state);
return DisposableAction.Empty;
});

var transport = new Mock<ForeverTransport>(hostContext, json, heartBeat.Object, counters.Object, traceManager.Object)
{
CallBase = true
};

var queue = new TaskQueue();
var results = new List<string>();

transport.Setup(t => t.EnqueueOperation(It.IsAny<Func<object, Task>>(), It.IsAny<object>()))
.Returns<Func<object, Task>, object>(
(writeAsync, state) =>
{
return queue.Enqueue(writeAsync, state);
});

transport.Setup(t => t.InitializeResponse(It.IsAny<ITransportConnection>()))
.Returns<ITransportConnection>(
pr =>
{
results.Add("InitializeResponse");
return Task.FromResult(0);
});

transport.Setup(t => t.Send(It.IsAny<PersistentResponse>()))
.Returns<PersistentResponse>(
pr =>
transport.Object.EnqueueOperation(() =>
{
results.Add("Send");
return Task.FromResult(0);
}));

// Act
transport.Object.ProcessRequest(transportConnection.Object);

// Assert
Assert.Equal("InitializeResponse", results[0]);
Assert.Equal("Send", results[1]);
}

[Fact]
public void RequestCompletesAfterFaultedInitializeResponse()
{
// Arrange
var response = new Mock<IResponse>();
response.Setup(m => m.CancellationToken).Returns(CancellationToken.None);
var request = new Mock<IRequest>();
var qs = new NameValueCollection();
qs["connectionId"] = "1";
request.Setup(m => m.QueryString).Returns(qs);
request.Setup(m => m.Url).Returns(new Uri("http://test/echo/connect"));
var counters = new PerformanceCounterManager();
var heartBeat = new Mock<ITransportHeartbeat>();
var json = new JsonNetSerializer();
var hostContext = new HostContext(request.Object, response.Object);
var transportConnection = new Mock<ITransportConnection>();
var traceManager = new Mock<ITraceManager>();
traceManager.Setup(m => m[It.IsAny<string>()]).Returns(new System.Diagnostics.TraceSource("foo"));

transportConnection.Setup(m => m.Receive(It.IsAny<string>(),
It.IsAny<Func<PersistentResponse, object, Task<bool>>>(),
It.IsAny<int>(),
It.IsAny<object>()))
.Returns<string, Func<PersistentResponse, object, Task<bool>>, int, object>(
(messageId, callback, maxMessages, state) =>
new DisposableAction(() => callback(new PersistentResponse(), state))
);

var transport = new Mock<ForeverTransport>(hostContext, json, heartBeat.Object, counters, traceManager.Object)
{
CallBase = true
};

var queue = new TaskQueue();

transport.Setup(t => t.EnqueueOperation(It.IsAny<Func<object, Task>>(), It.IsAny<object>()))
.Returns<Func<object, Task>, object>(
(writeAsync, state) => queue.Enqueue(writeAsync, state));

transport.Setup(t => t.InitializeResponse(It.IsAny<ITransportConnection>()))
.Returns<ITransportConnection>(
pr => TaskAsyncHelper.FromError(new Exception()));

transport.Setup(t => t.Send(It.IsAny<PersistentResponse>()))
.Returns<PersistentResponse>(
pr => transport.Object.EnqueueOperation(() => Task.FromResult(0)));

var tcs = new TaskCompletionSource<bool>();
transport.Object.AfterRequestEnd = (ex) =>
{
// Trip the cancellation token
tcs.TrySetResult(transport.Object.WriteQueue.IsDrained);
};

// Act
transport.Object.ProcessRequest(transportConnection.Object);

// Assert
Assert.True(tcs.Task.Wait(TimeSpan.FromSeconds(2)));
Assert.True(tcs.Task.Result);
}

public void EnqueAsyncWriteAndEndRequest(Func<Task> writeAsync)
{
var response = new Mock<IResponse>();
Expand Down

0 comments on commit cf28a8e

Please sign in to comment.