Permalink
Browse files

Implemented hub pipeline

- Added IHubPipelineModule as an extensibility point
- Added ability to add Modules to the IPipeline on the Host
  • Loading branch information...
1 parent 3fa839a commit 83fdbfd9baa1f1cc3399d7f210cb062597c8084c @davidfowl davidfowl committed with halter73 Sep 6, 2012
@@ -1,4 +1,5 @@
using System;
+using SignalR.Hubs;
namespace SignalR
{
@@ -46,5 +47,16 @@ public static IConnectionManager ConnectionManager
return DependencyResolver.Resolve<IConnectionManager>();
}
}
+
+ /// <summary>
+ ///
+ /// </summary>
+ public static IHubPipeline HubPipeline
+ {
+ get
+ {
+ return DependencyResolver.Resolve<IHubPipeline>();
+ }
+ }
}
}
@@ -1,4 +1,6 @@
-namespace SignalR.Hosting.Common
+using SignalR.Hubs;
+
+namespace SignalR.Hosting.Common
{
public class Host
{
@@ -41,5 +43,16 @@ public IConfigurationManager Configuration
return DependencyResolver.Resolve<IConfigurationManager>();
}
}
+
+ /// <summary>
+ ///
+ /// </summary>
+ public IHubPipeline HubPipeline
+ {
+ get
+ {
+ return DependencyResolver.Resolve<IHubPipeline>();
+ }
+ }
}
}
@@ -1,6 +1,7 @@
using System;
using System.Diagnostics;
using System.Linq;
+using System.Threading.Tasks;
using SignalR.Hubs;
using SignalR.Infrastructure;
@@ -73,13 +74,16 @@ public IHubContext GetHubContext(string hubName)
{
var connection = GetConnection(connectionName: null);
var hubManager = _resolver.Resolve<IHubManager>();
+ var pipelineInvoker = _resolver.Resolve<IHubPipelineInvoker>();
HubDescriptor hubDescriptor = hubManager.EnsureHub(hubName,
_hubResolutionErrorsTotalCounter,
_hubResolutionErrorsPerSecCounter,
_allErrorsTotalCounter,
_allErrorsPerSecCounter);
- return new HubContext(new ClientProxy(connection, hubDescriptor.Name),
+ Func<string, ClientHubInvocation, Task> send = (signal, value) => pipelineInvoker.Send(new HubOutgoingInvokerContext(connection, signal, value));
+
+ return new HubContext(new ClientProxy(send, hubDescriptor.Name),
new GroupManager(connection, hubName));
}
@@ -0,0 +1,35 @@
+using System.Collections.Generic;
+
+namespace SignalR.Hubs
+{
+ /// <summary>
+ ///
+ /// </summary>
+ public class ClientHubInvocation
+ {
+ /// <summary>
+ ///
+ /// </summary>
+ public string GroupOrConnectionId { get; set; }
+
+ /// <summary>
+ ///
+ /// </summary>
+ public string Hub { get; set; }
+
+ /// <summary>
+ ///
+ /// </summary>
+ public string Method { get; set; }
+
+ /// <summary>
+ ///
+ /// </summary>
+ public object[] Args { get; set; }
+
+ /// <summary>
+ ///
+ /// </summary>
+ public IDictionary<string, object> State { get; set; }
+ }
+}
@@ -1,24 +1,25 @@
-using System.Dynamic;
+using System;
+using System.Dynamic;
using System.Threading.Tasks;
namespace SignalR.Hubs
{
public class ClientProxy : DynamicObject, IClientProxy
{
- private readonly IConnection _connection;
+ private readonly Func<string, ClientHubInvocation, Task> _send;
private readonly string _hubName;
- public ClientProxy(IConnection connection, string hubName)
+ public ClientProxy(Func<string, ClientHubInvocation, Task> send, string hubName)
{
- _connection = connection;
+ _send = send;
_hubName = hubName;
}
public dynamic this[string key]
{
get
{
- return new SignalProxy(_connection, key, _hubName);
+ return new SignalProxy(_send, key, _hubName);
}
}
@@ -36,14 +37,14 @@ public override bool TryInvokeMember(InvokeMemberBinder binder, object[] args, o
public Task Invoke(string method, params object[] args)
{
- var invocation = new
+ var invocation = new ClientHubInvocation
{
Hub = _hubName,
Method = method,
Args = args
};
- return _connection.Send(_hubName, invocation);
+ return _send(_hubName, invocation);
}
}
}
@@ -19,6 +19,7 @@ public class HubDispatcher : PersistentConnection
private IHubManager _manager;
private IHubRequestParser _requestParser;
private IParameterResolver _binder;
+ private IHubPipelineInvoker _pipelineInvoker;
private readonly List<HubDescriptor> _hubs = new List<HubDescriptor>();
private bool _isDebuggingEnabled;
private PerformanceCounter _allErrorsTotalCounter;
@@ -52,6 +53,7 @@ public override void Initialize(IDependencyResolver resolver)
_manager = resolver.Resolve<IHubManager>();
_binder = resolver.Resolve<IParameterResolver>();
_requestParser = resolver.Resolve<IHubRequestParser>();
+ _pipelineInvoker = resolver.Resolve<IHubPipelineInvoker>();
var counters = resolver.Resolve<IPerformanceCounterWriter>();
_allErrorsTotalCounter = counters.GetCounter(PerformanceCounters.ErrorsAllTotal);
@@ -93,82 +95,144 @@ protected override Task OnReceivedAsync(IRequest request, string connectionId, s
var state = new TrackingDictionary(hubRequest.State);
var hub = CreateHub(request, descriptor, connectionId, state, throwIfFailedToCreate: true);
- Task resultTask;
+ return InvokeHubPipeline(request, connectionId, data, hubRequest, parameterValues, methodDescriptor, state, hub);
+ }
+
+ private Task InvokeHubPipeline(IRequest request, string connectionId, string data, HubRequest hubRequest, IJsonValue[] parameterValues, MethodDescriptor methodDescriptor, TrackingDictionary state, IHub hub)
+ {
+
+ var args = _binder.ResolveMethodParameters(methodDescriptor, parameterValues);
+ var context = new HubInvokerContext(hub, state, methodDescriptor, args);
+
+ // Invoke the pipeline
+ return _pipelineInvoker.Invoke(context)
+ .ContinueWith(task =>
+ {
+ if (task.IsFaulted)
+ {
+ return ProcessResponse(state, null, hubRequest, task.Exception);
+ }
+ else
+ {
+ return ProcessResponse(state, task.Result, hubRequest, null);
+ }
+ })
+ .FastUnwrap();
+
+ }
+
+ public override Task ProcessRequestAsync(HostContext context)
+ {
+ // Generate the proxy
+ if (context.Request.Url.LocalPath.EndsWith("/hubs", StringComparison.OrdinalIgnoreCase))
+ {
+ context.Response.ContentType = "application/x-javascript";
+ return context.Response.EndAsync(_proxyGenerator.GenerateProxy(_url));
+ }
+
+ _isDebuggingEnabled = context.IsDebuggingEnabled();
+
+ return base.ProcessRequestAsync(context);
+ }
+
+ internal static Task Connect(IHub hub)
+ {
+ return ((IConnected)hub).Connect();
+ }
+
+ internal static Task Reconnect(IHub hub, IEnumerable<string> groups)
+ {
+ return ((IConnected)hub).Reconnect(groups);
+ }
+
+ internal static Task Disconnect(IHub hub)
+ {
+ return ((IDisconnect)hub).Disconnect();
+ }
+
+ internal static Task<object> Incoming(IHubIncomingInvokerContext context)
+ {
+ var tcs = new TaskCompletionSource<object>();
try
{
- // Invoke the method
- object result = methodDescriptor.Invoker.Invoke(hub, _binder.ResolveMethodParameters(methodDescriptor, parameterValues));
- Type returnType = result != null ? result.GetType() : methodDescriptor.ReturnType;
+ var result = context.MethodDescriptor.Invoker.Invoke(context.Hub, context.Args);
+ Type returnType = context.MethodDescriptor.ReturnType;
if (typeof(Task).IsAssignableFrom(returnType))
{
var task = (Task)result;
if (!returnType.IsGenericType)
{
- return task.ContinueWith(t => ProcessResponse(state, null, hubRequest, t.Exception))
- .FastUnwrap();
+ task.ContinueWith(tcs);
}
else
{
// Get the <T> in Task<T>
Type resultType = returnType.GetGenericArguments().Single();
- // Get the correct ContinueWith overload
- var continueWith = TaskAsyncHelper.GetContinueWith(task.GetType());
+ Type genericTaskType = typeof(Task<>).MakeGenericType(resultType);
- var taskParameter = Expression.Parameter(continueWith.Type);
- var processResultMethod = typeof(HubDispatcher).GetMethod("ProcessTaskResult", BindingFlags.NonPublic | BindingFlags.Instance).MakeGenericMethod(resultType);
+ // Get the correct ContinueWith overload
+ var parameter = Expression.Parameter(typeof(object));
- var body = Expression.Call(Expression.Constant(this),
- processResultMethod,
- Expression.Constant(state),
- Expression.Constant(hubRequest),
- taskParameter);
+ // TODO: Cache this whole thing
+ var continueWithMethod = typeof(HubDispatcher).GetMethod("ContinueWith", BindingFlags.NonPublic | BindingFlags.Static)
+ .MakeGenericMethod(resultType);
- var lambda = Expression.Lambda(body, taskParameter);
+ Expression body = Expression.Call(continueWithMethod,
+ Expression.Convert(parameter, genericTaskType),
+ Expression.Constant(tcs));
- var call = Expression.Call(Expression.Constant(task, continueWith.Type), continueWith.Method, lambda);
- Func<Task<Task>> continueWithMethod = Expression.Lambda<Func<Task<Task>>>(call).Compile();
- return continueWithMethod.Invoke().FastUnwrap();
+ var continueWithInvoker = Expression.Lambda<Action<object>>(body, parameter).Compile();
+ continueWithInvoker.Invoke(result);
}
}
else
{
- resultTask = ProcessResponse(state, result, hubRequest, null);
+ tcs.TrySetResult(result);
}
}
- catch (TargetInvocationException e)
+ catch (Exception ex)
{
- resultTask = ProcessResponse(state, null, hubRequest, e);
+ tcs.TrySetException(ex);
}
- return resultTask.Then(() => base.OnReceivedAsync(request, connectionId, data))
- .Catch();
+ return tcs.Task;
}
- public override Task ProcessRequestAsync(HostContext context)
+ private static void ContinueWith<T>(Task<T> task, TaskCompletionSource<object> tcs)
{
- // Generate the proxy
- if (context.Request.Url.LocalPath.EndsWith("/hubs", StringComparison.OrdinalIgnoreCase))
+ task.ContinueWith(t =>
{
- context.Response.ContentType = "application/x-javascript";
- return context.Response.EndAsync(_proxyGenerator.GenerateProxy(_url));
- }
-
- _isDebuggingEnabled = context.IsDebuggingEnabled();
+ if (t.IsFaulted)
+ {
+ tcs.TrySetException(t.Exception);
+ }
+ else if (t.IsCanceled)
+ {
+ tcs.TrySetCanceled();
+ }
+ else
+ {
+ tcs.TrySetResult(t.Result);
+ }
+ });
+ }
- return base.ProcessRequestAsync(context);
+ internal static Task Outgoing(IHubOutgoingInvokerContext context)
+ {
+ return context.Connection.Send(context.Signal, context.Invocation);
}
protected override Task OnConnectedAsync(IRequest request, string connectionId)
{
- return ExecuteHubEventAsync<IConnected>(request, connectionId, hub => hub.Connect());
+ return ExecuteHubEventAsync<IConnected>(request, connectionId, hub => _pipelineInvoker.Connect(hub));
}
protected override Task OnReconnectedAsync(IRequest request, IEnumerable<string> groups, string connectionId)
{
- return ExecuteHubEventAsync<IConnected>(request, connectionId, hub => hub.Reconnect(groups));
+ return ExecuteHubEventAsync<IConnected>(request, connectionId, hub => _pipelineInvoker.Reconnect(hub, groups));
}
protected override IEnumerable<string> OnRejoiningGroups(IRequest request, IEnumerable<string> groups, string connectionId)
@@ -188,14 +252,14 @@ protected override IEnumerable<string> OnRejoiningGroups(IRequest request, IEnum
protected override Task OnDisconnectAsync(string connectionId)
{
- return ExecuteHubEventAsync<IDisconnect>(request: null, connectionId: connectionId, action: hub => hub.Disconnect());
+ return ExecuteHubEventAsync<IDisconnect>(request: null, connectionId: connectionId, action: hub => _pipelineInvoker.Disconnect(hub));
}
- private Task ExecuteHubEventAsync<T>(IRequest request, string connectionId, Func<T, Task> action) where T : class
+ private Task ExecuteHubEventAsync<T>(IRequest request, string connectionId, Func<IHub, Task> action) where T : class
{
var operations = GetHubsImplementingInterface(typeof(T))
.Select(hub => CreateHub(request, hub, connectionId))
- .OfType<T>()
+ .Where(hub => hub != null)
.Select(instance => action(instance).Catch().OrEmpty())
.ToList();
@@ -234,9 +298,12 @@ private IHub CreateHub(IRequest request, HubDescriptor descriptor, string connec
if (hub != null)
{
state = state ?? new TrackingDictionary();
+
+ Func<string, ClientHubInvocation, Task> send = (signal, value) => _pipelineInvoker.Send(new HubOutgoingInvokerContext(Connection, signal, value));
+
hub.Context = new HubCallerContext(request, connectionId);
- hub.Caller = new StatefulSignalProxy(Connection, connectionId, descriptor.Name, state);
- hub.Clients = new ClientProxy(Connection, descriptor.Name);
+ hub.Caller = new StatefulSignalProxy(send, connectionId, descriptor.Name, state);
+ hub.Clients = new ClientProxy(send, descriptor.Name);
hub.Groups = new GroupManager(Connection, descriptor.Name);
}
Oops, something went wrong.

0 comments on commit 83fdbfd

Please sign in to comment.