Skip to content

Commit

Permalink
Fixed ping/pong actor
Browse files Browse the repository at this point in the history
  • Loading branch information
andresgutierrez committed Oct 16, 2023
1 parent 5c0300c commit 32f488a
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 32 deletions.
16 changes: 10 additions & 6 deletions Nixie.Tests/Actors/PingPongActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@ public sealed class PongActor : IActor<string, string>
{
public PongActor(IActorContext<PongActor, string, string> _)
{

}

public Task<string> Receive(string message)
public async Task<string> Receive(string message)
{
return Task.FromResult(message);
await Task.Yield();

return message;

//return Task.FromResult(message);
}
}

public sealed class PingActor : IActor<string, string>
{
{
private readonly IActorRef<PongActor, string, string> pongRef;

private int receivedMessages;
Expand All @@ -31,8 +35,8 @@ public int GetMessages()
}

public void IncrMessage()
{
receivedMessages++;
{
receivedMessages++;
}

public async Task<string> Receive(string message)
Expand Down
6 changes: 4 additions & 2 deletions Nixie/ActorMessageReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ namespace Nixie;
/// <typeparam name="TResponse"></typeparam>
public sealed record ActorMessageReply<TRequest, TResponse>
{
private int completed = 1;

/// <summary>
/// Returns the request of the message.
/// </summary>
Expand All @@ -21,7 +23,7 @@ public sealed record ActorMessageReply<TRequest, TResponse>
/// <summary>
/// Returns true if the response has been set.
/// </summary>
public bool IsCompleted { get; private set; }
public bool IsCompleted => completed == 0;

/// <summary>
/// Constructor
Expand All @@ -39,7 +41,7 @@ public ActorMessageReply(TRequest request)
public void SetCompleted(TResponse? response)
{
Response = response;
IsCompleted = true;
Interlocked.Exchange(ref completed, 0);
}
}

6 changes: 3 additions & 3 deletions Nixie/ActorRunnerReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public sealed class ActorRunner<TActor, TRequest, TResponse> where TActor : IAct
/// <summary>
/// The name/id of the actor.
/// </summary>
public string Name { get; }
public string Name { get; }

/// <summary>
/// The inbox of the actor.
Expand Down Expand Up @@ -56,7 +56,7 @@ public ActorRunner(string name)
/// <param name="message"></param>
/// <returns></returns>
public ActorMessageReply<TRequest, TResponse> SendAndTryDeliver(TRequest message)
{
{
ActorMessageReply<TRequest, TResponse> messageReply = new(message);

if (shutdown == 0)
Expand Down Expand Up @@ -106,6 +106,6 @@ private async Task DeliverMessages()
Console.WriteLine("{0}\n{1}", ex.Message, ex.StackTrace);
}
}
} while (shutdown == 1 && Interlocked.CompareExchange(ref processing, 1, 0) != 0);
} while (shutdown == 1 && (Interlocked.CompareExchange(ref processing, 1, 0) != 0));
}
}
55 changes: 34 additions & 21 deletions Nixie/ActorSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public sealed class ActorSystem : IDisposable
{
private readonly ActorScheduler scheduler = new();

private readonly ConcurrentDictionary<Type, IActorRepositoryRunnable> repositories = new();
private readonly ConcurrentDictionary<Type, Lazy<IActorRepositoryRunnable>> repositories = new();

/// <summary>
/// Returns the actor scheduler
Expand Down Expand Up @@ -147,15 +147,20 @@ public sealed class ActorSystem : IDisposable
/// <returns></returns>
public ActorRepository<TActor, TRequest, TResponse> GetRepository<TActor, TRequest, TResponse>()
where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class
{
if (!repositories.TryGetValue(typeof(TActor), out IActorRepositoryRunnable? unitOfWorker))
{
ActorRepository<TActor, TRequest, TResponse> newUnitOfWork = new(this);
repositories.TryAdd(typeof(TActor), newUnitOfWork);
return newUnitOfWork;
}
{
Lazy<IActorRepositoryRunnable> repository = repositories.GetOrAdd(
typeof(TActor),
(type) => new Lazy<IActorRepositoryRunnable>(() => CreateRepository<TActor, TRequest, TResponse>())
);

return (ActorRepository<TActor, TRequest, TResponse>)repository.Value;
}

return (ActorRepository<TActor, TRequest, TResponse>)unitOfWorker;
private ActorRepository<TActor, TRequest, TResponse> CreateRepository<TActor, TRequest, TResponse>()
where TActor : IActor<TRequest, TResponse> where TRequest : class where TResponse : class
{
ActorRepository<TActor, TRequest, TResponse> repository = new(this);
return repository;
}

/// <summary>
Expand All @@ -166,15 +171,20 @@ public sealed class ActorSystem : IDisposable
/// <returns></returns>
public ActorRepository<TActor, TRequest> GetRepository<TActor, TRequest>()
where TActor : IActor<TRequest> where TRequest : class
{
if (!repositories.TryGetValue(typeof(TActor), out IActorRepositoryRunnable? unitOfWorker))
{
ActorRepository<TActor, TRequest> newUnitOfWork = new(this);
repositories.TryAdd(typeof(TActor), newUnitOfWork);
return newUnitOfWork;
}
{
Lazy<IActorRepositoryRunnable> repository = repositories.GetOrAdd(
typeof(TActor),
(type) => new Lazy<IActorRepositoryRunnable>(() => CreateRepository<TActor, TRequest>())
);

return (ActorRepository<TActor, TRequest>)unitOfWorker;
return (ActorRepository<TActor, TRequest>)repository.Value;
}

private ActorRepository<TActor, TRequest> CreateRepository<TActor, TRequest>()
where TActor : IActor<TRequest> where TRequest : class
{
ActorRepository<TActor, TRequest> repository = new(this);
return repository;
}

/// <summary>
Expand Down Expand Up @@ -273,11 +283,14 @@ public async Task Wait()
{
bool completed = true;

foreach (KeyValuePair<Type, IActorRepositoryRunnable> repository in repositories)
{
//Console.WriteLine("{0} HP={1} IsP={2}", x.Key, x.Value.HasPendingMessages(), x.Value.IsProcessing());
foreach (KeyValuePair<Type, Lazy<IActorRepositoryRunnable>> repository in repositories)
{
Lazy<IActorRepositoryRunnable> lazyRepository = repository.Value;

if (!lazyRepository.IsValueCreated)
continue;

if (repository.Value.HasPendingMessages() || repository.Value.IsProcessing())
if (lazyRepository.Value.HasPendingMessages() || lazyRepository.Value.IsProcessing())
{
await Task.Yield();
completed = false;
Expand Down

0 comments on commit 32f488a

Please sign in to comment.