From eddb4c56504a0546b19b65e48e682da09fefab77 Mon Sep 17 00:00:00 2001 From: Yevhen Bobrov Date: Tue, 14 Feb 2017 01:07:57 +0200 Subject: [PATCH] Stateful actors --- Source/Orleankka.Runtime/Actor.cs | 6 +- Source/Orleankka.Runtime/ActorRuntime.cs | 8 +- .../Core/ActorEndpoint.Common.T.cs | 8 + .../Core/ActorEndpoint.Common.cs | 106 ++++++++++++ .../Orleankka.Runtime/Core/ActorEndpoint.cs | 20 +-- .../Orleankka.Runtime/Core/ActorEndpoint.tt | 28 +++ Source/Orleankka.Runtime/Core/ActorType.cs | 8 +- .../Core/ActorTypeDeclaration.cs | 75 ++++++-- Source/Orleankka.Runtime/Core/IActorHost.cs | 20 +++ .../Core/StatefulActorEndpoint.cs | 160 ++++++++++++++++++ .../Core/StatefulActorEndpoint.tt | 49 ++++++ .../Orleankka.Runtime.csproj | 24 ++- .../Services/ActivationService.cs | 10 +- .../Services/ReminderService.cs | 16 +- .../Services/StorageService.cs | 35 ++++ .../Services/TimerService.cs | 10 +- Source/Orleankka.Runtime/StatefulActor.cs | 37 ++++ .../Orleankka.Runtime/StreamRefExtensions.cs | 4 +- .../Features/Stateful_actors.cs | 69 ++++++++ Source/Orleankka.Tests/Orleankka.Tests.csproj | 1 + Source/Orleankka.Tests/Testing/TestActions.cs | 8 +- 21 files changed, 645 insertions(+), 57 deletions(-) create mode 100644 Source/Orleankka.Runtime/Core/ActorEndpoint.Common.T.cs create mode 100644 Source/Orleankka.Runtime/Core/ActorEndpoint.Common.cs create mode 100644 Source/Orleankka.Runtime/Core/ActorEndpoint.tt create mode 100644 Source/Orleankka.Runtime/Core/IActorHost.cs create mode 100644 Source/Orleankka.Runtime/Core/StatefulActorEndpoint.cs create mode 100644 Source/Orleankka.Runtime/Core/StatefulActorEndpoint.tt create mode 100644 Source/Orleankka.Runtime/Services/StorageService.cs create mode 100644 Source/Orleankka.Runtime/StatefulActor.cs create mode 100644 Source/Orleankka.Tests/Features/Stateful_actors.cs diff --git a/Source/Orleankka.Runtime/Actor.cs b/Source/Orleankka.Runtime/Actor.cs index 982bbbd7..221bee14 100644 --- a/Source/Orleankka.Runtime/Actor.cs +++ b/Source/Orleankka.Runtime/Actor.cs @@ -27,16 +27,16 @@ protected Actor(string id, IActorRuntime runtime, Dispatcher dispatcher = null) Path = GetType().ToActorPath(id); } - internal void Initialize(ActorEndpoint endpoint, ActorPath path, IActorRuntime runtime, Dispatcher dispatcher) + internal virtual void Initialize(IActorHost host, ActorPath path, IActorRuntime runtime, Dispatcher dispatcher) { Path = path; Runtime = runtime; Dispatcher = Dispatcher ?? dispatcher; - Endpoint = endpoint; + Host = host; } public string Id => Path.Id; - internal ActorEndpoint Endpoint {get; private set;} + internal IActorHost Host {get; private set;} public ActorPath Path {get; private set;} public IActorRuntime Runtime {get; private set;} diff --git a/Source/Orleankka.Runtime/ActorRuntime.cs b/Source/Orleankka.Runtime/ActorRuntime.cs index 608bcadf..ce674dfb 100644 --- a/Source/Orleankka.Runtime/ActorRuntime.cs +++ b/Source/Orleankka.Runtime/ActorRuntime.cs @@ -13,12 +13,12 @@ public interface IActorRuntime public sealed class ActorRuntime : IActorRuntime { - internal ActorRuntime(IActorSystem system, ActorEndpoint endpoint) + internal ActorRuntime(IActorSystem system, IActorHost host) { System = system; - Timers = new TimerService(endpoint); - Reminders = new ReminderService(endpoint); - Activation = new ActivationService(endpoint); + Timers = new TimerService(host); + Reminders = new ReminderService(host); + Activation = new ActivationService(host); } public IActorSystem System { get; } diff --git a/Source/Orleankka.Runtime/Core/ActorEndpoint.Common.T.cs b/Source/Orleankka.Runtime/Core/ActorEndpoint.Common.T.cs new file mode 100644 index 00000000..469d6eac --- /dev/null +++ b/Source/Orleankka.Runtime/Core/ActorEndpoint.Common.T.cs @@ -0,0 +1,8 @@ + #pragma warning disable 649 + // ReSharper disable once StaticMemberInGenericType + // ReSharper disable once UnassignedField.Global + // ReSharper disable once MemberCanBePrivate.Global + protected static ActorType type; + #pragma warning restore 649 + + protected override ActorType Actor => type; \ No newline at end of file diff --git a/Source/Orleankka.Runtime/Core/ActorEndpoint.Common.cs b/Source/Orleankka.Runtime/Core/ActorEndpoint.Common.cs new file mode 100644 index 00000000..4eec93fd --- /dev/null +++ b/Source/Orleankka.Runtime/Core/ActorEndpoint.Common.cs @@ -0,0 +1,106 @@ + const string StickyReminderName = "##sticky##"; + + Actor instance; + + public Task Autorun() + { + KeepAlive(); + + return TaskDone.Done; + } + + public Task Receive(object message) + { + KeepAlive(); + + return Actor.Invoker.OnReceive(instance, message); + } + + public Task ReceiveVoid(object message) => Receive(message); + + async Task IRemindable.ReceiveReminder(string name, TickStatus status) + { + KeepAlive(); + + if (name == StickyReminderName) + return; + + await Actor.Invoker.OnReminder(instance, name); + } + + public override Task OnDeactivateAsync() + { + return instance != null + ? Actor.Invoker.OnDeactivate(instance) + : base.OnDeactivateAsync(); + } + + async Task HandleStickyness() + { + var period = TimeSpan.FromMinutes(1); + await RegisterOrUpdateReminder(StickyReminderName, period, period); + } + + void KeepAlive() => Actor.KeepAlive(this); + + public override async Task OnActivateAsync() + { + if (Actor.Sticky) + await HandleStickyness(); + + await Activate(); + } + + Task Activate() + { + var path = ActorPath.From(Actor.Name, IdentityOf(this)); + var runtime = new ActorRuntime(ClusterActorSystem.Current, this); + instance = Actor.Activate(this, path, runtime); + return Actor.Invoker.OnActivate(instance); + } + + static string IdentityOf(IGrain grain) + { + return (grain as IGrainWithStringKey).GetPrimaryKeyString(); + } + + protected abstract ActorType Actor { get; } + + #region Expose protected methods to actor services layer + + public new void DeactivateOnIdle() + { + base.DeactivateOnIdle(); + } + + public new void DelayDeactivation(TimeSpan timeSpan) + { + base.DelayDeactivation(timeSpan); + } + + public new Task GetReminder(string reminderName) + { + return base.GetReminder(reminderName); + } + + public new Task> GetReminders() + { + return base.GetReminders(); + } + + public new Task RegisterOrUpdateReminder(string reminderName, TimeSpan dueTime, TimeSpan period) + { + return base.RegisterOrUpdateReminder(reminderName, dueTime, period); + } + + public new Task UnregisterReminder(IGrainReminder reminder) + { + return base.UnregisterReminder(reminder); + } + + public new IDisposable RegisterTimer(Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period) + { + return base.RegisterTimer(asyncCallback, state, dueTime, period); + } + + #endregion \ No newline at end of file diff --git a/Source/Orleankka.Runtime/Core/ActorEndpoint.cs b/Source/Orleankka.Runtime/Core/ActorEndpoint.cs index f6647133..b01772a0 100644 --- a/Source/Orleankka.Runtime/Core/ActorEndpoint.cs +++ b/Source/Orleankka.Runtime/Core/ActorEndpoint.cs @@ -12,7 +12,7 @@ namespace Orleankka.Core /// /// FOR INTERNAL USE ONLY! /// - public abstract class ActorEndpoint : Grain, IRemindable + public abstract class ActorEndpoint : Grain, IRemindable, IActorHost { const string StickyReminderName = "##sticky##"; @@ -84,42 +84,42 @@ static string IdentityOf(IGrain grain) #region Expose protected methods to actor services layer - internal new void DeactivateOnIdle() + public new void DeactivateOnIdle() { base.DeactivateOnIdle(); } - internal new void DelayDeactivation(TimeSpan timeSpan) + public new void DelayDeactivation(TimeSpan timeSpan) { base.DelayDeactivation(timeSpan); } - internal new Task GetReminder(string reminderName) + public new Task GetReminder(string reminderName) { return base.GetReminder(reminderName); } - internal new Task> GetReminders() + public new Task> GetReminders() { return base.GetReminders(); } - internal new Task RegisterOrUpdateReminder(string reminderName, TimeSpan dueTime, TimeSpan period) + public new Task RegisterOrUpdateReminder(string reminderName, TimeSpan dueTime, TimeSpan period) { return base.RegisterOrUpdateReminder(reminderName, dueTime, period); } - internal new Task UnregisterReminder(IGrainReminder reminder) + public new Task UnregisterReminder(IGrainReminder reminder) { return base.UnregisterReminder(reminder); } - internal new IDisposable RegisterTimer(Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period) + public new IDisposable RegisterTimer(Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period) { return base.RegisterTimer(asyncCallback, state, dueTime, period); } - #endregion + #endregion } /// @@ -134,6 +134,6 @@ public abstract class ActorEndpoint : ActorEndpoint protected static ActorType type; #pragma warning restore 649 - protected override ActorType Actor => type; + protected override ActorType Actor => type; } } \ No newline at end of file diff --git a/Source/Orleankka.Runtime/Core/ActorEndpoint.tt b/Source/Orleankka.Runtime/Core/ActorEndpoint.tt new file mode 100644 index 00000000..7feb2a56 --- /dev/null +++ b/Source/Orleankka.Runtime/Core/ActorEndpoint.tt @@ -0,0 +1,28 @@ +<#@ template language="C#" hostspecific="true" #> +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +using Orleans; +using Orleans.Runtime; + +namespace Orleankka.Core +{ + using Cluster; + + /// + /// FOR INTERNAL USE ONLY! + /// + public abstract class ActorEndpoint : Grain, IRemindable, IActorHost + { +<#@ include file="ActorEndpoint.Common.cs" #> + } + + /// + /// FOR INTERNAL USE ONLY! + /// + public abstract class ActorEndpoint : ActorEndpoint + { +<#@ include file="ActorEndpoint.Common.T.cs" #> + } +} \ No newline at end of file diff --git a/Source/Orleankka.Runtime/Core/ActorType.cs b/Source/Orleankka.Runtime/Core/ActorType.cs index 2bfe84fe..38a162db 100644 --- a/Source/Orleankka.Runtime/Core/ActorType.cs +++ b/Source/Orleankka.Runtime/Core/ActorType.cs @@ -94,10 +94,10 @@ void Init(Type grain) field.SetValue(null, this); } - internal Actor Activate(ActorEndpoint endpoint, ActorPath path, IActorRuntime runtime) + internal Actor Activate(IActorHost host, ActorPath path, IActorRuntime runtime) { var instance = Activator.Activate(actor, path.Id, runtime, dispatcher); - instance.Initialize(endpoint, path, runtime, dispatcher); + instance.Initialize(host, path, runtime, dispatcher); return instance; } @@ -118,12 +118,12 @@ internal bool MayInterleave(InvokeMethodRequest request) static object UnwrapImmutable(object item) => item is Immutable ? ((Immutable)item).Value : item; - internal void KeepAlive(ActorEndpoint endpoint) + internal void KeepAlive(IActorHost host) { if (keepAliveTimeout == TimeSpan.Zero) return; - endpoint.DelayDeactivation(keepAliveTimeout); + host.DelayDeactivation(keepAliveTimeout); } internal IEnumerable Subscriptions() => diff --git a/Source/Orleankka.Runtime/Core/ActorTypeDeclaration.cs b/Source/Orleankka.Runtime/Core/ActorTypeDeclaration.cs index 137296b2..9b09ea4a 100644 --- a/Source/Orleankka.Runtime/Core/ActorTypeDeclaration.cs +++ b/Source/Orleankka.Runtime/Core/ActorTypeDeclaration.cs @@ -4,11 +4,13 @@ using System.Linq; using System.Reflection; using System.Collections.Generic; +using System.Diagnostics; using Microsoft.CodeAnalysis; using Microsoft.CodeAnalysis.CSharp; using Orleans.Placement; +using Orleans.Providers; namespace Orleankka.Core { @@ -25,13 +27,15 @@ public static IEnumerable Generate(Assembly[] assemblies) Directory.CreateDirectory(dir); var binary = Path.Combine(dir, Guid.NewGuid().ToString("N") + ".dll"); - var source = Generate(assemblies, declarations); + var generated = Generate(assemblies, declarations); - var syntaxTree = CSharpSyntaxTree.ParseText(source); + var syntaxTree = CSharpSyntaxTree.ParseText(generated.Source); var references = AppDomain.CurrentDomain.GetAssemblies() + .Concat(generated.References) + .Concat(ActorInterface.Registered().Select(x => x.GrainAssembly())) + .Distinct() .Select(ToMetadataReference) .Where(x => x != null) - .Concat(ActorInterface.Registered().Select(x => x.GrainAssembly()).Distinct().Select(ToMetadataReference)) .ToArray(); var compilation = CSharpCompilation.Create("Orleankka.Auto.Implementations", @@ -57,7 +61,7 @@ public static IEnumerable Generate(Assembly[] assemblies) static PortableExecutableReference ToMetadataReference(Assembly x) => x.IsDynamic || x.Location == "" ? null : MetadataReference.CreateFromFile(x.Location); - static string Generate(IEnumerable assemblies, IEnumerable declarations) + static GenerateResult Generate(IEnumerable assemblies, IEnumerable declarations) { var sb = new StringBuilder(@" using Orleankka; @@ -65,15 +69,14 @@ static string Generate(IEnumerable assemblies, IEnumerable x.Generate()).ToArray(); + return new GenerateResult(sb, results); } static readonly string[] separator = {".", "+"}; @@ -93,15 +96,16 @@ static string Generate(IEnumerable assemblies, IEnumerable(); StartNamespace(src); - GenerateImplementation(src); + GenerateImplementation(src, references); EndNamespace(src); - return src.ToString(); + return new GenerateResult(src.ToString(), references); } void StartNamespace(StringBuilder src) => @@ -110,7 +114,7 @@ string Generate() static void EndNamespace(StringBuilder src) => src.AppendLine("}"); - void GenerateImplementation(StringBuilder src) + void GenerateImplementation(StringBuilder src, List references) { GenerateAttributes(src); @@ -122,7 +126,16 @@ void GenerateImplementation(StringBuilder src) if (mayInterleave) src.AppendLine("[MayInterleave(\"MayInterleave\")]"); - src.AppendLine($"public class {clazz} : global::Orleankka.Core.ActorEndpoint, I{clazz} {{"); + string impl = $"Orleankka.Core.ActorEndpoint"; + if (IsStateful()) + { + var stateType = GetStateArgument(); + var stateTypeFullName = stateType.FullName.Replace("+", "."); + impl = $"Orleankka.Core.StatefulActorEndpoint"; + references.Add(stateType.Assembly); + } + + src.AppendLine($"public class {clazz} : global::{impl}, I{clazz} {{"); src.AppendLine($"public static bool MayInterleave(InvokeMethodRequest req) => type.MayInterleave(req);"); src.AppendLine("}"); } @@ -151,6 +164,10 @@ void GenerateAttributes(StringBuilder src) } src.AppendLine($"[{GetActorPlacement()}]"); + + var storageProvider = actor.GetCustomAttribute(); + if (storageProvider != null) + src.AppendLine($"[StorageProvider(ProviderName=\"{storageProvider.ProviderName}\")]"); } string GetActorPlacement() @@ -169,5 +186,37 @@ string GetActorPlacement() throw new ArgumentOutOfRangeException(); } } + + bool IsStateful() => typeof(IStatefulActor).IsAssignableFrom(actor); + + Type GetStateArgument() + { + var current = actor; + while (current.BaseType != null && + current.BaseType.GetGenericTypeDefinition() != typeof(StatefulActor<>)) + current = current.BaseType; + + Debug.Assert(current.BaseType != null); + return current.BaseType.GetGenericArguments()[0]; + } + + class GenerateResult + { + public readonly string Source; + public readonly IEnumerable References; + + public GenerateResult(string source, IEnumerable references) + { + Source = source; + References = references; + } + + public GenerateResult(StringBuilder sb, GenerateResult[] results) + { + Array.ForEach(results, x => sb.AppendLine(x.Source)); + References = results.SelectMany(x => x.References).ToList(); + Source = sb.ToString(); + } + } } } \ No newline at end of file diff --git a/Source/Orleankka.Runtime/Core/IActorHost.cs b/Source/Orleankka.Runtime/Core/IActorHost.cs new file mode 100644 index 00000000..45ec72e0 --- /dev/null +++ b/Source/Orleankka.Runtime/Core/IActorHost.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +using Orleans.Runtime; + +namespace Orleankka.Core +{ + interface IActorHost + { + Task Receive(object message); + void DeactivateOnIdle(); + void DelayDeactivation(TimeSpan timeSpan); + Task GetReminder(string reminderName); + Task> GetReminders(); + Task RegisterOrUpdateReminder(string reminderName, TimeSpan dueTime, TimeSpan period); + Task UnregisterReminder(IGrainReminder reminder); + IDisposable RegisterTimer(Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period); + } +} \ No newline at end of file diff --git a/Source/Orleankka.Runtime/Core/StatefulActorEndpoint.cs b/Source/Orleankka.Runtime/Core/StatefulActorEndpoint.cs new file mode 100644 index 00000000..f948bd80 --- /dev/null +++ b/Source/Orleankka.Runtime/Core/StatefulActorEndpoint.cs @@ -0,0 +1,160 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +using Orleans; +using Orleans.Runtime; + +namespace Orleankka.Core +{ + using Cluster; + + /// + /// FOR INTERNAL USE ONLY! + /// + public abstract class StatefulActorEndpoint : Grain, IRemindable, IActorHost where TState : new() + { + const string StickyReminderName = "##sticky##"; + + Actor instance; + + public Task Autorun() + { + KeepAlive(); + + return TaskDone.Done; + } + + public Task Receive(object message) + { + KeepAlive(); + + return Actor.Invoker.OnReceive(instance, message); + } + + public Task ReceiveVoid(object message) => Receive(message); + + async Task IRemindable.ReceiveReminder(string name, TickStatus status) + { + KeepAlive(); + + if (name == StickyReminderName) + return; + + await Actor.Invoker.OnReminder(instance, name); + } + + public override Task OnDeactivateAsync() + { + return instance != null + ? Actor.Invoker.OnDeactivate(instance) + : base.OnDeactivateAsync(); + } + + async Task HandleStickyness() + { + var period = TimeSpan.FromMinutes(1); + await RegisterOrUpdateReminder(StickyReminderName, period, period); + } + + void KeepAlive() => Actor.KeepAlive(this); + + public override async Task OnActivateAsync() + { + if (Actor.Sticky) + await HandleStickyness(); + + await Activate(); + } + + Task Activate() + { + var path = ActorPath.From(Actor.Name, IdentityOf(this)); + var runtime = new ActorRuntime(ClusterActorSystem.Current, this); + instance = Actor.Activate(this, path, runtime); + return Actor.Invoker.OnActivate(instance); + } + + static string IdentityOf(IGrain grain) + { + return (grain as IGrainWithStringKey).GetPrimaryKeyString(); + } + + protected abstract ActorType Actor { get; } + + #region Expose protected methods to actor services layer + + public new void DeactivateOnIdle() + { + base.DeactivateOnIdle(); + } + + public new void DelayDeactivation(TimeSpan timeSpan) + { + base.DelayDeactivation(timeSpan); + } + + public new Task GetReminder(string reminderName) + { + return base.GetReminder(reminderName); + } + + public new Task> GetReminders() + { + return base.GetReminders(); + } + + public new Task RegisterOrUpdateReminder(string reminderName, TimeSpan dueTime, TimeSpan period) + { + return base.RegisterOrUpdateReminder(reminderName, dueTime, period); + } + + public new Task UnregisterReminder(IGrainReminder reminder) + { + return base.UnregisterReminder(reminder); + } + + public new IDisposable RegisterTimer(Func asyncCallback, object state, TimeSpan dueTime, TimeSpan period) + { + return base.RegisterTimer(asyncCallback, state, dueTime, period); + } + + #endregion + + public new TState State + { + get { return base.State; } + set { base.State = value; } + } + + public new Task ClearStateAsync() + { + return base.ClearStateAsync(); + } + + public new Task WriteStateAsync() + { + return base.WriteStateAsync(); + } + + public new Task ReadStateAsync() + { + return base.ReadStateAsync(); + } + } + + /// + /// FOR INTERNAL USE ONLY! + /// + public abstract class StatefulActorEndpoint : StatefulActorEndpoint where TState : new() + { + #pragma warning disable 649 + // ReSharper disable once StaticMemberInGenericType + // ReSharper disable once UnassignedField.Global + // ReSharper disable once MemberCanBePrivate.Global + protected static ActorType type; + #pragma warning restore 649 + + protected override ActorType Actor => type; + } +} \ No newline at end of file diff --git a/Source/Orleankka.Runtime/Core/StatefulActorEndpoint.tt b/Source/Orleankka.Runtime/Core/StatefulActorEndpoint.tt new file mode 100644 index 00000000..1aec7a56 --- /dev/null +++ b/Source/Orleankka.Runtime/Core/StatefulActorEndpoint.tt @@ -0,0 +1,49 @@ +<#@ template language="C#" hostspecific="true" #> +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +using Orleans; +using Orleans.Runtime; + +namespace Orleankka.Core +{ + using Cluster; + + /// + /// FOR INTERNAL USE ONLY! + /// + public abstract class StatefulActorEndpoint : Grain, IRemindable, IActorHost where TState : new() + { +<#@ include file="ActorEndpoint.Common.cs" #> + + public new TState State + { + get { return base.State; } + set { base.State = value; } + } + + public new Task ClearStateAsync() + { + return base.ClearStateAsync(); + } + + public new Task WriteStateAsync() + { + return base.WriteStateAsync(); + } + + public new Task ReadStateAsync() + { + return base.ReadStateAsync(); + } + } + + /// + /// FOR INTERNAL USE ONLY! + /// + public abstract class StatefulActorEndpoint : StatefulActorEndpoint where TState : new() + { +<#@ include file="ActorEndpoint.Common.T.cs" #> + } +} \ No newline at end of file diff --git a/Source/Orleankka.Runtime/Orleankka.Runtime.csproj b/Source/Orleankka.Runtime/Orleankka.Runtime.csproj index ec8b2929..cfcc94e2 100644 --- a/Source/Orleankka.Runtime/Orleankka.Runtime.csproj +++ b/Source/Orleankka.Runtime/Orleankka.Runtime.csproj @@ -59,18 +59,32 @@ + + + True + True + ActorEndpoint.tt + + + + True + True + StatefulActorEndpoint.tt + + + @@ -81,13 +95,21 @@ - + + TextTemplatingFileGenerator + ActorEndpoint.cs + + + + TextTemplatingFileGenerator + StatefulActorEndpoint.cs + diff --git a/Source/Orleankka.Runtime/Services/ActivationService.cs b/Source/Orleankka.Runtime/Services/ActivationService.cs index 90167ec6..d329e44d 100644 --- a/Source/Orleankka.Runtime/Services/ActivationService.cs +++ b/Source/Orleankka.Runtime/Services/ActivationService.cs @@ -34,21 +34,21 @@ public interface IActivationService /// public class ActivationService : IActivationService { - readonly ActorEndpoint endpoint; + readonly IActorHost host; - internal ActivationService(ActorEndpoint endpoint) + internal ActivationService(IActorHost host) { - this.endpoint = endpoint; + this.host = host; } void IActivationService.DeactivateOnIdle() { - endpoint.DeactivateOnIdle(); + host.DeactivateOnIdle(); } void IActivationService.DelayDeactivation(TimeSpan period) { - endpoint.DelayDeactivation(period); + host.DelayDeactivation(period); } } } \ No newline at end of file diff --git a/Source/Orleankka.Runtime/Services/ReminderService.cs b/Source/Orleankka.Runtime/Services/ReminderService.cs index eab95862..29884335 100644 --- a/Source/Orleankka.Runtime/Services/ReminderService.cs +++ b/Source/Orleankka.Runtime/Services/ReminderService.cs @@ -56,36 +56,36 @@ public interface IReminderService class ReminderService : IReminderService { readonly IDictionary reminders = new Dictionary(); - readonly ActorEndpoint endpoint; + readonly IActorHost host; - internal ReminderService(ActorEndpoint endpoint) + internal ReminderService(IActorHost host) { - this.endpoint = endpoint; + this.host = host; } async Task IReminderService.Register(string id, TimeSpan due, TimeSpan period) { - reminders[id] = await endpoint.RegisterOrUpdateReminder(id, due, period); + reminders[id] = await host.RegisterOrUpdateReminder(id, due, period); } async Task IReminderService.Unregister(string id) { - var reminder = reminders.Find(id) ?? await endpoint.GetReminder(id); + var reminder = reminders.Find(id) ?? await host.GetReminder(id); if (reminder != null) - await endpoint.UnregisterReminder(reminder); + await host.UnregisterReminder(reminder); reminders.Remove(id); } async Task IReminderService.IsRegistered(string id) { - return reminders.ContainsKey(id) || (await endpoint.GetReminder(id)) != null; + return reminders.ContainsKey(id) || (await host.GetReminder(id)) != null; } async Task> IReminderService.Registered() { - return (await endpoint.GetReminders()).Select(x => x.ReminderName); + return (await host.GetReminders()).Select(x => x.ReminderName); } } } \ No newline at end of file diff --git a/Source/Orleankka.Runtime/Services/StorageService.cs b/Source/Orleankka.Runtime/Services/StorageService.cs new file mode 100644 index 00000000..3509ec66 --- /dev/null +++ b/Source/Orleankka.Runtime/Services/StorageService.cs @@ -0,0 +1,35 @@ +using System.Threading.Tasks; + +namespace Orleankka.Services +{ + using Core; + + /// + /// Manages state of a stateful actor + /// + public interface IStorageService where TState : new() + { + TState State { get; } + Task ClearState(); + Task WriteState(); + Task ReadState(); + } + + /// + /// Default runtime-bound implementation of + /// + class StorageService : IStorageService where TState: new() + { + readonly StatefulActorEndpoint endpoint; + + public StorageService(StatefulActorEndpoint endpoint) + { + this.endpoint = endpoint; + } + + TState IStorageService.State => endpoint.State; + Task IStorageService.ClearState() => endpoint.ClearStateAsync(); + Task IStorageService.WriteState() => endpoint.WriteStateAsync(); + Task IStorageService.ReadState() => endpoint.ReadStateAsync(); + } +} diff --git a/Source/Orleankka.Runtime/Services/TimerService.cs b/Source/Orleankka.Runtime/Services/TimerService.cs index bee5b3bd..96d2d060 100644 --- a/Source/Orleankka.Runtime/Services/TimerService.cs +++ b/Source/Orleankka.Runtime/Services/TimerService.cs @@ -138,11 +138,11 @@ class TimerService : ITimerService internal static bool IsExecuting() => CallContext.LogicalGetData("#ORLKKA_TMR") != null; readonly IDictionary timers = new Dictionary(); - readonly ActorEndpoint endpoint; + readonly IActorHost host; - internal TimerService(ActorEndpoint endpoint) + internal TimerService(IActorHost host) { - this.endpoint = endpoint; + this.host = host; } void ITimerService.Register(string id, TimeSpan due, Func callback) @@ -156,7 +156,7 @@ void ITimerService.Register(string id, TimeSpan due, Func callback) void ITimerService.Register(string id, TimeSpan due, TimeSpan period, Func callback) { - timers.Add(id, endpoint.RegisterTimer(async s => + timers.Add(id, host.RegisterTimer(async s => { SetExecuting(); await callback(); @@ -175,7 +175,7 @@ void ITimerService.Register(string id, TimeSpan due, TState state, Func< void ITimerService.Register(string id, TimeSpan due, TimeSpan period, TState state, Func callback) { - timers.Add(id, endpoint.RegisterTimer(async s => + timers.Add(id, host.RegisterTimer(async s => { SetExecuting(); await callback((TState) s); diff --git a/Source/Orleankka.Runtime/StatefulActor.cs b/Source/Orleankka.Runtime/StatefulActor.cs new file mode 100644 index 00000000..94ad4400 --- /dev/null +++ b/Source/Orleankka.Runtime/StatefulActor.cs @@ -0,0 +1,37 @@ +using System.Threading.Tasks; + +using Orleankka.Core; +using Orleankka.Services; + +namespace Orleankka +{ + interface IStatefulActor + {} + + public abstract class StatefulActor : Actor, IStatefulActor where TState : new() + { + IStorageService storage; + + protected StatefulActor() + {} + + protected StatefulActor(string id, IActorRuntime runtime, Dispatcher dispatcher = null, IStorageService storage = null) + : base(id, runtime, dispatcher) + { + this.storage = storage; + } + + internal override void Initialize(IActorHost host, ActorPath path, IActorRuntime runtime, Dispatcher dispatcher) + { + base.Initialize(host, path, runtime, dispatcher); + var endpoint = (StatefulActorEndpoint) host; + storage = new StorageService(endpoint); + } + + public TState State => storage.State; + + public Task ClearState() => storage.ClearState(); + public Task WriteState() => storage.WriteState(); + public Task ReadState() => storage.ReadState(); + } +} diff --git a/Source/Orleankka.Runtime/StreamRefExtensions.cs b/Source/Orleankka.Runtime/StreamRefExtensions.cs index f2c7795c..6dd5bddb 100644 --- a/Source/Orleankka.Runtime/StreamRefExtensions.cs +++ b/Source/Orleankka.Runtime/StreamRefExtensions.cs @@ -19,7 +19,7 @@ public static async Task Subscribe(this StreamRef stream, Actor actor, StreamFil Debug.Assert(subscriptions.Count == 0, "We should keep only one active subscription per-stream per-actor"); - await stream.Subscribe(x => actor.Endpoint.Receive(x), filter ?? DeclaredHandlerOnlyFilter(actor)); + await stream.Subscribe(x => actor.Host.Receive(x), filter ?? DeclaredHandlerOnlyFilter(actor)); } public static async Task Unsubscribe(this StreamRef stream, Actor actor) @@ -47,7 +47,7 @@ public static async Task Resume(this StreamRef stream, Actor actor) Debug.Assert(subscriptions.Count == 1, "We should keep only one active subscription per-stream per-actor"); - await subscriptions[0].Resume(x => actor.Endpoint.Receive(x)); + await subscriptions[0].Resume(x => actor.Host.Receive(x)); } static StreamFilter DeclaredHandlerOnlyFilter(Actor actor) => diff --git a/Source/Orleankka.Tests/Features/Stateful_actors.cs b/Source/Orleankka.Tests/Features/Stateful_actors.cs new file mode 100644 index 00000000..1901449e --- /dev/null +++ b/Source/Orleankka.Tests/Features/Stateful_actors.cs @@ -0,0 +1,69 @@ +using System; +using System.Threading.Tasks; + +using NUnit.Framework; + +using Orleans.Providers; + +namespace Orleankka.Features +{ + namespace Stateful_actors + { + using Meta; + using Testing; + + [Serializable] + public class SetState : Command + { + public string Data; + } + + [Serializable] + public class GetState : Query + {} + + [StorageProvider(ProviderName = "MemoryStore")] + public class TestActor : StatefulActor + { + public async Task Handle(SetState cmd) + { + State.Data = cmd.Data; + await WriteState(); + } + + public async Task Handle(GetState query) + { + await ReadState(); + return State.Data; + } + + [Serializable] + public class TestState + { + public string Data; + } + } + + [TestFixture] + [RequiresSilo] + public class Tests + { + IActorSystem system; + + [SetUp] + public void SetUp() + { + system = TestActorSystem.Instance; + } + + [Test] + public async void When_using_default_storage_service() + { + var actor = system.FreshActorOf(); + + await actor.Tell(new SetState {Data = "foo"}); + Assert.AreEqual("foo", await actor.Ask(new GetState())); + } + } + } +} \ No newline at end of file diff --git a/Source/Orleankka.Tests/Orleankka.Tests.csproj b/Source/Orleankka.Tests/Orleankka.Tests.csproj index dd87774e..1502f846 100644 --- a/Source/Orleankka.Tests/Orleankka.Tests.csproj +++ b/Source/Orleankka.Tests/Orleankka.Tests.csproj @@ -49,6 +49,7 @@ + diff --git a/Source/Orleankka.Tests/Testing/TestActions.cs b/Source/Orleankka.Tests/Testing/TestActions.cs index bd5c1247..336b6448 100644 --- a/Source/Orleankka.Tests/Testing/TestActions.cs +++ b/Source/Orleankka.Tests/Testing/TestActions.cs @@ -8,6 +8,7 @@ using Orleankka.Playground; using Orleankka.Testing; using Orleans.Providers.Streams.AzureQueue; +using Orleans.Storage; [assembly: TeardownSilo] @@ -48,8 +49,11 @@ void StartNew() var system = ActorSystem.Configure() .Playground() .UseInMemoryPubSubStore() - .TweakCluster(cfg => cfg - .DefaultKeepAliveTimeout(TimeSpan.FromMinutes(DefaultKeepAliveTimeoutInMinutes))) + .TweakCluster(cfg => + { + cfg.DefaultKeepAliveTimeout(TimeSpan.FromMinutes(DefaultKeepAliveTimeoutInMinutes)); + cfg.Globals.RegisterStorageProvider("MemoryStore"); + }) .Assemblies(GetType().Assembly); if (EnableAzureQueueStreamProvider)