From d259edeb572352cb755f3ce6ebe7857a7acbda3f Mon Sep 17 00:00:00 2001 From: "maxim.salamatko" Date: Fri, 30 Oct 2015 09:39:03 +0400 Subject: [PATCH 1/8] added raw implementation with failed tests --- .../Akka.Persistence.Tests.csproj | 1 + .../Fsm/PersistentFSMSpec.cs | 430 +++++++++++ .../Akka.Persistence/Akka.Persistence.csproj | 2 + .../Akka.Persistence/Fsm/PersistentFSM.cs | 682 ++++++++++++++++++ .../Akka.Persistence/Fsm/PersistentFSMBase.cs | 215 ++++++ src/core/Akka/Actor/FSM.cs | 2 +- 6 files changed, 1331 insertions(+), 1 deletion(-) create mode 100644 src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs create mode 100644 src/core/Akka.Persistence/Fsm/PersistentFSM.cs create mode 100644 src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs diff --git a/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj b/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj index 43510797058..4cc81ded709 100644 --- a/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj +++ b/src/core/Akka.Persistence.Tests/Akka.Persistence.Tests.csproj @@ -44,6 +44,7 @@ + diff --git a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs new file mode 100644 index 00000000000..34d92827634 --- /dev/null +++ b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs @@ -0,0 +1,430 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Akka.Actor; +using Akka.Dispatch.SysMsg; +using Akka.Persistence.Fsm; +using Akka.TestKit; +using Akka.Util; +using Xunit; + +namespace Akka.Persistence.Tests.Fsm +{ + public partial class PersistentFSMSpec : PersistenceSpec + { + private readonly Random _random = new Random(); + public PersistentFSMSpec() + : base(PersistenceSpec.Configuration("inmem", "PersistentFSMSpec")) + { + } + + [Fact] + public void PersistentFSM_should_has_function_as_regular_fsm() + { + var dummyReportActorRef = CreateTestProbe().Ref; + + var fsmRef = Sys.ActorOf(Props.Create(Name, dummyReportActorRef), Name); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + var shoes = new Item("2", "Shoes", 89.99F); + var coat = new Item("3", "Coat", 119.99F); + + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shirt)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shoes)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(coat)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new Buy()); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new Leave()); + ExpectMsg>(state => state.State == UserState.LookingAround); + ExpectMsg(); + ExpectMsg>(state => state.From == UserState.LookingAround); + ExpectMsg(); + ExpectMsg(); + ExpectMsg(); + ExpectMsg>(); + ExpectMsg(); + ExpectTerminated(fsmRef); + } + + [Fact] + public void PersistentFSM_should_has_function_as_regular_fsm_on_state_timeout() + { + var dummyReportActorRef = CreateTestProbe().Ref; + + var fsmRef = Sys.ActorOf(Props.Create(Name, dummyReportActorRef), Name); + + Watch(fsmRef); + + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + + fsmRef.Tell(new AddItem(shirt)); + ExpectMsg>(state => + { + return state.State == UserState.LookingAround; + + }); + + ExpectMsg>(); + + Within(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1.9), () => + { + ExpectMsg>(); + return true; + }); + + ExpectTerminated(fsmRef); + } + + [Fact] + public void PersistentFSM_should_recover_successfully_with_correct_state_data() + { + var dummyReportActorRef = CreateTestProbe().Ref; + + var fsmRef = Sys.ActorOf(Props.Create(()=> new WebStoreCustomerFSM(Name, dummyReportActorRef))); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + var shoes = new Item("2", "Shoes", 89.99F); + var coat = new Item("3", "Coat", 119.99F); + + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shirt)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shoes)); + fsmRef.Tell(new GetCurrentCart()); + + + //fsmRef.Tell(new AddItem(coat)); + //fsmRef.Tell(new GetCurrentCart()); + //fsmRef.Tell(new Buy()); + //fsmRef.Tell(new GetCurrentCart()); + //fsmRef.Tell(new Leave()); + + + ExpectMsg>(); + ExpectMsg(); + ExpectMsg>(state => + { + return state.From == UserState.LookingAround; + + }); + ExpectMsg(); + ExpectMsg(); + + fsmRef.Tell(PoisonPill.Instance); + ExpectTerminated(fsmRef); + + var recoveredFsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); + Watch(recoveredFsmRef); + recoveredFsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + recoveredFsmRef.Tell(new GetCurrentCart()); + recoveredFsmRef.Tell(new AddItem(coat)); + recoveredFsmRef.Tell(new GetCurrentCart()); + recoveredFsmRef.Tell(new Buy()); + recoveredFsmRef.Tell(new GetCurrentCart()); + recoveredFsmRef.Tell(new Leave()); + + ExpectMsg>(state => + { + return state.State == UserState.Shopping; + + }); + ExpectMsg(); + ExpectMsg(); + ExpectMsg>(); + ExpectMsg(); + ExpectTerminated(recoveredFsmRef); + } + + internal class WebStoreCustomerFSM : PersistentFSM + { + private readonly string _persistenceId; + private readonly IActorRef _reportActor; + + public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) + { + _persistenceId = persistenceId; + _reportActor = reportActor; + StartWith(UserState.LookingAround, new EmptyShoppingCart()); + + When(UserState.LookingAround, (@event,state) => + { + if (@event.FsmEvent is AddItem) + { + var addItem = (AddItem) @event.FsmEvent; + return + GoTo(UserState.Shopping) + .Applying(new ItemAdded(addItem.Item)).ForMax(TimeSpan.FromSeconds(1)); + } + if (@event.FsmEvent is GetCurrentCart) + { + return Stay().Replying(@event.StateData); + } + return state; + }); + + + When(UserState.Shopping, (@event, state) => + { + if (@event.FsmEvent is AddItem) + { + var addItem = ((AddItem) @event.FsmEvent); + return Stay().Applying(new ItemAdded(addItem.Item)).ForMax(TimeSpan.FromSeconds(1)); + } + if (@event.FsmEvent is Buy) + { + return + GoTo(UserState.Paid) + .Applying(new OrderExecuted()) + .AndThen(cart => + { + if (cart is NonEmptyShoppingCart) + { + _reportActor.Tell(new PurchaseWasMade()); + } + }); + } + if (@event.FsmEvent is Leave) + { + return Stop().Applying(new OrderDiscarded()).AndThen(cart => + { + _reportActor.Tell(new ShoppingCardDiscarded()); + }); + } + if (@event.FsmEvent is GetCurrentCart) + { + return Stay().Replying(@event.StateData); + } + if (@event.FsmEvent is StateTimeout) + { + return GoTo(UserState.Inactive).ForMax(TimeSpan.FromSeconds(2)); + } + return state; + }); + + + When(UserState.Inactive, (@event, state) => + { + if (@event.FsmEvent is AddItem) + { + var addItem = (AddItem) @event.FsmEvent; + return + GoTo(UserState.Shopping) + .Applying(new ItemAdded(addItem.Item)) + .ForMax(TimeSpan.FromSeconds(1)); + } + if (@event.FsmEvent is StateTimeout) + { + //var addItem = ((AddItem)@event) + return Stop().Applying(new OrderDiscarded()).AndThen(cart => + { + _reportActor.Tell(new ShoppingCardDiscarded()); + }); + } + return state; + }); + + When(UserState.Paid, (@event, state) => + { + if (@event.FsmEvent is Leave) + { + return Stop(); + } + if (@event.FsmEvent is GetCurrentCart) + { + return Stay().Replying(@event.StateData); + } + return state; + }); + } + + public override string PersistenceId { get { return _persistenceId; }} + + + + //protected override IShoppingCart ApplyEvent(IDomainEvent state, IShoppingCart data) + //{ + // if (state is ItemAdded) + // { + // return data.AddItem(((ItemAdded)state).Item); + // } + // if (state is OrderExecuted) + // { + // return data; + // } + // if (state is OrderDiscarded) + // { + // return data.Empty(); + // } + // return data; + //} + + + //protected override void OnRecoveryCompleted() + //{ + // // + //} + + protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) + { + if (e is ItemAdded) + { + var itemAdded = (ItemAdded) e; + return data.AddItem(itemAdded.Item); + } + if (e is OrderExecuted) + { + return data; + } + if (e is OrderDiscarded) + { + return data.Empty(); + } + + return data; + } + } + + } + + #region Custome States + internal enum UserState + { + Shopping, + Inactive, + Paid, + LookingAround + } + + #endregion + + #region Customer states data + + internal class Item + { + public string Id { get; } + public string Name { get; } + public float Price { get; } + + public Item(string id, string name, float price) + { + Id = id; + Name = name; + Price = price; + } + } + + internal interface IShoppingCart + { + IShoppingCart AddItem(Item item); + IShoppingCart Empty(); + } + + internal class EmptyShoppingCart : IShoppingCart + { + public IShoppingCart AddItem(Item item) + { + return new NonEmptyShoppingCart(); + } + + public IShoppingCart Empty() + { + return this; + } + } + + internal class NonEmptyShoppingCart : IShoppingCart + { + public IShoppingCart AddItem(Item item) + { + return this; + } + + public IShoppingCart Empty() + { + return new EmptyShoppingCart(); + } + } + + #endregion + + #region Customer commands + + internal interface ICommand + { + + } + + internal class AddItem : ICommand + { + public AddItem(Item item) + { + Item = item; + } + + public Item Item { get; } + } + + internal class Buy + { + } + internal class Leave + { + } + internal class GetCurrentCart : ICommand + { + } + + #endregion + + + #region Customer domain events + internal interface IDomainEvent + { + + } + + internal class ItemAdded : IDomainEvent + { + public ItemAdded(Item item) + { + Item = item; + } + + public Item Item { get; } + } + + internal class OrderExecuted : IDomainEvent + { + } + internal class OrderDiscarded : IDomainEvent + { + } + #endregion + #region Side effects - report events to be sent to some + internal interface IReportEvent + { + + } + internal class PurchaseWasMade : IReportEvent + { + } + internal class ShoppingCardDiscarded : IReportEvent + { + } + #endregion +} diff --git a/src/core/Akka.Persistence/Akka.Persistence.csproj b/src/core/Akka.Persistence/Akka.Persistence.csproj index ce5c00155b4..929231bf356 100644 --- a/src/core/Akka.Persistence/Akka.Persistence.csproj +++ b/src/core/Akka.Persistence/Akka.Persistence.csproj @@ -47,6 +47,8 @@ + + diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs new file mode 100644 index 00000000000..36b2a5f513a --- /dev/null +++ b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs @@ -0,0 +1,682 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using Akka.Actor; +using Akka.Actor.Internal; +using Akka.Event; +using Akka.Persistence.Serialization; +using Akka.Routing; +using Akka.Util; +using Akka.Util.Internal; +using Akka.Util.Internal.Collections; + +namespace Akka.Persistence.Fsm +{ + /// + /// Finite state machine (FSM) persistent actor. + /// + /// The state name type + /// The state data type + /// The event data type + public abstract class PersistentFSM : PersistentFSMBase, IListeners + { + private readonly ILoggingAdapter _log = Context.GetLogger(); + protected PersistentFSM() + { + if (this is ILoggingFSM) + DebugEvent = Context.System.Settings.FsmDebugEvent; + } + + public delegate State StateFunction(FSMBase.Event fsmEvent, State state = null); + + public delegate void TransitionHandler(TState initialState, TState nextState); + + #region Finite State Machine Domain Specific Language (FSM DSL if you like acronyms) + + /// + /// Insert a new at the end of the processing chain for the + /// given state. If the stateTimeout parameter is set, entering this state without a + /// differing explicit timeout setting will trigger a . + /// + /// designator for the state + /// delegate describing this state's response to input + /// default timeout for this state + public void When(TState stateName, StateFunction func, TimeSpan? timeout = null) + { + Register(stateName, func, timeout); + } + + /// + /// Sets the initial state for this FSM. Call this method from the constructor before the method. + /// If different state is needed after a restart this method, followed by , can be used in the actor + /// life cycle hooks and . + /// + /// Initial state designator. + /// Initial state data. + /// State timeout for the initial state, overriding the default timeout for that state. + public void StartWith(TState stateName, TData stateData, TimeSpan? timeout = null) + { + _currentState = new State(stateName, stateData, timeout); + } + + /// + /// Produce transition to other state. Return this from a state function + /// in order to effect the transition. + /// + /// State designator for the next state + /// State transition descriptor + public State GoTo(TState nextStateName) + { + return new State(nextStateName, _currentState.StateData); + } + + /// + /// Produce transition to other state. Return this from a state function + /// in order to effect the transition. + /// + /// State designator for the next state + /// Data for next state + /// State transition descriptor + public State GoTo(TState nextStateName, TData stateData) + { + return new State(nextStateName, stateData); + } + + /// + /// Produce "empty" transition descriptor. Return this from a state function + /// when no state change is to be effected. + /// + /// Descriptor for staying in the current state. + public State Stay() + { + return GoTo(_currentState.StateName); + } + + /// + /// Produce change descriptor to stop this FSM actor with + /// + public State Stop() + { + return Stop(new FSMBase.Normal()); + } + + /// + /// Produce change descriptor to stop this FSM actor with the specified . + /// + public State Stop(FSMBase.Reason reason) + { + return Stop(reason, _currentState.StateData); + } + + public State Stop(FSMBase.Reason reason, TData stateData) + { + return Stay().Using(stateData).WithStopReason(reason); + } + + public sealed class TransformHelper + { + public TransformHelper(StateFunction func) + { + Func = func; + } + + public StateFunction Func { get; private set; } + + public StateFunction Using(Func, State> andThen) + { + StateFunction continuedDelegate = (@event, state) => andThen.Invoke(Func.Invoke(@event, state)); + return continuedDelegate; + } + } + + /// + /// Schedule named timer to deliver message after given delay, possibly repeating. + /// Any existing timer with the same name will automatically be canceled before adding + /// the new timer. + /// + /// identifier to be used with . + /// message to be delivered + /// delay of first message delivery and between subsequent messages. + /// send once if false, scheduleAtFixedRate if true + public void SetTimer(string name, object msg, TimeSpan timeout, bool repeat = false) + { + if (DebugEvent) + _log.Debug("setting " + (repeat ? "repeating" : "") + "timer '{0}' / {1}: {2}", name, timeout, msg); + if (_timers.ContainsKey(name)) + _timers[name].Cancel(); + var timer = new Timer(name, msg, repeat, _timerGen.Next(), Context, DebugEvent ? _log : null); + timer.Schedule(Self, timeout); + + if (!_timers.ContainsKey(name)) + _timers.Add(name, timer); + else + _timers[name] = timer; + } + + /// + /// Cancel a named , ensuring that the message is not subsequently delivered (no race.) + /// + /// The name of the timer to cancel. + public void CancelTimer(string name) + { + if (DebugEvent) + { + _log.Debug("Cancelling timer {0}", name); + } + + if (_timers.ContainsKey(name)) + { + _timers[name].Cancel(); + _timers.Remove(name); + } + } + + /// + /// Determines whether the named timer is still active. Returns true + /// unless the timer does not exist, has previously been cancelled, or + /// if it was a single-shot timer whose message was already received. + /// + public bool IsTimerActive(string name) + { + return _timers.ContainsKey(name); + } + + /// + /// Set the state timeout explicitly. This method can be safely used from + /// within a state handler. + /// + public void SetStateTimeout(TState state, TimeSpan? timeout) + { + if (!_stateTimeouts.ContainsKey(state)) + _stateTimeouts.Add(state, timeout); + else + _stateTimeouts[state] = timeout; + } + + + + /// + /// Set handler which is called upon each state transition, i.e. not when + /// staying in the same state. + /// + public void OnTransition(TransitionHandler transitionHandler) + { + _transitionEvent.Add(transitionHandler); + } + + /// + /// Set the handler which is called upon termination of this FSM actor. Calling this + /// method again will overwrite the previous contents. + /// + public void OnTermination(Action> terminationHandler) + { + _terminateEvent = terminationHandler; + } + + /// + /// Set handler which is called upon reception of unhandled FSM messages. Calling + /// this method again will overwrite the previous contents. + /// + /// + public void WhenUnhandled(StateFunction stateFunction) + { + HandleEvent = OrElse(stateFunction, HandleEventDefault); + } + + /// + /// Verify the existence of initial state and setup timers. This should be the + /// last call within the constructor or and + /// . + /// + public void Initialize() + { + MakeTransition(_currentState); + } + + /// + /// Current state name + /// + public TState StateName + { + get { return _currentState.StateName; } + } + + /// + /// Current state data + /// + public TData StateData + { + get { return _currentState.StateData; } + } + + /// + /// Return next state data (available in handlers) + /// + public TData NextStateData + { + get + { + if (_nextState == null) throw new InvalidOperationException("NextStateData is only available during OnTransition"); + return _nextState.StateData; + } + } + + public TransformHelper Transform(StateFunction func) { return new TransformHelper(func); } + + #endregion + + #region Internal implementation details + + private readonly ListenerSupport _listener = new ListenerSupport(); + public ListenerSupport Listeners { get { return _listener; } } + + /// + /// Can be set to enable debugging on certain actions taken by the FSM + /// + protected bool DebugEvent; + + /// + /// FSM state data and current timeout handling + /// + private State _currentState; + + private ICancelable _timeoutFuture; + private State _nextState; + private long _generation = 0L; + + /// + /// Timer handling + /// + private readonly IDictionary _timers = new Dictionary(); + private readonly AtomicCounter _timerGen = new AtomicCounter(0); + + /// + /// State definitions + /// + private readonly Dictionary _stateFunctions = new Dictionary(); + private readonly Dictionary _stateTimeouts = new Dictionary(); + + + private void Register(TState name, StateFunction function, TimeSpan? timeout) + { + if (_stateFunctions.ContainsKey(name)) + { + _stateFunctions[name] = OrElse(_stateFunctions[name], function); + _stateTimeouts[name] = _stateTimeouts[name] ?? timeout; + } + else + { + _stateFunctions.Add(name, function); + _stateTimeouts.Add(name, timeout); + } + } + + /// + /// Unhandled event handler + /// + private StateFunction HandleEventDefault + { + get + { + return delegate (FSMBase.Event @event, State state) + { + _log.Warning("unhandled event {0} in state {1}", @event.FsmEvent, StateName); + return Stay(); + }; + } + } + + private StateFunction _handleEvent; + + private StateFunction HandleEvent + { + get { return _handleEvent ?? (_handleEvent = HandleEventDefault); } + set { _handleEvent = value; } + } + + + /// + /// Termination handling + /// + private Action> _terminateEvent = @event => { }; + + /// + /// Transition handling + /// + private readonly IList _transitionEvent = new List(); + + private void HandleTransition(TState previous, TState next) + { + foreach (var tran in _transitionEvent) + { + tran.Invoke(previous, next); + } + } + + + + + /// + /// C# port of Scala's orElse method for partial function chaining. + /// + /// See http://scalachina.com/api/scala/PartialFunction.html + /// + /// The original to be called + /// The to be called if returns null + /// A which combines both the results of and + private static StateFunction OrElse(StateFunction original, StateFunction fallback) + { + StateFunction chained = delegate (FSMBase.Event @event, State state) + { + var originalResult = original.Invoke(@event, state); + if (originalResult == null) return fallback.Invoke(@event, state); + return originalResult; + }; + + return chained; + } + + #endregion + + #region Actor methods + + + public class StateChangeEvent : IMessage + { + public StateChangeEvent(string state, TimeSpan? timeOut) + { + State = state; + TimeOut = timeOut; + } + + public string State { get; private set; } + + public TimeSpan? TimeOut { get; private set; } + } + + private void ProcessMsg(object any, object source) + { + var fsmEvent = new FSMBase.Event(any, _currentState.StateData); + ProcessEvent(fsmEvent, source); + } + + private void ProcessEvent(FSMBase.Event fsmEvent, object source) + { + if (DebugEvent) + { + var srcStr = GetSourceString(source); + _log.Debug("processing {0} from {1}", fsmEvent, srcStr); + } + var stateFunc = _stateFunctions[_currentState.StateName]; + var oldState = _currentState; + State upcomingState = null; + + if (stateFunc != null) + { + upcomingState = stateFunc(fsmEvent); + } + + if (upcomingState == null) + { + upcomingState = HandleEvent(fsmEvent); + } + + ApplyState(upcomingState); + if (DebugEvent && !Equals(oldState, upcomingState)) + { + _log.Debug("transition {0} -> {1}", oldState, upcomingState); + } + } + + private string GetSourceString(object source) + { + var s = source as string; + if (s != null) return s; + var timer = source as Timer; + if (timer != null) return "timer '" + timer.Name + "'"; + var actorRef = source as IActorRef; + if (actorRef != null) return actorRef.ToString(); + return "unknown"; + } + + private void ApplyState(State upcomingState) + { + var eventsToPersist = new List(); + if (upcomingState.Notifies || upcomingState.Timeout.HasValue) + { + eventsToPersist.Add(new StateChangeEvent(upcomingState.StateName.ToString(), upcomingState.Timeout)); + } + if (upcomingState.DomainEvents == null || upcomingState.DomainEvents.IsEmpty) + { + BaseApplyState(upcomingState); + } + else + { + eventsToPersist.Add(upcomingState.DomainEvents); + + var nextData = upcomingState.StateData; + var handlersExecutedCounter = 0; + + Persist(eventsToPersist, @event => + { + handlersExecutedCounter++; + if (handlersExecutedCounter == upcomingState.DomainEvents.Count()) + { + BaseApplyState(upcomingState.Using(nextData)); + if (@event is TEvent) + { + nextData = ApplyEvent((TEvent)@event, nextData); + } + + if (upcomingState.AfterTransitionHandler != null) + { + upcomingState.AfterTransitionHandler(upcomingState.StateData); + } + + + } + }); + } + } + + protected abstract TData ApplyEvent(TEvent e, TData data); + + private void BaseApplyState(State upcomingState) + { + if (upcomingState.StopReason == null) + { + MakeTransition(upcomingState); + return; + } + var replies = upcomingState.Replies; + replies.Reverse(); + foreach (var reply in replies) + { + Sender.Tell(reply); + } + Terminate(upcomingState); + Context.Stop(Self); + } + + private void MakeTransition(State upcomingState) + { + if (!_stateFunctions.ContainsKey(upcomingState.StateName)) + { + Terminate( + Stay() + .WithStopReason( + new FSMBase.Failure(String.Format("Next state {0} does not exist", upcomingState.StateName)))); + } + else + { + var replies = upcomingState.Replies; + replies.Reverse(); + foreach (var r in replies) { Sender.Tell(r); } + if (!_currentState.StateName.Equals(upcomingState.StateName) || upcomingState.Notifies) + { + _nextState = upcomingState; + HandleTransition(_currentState.StateName, _nextState.StateName); + Listeners.Gossip(new FSMBase.Transition(Self, _currentState.StateName, _nextState.StateName)); + _nextState = null; + } + _currentState = upcomingState; + var timeout = _currentState.Timeout ?? _stateTimeouts[_currentState.StateName]; + if (timeout.HasValue) + { + var t = timeout.Value; + if (t < TimeSpan.MaxValue) + { + _timeoutFuture = Context.System.Scheduler.ScheduleTellOnceCancelable(t, Context.Self, new PersistentFSMBase.TimeoutMarker(_generation), Context.Self); + } + } + } + } + + private void Terminate(State upcomingState) + { + if (_currentState.StopReason == null) + { + var reason = upcomingState.StopReason; + LogTermination(reason); + foreach (var t in _timers) { t.Value.Cancel(); } + _timers.Clear(); + _currentState = upcomingState; + + var stopEvent = new FSMBase.StopEvent(reason, _currentState.StateName, _currentState.StateData); + _terminateEvent(stopEvent); + } + } + + /// + /// Call the hook if you want to retain this behavior. + /// When overriding make sure to call base.PostStop(); + /// + /// Please note that this method is called by default from so + /// override that one if shall not be called during restart. + /// + protected override void PostStop() + { + /* + * Setting this instance's state to Terminated does no harm during restart, since + * the new instance will initialize fresh using StartWith. + */ + Terminate(Stay().WithStopReason(new FSMBase.Shutdown())); + base.PostStop(); + } + + #endregion + + /// + /// By default, is logged at error level and other + /// reason types are not logged. It is possible to override this behavior. + /// + /// + protected virtual void LogTermination(FSMBase.Reason reason) + { + PatternMatch.Match(reason) + .With(f => + { + if (f.Cause is Exception) + { + _log.Error(f.Cause.AsInstanceOf(), "terminating due to Failure"); + } + else + { + _log.Error(f.Cause.ToString()); + } + }); + } + + + public bool IsStateTimerActive { get; } + protected override bool ReceiveRecover(object message) + { + var match = PatternMatch.Match(message) + .With(t => + { + Initialize(); + }).With(e => + { + StartWith(StateName, ApplyEvent(e, StateData)); + }).With(sce => + { + StartWith(StateName, StateData, sce.TimeOut); + }); + + return match.WasHandled; + } + + protected override bool ReceiveCommand(object message) + { + var match = PatternMatch.Match(message) + .With(marker => + { + if (_generation == marker.Generation) + { + ProcessMsg(new StateTimeout(), "state timeout"); + } + }) + .With(t => + { + if (_timers.ContainsKey(t.Name) && _timers[t.Name].Generation == t.Generation) + { + if (_timeoutFuture != null) + { + _timeoutFuture.Cancel(false); + _timeoutFuture = null; + } + _generation++; + if (!t.Repeat) + { + _timers.Remove(t.Name); + } + ProcessMsg(t.Message, t); + } + }) + .With(cb => + { + Context.Watch(cb.ActorRef); + Listeners.Add(cb.ActorRef); + //send the current state back as a reference point + cb.ActorRef.Tell(new FSMBase.CurrentState(Self, _currentState.StateName)); + }) + .With(l => + { + Context.Watch(l.Listener); + Listeners.Add(l.Listener); + l.Listener.Tell(new FSMBase.CurrentState(Self, _currentState.StateName)); + }) + .With(ucb => + { + Context.Unwatch(ucb.ActorRef); + Listeners.Remove(ucb.ActorRef); + }) + .With(d => + { + Context.Unwatch(d.Listener); + Listeners.Remove(d.Listener); + }) + .With(_ => { DebugEvent = true; }) + .Default(msg => + { + if (_timeoutFuture != null) + { + _timeoutFuture.Cancel(false); + _timeoutFuture = null; + } + _generation++; + ProcessMsg(msg, Sender); + }); + return match.WasHandled; + } + } +} + diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs new file mode 100644 index 00000000000..fa0834ae169 --- /dev/null +++ b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs @@ -0,0 +1,215 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using Akka.Actor; +using Akka.Dispatch.SysMsg; +using Akka.Event; +using Akka.Util; + +namespace Akka.Persistence.Fsm +{ + public abstract class PersistentFSMBase : PersistentActor + { + #region States + + + + /// + /// Used in the event of a timeout between transitions + /// + public class StateTimeout + { + } + + /* + * INTERNAL API - used for ensuring that state changes occur on-time + */ + + internal class TimeoutMarker + { + public TimeoutMarker(long generation) + { + Generation = generation; + } + + public long Generation { get; private set; } + } + + [DebuggerDisplay("Timer {Name,nq}, message: {Message")] + internal class Timer : INoSerializationVerificationNeeded + { + private readonly ILoggingAdapter _debugLog; + + public Timer(string name, object message, bool repeat, int generation, IActorContext context, + ILoggingAdapter debugLog) + { + _debugLog = debugLog; + Context = context; + Generation = generation; + Repeat = repeat; + Message = message; + Name = name; + var scheduler = context.System.Scheduler; + _scheduler = scheduler; + _ref = new Cancelable(scheduler); + } + + private readonly IScheduler _scheduler; + private readonly ICancelable _ref; + + public string Name { get; private set; } + + public object Message { get; private set; } + + public bool Repeat { get; private set; } + + public int Generation { get; private set; } + + public IActorContext Context { get; private set; } + + public void Schedule(IActorRef actor, TimeSpan timeout) + { + var name = Name; + var message = Message; + + Action send; + if (_debugLog != null) + send = () => + { + _debugLog.Debug("{0}Timer '{1}' went off. Sending {2} -> {3}", + _ref.IsCancellationRequested ? "Cancelled " : "", name, message, actor); + actor.Tell(this, Context.Self); + }; + else + send = () => actor.Tell(this, Context.Self); + + if (Repeat) _scheduler.Advanced.ScheduleRepeatedly(timeout, timeout, send, _ref); + else _scheduler.Advanced.ScheduleOnce(timeout, send, _ref); + } + + public void Cancel() + { + if (!_ref.IsCancellationRequested) + { + _ref.Cancel(false); + } + } + } + + + + /// + /// This captures all of the managed state of the : the state name, + /// the state data, possibly custom timeout, stop reason, and replies accumulated while + /// processing the last message. + /// + /// The name of the state + /// The data of the state + /// The event of the state + public class State : FSMBase.State + { + public Action AfterTransitionHandler { get; private set; } + + + public State(TS stateName, TD stateData, TimeSpan? timeout = null, FSMBase.Reason stopReason = null, + List replies = null, ILinearSeq domainEvents = null, Action afterTransitionDo = null) + : base(stateName, stateData, timeout, stopReason, replies) + { + AfterTransitionHandler = afterTransitionDo; + DomainEvents = domainEvents; + Notifies = true; + } + + public ILinearSeq DomainEvents { get; } + + public bool Notifies { get; set; } + + /// + /// Specify domain events to be applied when transitioning to the new state. + /// + /// + /// + public State Applying(ILinearSeq events) + { + if (DomainEvents == null) + { + return Copy(null, null, null, events); + } + return Copy(null, null, null, new ArrayLinearSeq(DomainEvents.Concat(events).ToArray())); + } + + + /// + /// Specify domain event to be applied when transitioning to the new state. + /// + /// + /// + public State Applying(TE e) + { + if (DomainEvents == null) + { + return Copy(null, null, null, new ArrayLinearSeq(new TE[] {e})); + } + var events = new List(); + events.AddRange(DomainEvents); + events.Add(e); + return Copy(null, null, null, new ArrayLinearSeq(DomainEvents.Concat(events).ToArray())); + } + + + /// + /// Register a handler to be triggered after the state has been persisted successfully + /// + /// + /// + public State AndThen(Action handler) + { + return Copy(null, null, null, null, handler); + } + + public State Copy(TimeSpan? timeout, FSMBase.Reason stopReason = null, + List replies = null, ILinearSeq domainEvents = null, Action afterTransitionDo = null) + { + return new State(StateName, StateData, timeout ?? Timeout, stopReason ?? StopReason, replies ?? Replies, + domainEvents ?? DomainEvents, afterTransitionDo ?? AfterTransitionHandler); + } + + /// + /// Modify state transition descriptor with new state data. The data will be set + /// when transitioning to the new state. + /// + public new State Using(TD nextStateData) + { + return new State(StateName, nextStateData, Timeout, StopReason, Replies); + } + + + public new State Replying(object replyValue) + { + + if (Replies == null) Replies = new List(); + var newReplies = Replies.ToArray().ToList(); + newReplies.Add(replyValue); + return Copy(Timeout, replies: newReplies); + } + + public new State ForMax(TimeSpan timeout) + { + if (timeout <= TimeSpan.MaxValue) return Copy(timeout); + return Copy(null); + } + + /// + /// INTERNAL API + /// + internal State WithStopReason(FSMBase.Reason reason) + { + return Copy(null, reason); + } + + + #endregion + } + } +} \ No newline at end of file diff --git a/src/core/Akka/Actor/FSM.cs b/src/core/Akka/Actor/FSM.cs index 6bd7dbe37fe..8c7542fe3b3 100644 --- a/src/core/Akka/Actor/FSM.cs +++ b/src/core/Akka/Actor/FSM.cs @@ -250,7 +250,7 @@ public State(TS stateName, TD stateData, TimeSpan? timeout = null, Reason stopRe public Reason StopReason { get; private set; } - public List Replies { get; private set; } + public List Replies { get; protected set; } public State Copy(TimeSpan? timeout, Reason stopReason = null, List replies = null) { From 6f6658c07e6742c0a9e2f82f100281122292d8f5 Mon Sep 17 00:00:00 2001 From: "maxim.salamatko" Date: Sat, 31 Oct 2015 10:58:52 +0400 Subject: [PATCH 2/8] solved problem with events persist --- .../Akka.Persistence/Fsm/PersistentFSM.cs | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs index 36b2a5f513a..c0db9b3836a 100644 --- a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs +++ b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs @@ -391,13 +391,13 @@ private static StateFunction OrElse(StateFunction original, StateFunction fallba public class StateChangeEvent : IMessage { - public StateChangeEvent(string state, TimeSpan? timeOut) + public StateChangeEvent(TState state, TimeSpan? timeOut) { State = state; TimeOut = timeOut; } - public string State { get; private set; } + public TState State { get; private set; } public TimeSpan? TimeOut { get; private set; } } @@ -450,9 +450,9 @@ private string GetSourceString(object source) private void ApplyState(State upcomingState) { var eventsToPersist = new List(); - if (upcomingState.Notifies || upcomingState.Timeout.HasValue) + if ( upcomingState.Timeout.HasValue) { - eventsToPersist.Add(new StateChangeEvent(upcomingState.StateName.ToString(), upcomingState.Timeout)); + eventsToPersist.Add(new StateChangeEvent(upcomingState.StateName, upcomingState.Timeout)); } if (upcomingState.DomainEvents == null || upcomingState.DomainEvents.IsEmpty) { @@ -460,30 +460,36 @@ private void ApplyState(State upcomingState) } else { - eventsToPersist.Add(upcomingState.DomainEvents); + foreach (var domainEvent in upcomingState.DomainEvents) + { + eventsToPersist.Add(domainEvent); + } + - var nextData = upcomingState.StateData; + var nextData = StateData;// upcomingState.StateData; var handlersExecutedCounter = 0; - Persist(eventsToPersist, @event => - { - handlersExecutedCounter++; - if (handlersExecutedCounter == upcomingState.DomainEvents.Count()) + + Persist(eventsToPersist, @event => { - BaseApplyState(upcomingState.Using(nextData)); + handlersExecutedCounter++; if (@event is TEvent) { nextData = ApplyEvent((TEvent)@event, nextData); } - - if (upcomingState.AfterTransitionHandler != null) + if (handlersExecutedCounter == eventsToPersist.Count) { - upcomingState.AfterTransitionHandler(upcomingState.StateData); - } + BaseApplyState(upcomingState.Using(nextData)); + if (upcomingState.AfterTransitionHandler != null) + { + upcomingState.AfterTransitionHandler(upcomingState.StateData); + } + + + } + }); - } - }); } } @@ -520,7 +526,7 @@ private void MakeTransition(State upcomingState) var replies = upcomingState.Replies; replies.Reverse(); foreach (var r in replies) { Sender.Tell(r); } - if (!_currentState.StateName.Equals(upcomingState.StateName) || upcomingState.Notifies) + if (!_currentState.StateName.Equals(upcomingState.StateName)) { _nextState = upcomingState; HandleTransition(_currentState.StateName, _nextState.StateName); @@ -608,7 +614,7 @@ protected override bool ReceiveRecover(object message) StartWith(StateName, ApplyEvent(e, StateData)); }).With(sce => { - StartWith(StateName, StateData, sce.TimeOut); + StartWith(sce.State, StateData, sce.TimeOut); }); return match.WasHandled; From faaf5ba4827978e3e530393d82cca70f9ca1644d Mon Sep 17 00:00:00 2001 From: "maxim.salamatko" Date: Sat, 31 Oct 2015 14:30:12 +0400 Subject: [PATCH 3/8] added tests --- .../Fsm/PersistentFSMSpec.cs | 314 ++++++++++++++++-- .../Akka.Persistence/Fsm/PersistentFSM.cs | 17 +- 2 files changed, 291 insertions(+), 40 deletions(-) diff --git a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs index 34d92827634..d41f82a591c 100644 --- a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs +++ b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs @@ -107,13 +107,6 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_data() fsmRef.Tell(new GetCurrentCart()); - //fsmRef.Tell(new AddItem(coat)); - //fsmRef.Tell(new GetCurrentCart()); - //fsmRef.Tell(new Buy()); - //fsmRef.Tell(new GetCurrentCart()); - //fsmRef.Tell(new Leave()); - - ExpectMsg>(); ExpectMsg(); ExpectMsg>(state => @@ -150,6 +143,213 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_data() ExpectTerminated(recoveredFsmRef); } + [Fact] + public void PersistentFSM_should_execute_the_defined_actions_following_successful_persistence_of_state_change() + { + var reportActorProbe = CreateTestProbe(Sys); + + var fsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, reportActorProbe.Ref))); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + var shoes = new Item("2", "Shoes", 89.99F); + var coat = new Item("3", "Coat", 119.99F); + + fsmRef.Tell(new AddItem(shirt)); + fsmRef.Tell(new AddItem(shoes)); + fsmRef.Tell(new AddItem(coat)); + fsmRef.Tell(new Buy()); + fsmRef.Tell(new Leave()); + + ExpectMsg>(state => + { + return state.State == UserState.LookingAround; + }); + ExpectMsg>(state => + { + return state.From == UserState.LookingAround && state.To == UserState.Shopping; + }); + ExpectMsg>(state => + { + return state.From == UserState.Shopping && state.To == UserState.Paid; + }); + reportActorProbe.ExpectMsg(); + ExpectTerminated(fsmRef); + } + + [Fact] + public void PersistentFSM_should_execute_the_defined_actions_following_successful_persistence_of_FSM_stop() + { + var reportActorProbe = CreateTestProbe(Sys); + + var fsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, reportActorProbe.Ref))); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + var shoes = new Item("2", "Shoes", 89.99F); + var coat = new Item("3", "Coat", 119.99F); + + fsmRef.Tell(new AddItem(shirt)); + fsmRef.Tell(new AddItem(shoes)); + fsmRef.Tell(new AddItem(coat)); + fsmRef.Tell(new Leave()); + + ExpectMsg>(state => + { + return state.State == UserState.LookingAround; + }); + ExpectMsg>(state => + { + return state.From == UserState.LookingAround && state.To == UserState.Shopping; + }); + reportActorProbe.ExpectMsg(); + ExpectTerminated(fsmRef); + } + + [Fact] + public void PersistentFSM_should_recover_successfully_with_correct_state_timeout() + { + var dummyReportActorRef = CreateTestProbe().Ref; + + var fsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + + fsmRef.Tell(new AddItem(shirt)); + + ExpectMsg>(state => + { + return state.State == UserState.LookingAround; + }); + ExpectMsg>(state => + { + return state.From == UserState.LookingAround && state.To == UserState.Shopping; + + }); + + ExpectNoMsg(TimeSpan.FromSeconds(0.6)); + fsmRef.Tell(PoisonPill.Instance); + ExpectTerminated(fsmRef); + + var recoveredFsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); + Watch(recoveredFsmRef); + recoveredFsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + ExpectMsg>(state => + { + return state.State == UserState.Shopping; + }); + + + Within(TimeSpan.FromSeconds(0.9), TimeSpan.FromSeconds(1.9), () => + { + ExpectMsg>(state => + { + return state.From == UserState.Shopping && state.To == UserState.Inactive; + }); + return true; + }); + ExpectNoMsg(TimeSpan.FromSeconds(0.6)); + recoveredFsmRef.Tell(PoisonPill.Instance); + ExpectTerminated(recoveredFsmRef); + + recoveredFsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); + Watch(recoveredFsmRef); + recoveredFsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + ExpectMsg>(state => + { + return state.State == UserState.Inactive; + }); + ExpectTerminated(recoveredFsmRef); + } + + [Fact] + public void PersistentFSM_should_not_trigger_onTransition_for_stay() + { + var reportActorProbe = CreateTestProbe(Sys); + + var fsmRef = Sys.ActorOf(Props.Create(() => new SimpleTransitionFSM(Name, reportActorProbe.Ref))); + + reportActorProbe.ExpectNoMsg(TimeSpan.FromSeconds(3)); + + fsmRef.Tell("goto(the same state)"); + + reportActorProbe.ExpectNoMsg(TimeSpan.FromSeconds(3)); + + fsmRef.Tell("stay"); + + reportActorProbe.ExpectNoMsg(TimeSpan.FromSeconds(3)); + } + + + [Fact] + public void PersistentFSM_should_not_persist_state_change_event_when_staying_in_the_same_state() + { + var dummyReportActorRef = CreateTestProbe().Ref; + + var fsmRef = Sys.ActorOf(Props.Create(Name, dummyReportActorRef), Name); + + Watch(fsmRef); + fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); + + var shirt = new Item("1", "Shirt", 59.99F); + var shoes = new Item("2", "Shoes", 89.99F); + var coat = new Item("3", "Coat", 119.99F); + + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shirt)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(shoes)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new AddItem(coat)); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new Buy()); + fsmRef.Tell(new GetCurrentCart()); + fsmRef.Tell(new Leave()); + ExpectMsg>(state => state.State == UserState.LookingAround); + ExpectMsg(); + ExpectMsg>(state => state.From == UserState.LookingAround); + ExpectMsg(); + ExpectMsg(); + ExpectMsg(); + ExpectMsg>(); + ExpectMsg(); + ExpectTerminated(fsmRef); + + var persistentEventsStreamer = Sys.ActorOf(Props.Create(Name, TestActor), Name); + + ExpectMsg(); + ExpectMsg.StateChangeEvent>(); + + + ExpectMsg(); + ExpectMsg.StateChangeEvent>(); + + + ExpectMsg(); + ExpectMsg.StateChangeEvent>(); + + + ExpectMsg(); + ExpectMsg.StateChangeEvent>(); + + Watch(persistentEventsStreamer); + + persistentEventsStreamer.Tell(PoisonPill.Instance); + + ExpectTerminated(persistentEventsStreamer); + + } + + + internal class WebStoreCustomerFSM : PersistentFSM { private readonly string _persistenceId; @@ -255,30 +455,6 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) public override string PersistenceId { get { return _persistenceId; }} - - //protected override IShoppingCart ApplyEvent(IDomainEvent state, IShoppingCart data) - //{ - // if (state is ItemAdded) - // { - // return data.AddItem(((ItemAdded)state).Item); - // } - // if (state is OrderExecuted) - // { - // return data; - // } - // if (state is OrderDiscarded) - // { - // return data.Empty(); - // } - // return data; - //} - - - //protected override void OnRecoveryCompleted() - //{ - // // - //} - protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) { if (e is ItemAdded) @@ -301,8 +477,82 @@ protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) } + internal class SimpleTransitionFSM : PersistentFSM + { + private readonly string _persistenceId; + private readonly IActorRef _reportActor; + + public SimpleTransitionFSM(string persistenceId, IActorRef reportActor) + { + _persistenceId = persistenceId; + _reportActor = reportActor; + StartWith(UserState.LookingAround, new EmptyShoppingCart()); + + When(UserState.LookingAround, (@event, state) => + { + if ((string) @event.FsmEvent == "stay") + { + return Stay(); + } + return GoTo(UserState.LookingAround); + }); + OnTransition((state, nextState) => + { + _reportActor.Tell(string.Format("{0} -> {1}", state, nextState)); + }); + + + } + + public override string PersistenceId + { + get { return _persistenceId; } + } + + + protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) + { + + return data; + } + + + } + + internal class PersistentEventsStreamer : PersistentActor + { + private readonly string _persistenceId; + private readonly IActorRef _client; + + public PersistentEventsStreamer(string persistenceId, IActorRef client) + { + _persistenceId = persistenceId; + _client = client; + } + + public override string PersistenceId + { + get { return _persistenceId; } + } + + protected override bool ReceiveRecover(object message) + { + if (!(message is RecoveryCompleted)) + { + _client.Tell(message); + } + + return true; + } + + protected override bool ReceiveCommand(object message) + { + return true; + } + } + #region Custome States - internal enum UserState +internal enum UserState { Shopping, Inactive, diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs index c0db9b3836a..6cba5106d6e 100644 --- a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs +++ b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs @@ -450,22 +450,23 @@ private string GetSourceString(object source) private void ApplyState(State upcomingState) { var eventsToPersist = new List(); - if ( upcomingState.Timeout.HasValue) + if (upcomingState.DomainEvents != null) + { + foreach (var domainEvent in upcomingState.DomainEvents) + { + eventsToPersist.Add(domainEvent); + } + } + if (!StateName.Equals(upcomingState.StateName) || upcomingState.Timeout.HasValue) { eventsToPersist.Add(new StateChangeEvent(upcomingState.StateName, upcomingState.Timeout)); } - if (upcomingState.DomainEvents == null || upcomingState.DomainEvents.IsEmpty) + if (eventsToPersist.Count == 0) { BaseApplyState(upcomingState); } else { - foreach (var domainEvent in upcomingState.DomainEvents) - { - eventsToPersist.Add(domainEvent); - } - - var nextData = StateData;// upcomingState.StateData; var handlersExecutedCounter = 0; From c634bd43078630a442964ea7d8c5ac5a890db2b6 Mon Sep 17 00:00:00 2001 From: "maxim.salamatko" Date: Sat, 31 Oct 2015 16:15:47 +0400 Subject: [PATCH 4/8] extracted methods to base class --- .../Akka.Persistence/Fsm/PersistentFSM.cs | 664 +----------------- .../Akka.Persistence/Fsm/PersistentFSMBase.cs | 645 ++++++++++++++++- 2 files changed, 649 insertions(+), 660 deletions(-) diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs index 6cba5106d6e..f526ff0e96e 100644 --- a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs +++ b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs @@ -5,451 +5,50 @@ // //----------------------------------------------------------------------- -using System; using System.Collections.Generic; -using System.Linq; -using System.Threading; using Akka.Actor; -using Akka.Actor.Internal; -using Akka.Event; -using Akka.Persistence.Serialization; -using Akka.Routing; -using Akka.Util; -using Akka.Util.Internal; -using Akka.Util.Internal.Collections; namespace Akka.Persistence.Fsm { /// - /// Finite state machine (FSM) persistent actor. + /// Finite state machine (FSM) persistent actor. /// /// The state name type /// The state data type /// The event data type - public abstract class PersistentFSM : PersistentFSMBase, IListeners + public abstract class PersistentFSM : PersistentFSMBase { - private readonly ILoggingAdapter _log = Context.GetLogger(); - protected PersistentFSM() - { - if (this is ILoggingFSM) - DebugEvent = Context.System.Settings.FsmDebugEvent; - } - - public delegate State StateFunction(FSMBase.Event fsmEvent, State state = null); - - public delegate void TransitionHandler(TState initialState, TState nextState); - - #region Finite State Machine Domain Specific Language (FSM DSL if you like acronyms) - - /// - /// Insert a new at the end of the processing chain for the - /// given state. If the stateTimeout parameter is set, entering this state without a - /// differing explicit timeout setting will trigger a . - /// - /// designator for the state - /// delegate describing this state's response to input - /// default timeout for this state - public void When(TState stateName, StateFunction func, TimeSpan? timeout = null) - { - Register(stateName, func, timeout); - } - - /// - /// Sets the initial state for this FSM. Call this method from the constructor before the method. - /// If different state is needed after a restart this method, followed by , can be used in the actor - /// life cycle hooks and . - /// - /// Initial state designator. - /// Initial state data. - /// State timeout for the initial state, overriding the default timeout for that state. - public void StartWith(TState stateName, TData stateData, TimeSpan? timeout = null) - { - _currentState = new State(stateName, stateData, timeout); - } - - /// - /// Produce transition to other state. Return this from a state function - /// in order to effect the transition. - /// - /// State designator for the next state - /// State transition descriptor - public State GoTo(TState nextStateName) - { - return new State(nextStateName, _currentState.StateData); - } - - /// - /// Produce transition to other state. Return this from a state function - /// in order to effect the transition. - /// - /// State designator for the next state - /// Data for next state - /// State transition descriptor - public State GoTo(TState nextStateName, TData stateData) - { - return new State(nextStateName, stateData); - } - - /// - /// Produce "empty" transition descriptor. Return this from a state function - /// when no state change is to be effected. - /// - /// Descriptor for staying in the current state. - public State Stay() - { - return GoTo(_currentState.StateName); - } - - /// - /// Produce change descriptor to stop this FSM actor with - /// - public State Stop() - { - return Stop(new FSMBase.Normal()); - } - - /// - /// Produce change descriptor to stop this FSM actor with the specified . - /// - public State Stop(FSMBase.Reason reason) - { - return Stop(reason, _currentState.StateData); - } - - public State Stop(FSMBase.Reason reason, TData stateData) - { - return Stay().Using(stateData).WithStopReason(reason); - } - - public sealed class TransformHelper - { - public TransformHelper(StateFunction func) - { - Func = func; - } - - public StateFunction Func { get; private set; } - - public StateFunction Using(Func, State> andThen) - { - StateFunction continuedDelegate = (@event, state) => andThen.Invoke(Func.Invoke(@event, state)); - return continuedDelegate; - } - } - - /// - /// Schedule named timer to deliver message after given delay, possibly repeating. - /// Any existing timer with the same name will automatically be canceled before adding - /// the new timer. - /// - /// identifier to be used with . - /// message to be delivered - /// delay of first message delivery and between subsequent messages. - /// send once if false, scheduleAtFixedRate if true - public void SetTimer(string name, object msg, TimeSpan timeout, bool repeat = false) - { - if (DebugEvent) - _log.Debug("setting " + (repeat ? "repeating" : "") + "timer '{0}' / {1}: {2}", name, timeout, msg); - if (_timers.ContainsKey(name)) - _timers[name].Cancel(); - var timer = new Timer(name, msg, repeat, _timerGen.Next(), Context, DebugEvent ? _log : null); - timer.Schedule(Self, timeout); - - if (!_timers.ContainsKey(name)) - _timers.Add(name, timer); - else - _timers[name] = timer; - } - - /// - /// Cancel a named , ensuring that the message is not subsequently delivered (no race.) - /// - /// The name of the timer to cancel. - public void CancelTimer(string name) - { - if (DebugEvent) - { - _log.Debug("Cancelling timer {0}", name); - } - - if (_timers.ContainsKey(name)) - { - _timers[name].Cancel(); - _timers.Remove(name); - } - } - - /// - /// Determines whether the named timer is still active. Returns true - /// unless the timer does not exist, has previously been cancelled, or - /// if it was a single-shot timer whose message was already received. - /// - public bool IsTimerActive(string name) - { - return _timers.ContainsKey(name); - } - - /// - /// Set the state timeout explicitly. This method can be safely used from - /// within a state handler. - /// - public void SetStateTimeout(TState state, TimeSpan? timeout) - { - if (!_stateTimeouts.ContainsKey(state)) - _stateTimeouts.Add(state, timeout); - else - _stateTimeouts[state] = timeout; - } - - - - /// - /// Set handler which is called upon each state transition, i.e. not when - /// staying in the same state. - /// - public void OnTransition(TransitionHandler transitionHandler) - { - _transitionEvent.Add(transitionHandler); - } - - /// - /// Set the handler which is called upon termination of this FSM actor. Calling this - /// method again will overwrite the previous contents. - /// - public void OnTermination(Action> terminationHandler) - { - _terminateEvent = terminationHandler; - } - - /// - /// Set handler which is called upon reception of unhandled FSM messages. Calling - /// this method again will overwrite the previous contents. - /// - /// - public void WhenUnhandled(StateFunction stateFunction) - { - HandleEvent = OrElse(stateFunction, HandleEventDefault); - } - - /// - /// Verify the existence of initial state and setup timers. This should be the - /// last call within the constructor or and - /// . - /// - public void Initialize() - { - MakeTransition(_currentState); - } - - /// - /// Current state name - /// - public TState StateName - { - get { return _currentState.StateName; } - } - - /// - /// Current state data - /// - public TData StateData - { - get { return _currentState.StateData; } - } - - /// - /// Return next state data (available in handlers) - /// - public TData NextStateData - { - get - { - if (_nextState == null) throw new InvalidOperationException("NextStateData is only available during OnTransition"); - return _nextState.StateData; - } - } - - public TransformHelper Transform(StateFunction func) { return new TransformHelper(func); } - - #endregion - - #region Internal implementation details - - private readonly ListenerSupport _listener = new ListenerSupport(); - public ListenerSupport Listeners { get { return _listener; } } - - /// - /// Can be set to enable debugging on certain actions taken by the FSM - /// - protected bool DebugEvent; - - /// - /// FSM state data and current timeout handling - /// - private State _currentState; - - private ICancelable _timeoutFuture; - private State _nextState; - private long _generation = 0L; - - /// - /// Timer handling - /// - private readonly IDictionary _timers = new Dictionary(); - private readonly AtomicCounter _timerGen = new AtomicCounter(0); - - /// - /// State definitions - /// - private readonly Dictionary _stateFunctions = new Dictionary(); - private readonly Dictionary _stateTimeouts = new Dictionary(); - - - private void Register(TState name, StateFunction function, TimeSpan? timeout) - { - if (_stateFunctions.ContainsKey(name)) - { - _stateFunctions[name] = OrElse(_stateFunctions[name], function); - _stateTimeouts[name] = _stateTimeouts[name] ?? timeout; - } - else - { - _stateFunctions.Add(name, function); - _stateTimeouts.Add(name, timeout); - } - } - - /// - /// Unhandled event handler - /// - private StateFunction HandleEventDefault - { - get - { - return delegate (FSMBase.Event @event, State state) - { - _log.Warning("unhandled event {0} in state {1}", @event.FsmEvent, StateName); - return Stay(); - }; - } - } - - private StateFunction _handleEvent; - - private StateFunction HandleEvent - { - get { return _handleEvent ?? (_handleEvent = HandleEventDefault); } - set { _handleEvent = value; } - } - - - /// - /// Termination handling - /// - private Action> _terminateEvent = @event => { }; - - /// - /// Transition handling - /// - private readonly IList _transitionEvent = new List(); - - private void HandleTransition(TState previous, TState next) - { - foreach (var tran in _transitionEvent) - { - tran.Invoke(previous, next); - } - } - - - - /// - /// C# port of Scala's orElse method for partial function chaining. - /// - /// See http://scalachina.com/api/scala/PartialFunction.html + /// Call the hook if you want to retain this behavior. + /// When overriding make sure to call base.PostStop(); + /// Please note that this method is called by default from so + /// override that one if shall not be called during restart. /// - /// The original to be called - /// The to be called if returns null - /// A which combines both the results of and - private static StateFunction OrElse(StateFunction original, StateFunction fallback) - { - StateFunction chained = delegate (FSMBase.Event @event, State state) - { - var originalResult = original.Invoke(@event, state); - if (originalResult == null) return fallback.Invoke(@event, state); - return originalResult; - }; - - return chained; - } - - #endregion - - #region Actor methods - - - public class StateChangeEvent : IMessage - { - public StateChangeEvent(TState state, TimeSpan? timeOut) - { - State = state; - TimeOut = timeOut; - } - - public TState State { get; private set; } - - public TimeSpan? TimeOut { get; private set; } - } - - private void ProcessMsg(object any, object source) + protected override void PostStop() { - var fsmEvent = new FSMBase.Event(any, _currentState.StateData); - ProcessEvent(fsmEvent, source); + /* + * Setting this instance's state to Terminated does no harm during restart, since + * the new instance will initialize fresh using StartWith. + */ + Terminate(Stay().WithStopReason(new FSMBase.Shutdown())); + base.PostStop(); } - private void ProcessEvent(FSMBase.Event fsmEvent, object source) + protected override bool ReceiveRecover(object message) { - if (DebugEvent) - { - var srcStr = GetSourceString(source); - _log.Debug("processing {0} from {1}", fsmEvent, srcStr); - } - var stateFunc = _stateFunctions[_currentState.StateName]; - var oldState = _currentState; - State upcomingState = null; + var match = message.Match() + .With(t => { Initialize(); }) + .With(e => { StartWith(StateName, ApplyEvent(e, StateData)); }) + .With(sce => { StartWith(sce.State, StateData, sce.TimeOut); }); - if (stateFunc != null) - { - upcomingState = stateFunc(fsmEvent); - } - - if (upcomingState == null) - { - upcomingState = HandleEvent(fsmEvent); - } - - ApplyState(upcomingState); - if (DebugEvent && !Equals(oldState, upcomingState)) - { - _log.Debug("transition {0} -> {1}", oldState, upcomingState); - } + return match.WasHandled; } - private string GetSourceString(object source) - { - var s = source as string; - if (s != null) return s; - var timer = source as Timer; - if (timer != null) return "timer '" + timer.Name + "'"; - var actorRef = source as IActorRef; - if (actorRef != null) return actorRef.ToString(); - return "unknown"; - } + protected abstract TData ApplyEvent(TEvent e, TData data); - private void ApplyState(State upcomingState) + protected override void ApplyState(State upcomingState) { - var eventsToPersist = new List(); + var eventsToPersist = new List(); if (upcomingState.DomainEvents != null) { foreach (var domainEvent in upcomingState.DomainEvents) @@ -463,227 +62,32 @@ private void ApplyState(State upcomingState) } if (eventsToPersist.Count == 0) { - BaseApplyState(upcomingState); + base.ApplyState(upcomingState); } else { - var nextData = StateData;// upcomingState.StateData; + var nextData = StateData; // upcomingState.StateData; var handlersExecutedCounter = 0; - - Persist(eventsToPersist, @event => - { - handlersExecutedCounter++; - if (@event is TEvent) - { - nextData = ApplyEvent((TEvent)@event, nextData); - } - if (handlersExecutedCounter == eventsToPersist.Count) - { - BaseApplyState(upcomingState.Using(nextData)); - - if (upcomingState.AfterTransitionHandler != null) - { - upcomingState.AfterTransitionHandler(upcomingState.StateData); - } - - } - }); - - } - } - - protected abstract TData ApplyEvent(TEvent e, TData data); - - private void BaseApplyState(State upcomingState) - { - if (upcomingState.StopReason == null) - { - MakeTransition(upcomingState); - return; - } - var replies = upcomingState.Replies; - replies.Reverse(); - foreach (var reply in replies) - { - Sender.Tell(reply); - } - Terminate(upcomingState); - Context.Stop(Self); - } - - private void MakeTransition(State upcomingState) - { - if (!_stateFunctions.ContainsKey(upcomingState.StateName)) - { - Terminate( - Stay() - .WithStopReason( - new FSMBase.Failure(String.Format("Next state {0} does not exist", upcomingState.StateName)))); - } - else - { - var replies = upcomingState.Replies; - replies.Reverse(); - foreach (var r in replies) { Sender.Tell(r); } - if (!_currentState.StateName.Equals(upcomingState.StateName)) - { - _nextState = upcomingState; - HandleTransition(_currentState.StateName, _nextState.StateName); - Listeners.Gossip(new FSMBase.Transition(Self, _currentState.StateName, _nextState.StateName)); - _nextState = null; - } - _currentState = upcomingState; - var timeout = _currentState.Timeout ?? _stateTimeouts[_currentState.StateName]; - if (timeout.HasValue) - { - var t = timeout.Value; - if (t < TimeSpan.MaxValue) - { - _timeoutFuture = Context.System.Scheduler.ScheduleTellOnceCancelable(t, Context.Self, new PersistentFSMBase.TimeoutMarker(_generation), Context.Self); - } - } - } - } - - private void Terminate(State upcomingState) - { - if (_currentState.StopReason == null) - { - var reason = upcomingState.StopReason; - LogTermination(reason); - foreach (var t in _timers) { t.Value.Cancel(); } - _timers.Clear(); - _currentState = upcomingState; - - var stopEvent = new FSMBase.StopEvent(reason, _currentState.StateName, _currentState.StateData); - _terminateEvent(stopEvent); - } - } - - /// - /// Call the hook if you want to retain this behavior. - /// When overriding make sure to call base.PostStop(); - /// - /// Please note that this method is called by default from so - /// override that one if shall not be called during restart. - /// - protected override void PostStop() - { - /* - * Setting this instance's state to Terminated does no harm during restart, since - * the new instance will initialize fresh using StartWith. - */ - Terminate(Stay().WithStopReason(new FSMBase.Shutdown())); - base.PostStop(); - } - - #endregion - - /// - /// By default, is logged at error level and other - /// reason types are not logged. It is possible to override this behavior. - /// - /// - protected virtual void LogTermination(FSMBase.Reason reason) - { - PatternMatch.Match(reason) - .With(f => + Persist(eventsToPersist, @event => { - if (f.Cause is Exception) + handlersExecutedCounter++; + if (@event is TEvent) { - _log.Error(f.Cause.AsInstanceOf(), "terminating due to Failure"); + nextData = ApplyEvent((TEvent) @event, nextData); } - else + if (handlersExecutedCounter == eventsToPersist.Count) { - _log.Error(f.Cause.ToString()); - } - }); - } - - - public bool IsStateTimerActive { get; } - protected override bool ReceiveRecover(object message) - { - var match = PatternMatch.Match(message) - .With(t => - { - Initialize(); - }).With(e => - { - StartWith(StateName, ApplyEvent(e, StateData)); - }).With(sce => - { - StartWith(sce.State, StateData, sce.TimeOut); - }); - - return match.WasHandled; - } + base.ApplyState(upcomingState.Using(nextData)); - protected override bool ReceiveCommand(object message) - { - var match = PatternMatch.Match(message) - .With(marker => - { - if (_generation == marker.Generation) - { - ProcessMsg(new StateTimeout(), "state timeout"); - } - }) - .With(t => - { - if (_timers.ContainsKey(t.Name) && _timers[t.Name].Generation == t.Generation) - { - if (_timeoutFuture != null) - { - _timeoutFuture.Cancel(false); - _timeoutFuture = null; - } - _generation++; - if (!t.Repeat) + if (upcomingState.AfterTransitionHandler != null) { - _timers.Remove(t.Name); + upcomingState.AfterTransitionHandler(upcomingState.StateData); } - ProcessMsg(t.Message, t); - } - }) - .With(cb => - { - Context.Watch(cb.ActorRef); - Listeners.Add(cb.ActorRef); - //send the current state back as a reference point - cb.ActorRef.Tell(new FSMBase.CurrentState(Self, _currentState.StateName)); - }) - .With(l => - { - Context.Watch(l.Listener); - Listeners.Add(l.Listener); - l.Listener.Tell(new FSMBase.CurrentState(Self, _currentState.StateName)); - }) - .With(ucb => - { - Context.Unwatch(ucb.ActorRef); - Listeners.Remove(ucb.ActorRef); - }) - .With(d => - { - Context.Unwatch(d.Listener); - Listeners.Remove(d.Listener); - }) - .With(_ => { DebugEvent = true; }) - .Default(msg => - { - if (_timeoutFuture != null) - { - _timeoutFuture.Cancel(false); - _timeoutFuture = null; } - _generation++; - ProcessMsg(msg, Sender); }); - return match.WasHandled; + } } } -} - +} \ No newline at end of file diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs index fa0834ae169..a35a216100b 100644 --- a/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs +++ b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs @@ -1,22 +1,609 @@ +//----------------------------------------------------------------------- +// +// Copyright (C) 2009-2015 Typesafe Inc. +// Copyright (C) 2013-2015 Akka.NET project +// +//----------------------------------------------------------------------- + using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using Akka.Actor; -using Akka.Dispatch.SysMsg; +using Akka.Actor.Internal; using Akka.Event; +using Akka.Persistence.Serialization; +using Akka.Routing; using Akka.Util; +using Akka.Util.Internal; namespace Akka.Persistence.Fsm { - public abstract class PersistentFSMBase : PersistentActor + public abstract class PersistentFSMBase : PersistentActor, IListeners { - #region States + public delegate State StateFunction( + FSMBase.Event fsmEvent, State state = null); + + public delegate void TransitionHandler(TState initialState, TState nextState); + + protected readonly ListenerSupport _listener = new ListenerSupport(); + private readonly ILoggingAdapter _log = Context.GetLogger(); + + /// + /// State definitions + /// + private readonly Dictionary _stateFunctions = new Dictionary(); + + private readonly Dictionary _stateTimeouts = new Dictionary(); + + private readonly AtomicCounter _timerGen = new AtomicCounter(0); + + /// + /// Timer handling + /// + protected readonly IDictionary _timers = new Dictionary(); + + /// + /// Transition handling + /// + private readonly IList _transitionEvent = new List(); + + /// + /// FSM state data and current timeout handling + /// + protected State _currentState; + + protected long _generation; + private StateFunction _handleEvent; + private State _nextState; + + /// + /// Termination handling + /// + private Action> _terminateEvent = @event => { }; + + protected ICancelable _timeoutFuture; + + /// + /// Can be set to enable debugging on certain actions taken by the FSM + /// + protected bool DebugEvent; + + protected PersistentFSMBase() + { + if (this is ILoggingFSM) + DebugEvent = Context.System.Settings.FsmDebugEvent; + } + + /// + /// Current state name + /// + public TState StateName + { + get { return _currentState.StateName; } + } + + /// + /// Current state data + /// + public TData StateData + { + get { return _currentState.StateData; } + } + + /// + /// Return next state data (available in handlers) + /// + public TData NextStateData + { + get + { + if (_nextState == null) + throw new InvalidOperationException("NextStateData is only available during OnTransition"); + return _nextState.StateData; + } + } + + /// + /// Unhandled event handler + /// + private StateFunction HandleEventDefault + { + get + { + return delegate(FSMBase.Event @event, State state) + { + _log.Warning("unhandled event {0} in state {1}", @event.FsmEvent, StateName); + return Stay(); + }; + } + } + + private StateFunction HandleEvent + { + get { return _handleEvent ?? (_handleEvent = HandleEventDefault); } + set { _handleEvent = value; } + } + + public bool IsStateTimerActive { get; } + + public ListenerSupport Listeners + { + get { return _listener; } + } + + /// + /// Insert a new at the end of the processing chain for the + /// given state. If the stateTimeout parameter is set, entering this state without a + /// differing explicit timeout setting will trigger a . + /// + /// designator for the state + /// delegate describing this state's response to input + /// default timeout for this state + public void When(TState stateName, StateFunction func, TimeSpan? timeout = null) + { + Register(stateName, func, timeout); + } + + /// + /// Sets the initial state for this FSM. Call this method from the constructor before the + /// method. + /// If different state is needed after a restart this method, followed by , can be used in the + /// actor + /// life cycle hooks and . + /// + /// Initial state designator. + /// Initial state data. + /// State timeout for the initial state, overriding the default timeout for that state. + public void StartWith(TState stateName, TData stateData, TimeSpan? timeout = null) + { + _currentState = new State(stateName, stateData, timeout); + } + + /// + /// Produce transition to other state. Return this from a state function + /// in order to effect the transition. + /// + /// State designator for the next state + /// State transition descriptor + public State GoTo(TState nextStateName) + { + return new State(nextStateName, _currentState.StateData); + } + + /// + /// Produce transition to other state. Return this from a state function + /// in order to effect the transition. + /// + /// State designator for the next state + /// Data for next state + /// State transition descriptor + public State GoTo(TState nextStateName, TData stateData) + { + return new State(nextStateName, stateData); + } + + /// + /// Produce "empty" transition descriptor. Return this from a state function + /// when no state change is to be effected. + /// + /// Descriptor for staying in the current state. + public State Stay() + { + return GoTo(_currentState.StateName); + } + + /// + /// Produce change descriptor to stop this FSM actor with + /// + public State Stop() + { + return Stop(new FSMBase.Normal()); + } + + /// + /// Produce change descriptor to stop this FSM actor with the specified . + /// + public State Stop(FSMBase.Reason reason) + { + return Stop(reason, _currentState.StateData); + } + + public State Stop(FSMBase.Reason reason, TData stateData) + { + return Stay().Using(stateData).WithStopReason(reason); + } + + /// + /// Schedule named timer to deliver message after given delay, possibly repeating. + /// Any existing timer with the same name will automatically be canceled before adding + /// the new timer. + /// + /// identifier to be used with . + /// message to be delivered + /// delay of first message delivery and between subsequent messages. + /// send once if false, scheduleAtFixedRate if true + public void SetTimer(string name, object msg, TimeSpan timeout, bool repeat = false) + { + if (DebugEvent) + _log.Debug("setting " + (repeat ? "repeating" : "") + "timer '{0}' / {1}: {2}", name, timeout, msg); + if (_timers.ContainsKey(name)) + _timers[name].Cancel(); + var timer = new Timer(name, msg, repeat, _timerGen.Next(), Context, DebugEvent ? _log : null); + timer.Schedule(Self, timeout); + + if (!_timers.ContainsKey(name)) + _timers.Add(name, timer); + else + _timers[name] = timer; + } + + /// + /// Cancel a named , ensuring that the message is not subsequently delivered (no + /// race.) + /// + /// The name of the timer to cancel. + public void CancelTimer(string name) + { + if (DebugEvent) + { + _log.Debug("Cancelling timer {0}", name); + } + + if (_timers.ContainsKey(name)) + { + _timers[name].Cancel(); + _timers.Remove(name); + } + } + + /// + /// Determines whether the named timer is still active. Returns true + /// unless the timer does not exist, has previously been cancelled, or + /// if it was a single-shot timer whose message was already received. + /// + public bool IsTimerActive(string name) + { + return _timers.ContainsKey(name); + } + + /// + /// Set the state timeout explicitly. This method can be safely used from + /// within a state handler. + /// + public void SetStateTimeout(TState state, TimeSpan? timeout) + { + if (!_stateTimeouts.ContainsKey(state)) + _stateTimeouts.Add(state, timeout); + else + _stateTimeouts[state] = timeout; + } + + /// + /// Set handler which is called upon each state transition, i.e. not when + /// staying in the same state. + /// + public void OnTransition(TransitionHandler transitionHandler) + { + _transitionEvent.Add(transitionHandler); + } + + /// + /// Set the handler which is called upon termination of this FSM actor. Calling this + /// method again will overwrite the previous contents. + /// + public void OnTermination(Action> terminationHandler) + { + _terminateEvent = terminationHandler; + } + /// + /// Set handler which is called upon reception of unhandled FSM messages. Calling + /// this method again will overwrite the previous contents. + /// + /// + public void WhenUnhandled(StateFunction stateFunction) + { + HandleEvent = OrElse(stateFunction, HandleEventDefault); + } + /// + /// Verify the existence of initial state and setup timers. This should be the + /// last call within the constructor or and + /// . + /// + public void Initialize() + { + MakeTransition(_currentState); + } + + public TransformHelper Transform(StateFunction func) + { + return new TransformHelper(func); + } + + private void Register(TState name, StateFunction function, TimeSpan? timeout) + { + if (_stateFunctions.ContainsKey(name)) + { + _stateFunctions[name] = OrElse(_stateFunctions[name], function); + _stateTimeouts[name] = _stateTimeouts[name] ?? timeout; + } + else + { + _stateFunctions.Add(name, function); + _stateTimeouts.Add(name, timeout); + } + } + + private void HandleTransition(TState previous, TState next) + { + foreach (var tran in _transitionEvent) + { + tran.Invoke(previous, next); + } + } /// - /// Used in the event of a timeout between transitions + /// C# port of Scala's orElse method for partial function chaining. + /// See http://scalachina.com/api/scala/PartialFunction.html + /// + /// The original to be called + /// The to be called if returns null + /// + /// A which combines both the results of and + /// + /// + private static StateFunction OrElse(StateFunction original, StateFunction fallback) + { + StateFunction chained = delegate(FSMBase.Event @event, State state) + { + var originalResult = original.Invoke(@event, state); + if (originalResult == null) return fallback.Invoke(@event, state); + return originalResult; + }; + + return chained; + } + + protected void ProcessMsg(object any, object source) + { + var fsmEvent = new FSMBase.Event(any, _currentState.StateData); + ProcessEvent(fsmEvent, source); + } + + private void ProcessEvent(FSMBase.Event fsmEvent, object source) + { + if (DebugEvent) + { + var srcStr = GetSourceString(source); + _log.Debug("processing {0} from {1}", fsmEvent, srcStr); + } + var stateFunc = _stateFunctions[_currentState.StateName]; + var oldState = _currentState; + State upcomingState = null; + + if (stateFunc != null) + { + upcomingState = stateFunc(fsmEvent); + } + + if (upcomingState == null) + { + upcomingState = HandleEvent(fsmEvent); + } + + ApplyState(upcomingState); + if (DebugEvent && !Equals(oldState, upcomingState)) + { + _log.Debug("transition {0} -> {1}", oldState, upcomingState); + } + } + + private string GetSourceString(object source) + { + var s = source as string; + if (s != null) return s; + var timer = source as Timer; + if (timer != null) return "timer '" + timer.Name + "'"; + var actorRef = source as IActorRef; + if (actorRef != null) return actorRef.ToString(); + return "unknown"; + } + + + protected virtual void ApplyState(State upcomingState) + { + if (upcomingState.StopReason == null) + { + MakeTransition(upcomingState); + return; + } + var replies = upcomingState.Replies; + replies.Reverse(); + foreach (var reply in replies) + { + Sender.Tell(reply); + } + Terminate(upcomingState); + Context.Stop(Self); + } + + private void MakeTransition(State upcomingState) + { + if (!_stateFunctions.ContainsKey(upcomingState.StateName)) + { + Terminate( + Stay() + .WithStopReason( + new FSMBase.Failure(string.Format("Next state {0} does not exist", upcomingState.StateName)))); + } + else + { + var replies = upcomingState.Replies; + replies.Reverse(); + foreach (var r in replies) + { + Sender.Tell(r); + } + if (!_currentState.StateName.Equals(upcomingState.StateName)) + { + _nextState = upcomingState; + HandleTransition(_currentState.StateName, _nextState.StateName); + Listeners.Gossip(new FSMBase.Transition(Self, _currentState.StateName, _nextState.StateName)); + _nextState = null; + } + _currentState = upcomingState; + var timeout = _currentState.Timeout ?? _stateTimeouts[_currentState.StateName]; + if (timeout.HasValue) + { + var t = timeout.Value; + if (t < TimeSpan.MaxValue) + { + _timeoutFuture = Context.System.Scheduler.ScheduleTellOnceCancelable(t, Context.Self, + new TimeoutMarker(_generation), Context.Self); + } + } + } + } + + protected override bool ReceiveCommand(object message) + { + var match = message.Match() + .With(marker => + { + if (_generation == marker.Generation) + { + ProcessMsg(new StateTimeout(), "state timeout"); + } + }) + .With(t => + { + if (_timers.ContainsKey(t.Name) && _timers[t.Name].Generation == t.Generation) + { + if (_timeoutFuture != null) + { + _timeoutFuture.Cancel(false); + _timeoutFuture = null; + } + _generation++; + if (!t.Repeat) + { + _timers.Remove(t.Name); + } + ProcessMsg(t.Message, t); + } + }) + .With(cb => + { + Context.Watch(cb.ActorRef); + Listeners.Add(cb.ActorRef); + //send the current state back as a reference point + cb.ActorRef.Tell(new FSMBase.CurrentState(Self, _currentState.StateName)); + }) + .With(l => + { + Context.Watch(l.Listener); + Listeners.Add(l.Listener); + l.Listener.Tell(new FSMBase.CurrentState(Self, _currentState.StateName)); + }) + .With(ucb => + { + Context.Unwatch(ucb.ActorRef); + Listeners.Remove(ucb.ActorRef); + }) + .With(d => + { + Context.Unwatch(d.Listener); + Listeners.Remove(d.Listener); + }) + .With(_ => { DebugEvent = true; }) + .Default(msg => + { + if (_timeoutFuture != null) + { + _timeoutFuture.Cancel(false); + _timeoutFuture = null; + } + _generation++; + ProcessMsg(msg, Sender); + }); + return match.WasHandled; + } + + protected void Terminate(State upcomingState) + { + if (_currentState.StopReason == null) + { + var reason = upcomingState.StopReason; + LogTermination(reason); + foreach (var t in _timers) + { + t.Value.Cancel(); + } + _timers.Clear(); + _currentState = upcomingState; + + var stopEvent = new FSMBase.StopEvent(reason, _currentState.StateName, + _currentState.StateData); + _terminateEvent(stopEvent); + } + } + + /// + /// By default, is logged at error level and other + /// reason types are not logged. It is possible to override this behavior. + /// + /// + protected virtual void LogTermination(FSMBase.Reason reason) + { + reason.Match() + .With(f => + { + if (f.Cause is Exception) + { + _log.Error(f.Cause.AsInstanceOf(), "terminating due to Failure"); + } + else + { + _log.Error(f.Cause.ToString()); + } + }); + } + + public sealed class TransformHelper + { + public TransformHelper(StateFunction func) + { + Func = func; + } + + public StateFunction Func { get; } + + public StateFunction Using(Func, State> andThen) + { + StateFunction continuedDelegate = (@event, state) => andThen.Invoke(Func.Invoke(@event, state)); + return continuedDelegate; + } + } + + public class StateChangeEvent : IMessage + { + public StateChangeEvent(TState state, TimeSpan? timeOut) + { + State = state; + TimeOut = timeOut; + } + + public TState State { get; private set; } + + public TimeSpan? TimeOut { get; private set; } + } + + #region States + + /// + /// Used in the event of a timeout between transitions /// public class StateTimeout { @@ -33,13 +620,16 @@ public TimeoutMarker(long generation) Generation = generation; } - public long Generation { get; private set; } + public long Generation { get; } } [DebuggerDisplay("Timer {Name,nq}, message: {Message")] - internal class Timer : INoSerializationVerificationNeeded + public class Timer : INoSerializationVerificationNeeded { private readonly ILoggingAdapter _debugLog; + private readonly ICancelable _ref; + + private readonly IScheduler _scheduler; public Timer(string name, object message, bool repeat, int generation, IActorContext context, ILoggingAdapter debugLog) @@ -55,18 +645,15 @@ public Timer(string name, object message, bool repeat, int generation, IActorCon _ref = new Cancelable(scheduler); } - private readonly IScheduler _scheduler; - private readonly ICancelable _ref; - - public string Name { get; private set; } + public string Name { get; } - public object Message { get; private set; } + public object Message { get; } - public bool Repeat { get; private set; } + public bool Repeat { get; } - public int Generation { get; private set; } + public int Generation { get; } - public IActorContext Context { get; private set; } + public IActorContext Context { get; } public void Schedule(IActorRef actor, TimeSpan timeout) { @@ -98,18 +685,17 @@ public void Cancel() } - /// - /// This captures all of the managed state of the : the state name, - /// the state data, possibly custom timeout, stop reason, and replies accumulated while - /// processing the last message. + /// This captures all of the managed state of the : the state name, + /// the state data, possibly custom timeout, stop reason, and replies accumulated while + /// processing the last message. /// /// The name of the state /// The data of the state /// The event of the state public class State : FSMBase.State { - public Action AfterTransitionHandler { get; private set; } + public Action AfterTransitionHandler { get; } public State(TS stateName, TD stateData, TimeSpan? timeout = null, FSMBase.Reason stopReason = null, @@ -126,7 +712,7 @@ public State(TS stateName, TD stateData, TimeSpan? timeout = null, FSMBase.Reaso public bool Notifies { get; set; } /// - /// Specify domain events to be applied when transitioning to the new state. + /// Specify domain events to be applied when transitioning to the new state. /// /// /// @@ -141,7 +727,7 @@ public State Applying(ILinearSeq events) /// - /// Specify domain event to be applied when transitioning to the new state. + /// Specify domain event to be applied when transitioning to the new state. /// /// /// @@ -149,7 +735,7 @@ public State Applying(TE e) { if (DomainEvents == null) { - return Copy(null, null, null, new ArrayLinearSeq(new TE[] {e})); + return Copy(null, null, null, new ArrayLinearSeq(new[] {e})); } var events = new List(); events.AddRange(DomainEvents); @@ -159,7 +745,7 @@ public State Applying(TE e) /// - /// Register a handler to be triggered after the state has been persisted successfully + /// Register a handler to be triggered after the state has been persisted successfully /// /// /// @@ -171,13 +757,14 @@ public State AndThen(Action handler) public State Copy(TimeSpan? timeout, FSMBase.Reason stopReason = null, List replies = null, ILinearSeq domainEvents = null, Action afterTransitionDo = null) { - return new State(StateName, StateData, timeout ?? Timeout, stopReason ?? StopReason, replies ?? Replies, + return new State(StateName, StateData, timeout ?? Timeout, stopReason ?? StopReason, + replies ?? Replies, domainEvents ?? DomainEvents, afterTransitionDo ?? AfterTransitionHandler); } /// - /// Modify state transition descriptor with new state data. The data will be set - /// when transitioning to the new state. + /// Modify state transition descriptor with new state data. The data will be set + /// when transitioning to the new state. /// public new State Using(TD nextStateData) { @@ -187,28 +774,26 @@ public State Copy(TimeSpan? timeout, FSMBase.Reason stopReason = nul public new State Replying(object replyValue) { - if (Replies == null) Replies = new List(); var newReplies = Replies.ToArray().ToList(); newReplies.Add(replyValue); return Copy(Timeout, replies: newReplies); } - public new State ForMax(TimeSpan timeout) + public new State ForMax(TimeSpan timeout) { if (timeout <= TimeSpan.MaxValue) return Copy(timeout); return Copy(null); } /// - /// INTERNAL API + /// INTERNAL API /// internal State WithStopReason(FSMBase.Reason reason) { return Copy(null, reason); } - #endregion } } From 4b6c6499318f721052d8f7dd05b9e10ce7f29cee Mon Sep 17 00:00:00 2001 From: "maxim.salamatko" Date: Sun, 1 Nov 2015 09:22:04 +0400 Subject: [PATCH 5/8] added OnRecoveryCompleted --- .../Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs | 10 ++++++++++ src/core/Akka.Persistence/Fsm/PersistentFSM.cs | 8 +++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs index d41f82a591c..2f9671edc59 100644 --- a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs +++ b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs @@ -455,6 +455,11 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) public override string PersistenceId { get { return _persistenceId; }} + protected override void OnRecoveryCompleted() + { + + } + protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) { if (e is ItemAdded) @@ -510,6 +515,11 @@ public override string PersistenceId } + protected override void OnRecoveryCompleted() + { + + } + protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) { diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs index f526ff0e96e..46b161a8df1 100644 --- a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs +++ b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs @@ -34,10 +34,16 @@ protected override void PostStop() base.PostStop(); } + protected abstract void OnRecoveryCompleted(); + protected override bool ReceiveRecover(object message) { var match = message.Match() - .With(t => { Initialize(); }) + .With(t => + { + Initialize(); + OnRecoveryCompleted(); + }) .With(e => { StartWith(StateName, ApplyEvent(e, StateData)); }) .With(sce => { StartWith(sce.State, StateData, sce.TimeOut); }); From 5d6b95400621b93a69b9ed46a9e0e82f1b227e33 Mon Sep 17 00:00:00 2001 From: "maxim.salamatko" Date: Sun, 1 Nov 2015 10:01:43 +0400 Subject: [PATCH 6/8] added data tests --- .../Fsm/PersistentFSMSpec.cs | 92 ++++++++++++++----- .../Akka.Persistence/Fsm/PersistentFSM.cs | 16 +--- .../Akka.Persistence/Fsm/PersistentFSMBase.cs | 18 +++- 3 files changed, 86 insertions(+), 40 deletions(-) diff --git a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs index 2f9671edc59..e15ab45543f 100644 --- a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs +++ b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs @@ -8,11 +8,12 @@ using Akka.Persistence.Fsm; using Akka.TestKit; using Akka.Util; +using Akka.Util.Internal; using Xunit; namespace Akka.Persistence.Tests.Fsm { - public partial class PersistentFSMSpec : PersistenceSpec + public partial class PersistentFSMSpec : PersistenceSpec { private readonly Random _random = new Random(); public PersistentFSMSpec() @@ -47,9 +48,21 @@ public void PersistentFSM_should_has_function_as_regular_fsm() ExpectMsg>(state => state.State == UserState.LookingAround); ExpectMsg(); ExpectMsg>(state => state.From == UserState.LookingAround); - ExpectMsg(); - ExpectMsg(); - ExpectMsg(); + ExpectMsg(cart => + { + return cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1; + + }); + ExpectMsg(cart => + { + return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; + + }); + ExpectMsg(cart => + { + return cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3; + + }); ExpectMsg>(); ExpectMsg(); ExpectTerminated(fsmRef); @@ -72,9 +85,9 @@ public void PersistentFSM_should_has_function_as_regular_fsm_on_state_timeout() ExpectMsg>(state => { return state.State == UserState.LookingAround; - + }); - + ExpectMsg>(); Within(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1.9), () => @@ -91,7 +104,7 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_data() { var dummyReportActorRef = CreateTestProbe().Ref; - var fsmRef = Sys.ActorOf(Props.Create(()=> new WebStoreCustomerFSM(Name, dummyReportActorRef))); + var fsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); Watch(fsmRef); fsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); @@ -112,10 +125,18 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_data() ExpectMsg>(state => { return state.From == UserState.LookingAround; - + + }); + ExpectMsg(cart => + { + return cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1; + + }); + ExpectMsg(cart => + { + return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; + }); - ExpectMsg(); - ExpectMsg(); fsmRef.Tell(PoisonPill.Instance); ExpectTerminated(fsmRef); @@ -134,10 +155,18 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_data() ExpectMsg>(state => { return state.State == UserState.Shopping; - + + }); + ExpectMsg(cart => + { + return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; + + }); + ExpectMsg(cart => + { + return cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3; + }); - ExpectMsg(); - ExpectMsg(); ExpectMsg>(); ExpectMsg(); ExpectTerminated(recoveredFsmRef); @@ -361,11 +390,11 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) _reportActor = reportActor; StartWith(UserState.LookingAround, new EmptyShoppingCart()); - When(UserState.LookingAround, (@event,state) => + When(UserState.LookingAround, (@event, state) => { if (@event.FsmEvent is AddItem) { - var addItem = (AddItem) @event.FsmEvent; + var addItem = (AddItem)@event.FsmEvent; return GoTo(UserState.Shopping) .Applying(new ItemAdded(addItem.Item)).ForMax(TimeSpan.FromSeconds(1)); @@ -382,7 +411,7 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) { if (@event.FsmEvent is AddItem) { - var addItem = ((AddItem) @event.FsmEvent); + var addItem = ((AddItem)@event.FsmEvent); return Stay().Applying(new ItemAdded(addItem.Item)).ForMax(TimeSpan.FromSeconds(1)); } if (@event.FsmEvent is Buy) @@ -421,7 +450,7 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) { if (@event.FsmEvent is AddItem) { - var addItem = (AddItem) @event.FsmEvent; + var addItem = (AddItem)@event.FsmEvent; return GoTo(UserState.Shopping) .Applying(new ItemAdded(addItem.Item)) @@ -452,19 +481,19 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) }); } - public override string PersistenceId { get { return _persistenceId; }} + public override string PersistenceId { get { return _persistenceId; } } protected override void OnRecoveryCompleted() { - + } protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) { if (e is ItemAdded) { - var itemAdded = (ItemAdded) e; + var itemAdded = (ItemAdded)e; return data.AddItem(itemAdded.Item); } if (e is OrderExecuted) @@ -495,7 +524,7 @@ public SimpleTransitionFSM(string persistenceId, IActorRef reportActor) When(UserState.LookingAround, (@event, state) => { - if ((string) @event.FsmEvent == "stay") + if ((string)@event.FsmEvent == "stay") { return Stay(); } @@ -562,7 +591,7 @@ protected override bool ReceiveCommand(object message) } #region Custome States -internal enum UserState + internal enum UserState { Shopping, Inactive, @@ -592,25 +621,37 @@ internal interface IShoppingCart { IShoppingCart AddItem(Item item); IShoppingCart Empty(); + ICollection Items { get; set; } + } internal class EmptyShoppingCart : IShoppingCart { public IShoppingCart AddItem(Item item) { - return new NonEmptyShoppingCart(); + return new NonEmptyShoppingCart(item); } public IShoppingCart Empty() { return this; } + + public ICollection Items { get; set; } } internal class NonEmptyShoppingCart : IShoppingCart { + + public NonEmptyShoppingCart(Item item) + { + Items = new List(); + Items.Add(item); + } + public IShoppingCart AddItem(Item item) { + Items.Add(item); return this; } @@ -618,6 +659,9 @@ public IShoppingCart Empty() { return new EmptyShoppingCart(); } + + public ICollection Items { get; set; } + } #endregion @@ -665,7 +709,7 @@ public ItemAdded(Item item) Item = item; } - public Item Item { get; } + public Item Item { get; private set; } } internal class OrderExecuted : IDomainEvent diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs index 46b161a8df1..f370b3cfcc1 100644 --- a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs +++ b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs @@ -18,21 +18,7 @@ namespace Akka.Persistence.Fsm /// The event data type public abstract class PersistentFSM : PersistentFSMBase { - /// - /// Call the hook if you want to retain this behavior. - /// When overriding make sure to call base.PostStop(); - /// Please note that this method is called by default from so - /// override that one if shall not be called during restart. - /// - protected override void PostStop() - { - /* - * Setting this instance's state to Terminated does no harm during restart, since - * the new instance will initialize fresh using StartWith. - */ - Terminate(Stay().WithStopReason(new FSMBase.Shutdown())); - base.PostStop(); - } + protected abstract void OnRecoveryCompleted(); diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs index a35a216100b..30ac0cdd154 100644 --- a/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs +++ b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs @@ -50,7 +50,7 @@ public delegate State StateFunction( /// /// FSM state data and current timeout handling - /// + /// a protected State _currentState; protected long _generation; @@ -550,6 +550,22 @@ protected void Terminate(State upcomingState) } } + /// + /// Call the hook if you want to retain this behavior. + /// When overriding make sure to call base.PostStop(); + /// Please note that this method is called by default from so + /// override that one if shall not be called during restart. + /// + protected override void PostStop() + { + /* + * Setting this instance's state to Terminated does no harm during restart, since + * the new instance will initialize fresh using StartWith. + */ + Terminate(Stay().WithStopReason(new FSMBase.Shutdown())); + base.PostStop(); + } + /// /// By default, is logged at error level and other /// reason types are not logged. It is possible to override this behavior. From d2668c67c3c3acb0a374b3dda239d4e4d4fe839c Mon Sep 17 00:00:00 2001 From: "maxim.salamatko" Date: Sun, 1 Nov 2015 10:03:11 +0400 Subject: [PATCH 7/8] cleanup --- .../Fsm/PersistentFSMSpec.cs | 236 ++++++------------ .../Akka.Persistence/Fsm/PersistentFSM.cs | 3 - .../Akka.Persistence/Fsm/PersistentFSMBase.cs | 3 +- 3 files changed, 79 insertions(+), 163 deletions(-) diff --git a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs index e15ab45543f..7e88792b9b0 100644 --- a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs +++ b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs @@ -1,23 +1,18 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Text; -using System.Threading.Tasks; using Akka.Actor; -using Akka.Dispatch.SysMsg; using Akka.Persistence.Fsm; -using Akka.TestKit; -using Akka.Util; -using Akka.Util.Internal; using Xunit; namespace Akka.Persistence.Tests.Fsm { - public partial class PersistentFSMSpec : PersistenceSpec + public class PersistentFSMSpec : PersistenceSpec { private readonly Random _random = new Random(); + public PersistentFSMSpec() - : base(PersistenceSpec.Configuration("inmem", "PersistentFSMSpec")) + : base(Configuration("inmem", "PersistentFSMSpec")) { } @@ -48,21 +43,12 @@ public void PersistentFSM_should_has_function_as_regular_fsm() ExpectMsg>(state => state.State == UserState.LookingAround); ExpectMsg(); ExpectMsg>(state => state.From == UserState.LookingAround); - ExpectMsg(cart => - { - return cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1; - - }); - ExpectMsg(cart => - { - return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; - - }); - ExpectMsg(cart => - { - return cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3; - - }); + ExpectMsg( + cart => { return cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1; }); + ExpectMsg( + cart => { return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; }); + ExpectMsg( + cart => { return cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3; }); ExpectMsg>(); ExpectMsg(); ExpectTerminated(fsmRef); @@ -82,11 +68,7 @@ public void PersistentFSM_should_has_function_as_regular_fsm_on_state_timeout() var shirt = new Item("1", "Shirt", 59.99F); fsmRef.Tell(new AddItem(shirt)); - ExpectMsg>(state => - { - return state.State == UserState.LookingAround; - - }); + ExpectMsg>(state => { return state.State == UserState.LookingAround; }); ExpectMsg>(); @@ -122,21 +104,11 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_data() ExpectMsg>(); ExpectMsg(); - ExpectMsg>(state => - { - return state.From == UserState.LookingAround; - - }); - ExpectMsg(cart => - { - return cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1; - - }); - ExpectMsg(cart => - { - return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; - - }); + ExpectMsg>(state => { return state.From == UserState.LookingAround; }); + ExpectMsg( + cart => { return cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1; }); + ExpectMsg( + cart => { return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; }); fsmRef.Tell(PoisonPill.Instance); ExpectTerminated(fsmRef); @@ -152,21 +124,11 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_data() recoveredFsmRef.Tell(new GetCurrentCart()); recoveredFsmRef.Tell(new Leave()); - ExpectMsg>(state => - { - return state.State == UserState.Shopping; - - }); - ExpectMsg(cart => - { - return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; - - }); - ExpectMsg(cart => - { - return cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3; - - }); + ExpectMsg>(state => { return state.State == UserState.Shopping; }); + ExpectMsg( + cart => { return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; }); + ExpectMsg( + cart => { return cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3; }); ExpectMsg>(); ExpectMsg(); ExpectTerminated(recoveredFsmRef); @@ -192,18 +154,11 @@ public void PersistentFSM_should_execute_the_defined_actions_following_successfu fsmRef.Tell(new Buy()); fsmRef.Tell(new Leave()); - ExpectMsg>(state => - { - return state.State == UserState.LookingAround; - }); - ExpectMsg>(state => - { - return state.From == UserState.LookingAround && state.To == UserState.Shopping; - }); - ExpectMsg>(state => - { - return state.From == UserState.Shopping && state.To == UserState.Paid; - }); + ExpectMsg>(state => { return state.State == UserState.LookingAround; }); + ExpectMsg>( + state => { return state.From == UserState.LookingAround && state.To == UserState.Shopping; }); + ExpectMsg>( + state => { return state.From == UserState.Shopping && state.To == UserState.Paid; }); reportActorProbe.ExpectMsg(); ExpectTerminated(fsmRef); } @@ -227,14 +182,9 @@ public void PersistentFSM_should_execute_the_defined_actions_following_successfu fsmRef.Tell(new AddItem(coat)); fsmRef.Tell(new Leave()); - ExpectMsg>(state => - { - return state.State == UserState.LookingAround; - }); - ExpectMsg>(state => - { - return state.From == UserState.LookingAround && state.To == UserState.Shopping; - }); + ExpectMsg>(state => { return state.State == UserState.LookingAround; }); + ExpectMsg>( + state => { return state.From == UserState.LookingAround && state.To == UserState.Shopping; }); reportActorProbe.ExpectMsg(); ExpectTerminated(fsmRef); } @@ -253,15 +203,9 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_timeout fsmRef.Tell(new AddItem(shirt)); - ExpectMsg>(state => - { - return state.State == UserState.LookingAround; - }); - ExpectMsg>(state => - { - return state.From == UserState.LookingAround && state.To == UserState.Shopping; - - }); + ExpectMsg>(state => { return state.State == UserState.LookingAround; }); + ExpectMsg>( + state => { return state.From == UserState.LookingAround && state.To == UserState.Shopping; }); ExpectNoMsg(TimeSpan.FromSeconds(0.6)); fsmRef.Tell(PoisonPill.Instance); @@ -271,18 +215,13 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_timeout Watch(recoveredFsmRef); recoveredFsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); - ExpectMsg>(state => - { - return state.State == UserState.Shopping; - }); + ExpectMsg>(state => { return state.State == UserState.Shopping; }); Within(TimeSpan.FromSeconds(0.9), TimeSpan.FromSeconds(1.9), () => { - ExpectMsg>(state => - { - return state.From == UserState.Shopping && state.To == UserState.Inactive; - }); + ExpectMsg>( + state => { return state.From == UserState.Shopping && state.To == UserState.Inactive; }); return true; }); ExpectNoMsg(TimeSpan.FromSeconds(0.6)); @@ -292,10 +231,7 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_timeout recoveredFsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); Watch(recoveredFsmRef); recoveredFsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); - ExpectMsg>(state => - { - return state.State == UserState.Inactive; - }); + ExpectMsg>(state => { return state.State == UserState.Inactive; }); ExpectTerminated(recoveredFsmRef); } @@ -355,38 +291,35 @@ public void PersistentFSM_should_not_persist_state_change_event_when_staying_in_ var persistentEventsStreamer = Sys.ActorOf(Props.Create(Name, TestActor), Name); ExpectMsg(); - ExpectMsg.StateChangeEvent>(); + ExpectMsg.StateChangeEvent>(); ExpectMsg(); - ExpectMsg.StateChangeEvent>(); + ExpectMsg.StateChangeEvent>(); ExpectMsg(); - ExpectMsg.StateChangeEvent>(); + ExpectMsg.StateChangeEvent>(); ExpectMsg(); - ExpectMsg.StateChangeEvent>(); + ExpectMsg.StateChangeEvent>(); Watch(persistentEventsStreamer); persistentEventsStreamer.Tell(PoisonPill.Instance); ExpectTerminated(persistentEventsStreamer); - } - internal class WebStoreCustomerFSM : PersistentFSM { - private readonly string _persistenceId; private readonly IActorRef _reportActor; public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) { - _persistenceId = persistenceId; + PersistenceId = persistenceId; _reportActor = reportActor; StartWith(UserState.LookingAround, new EmptyShoppingCart()); @@ -394,7 +327,7 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) { if (@event.FsmEvent is AddItem) { - var addItem = (AddItem)@event.FsmEvent; + var addItem = (AddItem) @event.FsmEvent; return GoTo(UserState.Shopping) .Applying(new ItemAdded(addItem.Item)).ForMax(TimeSpan.FromSeconds(1)); @@ -411,7 +344,7 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) { if (@event.FsmEvent is AddItem) { - var addItem = ((AddItem)@event.FsmEvent); + var addItem = ((AddItem) @event.FsmEvent); return Stay().Applying(new ItemAdded(addItem.Item)).ForMax(TimeSpan.FromSeconds(1)); } if (@event.FsmEvent is Buy) @@ -429,10 +362,10 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) } if (@event.FsmEvent is Leave) { - return Stop().Applying(new OrderDiscarded()).AndThen(cart => - { - _reportActor.Tell(new ShoppingCardDiscarded()); - }); + return + Stop() + .Applying(new OrderDiscarded()) + .AndThen(cart => { _reportActor.Tell(new ShoppingCardDiscarded()); }); } if (@event.FsmEvent is GetCurrentCart) { @@ -450,7 +383,7 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) { if (@event.FsmEvent is AddItem) { - var addItem = (AddItem)@event.FsmEvent; + var addItem = (AddItem) @event.FsmEvent; return GoTo(UserState.Shopping) .Applying(new ItemAdded(addItem.Item)) @@ -459,10 +392,10 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) if (@event.FsmEvent is StateTimeout) { //var addItem = ((AddItem)@event) - return Stop().Applying(new OrderDiscarded()).AndThen(cart => - { - _reportActor.Tell(new ShoppingCardDiscarded()); - }); + return + Stop() + .Applying(new OrderDiscarded()) + .AndThen(cart => { _reportActor.Tell(new ShoppingCardDiscarded()); }); } return state; }); @@ -481,19 +414,18 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) }); } - public override string PersistenceId { get { return _persistenceId; } } + public override string PersistenceId { get; } protected override void OnRecoveryCompleted() { - } protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) { if (e is ItemAdded) { - var itemAdded = (ItemAdded)e; + var itemAdded = (ItemAdded) e; return data.AddItem(itemAdded.Item); } if (e is OrderExecuted) @@ -508,71 +440,53 @@ protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) return data; } } - } internal class SimpleTransitionFSM : PersistentFSM { - private readonly string _persistenceId; private readonly IActorRef _reportActor; public SimpleTransitionFSM(string persistenceId, IActorRef reportActor) { - _persistenceId = persistenceId; + PersistenceId = persistenceId; _reportActor = reportActor; StartWith(UserState.LookingAround, new EmptyShoppingCart()); When(UserState.LookingAround, (@event, state) => { - if ((string)@event.FsmEvent == "stay") + if ((string) @event.FsmEvent == "stay") { return Stay(); } return GoTo(UserState.LookingAround); }); - OnTransition((state, nextState) => - { - _reportActor.Tell(string.Format("{0} -> {1}", state, nextState)); - }); - - + OnTransition((state, nextState) => { _reportActor.Tell(string.Format("{0} -> {1}", state, nextState)); }); } - public override string PersistenceId - { - get { return _persistenceId; } - } + public override string PersistenceId { get; } protected override void OnRecoveryCompleted() { - } protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) { - return data; } - - } internal class PersistentEventsStreamer : PersistentActor { - private readonly string _persistenceId; private readonly IActorRef _client; public PersistentEventsStreamer(string persistenceId, IActorRef client) { - _persistenceId = persistenceId; + PersistenceId = persistenceId; _client = client; } - public override string PersistenceId - { - get { return _persistenceId; } - } + public override string PersistenceId { get; } protected override bool ReceiveRecover(object message) { @@ -591,6 +505,7 @@ protected override bool ReceiveCommand(object message) } #region Custome States + internal enum UserState { Shopping, @@ -605,24 +520,23 @@ internal enum UserState internal class Item { - public string Id { get; } - public string Name { get; } - public float Price { get; } - public Item(string id, string name, float price) { Id = id; Name = name; Price = price; } + + public string Id { get; } + public string Name { get; } + public float Price { get; } } internal interface IShoppingCart { + ICollection Items { get; set; } IShoppingCart AddItem(Item item); IShoppingCart Empty(); - ICollection Items { get; set; } - } internal class EmptyShoppingCart : IShoppingCart @@ -642,7 +556,6 @@ public IShoppingCart Empty() internal class NonEmptyShoppingCart : IShoppingCart { - public NonEmptyShoppingCart(Item item) { Items = new List(); @@ -661,7 +574,6 @@ public IShoppingCart Empty() } public ICollection Items { get; set; } - } #endregion @@ -670,7 +582,6 @@ public IShoppingCart Empty() internal interface ICommand { - } internal class AddItem : ICommand @@ -686,20 +597,21 @@ public AddItem(Item item) internal class Buy { } + internal class Leave { } + internal class GetCurrentCart : ICommand { } #endregion - #region Customer domain events + internal interface IDomainEvent { - } internal class ItemAdded : IDomainEvent @@ -709,26 +621,32 @@ public ItemAdded(Item item) Item = item; } - public Item Item { get; private set; } + public Item Item { get; } } internal class OrderExecuted : IDomainEvent { } + internal class OrderDiscarded : IDomainEvent { } + #endregion + #region Side effects - report events to be sent to some + internal interface IReportEvent { - } + internal class PurchaseWasMade : IReportEvent { } + internal class ShoppingCardDiscarded : IReportEvent { } + #endregion -} +} \ No newline at end of file diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs index f370b3cfcc1..7295b1c28bb 100644 --- a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs +++ b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs @@ -6,7 +6,6 @@ //----------------------------------------------------------------------- using System.Collections.Generic; -using Akka.Actor; namespace Akka.Persistence.Fsm { @@ -18,8 +17,6 @@ namespace Akka.Persistence.Fsm /// The event data type public abstract class PersistentFSM : PersistentFSMBase { - - protected abstract void OnRecoveryCompleted(); protected override bool ReceiveRecover(object message) diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs index 30ac0cdd154..2c3116f4a10 100644 --- a/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs +++ b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs @@ -50,7 +50,8 @@ public delegate State StateFunction( /// /// FSM state data and current timeout handling - /// a + /// + /// a protected State _currentState; protected long _generation; From 30e4142547e0cc492a828fc15c9bf86182dc00bb Mon Sep 17 00:00:00 2001 From: "maxim.salamatko" Date: Mon, 2 Nov 2015 12:58:23 +0400 Subject: [PATCH 8/8] fix c# version --- .../Fsm/PersistentFSMSpec.cs | 78 +++++++++++-------- .../Akka.Persistence/Fsm/PersistentFSM.cs | 4 +- .../Akka.Persistence/Fsm/PersistentFSMBase.cs | 20 ++--- 3 files changed, 57 insertions(+), 45 deletions(-) diff --git a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs index 7e88792b9b0..1c021fdafac 100644 --- a/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs +++ b/src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs @@ -44,11 +44,11 @@ public void PersistentFSM_should_has_function_as_regular_fsm() ExpectMsg(); ExpectMsg>(state => state.From == UserState.LookingAround); ExpectMsg( - cart => { return cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1; }); + cart => cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1); ExpectMsg( - cart => { return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; }); + cart => cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2); ExpectMsg( - cart => { return cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3; }); + cart => cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3); ExpectMsg>(); ExpectMsg(); ExpectTerminated(fsmRef); @@ -68,7 +68,7 @@ public void PersistentFSM_should_has_function_as_regular_fsm_on_state_timeout() var shirt = new Item("1", "Shirt", 59.99F); fsmRef.Tell(new AddItem(shirt)); - ExpectMsg>(state => { return state.State == UserState.LookingAround; }); + ExpectMsg>(state => state.State == UserState.LookingAround); ExpectMsg>(); @@ -104,11 +104,11 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_data() ExpectMsg>(); ExpectMsg(); - ExpectMsg>(state => { return state.From == UserState.LookingAround; }); + ExpectMsg>(state => state.From == UserState.LookingAround); ExpectMsg( - cart => { return cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1; }); + cart => cart.Items.Any(i => i.Name == "Shirt") && cart.Items.Count == 1); ExpectMsg( - cart => { return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; }); + cart => cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2); fsmRef.Tell(PoisonPill.Instance); ExpectTerminated(fsmRef); @@ -124,11 +124,11 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_data() recoveredFsmRef.Tell(new GetCurrentCart()); recoveredFsmRef.Tell(new Leave()); - ExpectMsg>(state => { return state.State == UserState.Shopping; }); + ExpectMsg>(state => state.State == UserState.Shopping); ExpectMsg( - cart => { return cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2; }); + cart => cart.Items.Any(i => i.Name == "Shoes") && cart.Items.Count == 2); ExpectMsg( - cart => { return cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3; }); + cart => cart.Items.Any(i => i.Name == "Coat") && cart.Items.Count == 3); ExpectMsg>(); ExpectMsg(); ExpectTerminated(recoveredFsmRef); @@ -154,11 +154,11 @@ public void PersistentFSM_should_execute_the_defined_actions_following_successfu fsmRef.Tell(new Buy()); fsmRef.Tell(new Leave()); - ExpectMsg>(state => { return state.State == UserState.LookingAround; }); + ExpectMsg>(state => state.State == UserState.LookingAround); ExpectMsg>( - state => { return state.From == UserState.LookingAround && state.To == UserState.Shopping; }); + state => state.From == UserState.LookingAround && state.To == UserState.Shopping); ExpectMsg>( - state => { return state.From == UserState.Shopping && state.To == UserState.Paid; }); + state => state.From == UserState.Shopping && state.To == UserState.Paid); reportActorProbe.ExpectMsg(); ExpectTerminated(fsmRef); } @@ -182,9 +182,9 @@ public void PersistentFSM_should_execute_the_defined_actions_following_successfu fsmRef.Tell(new AddItem(coat)); fsmRef.Tell(new Leave()); - ExpectMsg>(state => { return state.State == UserState.LookingAround; }); + ExpectMsg>(state => state.State == UserState.LookingAround); ExpectMsg>( - state => { return state.From == UserState.LookingAround && state.To == UserState.Shopping; }); + state => state.From == UserState.LookingAround && state.To == UserState.Shopping); reportActorProbe.ExpectMsg(); ExpectTerminated(fsmRef); } @@ -203,9 +203,9 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_timeout fsmRef.Tell(new AddItem(shirt)); - ExpectMsg>(state => { return state.State == UserState.LookingAround; }); + ExpectMsg>(state => state.State == UserState.LookingAround); ExpectMsg>( - state => { return state.From == UserState.LookingAround && state.To == UserState.Shopping; }); + state => state.From == UserState.LookingAround && state.To == UserState.Shopping); ExpectNoMsg(TimeSpan.FromSeconds(0.6)); fsmRef.Tell(PoisonPill.Instance); @@ -215,7 +215,7 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_timeout Watch(recoveredFsmRef); recoveredFsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); - ExpectMsg>(state => { return state.State == UserState.Shopping; }); + ExpectMsg>(state => state.State == UserState.Shopping); Within(TimeSpan.FromSeconds(0.9), TimeSpan.FromSeconds(1.9), () => @@ -231,7 +231,7 @@ public void PersistentFSM_should_recover_successfully_with_correct_state_timeout recoveredFsmRef = Sys.ActorOf(Props.Create(() => new WebStoreCustomerFSM(Name, dummyReportActorRef))); Watch(recoveredFsmRef); recoveredFsmRef.Tell(new FSMBase.SubscribeTransitionCallBack(TestActor)); - ExpectMsg>(state => { return state.State == UserState.Inactive; }); + ExpectMsg>(state => state.State == UserState.Inactive); ExpectTerminated(recoveredFsmRef); } @@ -316,10 +316,11 @@ public void PersistentFSM_should_not_persist_state_change_event_when_staying_in_ internal class WebStoreCustomerFSM : PersistentFSM { private readonly IActorRef _reportActor; + private readonly string _persistenceId; public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) { - PersistenceId = persistenceId; + _persistenceId = persistenceId; _reportActor = reportActor; StartWith(UserState.LookingAround, new EmptyShoppingCart()); @@ -365,7 +366,7 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) return Stop() .Applying(new OrderDiscarded()) - .AndThen(cart => { _reportActor.Tell(new ShoppingCardDiscarded()); }); + .AndThen(cart => _reportActor.Tell(new ShoppingCardDiscarded())); } if (@event.FsmEvent is GetCurrentCart) { @@ -395,7 +396,7 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) return Stop() .Applying(new OrderDiscarded()) - .AndThen(cart => { _reportActor.Tell(new ShoppingCardDiscarded()); }); + .AndThen(cart => _reportActor.Tell(new ShoppingCardDiscarded())); } return state; }); @@ -414,7 +415,10 @@ public WebStoreCustomerFSM(string persistenceId, IActorRef reportActor) }); } - public override string PersistenceId { get; } + public override string PersistenceId + { + get { return _persistenceId; } + } protected override void OnRecoveryCompleted() @@ -445,10 +449,11 @@ protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) internal class SimpleTransitionFSM : PersistentFSM { private readonly IActorRef _reportActor; + private readonly string _persistenceId; public SimpleTransitionFSM(string persistenceId, IActorRef reportActor) { - PersistenceId = persistenceId; + _persistenceId = persistenceId; _reportActor = reportActor; StartWith(UserState.LookingAround, new EmptyShoppingCart()); @@ -460,10 +465,13 @@ public SimpleTransitionFSM(string persistenceId, IActorRef reportActor) } return GoTo(UserState.LookingAround); }); - OnTransition((state, nextState) => { _reportActor.Tell(string.Format("{0} -> {1}", state, nextState)); }); + OnTransition((state, nextState) => _reportActor.Tell(string.Format("{0} -> {1}", state, nextState))); } - public override string PersistenceId { get; } + public override string PersistenceId + { + get { return _persistenceId; } + } protected override void OnRecoveryCompleted() @@ -479,14 +487,18 @@ protected override IShoppingCart ApplyEvent(IDomainEvent e, IShoppingCart data) internal class PersistentEventsStreamer : PersistentActor { private readonly IActorRef _client; + private readonly string _persistenceId; public PersistentEventsStreamer(string persistenceId, IActorRef client) { - PersistenceId = persistenceId; + _persistenceId = persistenceId; _client = client; } - public override string PersistenceId { get; } + public override string PersistenceId + { + get { return _persistenceId; } + } protected override bool ReceiveRecover(object message) { @@ -527,9 +539,9 @@ public Item(string id, string name, float price) Price = price; } - public string Id { get; } - public string Name { get; } - public float Price { get; } + public string Id { get; private set; } + public string Name { get; private set; } + public float Price { get; private set; } } internal interface IShoppingCart @@ -591,7 +603,7 @@ public AddItem(Item item) Item = item; } - public Item Item { get; } + public Item Item { get; private set; } } internal class Buy @@ -621,7 +633,7 @@ public ItemAdded(Item item) Item = item; } - public Item Item { get; } + public Item Item { get; private set; } } internal class OrderExecuted : IDomainEvent diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs index 7295b1c28bb..4e32617a303 100644 --- a/src/core/Akka.Persistence/Fsm/PersistentFSM.cs +++ b/src/core/Akka.Persistence/Fsm/PersistentFSM.cs @@ -27,8 +27,8 @@ protected override bool ReceiveRecover(object message) Initialize(); OnRecoveryCompleted(); }) - .With(e => { StartWith(StateName, ApplyEvent(e, StateData)); }) - .With(sce => { StartWith(sce.State, StateData, sce.TimeOut); }); + .With(e => StartWith(StateName, ApplyEvent(e, StateData))) + .With(sce => StartWith(sce.State, StateData, sce.TimeOut)); return match.WasHandled; } diff --git a/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs index 2c3116f4a10..fa8a2a1ac58 100644 --- a/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs +++ b/src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs @@ -126,7 +126,7 @@ private StateFunction HandleEvent set { _handleEvent = value; } } - public bool IsStateTimerActive { get; } + public bool IsStateTimerActive { get; private set; } public ListenerSupport Listeners { @@ -595,7 +595,7 @@ public TransformHelper(StateFunction func) Func = func; } - public StateFunction Func { get; } + public StateFunction Func { get; private set; } public StateFunction Using(Func, State> andThen) { @@ -637,7 +637,7 @@ public TimeoutMarker(long generation) Generation = generation; } - public long Generation { get; } + public long Generation { get; private set; } } [DebuggerDisplay("Timer {Name,nq}, message: {Message")] @@ -662,15 +662,15 @@ public Timer(string name, object message, bool repeat, int generation, IActorCon _ref = new Cancelable(scheduler); } - public string Name { get; } + public string Name { get; private set; } - public object Message { get; } + public object Message { get; private set; } - public bool Repeat { get; } + public bool Repeat { get; private set; } - public int Generation { get; } + public int Generation { get; private set; } - public IActorContext Context { get; } + public IActorContext Context { get; private set; } public void Schedule(IActorRef actor, TimeSpan timeout) { @@ -712,7 +712,7 @@ public void Cancel() /// The event of the state public class State : FSMBase.State { - public Action AfterTransitionHandler { get; } + public Action AfterTransitionHandler { get; private set; } public State(TS stateName, TD stateData, TimeSpan? timeout = null, FSMBase.Reason stopReason = null, @@ -724,7 +724,7 @@ public State(TS stateName, TD stateData, TimeSpan? timeout = null, FSMBase.Reaso Notifies = true; } - public ILinearSeq DomainEvents { get; } + public ILinearSeq DomainEvents { get; private set; } public bool Notifies { get; set; }