Skip to content

Commit

Permalink
Almost all exception handling. #21
Browse files Browse the repository at this point in the history
  • Loading branch information
veblush committed May 20, 2016
1 parent 505617f commit d19d123
Show file tree
Hide file tree
Showing 22 changed files with 971 additions and 241 deletions.
133 changes: 133 additions & 0 deletions core/Akka.Interfaced.Tests/ActorTaskCancellation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using Akka.Actor;
using Xunit;

namespace Akka.Interfaced.Tests
{
public class TaskCancellationActor : InterfacedActor, IWorker, IExtendedInterface<ISubjectObserver>
{
private LogBoard _log;
private int _delay;

public TaskCancellationActor(LogBoard log, int delay)
{
_log = log;
_delay = delay;
}

[MessageHandler]
private void Handle(string id)
{
throw new Exception("");
}

[MessageHandler, Reentrant]
private async Task Handle(int id)
{
_log.Log($"Handle({id})");

if (_delay == 0)
await Task.Yield();
else
await Task.Delay(_delay, CancellationToken);

_log.Log($"Handle({id}) Done");
}

Task IWorker.Atomic(int id)
{
return Task.FromResult(id);
}

[Reentrant]
async Task IWorker.Reentrant(int id)
{
_log.Log($"Reentrant({id})");

if (_delay == 0)
await Task.Yield();
else
await Task.Delay(_delay, CancellationToken);

_log.Log($"Reentrant({id}) Done");
}

[ExtendedHandler, Reentrant]
private async Task Event(string eventName)
{
_log.Log($"Event({eventName})");

if (_delay == 0)
await Task.Yield();
else
await Task.Delay(_delay, CancellationToken);

_log.Log($"Event({eventName}) Done");
}
}

public class ActorTaskCancellation : Akka.TestKit.Xunit2.TestKit
{
public ActorTaskCancellation()
: base("akka.actor.guardian-supervisor-strategy = \"Akka.Actor.StoppingSupervisorStrategy\"")
{
}

[Fact]
public async Task TaskInRequest_WhenActorStop_Cancelled()
{
var log = new LogBoard();
var worker = new WorkerRef(ActorOf(() => new TaskCancellationActor(log, 100)));

var exceptionTask = Record.ExceptionAsync(() => worker.Reentrant(1));
worker.Actor.Tell("E");
Assert.IsType<InterfacedRequestException>(await exceptionTask);

Watch(worker.Actor);
ExpectTerminated(worker.Actor);
await Task.Delay(100);

Assert.Equal(new[] { "Reentrant(1)" },
log.GetAndClearLogs());
}

[Fact]
public async Task TaskInNotification_WhenActorStop_Cancelled()
{
var log = new LogBoard();

var subjectActor = ActorOfAsTestActorRef<SubjectActor>("Subject");
var subject = new SubjectRef(subjectActor);
var observingActor = ActorOf(() => new TaskCancellationActor(log, 100));
await subject.Subscribe(new SubjectObserver(observingActor));

await subject.MakeEvent("E");
observingActor.Tell("E");

Watch(observingActor);
ExpectTerminated(observingActor);
await Task.Delay(100);
Assert.Equal(new[] { "Event(E)" },
log.GetAndClearLogs());
}

[Fact]
public async Task TaskInMessage_WhenActorStop_Cancelled()
{
var log = new LogBoard();
var worker = new WorkerRef(ActorOf(() => new TaskCancellationActor(log, 100)));

worker.Actor.Tell(1);
worker.Actor.Tell("E");

Watch(worker.Actor);
ExpectTerminated(worker.Actor);
await Task.Delay(100);
Assert.Equal(new[] { "Handle(1)" },
log.GetAndClearLogs());
}
}
}
6 changes: 5 additions & 1 deletion core/Akka.Interfaced.Tests/Akka.Interfaced.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@
</Reference>
</ItemGroup>
<ItemGroup>
<Compile Include="ExceptionHandle.cs" />
<Compile Include="ActorTaskCancellation.cs" />
<Compile Include="ExceptionHandle.Message.cs" />
<Compile Include="ExceptionHandle.Notification.cs" />
<Compile Include="ExceptionHandle.Request.cs" />
<Compile Include="ExceptionHandle.StartStop.cs" />
<Compile Include="LogBoard.cs" />
<Compile Include="MessageFilterAsync.cs" />
<Compile Include="MessageFilterFactory.cs" />
Expand Down
137 changes: 137 additions & 0 deletions core/Akka.Interfaced.Tests/ExceptionHandle.Message.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using System.Threading;
using Akka.Actor;
using Xunit;

namespace Akka.Interfaced.Tests
{
public class ExceptionActor_Message : InterfacedActor
{
private LogBoard _log;

public ExceptionActor_Message(LogBoard log)
{
_log = log;
}

[MessageHandler]
private void Handle(string message)
{
_log.Log($"Handle({message})");
if (message == "E")
throw new Exception();
}

[MessageHandler]
private async Task HandleAsync(int message)
{
_log.Log($"HandleAsync({message})");

if (message == 1)
throw new Exception();

await Task.Yield();

_log.Log($"HandleAsync({message}) Done");

if (message == 2)
throw new Exception();
}

[MessageHandler, Reentrant]
private async Task HandleReentrantAsync(long message)
{
_log.Log($"HandleReentrantAsync({message})");

if (message == 1)
throw new Exception();

await Task.Yield();

_log.Log($"HandleReentrantAsync({message}) Done");

if (message == 2)
throw new Exception();
}
}

public class ExceptionHandle_Message : Akka.TestKit.Xunit2.TestKit
{
public ExceptionHandle_Message()
: base("akka.actor.guardian-supervisor-strategy = \"Akka.Actor.StoppingSupervisorStrategy\"")
{
}

[Fact]
public void ExceptionThrown_At_Handle()
{
var log = new LogBoard();
var actor = ActorOf(Props.Create(() => new ExceptionActor_Message(log)));

actor.Tell("E");

Watch(actor);
ExpectTerminated(actor);
Assert.Equal(new[] { "Handle(E)" },
log.GetAndClearLogs());
}

[Fact]
public void ExceptionThrown_At_HandleAsync()
{
var log = new LogBoard();
var actor = ActorOf(Props.Create(() => new ExceptionActor_Message(log)));

actor.Tell(1);

Watch(actor);
ExpectTerminated(actor);
Assert.Equal(new[] { "HandleAsync(1)" },
log.GetAndClearLogs());
}

[Fact]
public void ExceptionThrown_At_HandleAsyncDone()
{
var log = new LogBoard();
var actor = ActorOf(Props.Create(() => new ExceptionActor_Message(log)));

actor.Tell(2);

Watch(actor);
ExpectTerminated(actor);
Assert.Equal(new[] { "HandleAsync(2)", "HandleAsync(2) Done" },
log.GetAndClearLogs());
}

[Fact]
public void ExceptionThrown_At_HandleReentrantAsync()
{
var log = new LogBoard();
var actor = ActorOf(Props.Create(() => new ExceptionActor_Message(log)));

actor.Tell(1L);

Watch(actor);
ExpectTerminated(actor);
Assert.Equal(new[] { "HandleReentrantAsync(1)" },
log.GetAndClearLogs());
}

[Fact]
public void ExceptionThrown_At_HandleReentrantAsyncDone()
{
var log = new LogBoard();
var actor = ActorOf(Props.Create(() => new ExceptionActor_Message(log)));

actor.Tell(2L);

Watch(actor);
ExpectTerminated(actor);
Assert.Equal(new[] { "HandleReentrantAsync(2)", "HandleReentrantAsync(2) Done" },
log.GetAndClearLogs());
}
}
}
Loading

0 comments on commit d19d123

Please sign in to comment.