Skip to content

Commit

Permalink
fix: data provided to AndThen (PersistentFSM) is not the latest versi…
Browse files Browse the repository at this point in the history
…on (#3654) (#3948)

Fixes issue #3654 in conformance with the JVM version of PersistentFSM.
  • Loading branch information
aloker authored and Aaronontheweb committed Nov 14, 2019
1 parent 1b29a93 commit caf13f6
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 1 deletion.
100 changes: 100 additions & 0 deletions src/core/Akka.Persistence.Tests/Fsm/PersistentFSMSpec.cs
Expand Up @@ -10,6 +10,7 @@
using FluentAssertions;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Akka.Configuration;
using Xunit;
using static Akka.Actor.FSMBase;
Expand Down Expand Up @@ -445,6 +446,17 @@ public void PersistentFSM_must_save_periodical_snapshots_if_enablesnapshotafter(
Shutdown(sys2);
}
}

[Fact]
public async Task PersistentFSM_must_pass_latest_statedata_to_AndThen()
{
var actor = Sys.ActorOf(Props.Create(() => new AndThenTestActor()));

var response1 = await actor.Ask<AndThenTestActor.Data>(new AndThenTestActor.Command("Message 1")).ConfigureAwait(true);
var response2 = await actor.Ask<AndThenTestActor.Data>(new AndThenTestActor.Command("Message 2")).ConfigureAwait(true);
Assert.Equal("Message 1", response1.Value);
Assert.Equal("Message 2", response2.Value);
}

internal class WebStoreCustomerFSM : PersistentFSM<IUserState, IShoppingCart, IDomainEvent>
{
Expand Down Expand Up @@ -1016,4 +1028,92 @@ public static Props Props(IActorRef probe)
return Actor.Props.Create(() => new SnapshotFSM(probe));
}
}
#region AndThen receiving latest data

public class AndThenTestActor : PersistentFSM<AndThenTestActor.IState, AndThenTestActor.Data, AndThenTestActor.IEvent>
{
public override string PersistenceId => "PersistentFSMSpec.AndThenTestActor";

public AndThenTestActor()
{
StartWith(Init.Instance, new Data());
When(Init.Instance, (evt, state) =>
{
switch (evt.FsmEvent)
{
case Command cmd:
return Stay()
.Applying(new CommandReceived(cmd.Value))
.AndThen(data =>
{
// NOTE At this point, I'd expect data to be the value returned by Apply
Sender.Tell(data, Self);
});
default:
return Stay();
}
});
}

protected override Data ApplyEvent(IEvent domainEvent, Data currentData)
{
switch (domainEvent)
{
case CommandReceived cmd:
return new Data(cmd.Value);
default:
return currentData;
}
}


public interface IState : PersistentFSM.IFsmState
{
}

public class Init : IState
{
public static readonly Init Instance = new Init();
public string Identifier => "Init";
}

public class Data
{
public Data()
{
}

public Data(string value)
{
Value = value;
}

public string Value { get; }
}

public interface IEvent
{
}

public class CommandReceived : IEvent
{
public CommandReceived(string value)
{
Value = value;
}

public string Value { get; }
}

public class Command
{
public Command(string value)
{
Value = value;
}

public string Value { get; }
}
}
#endregion
}
2 changes: 1 addition & 1 deletion src/core/Akka.Persistence/Fsm/PersistentFSM.cs
Expand Up @@ -133,7 +133,7 @@ void ApplyStateOnLastHandler()
{
base.ApplyState(nextState.Using(nextData));
CurrentStateTimeout = nextState.Timeout;
nextState.AfterTransitionDo?.Invoke(nextState.StateData);
nextState.AfterTransitionDo?.Invoke(StateData);
if (doSnapshot)
{
Log.Info($"Saving snapshot, sequence number [{SnapshotSequenceNr}]");
Expand Down

0 comments on commit caf13f6

Please sign in to comment.