Permalink
Browse files

Queue operations at the transport level so we overlap writes and flush.

- Moved TaskQueue to core infrastructure.
- Changed all the transports.
- Fixes issues caused by #874 in ASP.NET specifically.
  • Loading branch information...
1 parent ebd0f07 commit a2d13e8fcdcd5942ba38fbe8100189d995f536e7 @davidfowl davidfowl committed Oct 31, 2012
@@ -106,6 +106,7 @@
<Compile Include="Infrastructure\PerformanceCounterManager.cs" />
<Compile Include="Infrastructure\Disposer.cs" />
<Compile Include="Infrastructure\PerformanceCounterWrapper.cs" />
+ <Compile Include="Infrastructure\TaskQueue.cs" />
<Compile Include="Json\IJsonValue.cs" />
<Compile Include="Json\IJsonWritable.cs" />
<Compile Include="Json\JTokenValue.cs" />
@@ -21,7 +21,7 @@ public class ForeverFrameTransport : ForeverTransport
private const string _initSuffix = "') : null,\r\n" +
" r = ff ? ff.receive : function() {};\r\n" +
" ff ? ff.started(c) : '';" +
- "</script></head>" +
+ "</script></head>" +
"<body>\r\n";
private readonly bool _isDebug;
@@ -34,30 +34,30 @@ public ForeverFrameTransport(HostContext context, IDependencyResolver resolver)
public override Task KeepAlive()
{
- lock (_writeLock)
+ return EnqueueOperation(() =>
{
OutputWriter.Write("<script>r(c, {});</script>");
OutputWriter.WriteLine();
OutputWriter.WriteLine();
OutputWriter.Flush();
return Context.Response.FlushAsync();
- }
+ });
}
public override Task Send(PersistentResponse response)
{
OnSendingResponse(response);
- lock (_writeLock)
+ return EnqueueOperation(() =>
{
OutputWriter.Write("<script>r(c, ");
JsonSerializer.Serialize(response, OutputWriter);
OutputWriter.Write(");</script>\r\n");
OutputWriter.Flush();
return Context.Response.FlushAsync().Catch(IncrementErrorCounters);
- }
+ });
}
protected override Task InitializeResponse(ITransportConnection connection)
@@ -67,13 +67,13 @@ protected override Task InitializeResponse(ITransportConnection connection)
{
Context.Response.ContentType = "text/html";
- lock (_writeLock)
+ return EnqueueOperation(() =>
{
OutputWriter.Write(initScript);
OutputWriter.Flush();
return Context.Response.FlushAsync();
- }
+ });
},
_initPrefix + Context.Request.QueryString["frameId"] + _initSuffix);
}
@@ -156,13 +156,13 @@ public virtual Task Send(object value)
{
Context.Response.ContentType = Json.MimeType;
- lock (_writeLock)
+ return EnqueueOperation(() =>
{
JsonSerializer.Serialize(value, OutputWriter);
OutputWriter.Flush();
return Context.Response.EndAsync();
- }
+ });
}
protected virtual Task InitializeResponse(ITransportConnection connection)
@@ -328,7 +328,7 @@ private void ProcessMessages(ITransportConnection connection, Func<Task> postRec
},
MaxMessages);
}
- catch(Exception ex)
+ catch (Exception ex)
{
endRequest(ex);
@@ -2,7 +2,6 @@
using System;
using System.Collections.Generic;
-using System.Diagnostics;
using System.Threading.Tasks;
using Microsoft.AspNet.SignalR.Infrastructure;
@@ -171,21 +170,24 @@ public virtual Task Send(object value)
{
Context.Response.ContentType = IsJsonp ? Json.JsonpMimeType : Json.MimeType;
- if (IsJsonp)
+ return EnqueueOperation(() =>
{
- OutputWriter.Write(JsonpCallback);
- OutputWriter.Write("(");
- }
+ if (IsJsonp)
+ {
+ OutputWriter.Write(JsonpCallback);
+ OutputWriter.Write("(");
+ }
- _jsonSerializer.Serialize(value, OutputWriter);
+ _jsonSerializer.Serialize(value, OutputWriter);
- if (IsJsonp)
- {
- OutputWriter.Write(");");
- }
+ if (IsJsonp)
+ {
+ OutputWriter.Write(");");
+ }
- OutputWriter.Flush();
- return Context.Response.EndAsync();
+ OutputWriter.Flush();
+ return Context.Response.EndAsync();
+ });
}
private Task ProcessSendRequest()
@@ -15,22 +15,22 @@ public ServerSentEventsTransport(HostContext context, IDependencyResolver resolv
public override Task KeepAlive()
{
- lock (_writeLock)
+ return EnqueueOperation(() =>
{
OutputWriter.Write("data: {}");
OutputWriter.WriteLine();
OutputWriter.WriteLine();
OutputWriter.Flush();
return Context.Response.FlushAsync();
- }
+ });
}
public override Task Send(PersistentResponse response)
{
OnSendingResponse(response);
- lock (_writeLock)
+ return EnqueueOperation(() =>
{
OutputWriter.Write("id: ");
OutputWriter.Write(response.MessageId);
@@ -42,7 +42,7 @@ public override Task Send(PersistentResponse response)
OutputWriter.Flush();
return Context.Response.FlushAsync().Catch(IncrementErrorCounters);
- }
+ });
}
protected override Task InitializeResponse(ITransportConnection connection)
@@ -52,7 +52,7 @@ protected override Task InitializeResponse(ITransportConnection connection)
{
Context.Response.ContentType = "text/event-stream";
- lock (_writeLock)
+ return EnqueueOperation(() =>
{
// "data: initialized\n\n"
OutputWriter.Write("data: initialized");
@@ -61,7 +61,7 @@ protected override Task InitializeResponse(ITransportConnection connection)
OutputWriter.Flush();
return Context.Response.FlushAsync();
- }
+ });
});
}
}
@@ -32,8 +32,8 @@ public abstract class TransportDisconnectBase : ITrackingConnection
// Token that represents the host shutting down
private CancellationToken _hostShutdownToken;
- // Lock to protect against overlapping writes to the underlying response stream
- protected readonly object _writeLock = new object();
+ // Queue to protect against overlapping writes to the underlying response stream
+ private readonly TaskQueue _writeQueue = new TaskQueue();
public TransportDisconnectBase(HostContext context, IJsonSerializer jsonSerializer, ITransportHeartBeat heartBeat, IPerformanceCounterManager performanceCounterManager)
{
@@ -218,6 +218,11 @@ public void CompleteRequest()
}
}
+ protected Task EnqueueOperation(Func<Task> writeAsync)
+ {
+ return _writeQueue.Enqueue(writeAsync);
+ }
+
protected void InitializePersistentState()
{
_hostShutdownToken = _context.HostShutdownToken();
@@ -18,7 +18,6 @@ public class AspNetResponse : IResponse
private static readonly Lazy<RemoveHeaderDel> IIS7RemoveHeader = new Lazy<RemoveHeaderDel>(GetRemoveHeaderDelegate);
private readonly HttpContextBase _context;
private bool _bufferingDisabled;
- private readonly TaskQueue _queue = new TaskQueue();
public AspNetResponse(HttpContextBase context)
{
@@ -117,7 +116,7 @@ public void Write(ArraySegment<byte> data)
public Task FlushAsync()
{
- return _queue.Enqueue(() => _context.Response.FlushAsync());
+ return _context.Response.FlushAsync();
}
public Task EndAsync()
@@ -48,9 +48,6 @@
<Compile Include="..\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs">
<Link>Infrastructure\TaskAsyncHelper.cs</Link>
</Compile>
- <Compile Include="..\Microsoft.AspNet.SignalR.Hosting.AspNet45\Infrastructure\TaskQueue.cs">
- <Link>Infrastructure\TaskQueue.cs</Link>
- </Compile>
<Compile Include="AspNetRequest.cs" />
<Compile Include="AspNetResponse.cs" />
<Compile Include="AspNetShutDownDetector.cs" />
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
+<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
@@ -50,6 +50,9 @@
<Compile Include="..\Common\CommonVersionInfo.cs">
<Link>Properties\CommonVersionInfo.cs</Link>
</Compile>
+ <Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\TaskQueue.cs">
+ <Link>Infrastructure\TaskQueue.cs</Link>
+ </Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Hosting.AspNet\AspNetBootstrapper.cs">
<Link>AspNetBootstrapper.cs</Link>
</Compile>
@@ -94,7 +97,6 @@
</Compile>
<Compile Include="WebSockets\DefaultWebSocketHandler.cs" />
<Compile Include="Infrastructure\ByteBuffer.cs" />
- <Compile Include="Infrastructure\TaskQueue.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="WebSockets\WebSocketHandler.cs" />
<Compile Include="WebSockets\WebSocketMessage.cs" />
@@ -120,4 +122,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
+<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
@@ -45,12 +45,12 @@
<Compile Include="..\Common\CommonVersionInfo.cs">
<Link>Properties\CommonVersionInfo.cs</Link>
</Compile>
+ <Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\TaskQueue.cs">
+ <Link>Infrastructure\TaskQueue.cs</Link>
+ </Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Hosting.AspNet45\Infrastructure\ByteBuffer.cs">
<Link>Infrastructure\ByteBuffer.cs</Link>
</Compile>
- <Compile Include="..\Microsoft.AspNet.SignalR.Hosting.AspNet45\Infrastructure\TaskQueue.cs">
- <Link>Infrastructure\TaskQueue.cs</Link>
- </Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Hosting.AspNet45\WebSockets\DefaultWebSocketHandler.cs">
<Link>WebSockets\DefaultWebSocketHandler.cs</Link>
</Compile>
@@ -109,4 +109,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>
@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
+<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
@@ -51,7 +51,7 @@
<Compile Include="..\Common\CommonVersionInfo.cs">
<Link>Properties\CommonVersionInfo.cs</Link>
</Compile>
- <Compile Include="..\Microsoft.AspNet.SignalR.Hosting.AspNet45\Infrastructure\TaskQueue.cs">
+ <Compile Include="..\Microsoft.AspNet.SignalR.Core\Infrastructure\TaskQueue.cs">
<Link>Infrastructure\TaskQueue.cs</Link>
</Compile>
<Compile Include="..\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs">
@@ -81,4 +81,4 @@
<Target Name="AfterBuild">
</Target>
-->
-</Project>
+</Project>

0 comments on commit a2d13e8

Please sign in to comment.