Skip to content

Commit

Permalink
Change InterfacedActor<T> to InterfacedActor #20
Browse files Browse the repository at this point in the history
Ditch CRTP.
  • Loading branch information
veblush committed May 19, 2016
1 parent 030cb03 commit 681c5f3
Show file tree
Hide file tree
Showing 16 changed files with 285 additions and 302 deletions.
1 change: 1 addition & 0 deletions core/Akka.Interfaced/Akka.Interfaced.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
<Compile Include="InterfacedActor.ObserverMap.cs" />
<Compile Include="InterfacedActor.PerInstanceFilterList.cs" />
<Compile Include="InterfacedActor.RequestWaiter.cs" />
<Compile Include="InterfacedActorHandler.cs" />
<Compile Include="InterfacedObserver.cs" />
<Compile Include="InterfacedActor.cs" />
<Compile Include="InterfacedActorRef.cs" />
Expand Down
69 changes: 25 additions & 44 deletions core/Akka.Interfaced/InterfacedActor.cs
Original file line number Diff line number Diff line change
@@ -1,50 +1,21 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;

namespace Akka.Interfaced
{
public abstract class InterfacedActor<T> : UntypedActor, IWithUnboundedStash, IRequestWaiter, IFilterPerInstanceProvider
where T : InterfacedActor<T>
public abstract class InterfacedActor : UntypedActor, IWithUnboundedStash, IRequestWaiter, IFilterPerInstanceProvider
{
#region Static Variables

private readonly static RequestDispatcher<T> RequestDispatcher;
private readonly static NotificationDispatcher<T> NotificationDispatcher;
private readonly static MessageDispatcher<T> MessageDispatcher;
private readonly static List<Func<object, IFilter>> PerInstanceFilterCreators;

static InterfacedActor()
{
var filterHandlerBuilder = new FilterHandlerBuilder(typeof(T));

var requestHandlerBuilder = new RequestHandlerBuilder<T>();
RequestDispatcher = new RequestDispatcher<T>(requestHandlerBuilder.Build(filterHandlerBuilder));

var notificationHandlerBuilder = new NotificationHandlerBuilder<T>();
NotificationDispatcher = new NotificationDispatcher<T>(notificationHandlerBuilder.Build(filterHandlerBuilder));

var messageHandlerBuilder = new MessageHandlerBuilder<T>();
MessageDispatcher = new MessageDispatcher<T>(messageHandlerBuilder.Build(filterHandlerBuilder));

PerInstanceFilterCreators = filterHandlerBuilder.PerInstanceFilterCreators;
}

#endregion

#region Member Variables

public IStash Stash { get; set; }

private readonly InterfacedActorHandler _handler;
private int _activeReentrantCount;
private MessageHandleContext _currentAtomicContext;
private InterfacedActorRequestWaiter _requestWaiter;
private InterfacedActorObserverMap _observerMap;
private InterfacedActorPerInstanceFilterList _perInstanceFilterList;

#endregion

protected new IActorRef Sender
{
get
Expand All @@ -54,6 +25,11 @@ protected new IActorRef Sender
}
}

public InterfacedActor()
{
_handler = InterfacedActorHandlerTable.Get(GetType());
}

// Atomic async OnPreStart event (it will be called after PreStart)
protected virtual Task OnPreStart()
{
Expand All @@ -69,8 +45,8 @@ protected virtual Task OnPreStop()

protected override void PreStart()
{
if (PerInstanceFilterCreators.Count > 0)
_perInstanceFilterList = new InterfacedActorPerInstanceFilterList(this, PerInstanceFilterCreators);
if (_handler.PerInstanceFilterCreators.Count > 0)
_perInstanceFilterList = new InterfacedActorPerInstanceFilterList(this, _handler.PerInstanceFilterCreators);

InvokeOnPreStart();
}
Expand Down Expand Up @@ -145,7 +121,7 @@ protected override void OnReceive(object message)
return;
}

var messageHandler = MessageDispatcher.GetHandler(message.GetType());
var messageHandler = _handler.MessageDispatcher.GetHandler(message.GetType());
if (messageHandler != null)
{
HandleMessageByHandler(message, messageHandler);
Expand All @@ -169,7 +145,7 @@ private void OnRequestMessage(RequestMessage request)
return;
}

var handlerItem = RequestDispatcher.GetHandler(request.InvokePayload.GetType());
var handlerItem = _handler.RequestDispatcher.GetHandler(request.InvokePayload.GetType());
if (handlerItem == null)
{
sender.Tell(new ResponseMessage
Expand All @@ -184,7 +160,7 @@ private void OnRequestMessage(RequestMessage request)
{
// sync handle

var response = handlerItem.Handler((T)this, request, null);
var response = handlerItem.Handler(this, request, null);
if (request.RequestId != 0)
sender.Tell(response);
}
Expand All @@ -207,7 +183,7 @@ private void OnRequestMessage(RequestMessage request)
{
var requestId = request.RequestId;
var isReentrant = handlerItem.IsReentrant;
handlerItem.AsyncHandler((T)this, request, response =>
handlerItem.AsyncHandler(this, request, response =>
{
if (requestId != 0)
sender.Tell(response);
Expand All @@ -234,15 +210,15 @@ private void OnNotificationMessage(NotificationMessage notification)
{
if (notification.ObserverId == 0)
{
var handlerItem = NotificationDispatcher.GetHandler(notification.InvokePayload.GetType());
var handlerItem = _handler.NotificationDispatcher.GetHandler(notification.InvokePayload.GetType());
if (handlerItem == null)
return;

if (handlerItem.Handler != null)
{
// sync handle

handlerItem.Handler((T)this, notification);
handlerItem.Handler(this, notification);
}
else
{
Expand All @@ -261,7 +237,7 @@ private void OnNotificationMessage(NotificationMessage notification)

using (new SynchronizationContextSwitcher(new ActorSynchronizationContext(context)))
{
handlerItem.AsyncHandler((T)this, notification)
handlerItem.AsyncHandler(this, notification)
.ContinueWith(t => OnTaskCompleted(handlerItem.IsReentrant),
TaskContinuationOptions.ExecuteSynchronously);
}
Expand Down Expand Up @@ -306,7 +282,7 @@ private void OnInterfacedPoisonPill()
}
}

private void HandleMessageByHandler(object message, MessageHandlerItem<T> handlerItem)
private void HandleMessageByHandler(object message, MessageHandlerItem handlerItem)
{
if (handlerItem.AsyncHandler != null)
{
Expand All @@ -323,14 +299,14 @@ private void HandleMessageByHandler(object message, MessageHandlerItem<T> handle

using (new SynchronizationContextSwitcher(new ActorSynchronizationContext(context)))
{
handlerItem.AsyncHandler((T)this, message)
handlerItem.AsyncHandler(this, message)
.ContinueWith(t => OnTaskCompleted(handlerItem.IsReentrant),
TaskContinuationOptions.ExecuteSynchronously);
}
}
else
{
handlerItem.Handler((T)this, message);
handlerItem.Handler(this, message);
}
}

Expand Down Expand Up @@ -490,4 +466,9 @@ IFilter IFilterPerInstanceProvider.GetFilter(int index)
return _perInstanceFilterList.Get(index);
}
}

[Obsolete("Use non generic version of InterfacedActor.")]
public abstract class InterfacedActor<T> : InterfacedActor
{
}
}
44 changes: 44 additions & 0 deletions core/Akka.Interfaced/InterfacedActorHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
using System;
using System.Collections.Generic;
using System.Collections.Concurrent;

namespace Akka.Interfaced
{
public class InterfacedActorHandler
{
public readonly RequestDispatcher RequestDispatcher;
public readonly NotificationDispatcher NotificationDispatcher;
public readonly MessageDispatcher MessageDispatcher;
public readonly List<Func<object, IFilter>> PerInstanceFilterCreators;

public InterfacedActorHandler(Type type)
{
var filterHandlerBuilder = new FilterHandlerBuilder(type);

var requestHandlerBuilder = new RequestHandlerBuilder();
RequestDispatcher = new RequestDispatcher(
requestHandlerBuilder.Build(type, filterHandlerBuilder));

var notificationHandlerBuilder = new NotificationHandlerBuilder();
NotificationDispatcher = new NotificationDispatcher(
notificationHandlerBuilder.Build(type, filterHandlerBuilder));

var messageHandlerBuilder = new MessageHandlerBuilder();
MessageDispatcher = new MessageDispatcher(
messageHandlerBuilder.Build(type, filterHandlerBuilder));

PerInstanceFilterCreators = filterHandlerBuilder.PerInstanceFilterCreators;
}
}

public static class InterfacedActorHandlerTable
{
private static ConcurrentDictionary<Type, InterfacedActorHandler> s_table =
new ConcurrentDictionary<Type, InterfacedActorHandler>();

public static InterfacedActorHandler Get(Type type)
{
return s_table.GetOrAdd(type, t => new InterfacedActorHandler(t));
}
}
}
11 changes: 5 additions & 6 deletions core/Akka.Interfaced/MessageDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@

namespace Akka.Interfaced
{
public class MessageDispatcher<T>
where T : class
public class MessageDispatcher
{
private readonly Dictionary<Type, MessageHandlerItem<T>> _handlerTable;
private readonly Dictionary<Type, MessageHandlerItem> _handlerTable;

public MessageDispatcher(Dictionary<Type, MessageHandlerItem<T>> handlerTable)
public MessageDispatcher(Dictionary<Type, MessageHandlerItem> handlerTable)
{
_handlerTable = handlerTable;
}

public MessageHandlerItem<T> GetHandler(Type type)
public MessageHandlerItem GetHandler(Type type)
{
MessageHandlerItem<T> item;
MessageHandlerItem item;
return _handlerTable.TryGetValue(type, out item) ? item : null;
}
}
Expand Down
10 changes: 5 additions & 5 deletions core/Akka.Interfaced/MessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

namespace Akka.Interfaced
{
public delegate void MessageHandler<in T>(T self, object message);
public delegate Task MessageAsyncHandler<in T>(T self, object message);
public delegate void MessageHandler(object self, object message);
public delegate Task MessageAsyncHandler(object self, object message);

public class MessageHandlerItem<T>
public class MessageHandlerItem
{
public bool IsReentrant;
public MessageHandler<T> Handler;
public MessageAsyncHandler<T> AsyncHandler;
public MessageHandler Handler;
public MessageAsyncHandler AsyncHandler;
}
}
39 changes: 19 additions & 20 deletions core/Akka.Interfaced/MessageHandlerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@

namespace Akka.Interfaced
{
internal class MessageHandlerBuilder<T>
where T : class
internal class MessageHandlerBuilder
{
private Dictionary<Type, MessageHandlerItem<T>> _table;
private Type _type;
private FilterHandlerBuilder _filterHandlerBuilder;
private Dictionary<Type, MessageHandlerItem> _table;

internal Dictionary<Type, MessageHandlerItem<T>> Build(FilterHandlerBuilder filterHandlerBuilder)
internal Dictionary<Type, MessageHandlerItem> Build(Type type, FilterHandlerBuilder filterHandlerBuilder)
{
_table = new Dictionary<Type, MessageHandlerItem<T>>();
_type = type;
_filterHandlerBuilder = filterHandlerBuilder;
_table = new Dictionary<Type, MessageHandlerItem>();

BuildAnnotatedMessageHandlers();

Expand All @@ -23,11 +24,9 @@ internal class MessageHandlerBuilder<T>

private void BuildAnnotatedMessageHandlers()
{
var type = typeof(T);

// create a handler for every method which has MessageHandlerAttribute

var methods = type.GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
var methods = _type.GetMethods(BindingFlags.Instance | BindingFlags.Public | BindingFlags.NonPublic);
foreach (var method in methods)
{
var attr = method.GetCustomAttribute<MessageHandlerAttribute>();
Expand All @@ -40,31 +39,31 @@ private void BuildAnnotatedMessageHandlers()

if (isAsyncMethod || filterChain.AsyncFilterExists)
{
var item = new MessageHandlerItem<T>
var item = new MessageHandlerItem
{
IsReentrant = HandlerBuilderHelpers.IsReentrantMethod(method),
AsyncHandler = BuildAsyncHandler(messageType, method, filterChain)
AsyncHandler = BuildAsyncHandler(_type, messageType, method, filterChain)
};
_table.Add(messageType, item);
}
else
{
if (method.GetCustomAttribute<AsyncStateMachineAttribute>() != null)
throw new InvalidOperationException($"Async void handler is not supported. ({type.FullName}.{method.Name})");
throw new InvalidOperationException($"Async void handler is not supported. ({_type.FullName}.{method.Name})");

var item = new MessageHandlerItem<T>
var item = new MessageHandlerItem
{
Handler = BuildHandler(messageType, method, filterChain)
Handler = BuildHandler(_type, messageType, method, filterChain)
};
_table.Add(messageType, item);
}
}
}

private static MessageHandler<T> BuildHandler(
Type messageType, MethodInfo method, FilterChain filterChain)
private static MessageHandler BuildHandler(
Type targetType, Type messageType, MethodInfo method, FilterChain filterChain)
{
var handler = MessageHandlerFuncBuilder.Build<T>(method);
var handler = MessageHandlerFuncBuilder.Build(targetType, method);
if (filterChain.Empty)
return handler;

Expand Down Expand Up @@ -123,13 +122,13 @@ private void BuildAnnotatedMessageHandlers()
};
}

private static MessageAsyncHandler<T> BuildAsyncHandler(
Type messageType, MethodInfo method, FilterChain filterChain)
private static MessageAsyncHandler BuildAsyncHandler(
Type targetType, Type messageType, MethodInfo method, FilterChain filterChain)
{
var isAsyncMethod = method.ReturnType.Name.StartsWith("Task");
var handler = isAsyncMethod
? MessageHandlerAsyncBuilder.Build<T>(method)
: MessageHandlerSyncToAsyncBuilder.Build<T>(method);
? MessageHandlerAsyncBuilder.Build(targetType, method)
: MessageHandlerSyncToAsyncBuilder.Build(targetType, method);
if (filterChain.Empty)
return handler;

Expand Down
Loading

0 comments on commit 681c5f3

Please sign in to comment.