diff --git a/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj b/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj index ac086707e6f..5c1f6a1f2e9 100644 --- a/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj +++ b/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj @@ -62,6 +62,7 @@ + diff --git a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs new file mode 100644 index 00000000000..1c021fdafac --- /dev/null +++ b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs @@ -0,0 +1,664 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Akka.Actor; +using Akka.Persistence.Fsm; +using Xunit; + +namespace Akka.Persistence.Tests.Fsm +{ + public class PersistentFSMSpec : PersistenceSpec + { + private readonly Random _random = new Random(); + + public PersistentFSMSpec() + : base(Configuration("inmem", "PersistentFSMSpec")) + { + } + + [Fact] + public void PersistentFSM_should_has_function_as_regular_fsm() + { + var dummyReportActorRef = CreateTestProbe().Ref; + + var fsmRef = Sys.ActorOf(Props.Create(Name, dummyReportActorRef), Name); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + var shoes = new Item("2", "Shoes", 89.99F); + var coat = new Item("3", "Coat", 119.99F); + + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shirt)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shoes)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(coat)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new Buy()); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new Leave()); + ExpectMsg>(state => state.State == UserState.LookingAround); + ExpectMsg(); + ExpectMsg>(state => state.From == UserState.LookingAround); + ExpectMsg( + cart => cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1); + ExpectMsg( + cart => cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2); + ExpectMsg( + cart => cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3); + ExpectMsg>(); + ExpectMsg(); + ExpectTerminated(fsmRef); + } + + [Fact] + public void PersistentFSM_should_has_function_as_regular_fsm_on_state_timeout() + { + var dummyReportActorRef = CreateTestProbe().Ref; + + var fsmRef = Sys.ActorOf(Props.Create(Name, dummyReportActorRef), Name); + + Watch(fsmRef); + + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + + fsmRef.Tell(new AddItem(shirt)); + ExpectMsg>(state => state.State == UserState.LookingAround); + + ExpectMsg>(); + + Within(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1.9), () => + { + ExpectMsg>(); + return true; + }); + + ExpectTerminated(fsmRef); + } + + [Fact] + public void PersistentFSM_should_recover_successfully_with_correct_state_data() + { + var dummyReportActorRef = CreateTestProbe().Ref; + + var fsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + var shoes = new Item("2", "Shoes", 89.99F); + var coat = new Item("3", "Coat", 119.99F); + + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shirt)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shoes)); + fsmRef.Tell(new GetCurrentCart()); + + + ExpectMsg>(); + ExpectMsg(); + ExpectMsg>(state => state.From == UserState.LookingAround); + ExpectMsg( + cart => cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1); + ExpectMsg( + cart => cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2); + + fsmRef.Tell(PoisonPill.Instance); + ExpectTerminated(fsmRef); + + var recoveredFsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); + Watch(recoveredFsmRef); + recoveredFsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + recoveredFsmRef.Tell(new GetCurrentCart()); + recoveredFsmRef.Tell(new AddItem(coat)); + recoveredFsmRef.Tell(new GetCurrentCart()); + recoveredFsmRef.Tell(new Buy()); + recoveredFsmRef.Tell(new GetCurrentCart()); + recoveredFsmRef.Tell(new Leave()); + + ExpectMsg>(state => state.State == UserState.Shopping); + ExpectMsg( + cart => cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2); + ExpectMsg( + cart => cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3); + ExpectMsg>(); + ExpectMsg(); + ExpectTerminated(recoveredFsmRef); + } + + [Fact] + public void PersistentFSM_should_execute_the_defined_actions_following_successful_persistence_of_state_change() + { + var reportActorProbe = CreateTestProbe(Sys); + + var fsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, reportActorProbe.Ref))); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + var shoes = new Item("2", "Shoes", 89.99F); + var coat = new Item("3", "Coat", 119.99F); + + fsmRef.Tell(new AddItem(shirt)); + fsmRef.Tell(new AddItem(shoes)); + fsmRef.Tell(new AddItem(coat)); + fsmRef.Tell(new Buy()); + fsmRef.Tell(new Leave()); + + ExpectMsg>(state => state.State == UserState.LookingAround); + ExpectMsg>( + state => state.From == UserState.LookingAround && state.To == UserState.Shopping); + ExpectMsg>( + state => state.From == UserState.Shopping && state.To == UserState.Paid); + reportActorProbe.ExpectMsg(); + ExpectTerminated(fsmRef); + } + + [Fact] + public void PersistentFSM_should_execute_the_defined_actions_following_successful_persistence_of_FSM_stop() + { + var reportActorProbe = CreateTestProbe(Sys); + + var fsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, reportActorProbe.Ref))); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + var shoes = new Item("2", "Shoes", 89.99F); + var coat = new Item("3", "Coat", 119.99F); + + fsmRef.Tell(new AddItem(shirt)); + fsmRef.Tell(new AddItem(shoes)); + fsmRef.Tell(new AddItem(coat)); + fsmRef.Tell(new Leave()); + + ExpectMsg>(state => state.State == UserState.LookingAround); + ExpectMsg>( + state => state.From == UserState.LookingAround && state.To == UserState.Shopping); + reportActorProbe.ExpectMsg(); + ExpectTerminated(fsmRef); + } + + [Fact] + public void PersistentFSM_should_recover_successfully_with_correct_state_timeout() + { + var dummyReportActorRef = CreateTestProbe().Ref; + + var fsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + + fsmRef.Tell(new AddItem(shirt)); + + ExpectMsg>(state => state.State == UserState.LookingAround); + ExpectMsg>( + state => state.From == UserState.LookingAround && state.To == UserState.Shopping); + + ExpectNoMsg(TimeSpan.FromSeconds(0.6)); + fsmRef.Tell(PoisonPill.Instance); + ExpectTerminated(fsmRef); + + var recoveredFsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); + Watch(recoveredFsmRef); + recoveredFsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + ExpectMsg>(state => state.State == UserState.Shopping); + + + Within(TimeSpan.FromSeconds(0.9), TimeSpan.FromSeconds(1.9), () => + { + ExpectMsg>( + state => { return state.From == UserState.Shopping && state.To == UserState.Inactive; }); + return true; + }); + ExpectNoMsg(TimeSpan.FromSeconds(0.6)); + recoveredFsmRef.Tell(PoisonPill.Instance); + ExpectTerminated(recoveredFsmRef); + + recoveredFsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); + Watch(recoveredFsmRef); + recoveredFsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + ExpectMsg>(state => state.State == UserState.Inactive); + ExpectTerminated(recoveredFsmRef); + } + + [Fact] + public void PersistentFSM_should_not_trigger_onTransition_for_stay() + { + var reportActorProbe = CreateTestProbe(Sys); + + var fsmRef = Sys.ActorOf(Props.Create(() => new SimpleTransitionFSM(Name, reportActorProbe.Ref))); + + reportActorProbe.ExpectNoMsg(TimeSpan.FromSeconds(3)); + + fsmRef.Tell("goto(the same state)"); + + reportActorProbe.ExpectNoMsg(TimeSpan.FromSeconds(3)); + + fsmRef.Tell("stay"); + + reportActorProbe.ExpectNoMsg(TimeSpan.FromSeconds(3)); + } + + + [Fact] + public void PersistentFSM_should_not_persist_state_change_event_when_staying_in_the_same_state() + { + var dummyReportActorRef = CreateTestProbe().Ref; + + var fsmRef = Sys.ActorOf(Props.Create(Name, dummyReportActorRef), Name); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + var shoes = new Item("2", "Shoes", 89.99F); + var coat = new Item("3", "Coat", 119.99F); + + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shirt)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shoes)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(coat)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new Buy()); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new Leave()); + ExpectMsg>(state => state.State == UserState.LookingAround); + ExpectMsg(); + ExpectMsg>(state => state.From == UserState.LookingAround); + ExpectMsg(); + ExpectMsg(); + ExpectMsg(); + ExpectMsg>(); + ExpectMsg(); + ExpectTerminated(fsmRef); + + var persistentEventsStreamer = Sys.ActorOf(Props.Create(Name, TestActor), Name); + + ExpectMsg(); + ExpectMsg.StateChangeEvent>(); + + + ExpectMsg(); + ExpectMsg.StateChangeEvent>(); + + + ExpectMsg(); + ExpectMsg.StateChangeEvent>(); + + + ExpectMsg(); + ExpectMsg.StateChangeEvent>(); + + Watch(persistentEventsStreamer); + + persistentEventsStreamer.Tell(PoisonPill.Instance); + + ExpectTerminated(persistentEventsStreamer); + } + + + internal class WebStoreCustomerFSM : PersistentFSM + { + private readonly IActorRef _reportActor; + private readonly string _persistenceId; + + public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) + { + _persistenceId = persistenceId; + _reportActor = reportActor; + StartWith(UserState.LookingAround, new EmptyShoppingCart()); + + When(UserState.LookingAround, (@event, state) => + { + if (@event.FsmEvent is AddItem) + { + var addItem = (AddItem) @event.FsmEvent; + return + GoTo(UserState.Shopping) + .Applying(new ItemAdded(addItem.Item)).ForMax(TimeSpan.FromSeconds(1)); + } + if (@event.FsmEvent is GetCurrentCart) + { + return Stay().Replying(@event.StateData); + } + return state; + }); + + + When(UserState.Shopping, (@event, state) => + { + if (@event.FsmEvent is AddItem) + { + var addItem = ((AddItem) @event.FsmEvent); + return Stay().Applying(new ItemAdded(addItem.Item)).ForMax(TimeSpan.FromSeconds(1)); + } + if (@event.FsmEvent is Buy) + { + return + GoTo(UserState.Paid) + .Applying(new OrderExecuted()) + .AndThen(cart => + { + if (cart is NonEmptyShoppingCart) + { + _reportActor.Tell(new PurchaseWasMade()); + } + }); + } + if (@event.FsmEvent is Leave) + { + return + Stop() + .Applying(new OrderDiscarded()) + .AndThen(cart => _reportActor.Tell(new ShoppingCardDiscarded())); + } + if (@event.FsmEvent is GetCurrentCart) + { + return Stay().Replying(@event.StateData); + } + if (@event.FsmEvent is StateTimeout) + { + return GoTo(UserState.Inactive).ForMax(TimeSpan.FromSeconds(2)); + } + return state; + }); + + + When(UserState.Inactive, (@event, state) => + { + if (@event.FsmEvent is AddItem) + { + var addItem = (AddItem) @event.FsmEvent; + return + GoTo(UserState.Shopping) + .Applying(new ItemAdded(addItem.Item)) + .ForMax(TimeSpan.FromSeconds(1)); + } + if (@event.FsmEvent is StateTimeout) + { + //var addItem = ((AddItem)@event) + return + Stop() + .Applying(new OrderDiscarded()) + .AndThen(cart => _reportActor.Tell(new ShoppingCardDiscarded())); + } + return state; + }); + + When(UserState.Paid, (@event, state) => + { + if (@event.FsmEvent is Leave) + { + return Stop(); + } + if (@event.FsmEvent is GetCurrentCart) + { + return Stay().Replying(@event.StateData); + } + return state; + }); + } + + public override string PersistenceId + { + get { return _persistenceId; } + } + + + protected override void OnRecoveryCompleted() + { + } + + protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) + { + if (e is ItemAdded) + { + var itemAdded = (ItemAdded) e; + return data.AddItem(itemAdded.Item); + } + if (e is OrderExecuted) + { + return data; + } + if (e is OrderDiscarded) + { + return data.Empty(); + } + + return data; + } + } + } + + internal class SimpleTransitionFSM : PersistentFSM + { + private readonly IActorRef _reportActor; + private readonly string _persistenceId; + + public SimpleTransitionFSM(string persistenceId, IActorRef reportActor) + { + _persistenceId = persistenceId; + _reportActor = reportActor; + StartWith(UserState.LookingAround, new EmptyShoppingCart()); + + When(UserState.LookingAround, (@event, state) => + { + if ((string) @event.FsmEvent == "stay") + { + return Stay(); + } + return GoTo(UserState.LookingAround); + }); + OnTransition((state, nextState) => _reportActor.Tell(string.Format("{0} -> {1}", state, nextState))); + } + + public override string PersistenceId + { + get { return _persistenceId; } + } + + + protected override void OnRecoveryCompleted() + { + } + + protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) + { + return data; + } + } + + internal class PersistentEventsStreamer : PersistentActor + { + private readonly IActorRef _client; + private readonly string _persistenceId; + + public PersistentEventsStreamer(string persistenceId, IActorRef client) + { + _persistenceId = persistenceId; + _client = client; + } + + public override string PersistenceId + { + get { return _persistenceId; } + } + + protected override bool ReceiveRecover(object message) + { + if (!(message is RecoveryCompleted)) + { + _client.Tell(message); + } + + return true; + } + + protected override bool ReceiveCommand(object message) + { + return true; + } + } + + #region Custome States + + internal enum UserState + { + Shopping, + Inactive, + Paid, + LookingAround + } + + #endregion + + #region Customer states data + + internal class Item + { + public Item(string id, string name, float price) + { + Id = id; + Name = name; + Price = price; + } + + public string Id { get; private set; } + public string Name { get; private set; } + public float Price { get; private set; } + } + + internal interface IShoppingCart + { + ICollection Items { get; set; } + IShoppingCart AddItem(Item item); + IShoppingCart Empty(); + } + + internal class EmptyShoppingCart : IShoppingCart + { + public IShoppingCart AddItem(Item item) + { + return new NonEmptyShoppingCart(item); + } + + public IShoppingCart Empty() + { + return this; + } + + public ICollection Items { get; set; } + } + + internal class NonEmptyShoppingCart : IShoppingCart + { + public NonEmptyShoppingCart(Item item) + { + Items = new List(); + Items.Add(item); + } + + public IShoppingCart AddItem(Item item) + { + Items.Add(item); + return this; + } + + public IShoppingCart Empty() + { + return new EmptyShoppingCart(); + } + + public ICollection Items { get; set; } + } + + #endregion + + #region Customer commands + + internal interface ICommand + { + } + + internal class AddItem : ICommand + { + public AddItem(Item item) + { + Item = item; + } + + public Item Item { get; private set; } + } + + internal class Buy + { + } + + internal class Leave + { + } + + internal class GetCurrentCart : ICommand + { + } + + #endregion + + #region Customer domain events + + internal interface IDomainEvent + { + } + + internal class ItemAdded : IDomainEvent + { + public ItemAdded(Item item) + { + Item = item; + } + + public Item Item { get; private set; } + } + + internal class OrderExecuted : IDomainEvent + { + } + + internal class OrderDiscarded : IDomainEvent + { + } + + #endregion + + #region Side effects - report events to be sent to some + + internal interface IReportEvent + { + } + + internal class PurchaseWasMade : IReportEvent + { + } + + internal class ShoppingCardDiscarded : IReportEvent + { + } + + #endregion +} \ No newline at end of file diff --git a/src/core/Akka.Persistence/Akka.Persistence.csproj b/src/core/Akka.Persistence/Akka.Persistence.csproj index 383440396d1..d33b27b744f 100644 --- a/src/core/Akka.Persistence/Akka.Persistence.csproj +++ b/src/core/Akka.Persistence/Akka.Persistence.csproj @@ -65,6 +65,8 @@ + + diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs new file mode 100644 index 00000000000..4e32617a303 --- /dev/null +++ b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs @@ -0,0 +1,82 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System.Collections.Generic; + +namespace Akka.Persistence.Fsm +{ + /// + /// Finite state machine (FSM) persistent actor. + /// + /// The state name type + /// The state data type + /// The event data type + public abstract class PersistentFSM : PersistentFSMBase + { + protected abstract void OnRecoveryCompleted(); + + protected override bool ReceiveRecover(object message) + { + var match = message.Match() + .With(t => + { + Initialize(); + OnRecoveryCompleted(); + }) + .With(e => StartWith(StateName, ApplyEvent(e, StateData))) + .With(sce => StartWith(sce.State, StateData, sce.TimeOut)); + + return match.WasHandled; + } + + protected abstract TData ApplyEvent(TEvent e, TData data); + + protected override void ApplyState(State upcomingState) + { + var eventsToPersist = new List(); + if (upcomingState.DomainEvents != null) + { + foreach (var domainEvent in upcomingState.DomainEvents) + { + eventsToPersist.Add(domainEvent); + } + } + if (!StateName.Equals(upcomingState.StateName) || upcomingState.Timeout.HasValue) + { + eventsToPersist.Add(new StateChangeEvent(upcomingState.StateName, upcomingState.Timeout)); + } + if (eventsToPersist.Count == 0) + { + base.ApplyState(upcomingState); + } + else + { + var nextData = StateData; // upcomingState.StateData; + var handlersExecutedCounter = 0; + + + Persist(eventsToPersist, @event => + { + handlersExecutedCounter++; + if (@event is TEvent) + { + nextData = ApplyEvent((TEvent) @event, nextData); + } + if (handlersExecutedCounter == eventsToPersist.Count) + { + base.ApplyState(upcomingState.Using(nextData)); + + if (upcomingState.AfterTransitionHandler != null) + { + upcomingState.AfterTransitionHandler(upcomingState.StateData); + } + } + }); + } + } + } +} \ No newline at end of file diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs new file mode 100644 index 00000000000..fa8a2a1ac58 --- /dev/null +++ b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs @@ -0,0 +1,817 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using Akka.Actor; +using Akka.Actor.Internal; +using Akka.Event; +using Akka.Persistence.Serialization; +using Akka.Routing; +using Akka.Util; +using Akka.Util.Internal; + +namespace Akka.Persistence.Fsm +{ + public abstract class PersistentFSMBase : PersistentActor, IListeners + { + public delegate State StateFunction( + FSMBase.Event fsmEvent, State state = null); + + public delegate void TransitionHandler(TState initialState, TState nextState); + + protected readonly ListenerSupport _listener = new ListenerSupport(); + private readonly ILoggingAdapter _log = Context.GetLogger(); + + /// + /// State definitions + /// + private readonly Dictionary _stateFunctions = new Dictionary(); + + private readonly Dictionary _stateTimeouts = new Dictionary(); + + private readonly AtomicCounter _timerGen = new AtomicCounter(0); + + /// + /// Timer handling + /// + protected readonly IDictionary _timers = new Dictionary(); + + /// + /// Transition handling + /// + private readonly IList _transitionEvent = new List(); + + /// + /// FSM state data and current timeout handling + /// + /// a + protected State _currentState; + + protected long _generation; + private StateFunction _handleEvent; + private State _nextState; + + /// + /// Termination handling + /// + private Action> _terminateEvent = @event => { }; + + protected ICancelable _timeoutFuture; + + /// + /// Can be set to enable debugging on certain actions taken by the FSM + /// + protected bool DebugEvent; + + protected PersistentFSMBase() + { + if (this is ILoggingFSM) + DebugEvent = Context.System.Settings.FsmDebugEvent; + } + + /// + /// Current state name + /// + public TState StateName + { + get { return _currentState.StateName; } + } + + /// + /// Current state data + /// + public TData StateData + { + get { return _currentState.StateData; } + } + + /// + /// Return next state data (available in handlers) + /// + public TData NextStateData + { + get + { + if (_nextState == null) + throw new InvalidOperationException("NextStateData is only available during OnTransition"); + return _nextState.StateData; + } + } + + /// + /// Unhandled event handler + /// + private StateFunction HandleEventDefault + { + get + { + return delegate(FSMBase.Event @event, State state) + { + _log.Warning("unhandled event {0} in state {1}", @event.FsmEvent, StateName); + return Stay(); + }; + } + } + + private StateFunction HandleEvent + { + get { return _handleEvent ?? (_handleEvent = HandleEventDefault); } + set { _handleEvent = value; } + } + + public bool IsStateTimerActive { get; private set; } + + public ListenerSupport Listeners + { + get { return _listener; } + } + + /// + /// Insert a new at the end of the processing chain for the + /// given state. If the stateTimeout parameter is set, entering this state without a + /// differing explicit timeout setting will trigger a . + /// + /// designator for the state + /// delegate describing this state's response to input + /// default timeout for this state + public void When(TState stateName, StateFunction func, TimeSpan? timeout = null) + { + Register(stateName, func, timeout); + } + + /// + /// Sets the initial state for this FSM. Call this method from the constructor before the + /// method. + /// If different state is needed after a restart this method, followed by , can be used in the + /// actor + /// life cycle hooks and . + /// + /// Initial state designator. + /// Initial state data. + /// State timeout for the initial state, overriding the default timeout for that state. + public void StartWith(TState stateName, TData stateData, TimeSpan? timeout = null) + { + _currentState = new State(stateName, stateData, timeout); + } + + /// + /// Produce transition to other state. Return this from a state function + /// in order to effect the transition. + /// + /// State designator for the next state + /// State transition descriptor + public State GoTo(TState nextStateName) + { + return new State(nextStateName, _currentState.StateData); + } + + /// + /// Produce transition to other state. Return this from a state function + /// in order to effect the transition. + /// + /// State designator for the next state + /// Data for next state + /// State transition descriptor + public State GoTo(TState nextStateName, TData stateData) + { + return new State(nextStateName, stateData); + } + + /// + /// Produce "empty" transition descriptor. Return this from a state function + /// when no state change is to be effected. + /// + /// Descriptor for staying in the current state. + public State Stay() + { + return GoTo(_currentState.StateName); + } + + /// + /// Produce change descriptor to stop this FSM actor with + /// + public State Stop() + { + return Stop(new FSMBase.Normal()); + } + + /// + /// Produce change descriptor to stop this FSM actor with the specified . + /// + public State Stop(FSMBase.Reason reason) + { + return Stop(reason, _currentState.StateData); + } + + public State Stop(FSMBase.Reason reason, TData stateData) + { + return Stay().Using(stateData).WithStopReason(reason); + } + + /// + /// Schedule named timer to deliver message after given delay, possibly repeating. + /// Any existing timer with the same name will automatically be canceled before adding + /// the new timer. + /// + /// identifier to be used with . + /// message to be delivered + /// delay of first message delivery and between subsequent messages. + /// send once if false, scheduleAtFixedRate if true + public void SetTimer(string name, object msg, TimeSpan timeout, bool repeat = false) + { + if (DebugEvent) + _log.Debug("setting " + (repeat ? "repeating" : "") + "timer '{0}' / {1}: {2}", name, timeout, msg); + if (_timers.ContainsKey(name)) + _timers[name].Cancel(); + var timer = new Timer(name, msg, repeat, _timerGen.Next(), Context, DebugEvent ? _log : null); + timer.Schedule(Self, timeout); + + if (!_timers.ContainsKey(name)) + _timers.Add(name, timer); + else + _timers[name] = timer; + } + + /// + /// Cancel a named , ensuring that the message is not subsequently delivered (no + /// race.) + /// + /// The name of the timer to cancel. + public void CancelTimer(string name) + { + if (DebugEvent) + { + _log.Debug("Cancelling timer {0}", name); + } + + if (_timers.ContainsKey(name)) + { + _timers[name].Cancel(); + _timers.Remove(name); + } + } + + /// + /// Determines whether the named timer is still active. Returns true + /// unless the timer does not exist, has previously been cancelled, or + /// if it was a single-shot timer whose message was already received. + /// + public bool IsTimerActive(string name) + { + return _timers.ContainsKey(name); + } + + /// + /// Set the state timeout explicitly. This method can be safely used from + /// within a state handler. + /// + public void SetStateTimeout(TState state, TimeSpan? timeout) + { + if (!_stateTimeouts.ContainsKey(state)) + _stateTimeouts.Add(state, timeout); + else + _stateTimeouts[state] = timeout; + } + + /// + /// Set handler which is called upon each state transition, i.e. not when + /// staying in the same state. + /// + public void OnTransition(TransitionHandler transitionHandler) + { + _transitionEvent.Add(transitionHandler); + } + + /// + /// Set the handler which is called upon termination of this FSM actor. Calling this + /// method again will overwrite the previous contents. + /// + public void OnTermination(Action> terminationHandler) + { + _terminateEvent = terminationHandler; + } + + /// + /// Set handler which is called upon reception of unhandled FSM messages. Calling + /// this method again will overwrite the previous contents. + /// + /// + public void WhenUnhandled(StateFunction stateFunction) + { + HandleEvent = OrElse(stateFunction, HandleEventDefault); + } + + /// + /// Verify the existence of initial state and setup timers. This should be the + /// last call within the constructor or and + /// . + /// + public void Initialize() + { + MakeTransition(_currentState); + } + + public TransformHelper Transform(StateFunction func) + { + return new TransformHelper(func); + } + + private void Register(TState name, StateFunction function, TimeSpan? timeout) + { + if (_stateFunctions.ContainsKey(name)) + { + _stateFunctions[name] = OrElse(_stateFunctions[name], function); + _stateTimeouts[name] = _stateTimeouts[name] ?? timeout; + } + else + { + _stateFunctions.Add(name, function); + _stateTimeouts.Add(name, timeout); + } + } + + private void HandleTransition(TState previous, TState next) + { + foreach (var tran in _transitionEvent) + { + tran.Invoke(previous, next); + } + } + + /// + /// C# port of Scala's orElse method for partial function chaining. + /// See http://scalachina.com/api/scala/PartialFunction.html + /// + /// The original to be called + /// The to be called if returns null + /// + /// A which combines both the results of and + /// + /// + private static StateFunction OrElse(StateFunction original, StateFunction fallback) + { + StateFunction chained = delegate(FSMBase.Event @event, State state) + { + var originalResult = original.Invoke(@event, state); + if (originalResult == null) return fallback.Invoke(@event, state); + return originalResult; + }; + + return chained; + } + + protected void ProcessMsg(object any, object source) + { + var fsmEvent = new FSMBase.Event(any, _currentState.StateData); + ProcessEvent(fsmEvent, source); + } + + private void ProcessEvent(FSMBase.Event fsmEvent, object source) + { + if (DebugEvent) + { + var srcStr = GetSourceString(source); + _log.Debug("processing {0} from {1}", fsmEvent, srcStr); + } + var stateFunc = _stateFunctions[_currentState.StateName]; + var oldState = _currentState; + State upcomingState = null; + + if (stateFunc != null) + { + upcomingState = stateFunc(fsmEvent); + } + + if (upcomingState == null) + { + upcomingState = HandleEvent(fsmEvent); + } + + ApplyState(upcomingState); + if (DebugEvent && !Equals(oldState, upcomingState)) + { + _log.Debug("transition {0} -> {1}", oldState, upcomingState); + } + } + + private string GetSourceString(object source) + { + var s = source as string; + if (s != null) return s; + var timer = source as Timer; + if (timer != null) return "timer '" + timer.Name + "'"; + var actorRef = source as IActorRef; + if (actorRef != null) return actorRef.ToString(); + return "unknown"; + } + + + protected virtual void ApplyState(State upcomingState) + { + if (upcomingState.StopReason == null) + { + MakeTransition(upcomingState); + return; + } + var replies = upcomingState.Replies; + replies.Reverse(); + foreach (var reply in replies) + { + Sender.Tell(reply); + } + Terminate(upcomingState); + Context.Stop(Self); + } + + private void MakeTransition(State upcomingState) + { + if (!_stateFunctions.ContainsKey(upcomingState.StateName)) + { + Terminate( + Stay() + .WithStopReason( + new FSMBase.Failure(string.Format("Next state {0} does not exist", upcomingState.StateName)))); + } + else + { + var replies = upcomingState.Replies; + replies.Reverse(); + foreach (var r in replies) + { + Sender.Tell(r); + } + if (!_currentState.StateName.Equals(upcomingState.StateName)) + { + _nextState = upcomingState; + HandleTransition(_currentState.StateName, _nextState.StateName); + Listeners.Gossip(new FSMBase.Transition(Self, _currentState.StateName, _nextState.StateName)); + _nextState = null; + } + _currentState = upcomingState; + var timeout = _currentState.Timeout ?? _stateTimeouts[_currentState.StateName]; + if (timeout.HasValue) + { + var t = timeout.Value; + if (t < TimeSpan.MaxValue) + { + _timeoutFuture = Context.System.Scheduler.ScheduleTellOnceCancelable(t, Context.Self, + new TimeoutMarker(_generation), Context.Self); + } + } + } + } + + protected override bool ReceiveCommand(object message) + { + var match = message.Match() + .With(marker => + { + if (_generation == marker.Generation) + { + ProcessMsg(new StateTimeout(), "state timeout"); + } + }) + .With(t => + { + if (_timers.ContainsKey(t.Name) && _timers[t.Name].Generation == t.Generation) + { + if (_timeoutFuture != null) + { + _timeoutFuture.Cancel(false); + _timeoutFuture = null; + } + _generation++; + if (!t.Repeat) + { + _timers.Remove(t.Name); + } + ProcessMsg(t.Message, t); + } + }) + .With(cb => + { + Context.Watch(cb.ActorRef); + Listeners.Add(cb.ActorRef); + //send the current state back as a reference point + cb.ActorRef.Tell(new FSMBase.CurrentState(Self, _currentState.StateName)); + }) + .With(l => + { + Context.Watch(l.Listener); + Listeners.Add(l.Listener); + l.Listener.Tell(new FSMBase.CurrentState(Self, _currentState.StateName)); + }) + .With(ucb => + { + Context.Unwatch(ucb.ActorRef); + Listeners.Remove(ucb.ActorRef); + }) + .With(d => + { + Context.Unwatch(d.Listener); + Listeners.Remove(d.Listener); + }) + .With(_ => { DebugEvent = true; }) + .Default(msg => + { + if (_timeoutFuture != null) + { + _timeoutFuture.Cancel(false); + _timeoutFuture = null; + } + _generation++; + ProcessMsg(msg, Sender); + }); + return match.WasHandled; + } + + protected void Terminate(State upcomingState) + { + if (_currentState.StopReason == null) + { + var reason = upcomingState.StopReason; + LogTermination(reason); + foreach (var t in _timers) + { + t.Value.Cancel(); + } + _timers.Clear(); + _currentState = upcomingState; + + var stopEvent = new FSMBase.StopEvent(reason, _currentState.StateName, + _currentState.StateData); + _terminateEvent(stopEvent); + } + } + + /// + /// Call the hook if you want to retain this behavior. + /// When overriding make sure to call base.PostStop(); + /// Please note that this method is called by default from so + /// override that one if shall not be called during restart. + /// + protected override void PostStop() + { + /* + * Setting this instance's state to Terminated does no harm during restart, since + * the new instance will initialize fresh using StartWith. + */ + Terminate(Stay().WithStopReason(new FSMBase.Shutdown())); + base.PostStop(); + } + + /// + /// By default, is logged at error level and other + /// reason types are not logged. It is possible to override this behavior. + /// + /// + protected virtual void LogTermination(FSMBase.Reason reason) + { + reason.Match() + .With(f => + { + if (f.Cause is Exception) + { + _log.Error(f.Cause.AsInstanceOf(), "terminating due to Failure"); + } + else + { + _log.Error(f.Cause.ToString()); + } + }); + } + + public sealed class TransformHelper + { + public TransformHelper(StateFunction func) + { + Func = func; + } + + public StateFunction Func { get; private set; } + + public StateFunction Using(Func, State> andThen) + { + StateFunction continuedDelegate = (@event, state) => andThen.Invoke(Func.Invoke(@event, state)); + return continuedDelegate; + } + } + + public class StateChangeEvent : IMessage + { + public StateChangeEvent(TState state, TimeSpan? timeOut) + { + State = state; + TimeOut = timeOut; + } + + public TState State { get; private set; } + + public TimeSpan? TimeOut { get; private set; } + } + + #region States + + /// + /// Used in the event of a timeout between transitions + /// + public class StateTimeout + { + } + + /* + * INTERNAL API - used for ensuring that state changes occur on-time + */ + + internal class TimeoutMarker + { + public TimeoutMarker(long generation) + { + Generation = generation; + } + + public long Generation { get; private set; } + } + + [DebuggerDisplay("Timer {Name,nq}, message: {Message")] + public class Timer : INoSerializationVerificationNeeded + { + private readonly ILoggingAdapter _debugLog; + private readonly ICancelable _ref; + + private readonly IScheduler _scheduler; + + public Timer(string name, object message, bool repeat, int generation, IActorContext context, + ILoggingAdapter debugLog) + { + _debugLog = debugLog; + Context = context; + Generation = generation; + Repeat = repeat; + Message = message; + Name = name; + var scheduler = context.System.Scheduler; + _scheduler = scheduler; + _ref = new Cancelable(scheduler); + } + + public string Name { get; private set; } + + public object Message { get; private set; } + + public bool Repeat { get; private set; } + + public int Generation { get; private set; } + + public IActorContext Context { get; private set; } + + public void Schedule(IActorRef actor, TimeSpan timeout) + { + var name = Name; + var message = Message; + + Action send; + if (_debugLog != null) + send = () => + { + _debugLog.Debug("{0}Timer '{1}' went off. Sending {2} -> {3}", + _ref.IsCancellationRequested ? "Cancelled " : "", name, message, actor); + actor.Tell(this, Context.Self); + }; + else + send = () => actor.Tell(this, Context.Self); + + if (Repeat) _scheduler.Advanced.ScheduleRepeatedly(timeout, timeout, send, _ref); + else _scheduler.Advanced.ScheduleOnce(timeout, send, _ref); + } + + public void Cancel() + { + if (!_ref.IsCancellationRequested) + { + _ref.Cancel(false); + } + } + } + + + /// + /// This captures all of the managed state of the : the state name, + /// the state data, possibly custom timeout, stop reason, and replies accumulated while + /// processing the last message. + /// + /// The name of the state + /// The data of the state + /// The event of the state + public class State : FSMBase.State + { + public Action AfterTransitionHandler { get; private set; } + + + public State(TS stateName, TD stateData, TimeSpan? timeout = null, FSMBase.Reason stopReason = null, + List replies = null, ILinearSeq domainEvents = null, Action afterTransitionDo = null) + : base(stateName, stateData, timeout, stopReason, replies) + { + AfterTransitionHandler = afterTransitionDo; + DomainEvents = domainEvents; + Notifies = true; + } + + public ILinearSeq DomainEvents { get; private set; } + + public bool Notifies { get; set; } + + /// + /// Specify domain events to be applied when transitioning to the new state. + /// + /// + /// + public State Applying(ILinearSeq events) + { + if (DomainEvents == null) + { + return Copy(null, null, null, events); + } + return Copy(null, null, null, new ArrayLinearSeq(DomainEvents.Concat(events).ToArray())); + } + + + /// + /// Specify domain event to be applied when transitioning to the new state. + /// + /// + /// + public State Applying(TE e) + { + if (DomainEvents == null) + { + return Copy(null, null, null, new ArrayLinearSeq(new[] {e})); + } + var events = new List(); + events.AddRange(DomainEvents); + events.Add(e); + return Copy(null, null, null, new ArrayLinearSeq(DomainEvents.Concat(events).ToArray())); + } + + + /// + /// Register a handler to be triggered after the state has been persisted successfully + /// + /// + /// + public State AndThen(Action handler) + { + return Copy(null, null, null, null, handler); + } + + public State Copy(TimeSpan? timeout, FSMBase.Reason stopReason = null, + List replies = null, ILinearSeq domainEvents = null, Action afterTransitionDo = null) + { + return new State(StateName, StateData, timeout ?? Timeout, stopReason ?? StopReason, + replies ?? Replies, + domainEvents ?? DomainEvents, afterTransitionDo ?? AfterTransitionHandler); + } + + /// + /// Modify state transition descriptor with new state data. The data will be set + /// when transitioning to the new state. + /// + public new State Using(TD nextStateData) + { + return new State(StateName, nextStateData, Timeout, StopReason, Replies); + } + + + public new State Replying(object replyValue) + { + if (Replies == null) Replies = new List(); + var newReplies = Replies.ToArray().ToList(); + newReplies.Add(replyValue); + return Copy(Timeout, replies: newReplies); + } + + public new State ForMax(TimeSpan timeout) + { + if (timeout <= TimeSpan.MaxValue) return Copy(timeout); + return Copy(null); + } + + /// + /// INTERNAL API + /// + internal State WithStopReason(FSMBase.Reason reason) + { + return Copy(null, reason); + } + + #endregion + } + } +} \ No newline at end of file diff --git a/src/core/Akka/Actor/FSM.cs b/src/core/Akka/Actor/FSM.cs index 6bd7dbe37fe..8c7542fe3b3 100644 --- a/src/core/Akka/Actor/FSM.cs +++ b/src/core/Akka/Actor/FSM.cs @@ -250,7 +250,7 @@ public State(TS stateName, TD stateData, TimeSpan? timeout = null, Reason stopRe public Reason StopReason { get; private set; } - public List Replies { get; private set; } + public List Replies { get; protected set; } public State Copy(TimeSpan? timeout, Reason stopReason = null, List replies = null) {