Skip to content

Commit

Permalink
Convert Akka.Persistence.TestKit.Tests to async
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkatufus committed Mar 11, 2022
1 parent 8ed4e7a commit 91bf88c
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,13 @@ public async Task CounterActor_internal_state_will_be_lost_if_underlying_persist
Watch(actor);
actor.Tell("inc", TestActor);
ExpectMsg<Terminated>(TimeSpan.FromSeconds(3));
await ExpectMsgAsync<Terminated>(TimeSpan.FromSeconds(3));
// need to restart actor
actor = ActorOf(counterProps, "counter1");
actor.Tell("read", TestActor);
var value = ExpectMsg<int>(TimeSpan.FromSeconds(3));
var value = await ExpectMsgAsync<int>(TimeSpan.FromSeconds(3));
value.ShouldBe(0);
});
}
Expand Down
16 changes: 8 additions & 8 deletions src/core/Akka.Persistence.TestKit.Tests/Bug4762FixSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ namespace Akka.Persistence.TestKit.Tests
/// </summary>
public class Bug4762FixSpec : PersistenceTestKit
{
class WriteMessage
private class WriteMessage
{ }

class TestEvent
private class TestEvent
{ }

class TestActor2 : UntypedPersistentActor
private class TestActor2 : UntypedPersistentActor
{
private readonly IActorRef _probe;
private readonly ILoggingAdapter _log;
Expand Down Expand Up @@ -68,18 +68,18 @@ protected override void OnRecover(object message)
public async Task TestJournal_PersistAll_should_only_count_each_event_exceptions_once()
{
var probe = CreateTestProbe();
await WithJournalWrite(write => write.Pass(), () =>
await WithJournalWrite(write => write.Pass(), async () =>
{
var actor = ActorOf(() => new TestActor2(probe));
Watch(actor);
var command = new WriteMessage();
actor.Tell(command, actor);
probe.ExpectMsg<RecoveryCompleted>();
probe.ExpectMsg<Done>();
probe.ExpectMsg<Done>();
probe.ExpectNoMsg(3000);
await probe.ExpectMsgAsync<RecoveryCompleted>();
await probe.ExpectMsgAsync<Done>();
await probe.ExpectMsgAsync<Done>();
await probe.ExpectNoMsgAsync(3000);
});
}
}
Expand Down
28 changes: 16 additions & 12 deletions src/core/Akka.Persistence.TestKit.Tests/JournalInterceptorsSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,35 @@ namespace Akka.Persistence.TestKit.Tests
using Akka.Persistence.TestKit;
using FluentAssertions;
using Xunit;
using static FluentAssertions.FluentActions;

public class JournalInterceptorsSpecs
{
[Fact]
public void noop_immediately_returns_without_exception()
public async Task noop_immediately_returns_without_exception()
{
JournalInterceptors.Noop.Instance
.Awaiting(x => x.InterceptAsync(null))
.Should().NotThrow();
await Awaiting(async () =>
{
await JournalInterceptors.Noop.Instance.InterceptAsync(null);
}).Should().NotThrowAsync();
}

[Fact]
public void failure_must_throw_specific_exception()
public async Task failure_must_throw_specific_exception()
{
JournalInterceptors.Failure.Instance
.Awaiting(x => x.InterceptAsync(null))
.Should().ThrowExactly<TestJournalFailureException>();
await Awaiting(async () =>
{
await JournalInterceptors.Failure.Instance.InterceptAsync(null);
}).Should().ThrowExactlyAsync<TestJournalFailureException>();
}

[Fact]
public void rejection_must_throw_specific_exception()
public async Task rejection_must_throw_specific_exception()
{
JournalInterceptors.Rejection.Instance
.Awaiting(x => x.InterceptAsync(null))
.Should().ThrowExactly<TestJournalRejectionException>();
await Awaiting(async () =>
{
await JournalInterceptors.Rejection.Instance.InterceptAsync(null);
}).Should().ThrowExactlyAsync<TestJournalRejectionException>();
}

[Fact]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,41 @@ namespace Akka.Persistence.TestKit.Tests
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;
using static FluentAssertions.FluentActions;

public class SnapshotStoreInterceptorsSpec
{
[Fact]
public void noop_must_do_nothing()
=> SnapshotStoreInterceptors.Noop.Instance
.Awaiting(x => x.InterceptAsync(null, null))
.Should().NotThrow();
public async Task noop_must_do_nothing()
{
await Awaiting(async () =>
{
await SnapshotStoreInterceptors.Noop.Instance.InterceptAsync(null, null);
}).Should().NotThrowAsync();
}

[Fact]
public void failure_must_always_throw_exception()
=> SnapshotStoreInterceptors.Failure.Instance
.Awaiting(x => x.InterceptAsync(null, null))
.Should().ThrowExactly<TestSnapshotStoreFailureException>();
public async Task failure_must_always_throw_exception()
{
await Awaiting(async () =>
{
await SnapshotStoreInterceptors.Failure.Instance.InterceptAsync(null, null);
}).Should().ThrowExactlyAsync<TestSnapshotStoreFailureException>();
}

[Fact]
public async Task delay_must_call_next_interceptor_after_specified_delay()
{
var duration = TimeSpan.FromMilliseconds(100);
var duration = TimeSpan.FromMilliseconds(200);
var epsilon = TimeSpan.FromMilliseconds(50);
var probe = new InterceptorProbe();
var delay = new SnapshotStoreInterceptors.Delay(duration, probe);

var startedAt = DateTime.Now;
await delay.InterceptAsync(null, null);

probe.WasCalled.Should().BeTrue();
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration);
probe.CalledAt.Should().BeOnOrAfter(startedAt + duration - epsilon);
}

[Fact]
Expand Down
60 changes: 30 additions & 30 deletions src/core/Akka.Persistence.TestKit.Tests/TestJournalSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,119 +30,119 @@ public TestJournalSpec() : base(DefaultTimeoutConfig)
private readonly TestProbe _probe;

[Fact]
public void must_return_ack_after_new_write_interceptor_is_set()
public async Task must_return_ack_after_new_write_interceptor_is_set()
{
JournalActorRef.Tell(new TestJournal.UseWriteInterceptor(null), TestActor);

ExpectMsg<TestJournal.Ack>(TimeSpan.FromSeconds(3));
await ExpectMsgAsync<TestJournal.Ack>(TimeSpan.FromSeconds(3));
}

[Fact]
public async Task works_as_memory_journal_by_default()
{
var actor = ActorOf(() => new PersistActor(_probe));
_probe.ExpectMsg<RecoveryCompleted>();
await _probe.ExpectMsgAsync<RecoveryCompleted>();

await Journal.OnWrite.Pass();
actor.Tell(new PersistActor.WriteMessage("write"), TestActor);

_probe.ExpectMsg("ack");
await _probe.ExpectMsgAsync("ack");
}

[Fact]
public async Task must_recover_restarted_actor()
{
var actor = ActorOf(() => new PersistActor(_probe));
Watch(actor);
_probe.ExpectMsg<RecoveryCompleted>();
await _probe.ExpectMsgAsync<RecoveryCompleted>();

await Journal.OnRecovery.Pass();
actor.Tell(new PersistActor.WriteMessage("1"), TestActor);
_probe.ExpectMsg("ack");
await _probe.ExpectMsgAsync("ack");
actor.Tell(new PersistActor.WriteMessage("2"), TestActor);
_probe.ExpectMsg("ack");
await _probe.ExpectMsgAsync("ack");

await actor.GracefulStop(TimeSpan.FromSeconds(1));
ExpectTerminated(actor);
await ExpectTerminatedAsync(actor);

ActorOf(() => new PersistActor(_probe));
_probe.ExpectMsg("1");
_probe.ExpectMsg("2");
_probe.ExpectMsg<RecoveryCompleted>();
await _probe.ExpectMsgAsync("1");
await _probe.ExpectMsgAsync("2");
await _probe.ExpectMsgAsync<RecoveryCompleted>();
}

[Fact]
public async Task when_fail_on_write_is_set_all_writes_to_journal_will_fail()
{
var actor = ActorOf(() => new PersistActor(_probe));
Watch(actor);
_probe.ExpectMsg<RecoveryCompleted>();
await _probe.ExpectMsgAsync<RecoveryCompleted>();

await Journal.OnWrite.Fail();
actor.Tell(new PersistActor.WriteMessage("write"), TestActor);

_probe.ExpectMsg("failure");
ExpectTerminated(actor);
await _probe.ExpectMsgAsync("failure");
await ExpectTerminatedAsync(actor);
}

[Fact]
public async Task must_recover_failed_actor()
{
var actor = ActorOf(() => new PersistActor(_probe));
Watch(actor);
_probe.ExpectMsg<RecoveryCompleted>();
await _probe.ExpectMsgAsync<RecoveryCompleted>();

await Journal.OnRecovery.Pass();
actor.Tell(new PersistActor.WriteMessage("1"), TestActor);
_probe.ExpectMsg("ack");
await _probe.ExpectMsgAsync("ack");
actor.Tell(new PersistActor.WriteMessage("2"), TestActor);
_probe.ExpectMsg("ack");
await _probe.ExpectMsgAsync("ack");

await Journal.OnWrite.Fail();
actor.Tell(new PersistActor.WriteMessage("3"), TestActor);

_probe.ExpectMsg("failure");
ExpectTerminated(actor);
await _probe.ExpectMsgAsync("failure");
await ExpectTerminatedAsync(actor);

ActorOf(() => new PersistActor(_probe));
_probe.ExpectMsg("1");
_probe.ExpectMsg("2");
_probe.ExpectMsg<RecoveryCompleted>();
await _probe.ExpectMsgAsync("1");
await _probe.ExpectMsgAsync("2");
await _probe.ExpectMsgAsync<RecoveryCompleted>();
}

[Fact]
public async Task when_reject_on_write_is_set_all_writes_to_journal_will_be_rejected()
{
var actor = ActorOf(() => new PersistActor(_probe));
Watch(actor);
_probe.ExpectMsg<RecoveryCompleted>();
await _probe.ExpectMsgAsync<RecoveryCompleted>();

await Journal.OnWrite.Reject();
actor.Tell(new PersistActor.WriteMessage("write"), TestActor);

_probe.ExpectMsg("rejected");
await _probe.ExpectMsgAsync("rejected");
}

[Fact]
public async Task journal_must_reset_state_to_pass()
{
await WithJournalWrite(write => write.Fail(), () =>
await WithJournalWrite(write => write.Fail(), async () =>
{
var actor = ActorOf(() => new PersistActor(_probe));
Watch(actor);
_probe.ExpectMsg<RecoveryCompleted>();
await _probe.ExpectMsgAsync<RecoveryCompleted>();
actor.Tell(new PersistActor.WriteMessage("write"), TestActor);
_probe.ExpectMsg("failure");
ExpectTerminated(actor);
await _probe.ExpectMsgAsync("failure");
await ExpectTerminatedAsync(actor);
});

var actor2 = ActorOf(() => new PersistActor(_probe));
Watch(actor2);

_probe.ExpectMsg<RecoveryCompleted>();
await _probe.ExpectMsgAsync<RecoveryCompleted>();
actor2.Tell(new PersistActor.WriteMessage("write"), TestActor);
_probe.ExpectMsg("ack");
await _probe.ExpectMsgAsync("ack");
}
}
}
34 changes: 17 additions & 17 deletions src/core/Akka.Persistence.TestKit.Tests/TestSnapshotStoreSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@ public TestSnapshotStoreSpec()
private readonly TestProbe _probe;

[Fact]
public void send_ack_after_load_interceptor_is_set()
public async Task send_ack_after_load_interceptor_is_set()
{
SnapshotsActorRef.Tell(new TestSnapshotStore.UseLoadInterceptor(null), TestActor);
ExpectMsg<TestSnapshotStore.Ack>();
await ExpectMsgAsync<TestSnapshotStore.Ack>();
}

[Fact]
public void send_ack_after_save_interceptor_is_set()
public async Task send_ack_after_save_interceptor_is_set()
{
SnapshotsActorRef.Tell(new TestSnapshotStore.UseSaveInterceptor(null), TestActor);
ExpectMsg<TestSnapshotStore.Ack>();
await ExpectMsgAsync<TestSnapshotStore.Ack>();
}

[Fact]
public void send_ack_after_delete_interceptor_is_set()
public async Task send_ack_after_delete_interceptor_is_set()
{
SnapshotsActorRef.Tell(new TestSnapshotStore.UseDeleteInterceptor(null), TestActor);
ExpectMsg<TestSnapshotStore.Ack>();
await ExpectMsgAsync<TestSnapshotStore.Ack>();
}

[Fact]
Expand All @@ -49,17 +49,17 @@ public async Task after_load_behavior_was_executed_store_is_back_to_pass_mode()
// create snapshot
var actor = ActorOf(() => new SnapshotActor(_probe));
actor.Tell("save");
_probe.ExpectMsg<SaveSnapshotSuccess>();
await _probe.ExpectMsgAsync<SaveSnapshotSuccess>();
await actor.GracefulStop(TimeSpan.FromSeconds(3));

await WithSnapshotLoad(load => load.Fail(), () =>
await WithSnapshotLoad(load => load.Fail(), async () =>
{
ActorOf(() => new SnapshotActor(_probe));
_probe.ExpectMsg<SnapshotActor.RecoveryFailure>();
await _probe.ExpectMsgAsync<SnapshotActor.RecoveryFailure>();
});

ActorOf(() => new SnapshotActor(_probe));
_probe.ExpectMsg<SnapshotOffer>();
await _probe.ExpectMsgAsync<SnapshotOffer>();
}

[Fact]
Expand All @@ -68,14 +68,14 @@ public async Task after_save_behavior_was_executed_store_is_back_to_pass_mode()
// create snapshot
var actor = ActorOf(() => new SnapshotActor(_probe));

await WithSnapshotSave(save => save.Fail(), () =>
await WithSnapshotSave(save => save.Fail(), async () =>
{
actor.Tell("save");
_probe.ExpectMsg<SaveSnapshotFailure>();
await _probe.ExpectMsgAsync<SaveSnapshotFailure>();
});

actor.Tell("save");
_probe.ExpectMsg<SaveSnapshotSuccess>();
await _probe.ExpectMsgAsync<SaveSnapshotSuccess>();
}

[Fact]
Expand All @@ -85,17 +85,17 @@ public async Task after_delete_behavior_was_executed_store_is_back_to_pass_mode(
var actor = ActorOf(() => new SnapshotActor(_probe));
actor.Tell("save");

var success = _probe.ExpectMsg<SaveSnapshotSuccess>();
var success = await _probe.ExpectMsgAsync<SaveSnapshotSuccess>();
var nr = success.Metadata.SequenceNr;

await WithSnapshotDelete(del => del.Fail(), () =>
await WithSnapshotDelete(del => del.Fail(), async () =>
{
actor.Tell(new SnapshotActor.DeleteOne(nr), TestActor);
_probe.ExpectMsg<DeleteSnapshotFailure>();
await _probe.ExpectMsgAsync<DeleteSnapshotFailure>();
});

actor.Tell(new SnapshotActor.DeleteOne(nr), TestActor);
_probe.ExpectMsg<DeleteSnapshotSuccess>();
await _probe.ExpectMsgAsync<DeleteSnapshotSuccess>();
}
}
}

0 comments on commit 91bf88c

Please sign in to comment.