Skip to content

Commit

Permalink
Make actor responses optional by default, added ConsistentHash/RoundR…
Browse files Browse the repository at this point in the history
…obin reply routers
  • Loading branch information
andresgutierrez committed Oct 17, 2023
1 parent 727a739 commit e8463dd
Show file tree
Hide file tree
Showing 23 changed files with 394 additions and 82 deletions.
6 changes: 3 additions & 3 deletions Nixie.Tests/Actors/PingPongActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ public PongActor(IActorContext<PongActor, string, string> _)

}

public Task<string> Receive(string message)
public Task<string?> Receive(string message)
{
return Task.FromResult(message);
return Task.FromResult<string?>(message);
}
}

Expand All @@ -35,7 +35,7 @@ public void IncrMessage()
receivedMessages++;
}

public async Task<string> Receive(string message)
public async Task<string?> Receive(string message)
{
IncrMessage();

Expand Down
4 changes: 2 additions & 2 deletions Nixie.Tests/Actors/ReplyActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ public void IncrMessage(string id)
receivedMessages[id]++;
}

public Task<string> Receive(string message)
public Task<string?> Receive(string message)
{
IncrMessage(message);

return Task.FromResult(message);
return Task.FromResult<string?>(message);
}
}
2 changes: 1 addition & 1 deletion Nixie.Tests/Actors/ReplySlowActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void IncrMessage(string id)
receivedMessages[id]++;
}

public async Task<string> Receive(string message)
public async Task<string?> Receive(string message)
{
IncrMessage(message);

Expand Down
21 changes: 8 additions & 13 deletions Nixie.Tests/Actors/RouteeActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,15 @@ public RouterMessage(RouterMessageType type, string data)

public int GetHash()
{
ulong hash = 14695981039346656037;

ReadOnlySpan<char> spanBytes = Data.AsSpan();

for (int i = 0; i < spanBytes.Length; i++)
return Data switch
{
unchecked
{
hash ^= spanBytes[i];
hash *= 0x100000001b3;
}
}

return Math.Abs((int)hash);
"aaa" => 0,
"bbb" => 1,
"ccc" => 2,
"ddd" => 3,
"eee" => 4,
_ => 0
};
}
}

Expand Down
42 changes: 42 additions & 0 deletions Nixie.Tests/Actors/RouteeReplyActor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

namespace Nixie.Tests.Actors;

public sealed class RouterResponse
{
public string Data { get; }

public RouterResponse(string data)
{
Data = data;
}
}

public sealed class RouteeReplyActor : IActor<RouterMessage, RouterResponse>
{
private int receivedMessages;

private readonly IActorContext<RouteeReplyActor, RouterMessage, RouterResponse> context;

public RouteeReplyActor(IActorContext<RouteeReplyActor, RouterMessage, RouterResponse> context)
{
this.context = context;
}

public int GetMessages()
{
return receivedMessages;
}

public void IncrMessage()
{
receivedMessages++;
}

public Task<RouterResponse?> Receive(RouterMessage message)
{
if (message.Type == RouterMessageType.Route)
IncrMessage();

return Task.FromResult<RouterResponse?>(new RouterResponse(message.Data));
}
}
99 changes: 94 additions & 5 deletions Nixie.Tests/TestRouters.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

using Nixie.Routers;
using Nixie.Tests.Actors;
using System.Diagnostics.Metrics;

namespace Nixie.Tests;

Expand Down Expand Up @@ -62,6 +63,33 @@ public async void TestCreateRoundRobinRouterSlowSend()
}
}

[Fact]
public async void TestCreateRoundRobinRouterReply()
{
using ActorSystem asx = new();

IActorRef<RoundRobinActor<RouteeReplyActor, RouterMessage, RouterResponse>, RouterMessage, RouterResponse> router =
asx.Spawn<RoundRobinActor<RouteeReplyActor, RouterMessage, RouterResponse>, RouterMessage, RouterResponse>("my-router", 5);

await router.Ask(new RouterMessage(RouterMessageType.Route, "aaa"));
await router.Ask(new RouterMessage(RouterMessageType.Route, "bbb"));
await router.Ask(new RouterMessage(RouterMessageType.Route, "ccc"));
await router.Ask(new RouterMessage(RouterMessageType.Route, "ddd"));
await router.Ask(new RouterMessage(RouterMessageType.Route, "eee"));

Assert.IsAssignableFrom<RoundRobinActor<RouteeReplyActor, RouterMessage, RouterResponse>>(router.Runner.Actor);

RoundRobinActor<RouteeReplyActor, RouterMessage, RouterResponse> routerActor = (RoundRobinActor<RouteeReplyActor, RouterMessage, RouterResponse>)router.Runner.Actor!;

foreach (IActorRef<RouteeReplyActor, RouterMessage, RouterResponse> routee in routerActor.Instances)
{
Assert.IsAssignableFrom<RouteeReplyActor>(routee.Runner.Actor);

RouteeReplyActor routeeActor = (RouteeReplyActor)routee.Runner.Actor!;
Assert.Equal(1, routeeActor.GetMessages());
}
}

[Fact]
public async void TestCreateConsistentHashRouter()
{
Expand All @@ -70,11 +98,11 @@ public async void TestCreateConsistentHashRouter()
IActorRef<ConsistentHashActor<RouteeActor, RouterMessage>, RouterMessage> router =
asx.Spawn<ConsistentHashActor<RouteeActor, RouterMessage>, RouterMessage>("my-router", 5);

router.Send(new RouterMessage(RouterMessageType.Route, "cf3be82f-7509-4450-9fc6-747490b4696d")); // 2
router.Send(new RouterMessage(RouterMessageType.Route, "f9d4c32c-57ea-4826-a632-2bbaf349679e")); // 4
router.Send(new RouterMessage(RouterMessageType.Route, "11e27c2a-f8a1-4d8c-a350-7c81b5deb29e")); // 1
router.Send(new RouterMessage(RouterMessageType.Route, "35000df2-df97-4a90-96f9-78887965c9e8")); // 0
router.Send(new RouterMessage(RouterMessageType.Route, "eac37733-0ec9-440b-98af-553fb65b189f")); // 3
router.Send(new RouterMessage(RouterMessageType.Route, "aaa"));
router.Send(new RouterMessage(RouterMessageType.Route, "bbb"));
router.Send(new RouterMessage(RouterMessageType.Route, "ccc"));
router.Send(new RouterMessage(RouterMessageType.Route, "ddd"));
router.Send(new RouterMessage(RouterMessageType.Route, "eee"));

await asx.Wait();

Expand All @@ -90,4 +118,65 @@ public async void TestCreateConsistentHashRouter()
Assert.Equal(1, routeeActor.GetMessages());
}
}

[Fact]
public async void TestCreateConsistentHashRouterReply()
{
using ActorSystem asx = new();

IActorRef<ConsistentHashActor<RouteeReplyActor, RouterMessage, RouterResponse>, RouterMessage, RouterResponse> router =
asx.Spawn<ConsistentHashActor<RouteeReplyActor, RouterMessage, RouterResponse>, RouterMessage, RouterResponse>("my-router", 5);

await router.Ask(new RouterMessage(RouterMessageType.Route, "aaa"));
await router.Ask(new RouterMessage(RouterMessageType.Route, "bbb"));
await router.Ask(new RouterMessage(RouterMessageType.Route, "ccc"));
await router.Ask(new RouterMessage(RouterMessageType.Route, "ddd"));
await router.Ask(new RouterMessage(RouterMessageType.Route, "eee"));

await asx.Wait();

Assert.IsAssignableFrom<ConsistentHashActor<RouteeReplyActor, RouterMessage, RouterResponse>>(router.Runner.Actor);

ConsistentHashActor<RouteeReplyActor, RouterMessage, RouterResponse> routerActor = (ConsistentHashActor<RouteeReplyActor, RouterMessage, RouterResponse>)router.Runner.Actor!;

foreach (IActorRef<RouteeReplyActor, RouterMessage, RouterResponse> routee in routerActor.Instances)
{
Assert.IsAssignableFrom<RouteeReplyActor>(routee.Runner.Actor);

RouteeReplyActor routeeActor = (RouteeReplyActor)routee.Runner.Actor!;
Assert.Equal(1, routeeActor.GetMessages());
}
}

[Fact]
public async void TestCreateConsistentHashRouterReplyParallel()
{
using ActorSystem asx = new();

IActorRef<ConsistentHashActor<RouteeReplyActor, RouterMessage, RouterResponse>, RouterMessage, RouterResponse> router =
asx.Spawn<ConsistentHashActor<RouteeReplyActor, RouterMessage, RouterResponse>, RouterMessage, RouterResponse>("my-router", 5);

Task[] tasks = new Task[5]
{
router.Ask(new RouterMessage(RouterMessageType.Route, "aaa")),
router.Ask(new RouterMessage(RouterMessageType.Route, "bbb")),
router.Ask(new RouterMessage(RouterMessageType.Route, "ccc")),
router.Ask(new RouterMessage(RouterMessageType.Route, "ddd")),
router.Ask(new RouterMessage(RouterMessageType.Route, "eee")),
};

await Task.WhenAll(tasks);

Assert.IsAssignableFrom<ConsistentHashActor<RouteeReplyActor, RouterMessage, RouterResponse>>(router.Runner.Actor);

ConsistentHashActor<RouteeReplyActor, RouterMessage, RouterResponse> routerActor = (ConsistentHashActor<RouteeReplyActor, RouterMessage, RouterResponse>)router.Runner.Actor!;

foreach (IActorRef<RouteeReplyActor, RouterMessage, RouterResponse> routee in routerActor.Instances)
{
Assert.IsAssignableFrom<RouteeReplyActor>(routee.Runner.Actor);

RouteeReplyActor routeeActor = (RouteeReplyActor)routee.Runner.Actor!;
Assert.Equal(1, routeeActor.GetMessages());
}
}
}
14 changes: 13 additions & 1 deletion Nixie/ActorContextReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ namespace Nixie;
/// Represents an actor context. This class is passed to the actor when it is created.
/// It can be used to create other actors or get the sender and the actor system.
/// </summary>
public sealed class ActorContext<TActor, TRequest, TResponse> : IActorContext<TActor, TRequest, TResponse> where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class
public sealed class ActorContext<TActor, TRequest, TResponse> : IActorContext<TActor, TRequest, TResponse>
where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class?
{
private readonly ActorSystem actorSystem;

Expand Down Expand Up @@ -35,6 +36,17 @@ public sealed class ActorContext<TActor, TRequest, TResponse> : IActorContext<TA
/// </summary>
public IGenericActorRef? Sender { get; set; }

/// <summary>
/// Returns a reference to the current reply object
/// </summary>
public ActorMessageReply<TRequest, TResponse>? Reply { get; set; }

/// <summary>
/// Indicates if the response of the current invocation must be by passed
/// to allow other consumer to set the response
/// </summary>
public bool ByPassReply { get; set; }

/// <summary>
/// Creates a new actor context.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion Nixie/ActorMessageReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace Nixie;
/// <typeparam name="TResponse"></typeparam>
public sealed record ActorMessageReply<TRequest, TResponse>
{
private int completed = 1;
private volatile int completed = 1;

/// <summary>
/// Returns the request of the message.
Expand Down
24 changes: 17 additions & 7 deletions Nixie/ActorRefReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace Nixie;
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
public sealed class ActorRef<TActor, TRequest, TResponse> : IGenericActorRef, IActorRef<TActor, TRequest, TResponse>
where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class
where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class?
{
private readonly ActorRunner<TActor, TRequest, TResponse> runner;

Expand All @@ -34,7 +34,7 @@ public ActorRef(ActorRunner<TActor, TRequest, TResponse> runner)
/// <param name="message"></param>
public void Send(TRequest message)
{
runner.SendAndTryDeliver(message, null);
runner.SendAndTryDeliver(message, null, null);
}

/// <summary>
Expand All @@ -44,7 +44,17 @@ public void Send(TRequest message)
/// <param name="sender"></param>
public void Send(TRequest message, IGenericActorRef sender)
{
runner.SendAndTryDeliver(message, sender);
runner.SendAndTryDeliver(message, sender, null);
}

/// <summary>
/// Passes a message to the actor without expecting a response and specifying a parent promise
/// </summary>
/// <param name="message"></param>
/// <param name="parentPromise"></param>
public void Send(TRequest message, ActorMessageReply<TRequest, TResponse>? parentPromise)
{
runner.SendAndTryDeliver(message, null, parentPromise);
}

/// <summary>
Expand All @@ -54,7 +64,7 @@ public void Send(TRequest message, IGenericActorRef sender)
/// <returns></returns>
public async Task<TResponse?> Ask(TRequest message)
{
ActorMessageReply<TRequest, TResponse> promise = runner.SendAndTryDeliver(message, null);
ActorMessageReply<TRequest, TResponse> promise = runner.SendAndTryDeliver(message, null, null);

while (!promise.IsCompleted)
await Task.Yield();
Expand All @@ -71,7 +81,7 @@ public void Send(TRequest message, IGenericActorRef sender)
/// <returns></returns>
public async Task<TResponse?> Ask(TRequest message, TimeSpan timeout)
{
ActorMessageReply<TRequest, TResponse> promise = runner.SendAndTryDeliver(message, null);
ActorMessageReply<TRequest, TResponse> promise = runner.SendAndTryDeliver(message, null, null);

Stopwatch stopwatch = Stopwatch.StartNew();

Expand All @@ -94,7 +104,7 @@ public void Send(TRequest message, IGenericActorRef sender)
/// <returns></returns>
public async Task<TResponse?> Ask(TRequest message, IGenericActorRef sender)
{
ActorMessageReply<TRequest, TResponse> promise = runner.SendAndTryDeliver(message, sender);
ActorMessageReply<TRequest, TResponse> promise = runner.SendAndTryDeliver(message, sender, null);

while (!promise.IsCompleted)
await Task.Yield();
Expand All @@ -112,7 +122,7 @@ public void Send(TRequest message, IGenericActorRef sender)
/// <returns></returns>
public async Task<TResponse?> Ask(TRequest message, IGenericActorRef sender, TimeSpan timeout)
{
ActorMessageReply<TRequest, TResponse> promise = runner.SendAndTryDeliver(message, sender);
ActorMessageReply<TRequest, TResponse> promise = runner.SendAndTryDeliver(message, sender, null);

Stopwatch stopwatch = Stopwatch.StartNew();

Expand Down
3 changes: 2 additions & 1 deletion Nixie/ActorRepositoryReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace Nixie;
/// <typeparam name="TActor"></typeparam>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
public sealed class ActorRepository<TActor, TRequest, TResponse> : IActorRepositoryRunnable where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class
public sealed class ActorRepository<TActor, TRequest, TResponse> : IActorRepositoryRunnable
where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class?
{
private readonly ActorSystem actorSystem;

Expand Down

0 comments on commit e8463dd

Please sign in to comment.