Skip to content

Commit

Permalink
Stateful actors
Browse files Browse the repository at this point in the history
  • Loading branch information
yevhen committed Feb 13, 2017
1 parent b7fdf0d commit eddb4c5
Show file tree
Hide file tree
Showing 21 changed files with 645 additions and 57 deletions.
6 changes: 3 additions & 3 deletions Source/Orleankka.Runtime/Actor.cs
Expand Up @@ -27,16 +27,16 @@ protected Actor(string id, IActorRuntime runtime, Dispatcher dispatcher = null)
Path = GetType().ToActorPath(id);
}

internal void Initialize(ActorEndpoint endpoint, ActorPath path, IActorRuntime runtime, Dispatcher dispatcher)
internal virtual void Initialize(IActorHost host, ActorPath path, IActorRuntime runtime, Dispatcher dispatcher)
{
Path = path;
Runtime = runtime;
Dispatcher = Dispatcher ?? dispatcher;
Endpoint = endpoint;
Host = host;
}

public string Id => Path.Id;
internal ActorEndpoint Endpoint {get; private set;}
internal IActorHost Host {get; private set;}

public ActorPath Path {get; private set;}
public IActorRuntime Runtime {get; private set;}
Expand Down
8 changes: 4 additions & 4 deletions Source/Orleankka.Runtime/ActorRuntime.cs
Expand Up @@ -13,12 +13,12 @@ public interface IActorRuntime

public sealed class ActorRuntime : IActorRuntime
{
internal ActorRuntime(IActorSystem system, ActorEndpoint endpoint)
internal ActorRuntime(IActorSystem system, IActorHost host)
{
System = system;
Timers = new TimerService(endpoint);
Reminders = new ReminderService(endpoint);
Activation = new ActivationService(endpoint);
Timers = new TimerService(host);
Reminders = new ReminderService(host);
Activation = new ActivationService(host);
}

public IActorSystem System { get; }
Expand Down
8 changes: 8 additions & 0 deletions Source/Orleankka.Runtime/Core/ActorEndpoint.Common.T.cs
@@ -0,0 +1,8 @@
 #pragma warning disable 649
// ReSharper disable once StaticMemberInGenericType
// ReSharper disable once UnassignedField.Global
// ReSharper disable once MemberCanBePrivate.Global
protected static ActorType type;
#pragma warning restore 649

protected override ActorType Actor => type;
106 changes: 106 additions & 0 deletions Source/Orleankka.Runtime/Core/ActorEndpoint.Common.cs
@@ -0,0 +1,106 @@
 const string StickyReminderName = "##sticky##";

Actor instance;

public Task Autorun()
{
KeepAlive();

return TaskDone.Done;
}

public Task<object> Receive(object message)
{
KeepAlive();

return Actor.Invoker.OnReceive(instance, message);
}

public Task ReceiveVoid(object message) => Receive(message);

async Task IRemindable.ReceiveReminder(string name, TickStatus status)
{
KeepAlive();

if (name == StickyReminderName)
return;

await Actor.Invoker.OnReminder(instance, name);
}

public override Task OnDeactivateAsync()
{
return instance != null
? Actor.Invoker.OnDeactivate(instance)
: base.OnDeactivateAsync();
}

async Task HandleStickyness()
{
var period = TimeSpan.FromMinutes(1);
await RegisterOrUpdateReminder(StickyReminderName, period, period);
}

void KeepAlive() => Actor.KeepAlive(this);

public override async Task OnActivateAsync()
{
if (Actor.Sticky)
await HandleStickyness();

await Activate();
}

Task Activate()
{
var path = ActorPath.From(Actor.Name, IdentityOf(this));
var runtime = new ActorRuntime(ClusterActorSystem.Current, this);
instance = Actor.Activate(this, path, runtime);
return Actor.Invoker.OnActivate(instance);
}

static string IdentityOf(IGrain grain)
{
return (grain as IGrainWithStringKey).GetPrimaryKeyString();
}

protected abstract ActorType Actor { get; }

#region Expose protected methods to actor services layer

public new void DeactivateOnIdle()
{
base.DeactivateOnIdle();
}

public new void DelayDeactivation(TimeSpan timeSpan)
{
base.DelayDeactivation(timeSpan);
}

public new Task<IGrainReminder> GetReminder(string reminderName)
{
return base.GetReminder(reminderName);
}

public new Task<List<IGrainReminder>> GetReminders()
{
return base.GetReminders();
}

public new Task<IGrainReminder> RegisterOrUpdateReminder(string reminderName, TimeSpan dueTime, TimeSpan period)
{
return base.RegisterOrUpdateReminder(reminderName, dueTime, period);
}

public new Task UnregisterReminder(IGrainReminder reminder)
{
return base.UnregisterReminder(reminder);
}

public new IDisposable RegisterTimer(Func<object, Task> asyncCallback, object state, TimeSpan dueTime, TimeSpan period)
{
return base.RegisterTimer(asyncCallback, state, dueTime, period);
}

#endregion
20 changes: 10 additions & 10 deletions Source/Orleankka.Runtime/Core/ActorEndpoint.cs
Expand Up @@ -12,7 +12,7 @@ namespace Orleankka.Core
/// <summary>
/// FOR INTERNAL USE ONLY!
/// </summary>
public abstract class ActorEndpoint : Grain, IRemindable
public abstract class ActorEndpoint : Grain, IRemindable, IActorHost
{
const string StickyReminderName = "##sticky##";

Expand Down Expand Up @@ -84,42 +84,42 @@ static string IdentityOf(IGrain grain)

#region Expose protected methods to actor services layer

internal new void DeactivateOnIdle()
public new void DeactivateOnIdle()
{
base.DeactivateOnIdle();
}

internal new void DelayDeactivation(TimeSpan timeSpan)
public new void DelayDeactivation(TimeSpan timeSpan)
{
base.DelayDeactivation(timeSpan);
}

internal new Task<IGrainReminder> GetReminder(string reminderName)
public new Task<IGrainReminder> GetReminder(string reminderName)
{
return base.GetReminder(reminderName);
}

internal new Task<List<IGrainReminder>> GetReminders()
public new Task<List<IGrainReminder>> GetReminders()
{
return base.GetReminders();
}

internal new Task<IGrainReminder> RegisterOrUpdateReminder(string reminderName, TimeSpan dueTime, TimeSpan period)
public new Task<IGrainReminder> RegisterOrUpdateReminder(string reminderName, TimeSpan dueTime, TimeSpan period)
{
return base.RegisterOrUpdateReminder(reminderName, dueTime, period);
}

internal new Task UnregisterReminder(IGrainReminder reminder)
public new Task UnregisterReminder(IGrainReminder reminder)
{
return base.UnregisterReminder(reminder);
}

internal new IDisposable RegisterTimer(Func<object, Task> asyncCallback, object state, TimeSpan dueTime, TimeSpan period)
public new IDisposable RegisterTimer(Func<object, Task> asyncCallback, object state, TimeSpan dueTime, TimeSpan period)
{
return base.RegisterTimer(asyncCallback, state, dueTime, period);
}

#endregion
#endregion
}

/// <summary>
Expand All @@ -134,6 +134,6 @@ public abstract class ActorEndpoint<TInterface> : ActorEndpoint
protected static ActorType type;
#pragma warning restore 649

protected override ActorType Actor => type;
protected override ActorType Actor => type;
}
}
28 changes: 28 additions & 0 deletions Source/Orleankka.Runtime/Core/ActorEndpoint.tt
@@ -0,0 +1,28 @@
<#@ template language="C#" hostspecific="true" #>
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

using Orleans;
using Orleans.Runtime;

namespace Orleankka.Core
{
using Cluster;

/// <summary>
/// FOR INTERNAL USE ONLY!
/// </summary>
public abstract class ActorEndpoint : Grain, IRemindable, IActorHost
{
<#@ include file="ActorEndpoint.Common.cs" #>
}

/// <summary>
/// FOR INTERNAL USE ONLY!
/// </summary>
public abstract class ActorEndpoint<TInterface> : ActorEndpoint
{
<#@ include file="ActorEndpoint.Common.T.cs" #>
}
}
8 changes: 4 additions & 4 deletions Source/Orleankka.Runtime/Core/ActorType.cs
Expand Up @@ -94,10 +94,10 @@ void Init(Type grain)
field.SetValue(null, this);
}

internal Actor Activate(ActorEndpoint endpoint, ActorPath path, IActorRuntime runtime)
internal Actor Activate(IActorHost host, ActorPath path, IActorRuntime runtime)
{
var instance = Activator.Activate(actor, path.Id, runtime, dispatcher);
instance.Initialize(endpoint, path, runtime, dispatcher);
instance.Initialize(host, path, runtime, dispatcher);
return instance;
}

Expand All @@ -118,12 +118,12 @@ internal bool MayInterleave(InvokeMethodRequest request)
static object UnwrapImmutable(object item) =>
item is Immutable<object> ? ((Immutable<object>)item).Value : item;

internal void KeepAlive(ActorEndpoint endpoint)
internal void KeepAlive(IActorHost host)
{
if (keepAliveTimeout == TimeSpan.Zero)
return;

endpoint.DelayDeactivation(keepAliveTimeout);
host.DelayDeactivation(keepAliveTimeout);
}

internal IEnumerable<StreamSubscriptionSpecification> Subscriptions() =>
Expand Down

0 comments on commit eddb4c5

Please sign in to comment.