Skip to content
This repository has been archived by the owner on Dec 18, 2018. It is now read-only.

Commit

Permalink
Various fixes in HttpConnectionDispatcher (#151)
Browse files Browse the repository at this point in the history
- The connection state object is manipulated by multiple parties in a non thread safe way. This change introduces a semaphore that should be used by anyone updating or reading the connection state. 
- Handle cases where there's an active request for a connection id and another incoming request for the same connection id, sse and websockets 409 and long polling kicks out the previous connection (#27 and #4)
- Handle requests being processed for disposed connections. There was a race where the background thread could remove and clean up the connection while it was about to be processed.
- Synchronize between the background scanning thread and the request threads when updating the connection state.
- Added `DisposeAndRemoveAsync` to the connection manager that handles`DisposeAsync` throwing and properly removes connections from connection tracking.
- Added Start to ConnectionManager so that testing is easier (background timer doesn't kick in unless start is called).
- Added RequestId to connection state for easier debugging and correlation (can easily see which request is currently processing the logical connection).
- Added tests
  • Loading branch information
davidfowl committed Jan 25, 2017
1 parent acd1dc5 commit 934f6a7
Show file tree
Hide file tree
Showing 12 changed files with 462 additions and 102 deletions.
88 changes: 69 additions & 19 deletions src/Microsoft.AspNetCore.Sockets/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,27 @@
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
using Microsoft.AspNetCore.Sockets.Internal;
using Microsoft.Extensions.Logging;

namespace Microsoft.AspNetCore.Sockets
{
public class ConnectionManager
{
private readonly ConcurrentDictionary<string, ConnectionState> _connections = new ConcurrentDictionary<string, ConnectionState>();
private readonly Timer _timer;
private Timer _timer;
private readonly ILogger<ConnectionManager> _logger;

public ConnectionManager()
public ConnectionManager(ILogger<ConnectionManager> logger)
{
_timer = new Timer(Scan, this, 0, 1000);
_logger = logger;
}

public void Start()
{
if (_timer == null)
{
_timer = new Timer(Scan, this, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
}
}

public bool TryGetConnection(string id, out ConnectionState state)
Expand Down Expand Up @@ -47,9 +57,11 @@ public ConnectionState CreateConnection()
public void RemoveConnection(string id)
{
ConnectionState state;
_connections.TryRemove(id, out state);

// Remove the connection completely
if (_connections.TryRemove(id, out state))
{
// Remove the connection completely
_logger.LogDebug("Removing {connectionId} from the list of connections", id);
}
}

private static string MakeNewConnectionId()
Expand All @@ -65,38 +77,76 @@ private static void Scan(object state)

private void Scan()
{
// Scan the registered connections looking for ones that have timed out
foreach (var c in _connections)
// Pause the timer while we're running
_timer.Change(Timeout.Infinite, Timeout.Infinite);

try
{
if (!c.Value.Active && (DateTimeOffset.UtcNow - c.Value.LastSeenUtc).TotalSeconds > 5)
// Scan the registered connections looking for ones that have timed out
foreach (var c in _connections)
{
ConnectionState s;
if (_connections.TryRemove(c.Key, out s))
var status = ConnectionState.ConnectionStatus.Inactive;
var lastSeenUtc = DateTimeOffset.UtcNow;

try
{
c.Value.Lock.Wait();

// Capture the connection state
status = c.Value.Status;

lastSeenUtc = c.Value.LastSeenUtc;
}
finally
{
// REVIEW: Should we keep firing and forgetting this?
var ignore = s.DisposeAsync();
c.Value.Lock.Release();
}

// Once the decision has been made to to dispose we don't check the status again
if (status == ConnectionState.ConnectionStatus.Inactive && (DateTimeOffset.UtcNow - lastSeenUtc).TotalSeconds > 5)
{
var ignore = DisposeAndRemoveAsync(c.Value);
}
}
}
finally
{
// Resume once we finished processing all connections
_timer.Change(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1));
}
}

public void CloseConnections()
{
// Stop firing the timer
_timer.Dispose();
_timer?.Dispose();

var tasks = new List<Task>();

foreach (var c in _connections)
{
ConnectionState s;
if (_connections.TryRemove(c.Key, out s))
{
tasks.Add(s.DisposeAsync());
}
tasks.Add(DisposeAndRemoveAsync(c.Value));
}

Task.WaitAll(tasks.ToArray(), TimeSpan.FromSeconds(5));
}

public async Task DisposeAndRemoveAsync(ConnectionState state)
{
try
{
await state.DisposeAsync();
}
catch (Exception ex)
{
_logger.LogError(0, ex, "Failed disposing connection {connectionId}", state.Connection.ConnectionId);
}
finally
{
// Remove it from the list after disposal so that's it's easy to see
// connections that might be in a hung state via the connections list
RemoveConnection(state.Connection.ConnectionId);
}
}
}
}
175 changes: 139 additions & 36 deletions src/Microsoft.AspNetCore.Sockets/HttpConnectionDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Sockets.Internal;
Expand Down Expand Up @@ -70,8 +71,6 @@ private async Task ExecuteEndpointAsync(string path, HttpContext context, EndPoi
var sse = new ServerSentEventsTransport(state.Application.Input, _loggerFactory);

await DoPersistentConnection(endpoint, sse, context, state);

_manager.RemoveConnection(state.Connection.ConnectionId);
}
else if (context.Request.Path.StartsWithSegments(path + "/ws"))
{
Expand All @@ -92,8 +91,6 @@ private async Task ExecuteEndpointAsync(string path, HttpContext context, EndPoi
var ws = new WebSocketsTransport(state.Application, _loggerFactory);

await DoPersistentConnection(endpoint, ws, context, state);

_manager.RemoveConnection(state.Connection.ConnectionId);
}
else if (context.Request.Path.StartsWithSegments(path + "/poll"))
{
Expand All @@ -111,39 +108,112 @@ private async Task ExecuteEndpointAsync(string path, HttpContext context, EndPoi
return;
}

// Mark the connection as active
state.Active = true;

// Raise OnConnected for new connections only since polls happen all the time
if (state.ApplicationTask == null)
try
{
_logger.LogDebug("Establishing new Long Polling connection: {0}", state.Connection.ConnectionId);
await state.Lock.WaitAsync();

if (state.Status == ConnectionState.ConnectionStatus.Disposed)
{
_logger.LogDebug("Connection {connectionId} was disposed,", state.Connection.ConnectionId);

// The connection was disposed
context.Response.StatusCode = StatusCodes.Status404NotFound;
return;
}

if (state.Status == ConnectionState.ConnectionStatus.Active)
{
_logger.LogDebug("Connection {connectionId} is already active via {requestId}. Cancelling previous request.", state.Connection.ConnectionId, state.RequestId);

using (state.Cancellation)
{
// Cancel the previous request
state.Cancellation.Cancel();

try
{
// Wait for the previous request to drain
await state.TransportTask;
}
catch (OperationCanceledException)
{
// Should be a cancelled task
}

_logger.LogDebug("Previous poll cancelled for {connectionId} on {requestId}.", state.Connection.ConnectionId, state.RequestId);
}
}

// Mark the request identifier
state.RequestId = context.TraceIdentifier;

// This will re-initialize formatType metadata, but meh...
state.Connection.Metadata["transport"] = LongPollingTransport.Name;
// Mark the connection as active
state.Status = ConnectionState.ConnectionStatus.Active;

state.ApplicationTask = endpoint.OnConnectedAsync(state.Connection);
// Raise OnConnected for new connections only since polls happen all the time
if (state.ApplicationTask == null)
{
_logger.LogDebug("Establishing new connection: {connectionId} on {requestId}", state.Connection.ConnectionId, state.RequestId);

state.Connection.Metadata["transport"] = LongPollingTransport.Name;

state.ApplicationTask = endpoint.OnConnectedAsync(state.Connection);
}
else
{
_logger.LogDebug("Resuming existing connection: {connectionId} on {requestId}", state.Connection.ConnectionId, state.RequestId);
}

var longPolling = new LongPollingTransport(state.Application.Input, _loggerFactory);

state.Cancellation = new CancellationTokenSource();

// REVIEW: Performance of this isn't great as this does a bunch of per request allocations
var tokenSource = CancellationTokenSource.CreateLinkedTokenSource(state.Cancellation.Token, context.RequestAborted);

// Start the transport
state.TransportTask = longPolling.ProcessRequestAsync(context, tokenSource.Token);
}
else
finally
{
_logger.LogDebug("Resuming existing Long Polling connection: {0}", state.Connection.ConnectionId);
state.Lock.Release();
}

var longPolling = new LongPollingTransport(state.Application.Input, _loggerFactory);

// Start the transport
state.TransportTask = longPolling.ProcessRequestAsync(context);

var resultTask = await Task.WhenAny(state.ApplicationTask, state.TransportTask);

// If the application ended before the transport task then we need to end the connection completely
// so there is no future polling
if (resultTask == state.ApplicationTask)
{
await state.DisposeAsync();
await _manager.DisposeAndRemoveAsync(state);
}
else if (!resultTask.IsCanceled)
{
// Otherwise, we update the state to inactive again and wait for the next poll
try
{
await state.Lock.WaitAsync();

if (state.Status == ConnectionState.ConnectionStatus.Active)
{
// Mark the connection as inactive
state.LastSeenUtc = DateTime.UtcNow;

state.Status = ConnectionState.ConnectionStatus.Inactive;

state.RequestId = null;

// Dispose the cancellation token
state.Cancellation.Dispose();

state.Cancellation = null;
}
}
finally
{
state.Lock.Release();
}
}

// Mark the connection as inactive
state.LastSeenUtc = DateTime.UtcNow;
state.Active = false;
}
}

Expand All @@ -163,22 +233,55 @@ private ConnectionState CreateConnection(HttpContext context)
return state;
}

private static async Task DoPersistentConnection(EndPoint endpoint,
IHttpTransport transport,
HttpContext context,
ConnectionState state)
private async Task DoPersistentConnection(EndPoint endpoint,
IHttpTransport transport,
HttpContext context,
ConnectionState state)
{
// Call into the end point passing the connection
state.ApplicationTask = endpoint.OnConnectedAsync(state.Connection);
try
{
await state.Lock.WaitAsync();

if (state.Status == ConnectionState.ConnectionStatus.Disposed)
{
_logger.LogDebug("Connection {connectionId} was disposed,", state.Connection.ConnectionId);

// Start the transport
state.TransportTask = transport.ProcessRequestAsync(context);
// Connection was disposed
context.Response.StatusCode = StatusCodes.Status404NotFound;
return;
}

// There's already an active request
if (state.Status == ConnectionState.ConnectionStatus.Active)
{
_logger.LogDebug("Connection {connectionId} is already active via {requestId}.", state.Connection.ConnectionId, state.RequestId);

// Reject the request with a 409 conflict
context.Response.StatusCode = StatusCodes.Status409Conflict;
return;
}

// Mark the connection as active
state.Status = ConnectionState.ConnectionStatus.Active;

// Store the request identifier
state.RequestId = context.TraceIdentifier;

// Call into the end point passing the connection
state.ApplicationTask = endpoint.OnConnectedAsync(state.Connection);

// Start the transport
state.TransportTask = transport.ProcessRequestAsync(context, context.RequestAborted);
}
finally
{
state.Lock.Release();
}

// Wait for any of them to end
await Task.WhenAny(state.ApplicationTask, state.TransportTask);

// Kill the channel
await state.DisposeAsync();
await _manager.DisposeAndRemoveAsync(state);
}

private Task ProcessNegotiate(HttpContext context)
Expand Down Expand Up @@ -243,7 +346,7 @@ private async Task<bool> EnsureConnectionStateAsync(ConnectionState connectionSt
}
else if (!string.Equals(transport, transportName, StringComparison.Ordinal))
{
context.Response.StatusCode = 400;
context.Response.StatusCode = StatusCodes.Status400BadRequest;
await context.Response.WriteAsync("Cannot change transports mid-connection");
return false;
}
Expand Down
Loading

0 comments on commit 934f6a7

Please sign in to comment.