Skip to content

Commit

Permalink
Refactor Props (#5416)
Browse files Browse the repository at this point in the history
* clean and seal props

* make producer singleton

* add legacy support

* update api

* add producer type compare

* fix props equals

* fix test for invalid props expression

* fix racy test and testkit

* fix thread sleep

* lower condition interval

* fix await done async

* improve logging startup check

* enable inline execution

* fix racy sync-over-async-over-sync  call

* add actor path address validation

* fix racy starup in logging

* fix DefaultAddress NRE

* wait on provider ready

* remove useless memory barrier

* update api

* add testkit log test on startup

* remove await log message hack

Co-authored-by: Aaron Stannard <aaron@petabridge.com>
  • Loading branch information
Zetanova and Aaronontheweb committed Dec 14, 2021
1 parent bea6634 commit 12c26c2
Show file tree
Hide file tree
Showing 24 changed files with 289 additions and 383 deletions.
33 changes: 14 additions & 19 deletions src/contrib/dependencyinjection/Akka.DI.Core/DIActorProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ namespace Akka.DI.Core
/// <summary>
/// This class represents an actor creation strategy that uses dependency injection (DI) to resolve and instantiate actors based on their type.
/// </summary>
public class DIActorProducer : IIndirectActorProducer
public sealed class DIActorProducer : IIndirectActorProducerWithActorType
{
private IDependencyResolver dependencyResolver;
private Type actorType;
private readonly IDependencyResolver _dependencyResolver;
private readonly Type _actorType;
private readonly Func<ActorBase> _actorFactory;

readonly Func<ActorBase> actorFactory;
public Type ActorType => _actorType;

/// <summary>
/// Initializes a new instance of the <see cref="DIActorProducer"/> class.
Expand All @@ -33,26 +34,20 @@ public DIActorProducer(IDependencyResolver dependencyResolver, Type actorType)
if (dependencyResolver == null) throw new ArgumentNullException(nameof(dependencyResolver), $"DIActorProducer requires {nameof(dependencyResolver)} to be provided");
if (actorType == null) throw new ArgumentNullException(nameof(actorType), $"DIActorProducer requires {nameof(actorType)} to be provided");

this.dependencyResolver = dependencyResolver;
this.actorType = actorType;
this.actorFactory = dependencyResolver.CreateActorFactory(actorType);
_dependencyResolver = dependencyResolver;
_actorType = actorType;
_actorFactory = dependencyResolver.CreateActorFactory(actorType);
}

/// <summary>
/// Retrieves the type of the actor to produce.
/// </summary>
public Type ActorType
{
get { return this.actorType; }
}


/// <summary>
/// Creates an actor based on the container's implementation specific actor factory.
/// </summary>
/// <returns>An actor created by the container.</returns>
public ActorBase Produce()
public ActorBase Produce(Props props)
{
return actorFactory();
if (props.Type != _actorType)
throw new InvalidOperationException($"invalid actor type {props.Type}");
return _actorFactory();
}

/// <summary>
Expand All @@ -61,7 +56,7 @@ public ActorBase Produce()
/// <param name="actor">The actor to remove from the container.</param>
public void Release(ActorBase actor)
{
dependencyResolver.Release(actor);
_dependencyResolver.Release(actor);
}
}
}
2 changes: 1 addition & 1 deletion src/contrib/dependencyinjection/Akka.DI.Core/DIExt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void Initialize(IDependencyResolver dependencyResolver)
/// <returns>A <see cref="Akka.Actor.Props"/> configuration object for the given actor type.</returns>
public Props Props(Type actorType)
{
return new Props(typeof(DIActorProducer), new object[] { dependencyResolver, actorType });
return Akka.Actor.Props.CreateBy(new DIActorProducer(dependencyResolver, actorType), actorType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public static ServiceProvider For(ActorSystem actorSystem)
/// <returns>A new <see cref="Akka.Actor.Props"/> instance which uses DI internally.</returns>
public Props Props<T>(params object[] args) where T : ActorBase
{
return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer<T>(Provider, args));
return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer(Provider), typeof(T), args);
}
}

Expand Down Expand Up @@ -93,43 +93,23 @@ public override ServiceProvider CreateExtension(ExtendedActorSystem system)
///
/// Used to create actors via the <see cref="ActivatorUtilities"/>.
/// </summary>
internal class ServiceProviderActorProducer : IIndirectActorProducer
internal sealed class ServiceProviderActorProducer : IIndirectActorProducer
{
private readonly IServiceProvider _provider;
private readonly object[] _args;

public ServiceProviderActorProducer(IServiceProvider provider, Type actorType, object[] args)

public ServiceProviderActorProducer(IServiceProvider provider)
{
_provider = provider;
_args = args;
ActorType = actorType;
}

public ActorBase Produce()
public ActorBase Produce(Props props)
{
return (ActorBase)ActivatorUtilities.CreateInstance(_provider, ActorType, _args);
return (ActorBase)ActivatorUtilities.CreateInstance(_provider, props.Type, props.Arguments);
}

public Type ActorType { get; }

public void Release(ActorBase actor)
{
// no-op
}
}

/// <summary>
/// INTERNAL API
///
/// Used to create actors via the <see cref="ActivatorUtilities"/>.
/// </summary>
/// <typeparam name="TActor">the actor type</typeparam>
internal class ServiceProviderActorProducer<TActor> : ServiceProviderActorProducer where TActor:ActorBase
{

public ServiceProviderActorProducer(IServiceProvider provider, object[] args)
: base(provider, typeof(TActor), args)
{
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ namespace Akka.DependencyInjection
/// </summary>
public class ServiceProviderDependencyResolver : IDependencyResolver
{
private readonly ServiceProviderActorProducer _producer;

public IServiceProvider ServiceProvider { get; }

public ServiceProviderDependencyResolver(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
_producer = new ServiceProviderActorProducer(ServiceProvider);
}

public IResolverScope CreateScope()
Expand All @@ -43,7 +46,7 @@ public object GetService(Type type)
public Props Props(Type type, params object[] args)
{
if(typeof(ActorBase).IsAssignableFrom(type))
return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer(ServiceProvider, type, args));
return Akka.Actor.Props.CreateBy(_producer, type, args);
throw new ArgumentException(nameof(type), $"[{type}] does not implement Akka.Actor.ActorBase.");
}

Expand All @@ -54,7 +57,7 @@ public Props Props(Type type)

public Props Props<T>(params object[] args) where T : ActorBase
{
return Akka.Actor.Props.CreateBy(new ServiceProviderActorProducer<T>(ServiceProvider, args));
return Akka.Actor.Props.CreateBy(_producer, typeof(T), args);
}
}

Expand Down
37 changes: 17 additions & 20 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ namespace Akka.Actor
protected override void PreStart() { }
protected override bool Receive(object message) { }
}
public class HashedWheelTimerScheduler : Akka.Actor.SchedulerBase, Akka.Actor.IDateTimeOffsetNowTimeProvider, Akka.Actor.ITimeProvider, System.IDisposable
public sealed class HashedWheelTimerScheduler : Akka.Actor.SchedulerBase, Akka.Actor.IDateTimeOffsetNowTimeProvider, Akka.Actor.ITimeProvider, System.IDisposable
{
public HashedWheelTimerScheduler(Akka.Configuration.Config scheduler, Akka.Event.ILoggingAdapter log) { }
public override System.TimeSpan HighResMonotonicClock { get; }
Expand Down Expand Up @@ -1072,10 +1072,14 @@ namespace Akka.Actor
}
public interface IIndirectActorProducer
{
System.Type ActorType { get; }
Akka.Actor.ActorBase Produce();
Akka.Actor.ActorBase Produce(Akka.Actor.Props props);
void Release(Akka.Actor.ActorBase actor);
}
[System.ObsoleteAttribute("Do not use this interface")]
public interface IIndirectActorProducerWithActorType : Akka.Actor.IIndirectActorProducer
{
System.Type ActorType { get; }
}
public interface IInternalActor
{
Akka.Actor.IActorContext ActorContext { get; }
Expand Down Expand Up @@ -1409,31 +1413,29 @@ namespace Akka.Actor
public System.Exception RestartException { get; }
public override void GetObjectData(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context) { }
}
public class Props : Akka.Util.ISurrogated, System.IEquatable<Akka.Actor.Props>
public sealed class Props : Akka.Util.ISurrogated, System.IEquatable<Akka.Actor.Props>
{
public static readonly Akka.Actor.Props None;
protected Props() { }
protected Props(Akka.Actor.Props copy) { }
public Props(System.Type type, object[] args) { }
public Props(System.Type type) { }
public Props(System.Type type, Akka.Actor.SupervisorStrategy supervisorStrategy, System.Collections.Generic.IEnumerable<object> args) { }
public Props(System.Type type, Akka.Actor.SupervisorStrategy supervisorStrategy, params object[] args) { }
public Props(Akka.Actor.Deploy deploy, System.Type type, System.Collections.Generic.IEnumerable<object> args) { }
public Props(Akka.Actor.Deploy deploy, System.Type type, params object[] args) { }
public object[] Arguments { get; }
public Akka.Actor.Deploy Deploy { get; set; }
public Akka.Actor.Deploy Deploy { get; }
[Newtonsoft.Json.JsonIgnoreAttribute()]
public string Dispatcher { get; }
public static Akka.Actor.Props Empty { get; }
[Newtonsoft.Json.JsonIgnoreAttribute()]
public string Mailbox { get; }
[Newtonsoft.Json.JsonIgnoreAttribute()]
public Akka.Routing.RouterConfig RouterConfig { get; }
public Akka.Actor.SupervisorStrategy SupervisorStrategy { get; set; }
public Akka.Actor.SupervisorStrategy SupervisorStrategy { get; }
public static Akka.Actor.Props Terminated { get; }
[Newtonsoft.Json.JsonIgnoreAttribute()]
public System.Type Type { get; }
public string TypeName { get; }
protected virtual Akka.Actor.Props Copy() { }
public static Akka.Actor.Props Create<TActor>(System.Linq.Expressions.Expression<System.Func<TActor>> factory, Akka.Actor.SupervisorStrategy supervisorStrategy = null)
where TActor : Akka.Actor.ActorBase { }
public static Akka.Actor.Props Create<TActor>(params object[] args)
Expand All @@ -1443,20 +1445,20 @@ namespace Akka.Actor
public static Akka.Actor.Props Create(System.Type type, params object[] args) { }
[System.ObsoleteAttribute("Do not use this method. Call CreateBy(IIndirectActorProducer, params object[] arg" +
"s) instead")]
public static Akka.Actor.Props CreateBy<TProducer>(params object[] args)
public static Akka.Actor.Props CreateBy<TProducer>(System.Type type, params object[] args)
where TProducer : class, Akka.Actor.IIndirectActorProducer { }
public static Akka.Actor.Props CreateBy(Akka.Actor.IIndirectActorProducer producer, params object[] args) { }
public static Akka.Actor.Props CreateBy(Akka.Actor.IIndirectActorProducer producer, System.Type type, params object[] args) { }
public bool Equals(Akka.Actor.Props other) { }
public override bool Equals(object obj) { }
public override int GetHashCode() { }
public virtual Akka.Actor.ActorBase NewActor() { }
public Akka.Actor.ActorBase NewActor() { }
public Akka.Util.ISurrogate ToSurrogate(Akka.Actor.ActorSystem system) { }
public Akka.Actor.Props WithDeploy(Akka.Actor.Deploy deploy) { }
public Akka.Actor.Props WithDispatcher(string dispatcher) { }
public Akka.Actor.Props WithMailbox(string mailbox) { }
public Akka.Actor.Props WithRouter(Akka.Routing.RouterConfig routerConfig) { }
public Akka.Actor.Props WithSupervisorStrategy(Akka.Actor.SupervisorStrategy supervisorStrategy) { }
public class PropsSurrogate : Akka.Util.ISurrogate
public sealed class PropsSurrogate : Akka.Util.ISurrogate
{
public PropsSurrogate() { }
public object[] Arguments { get; set; }
Expand Down Expand Up @@ -1783,11 +1785,6 @@ namespace Akka.Actor
public override int GetHashCode() { }
public override string ToString() { }
}
public class TerminatedProps : Akka.Actor.Props
{
public TerminatedProps() { }
public override Akka.Actor.ActorBase NewActor() { }
}
[System.ObsoleteAttribute("TypedActor in its current shape will be removed in v1.5")]
public abstract class TypedActor : Akka.Actor.ActorBase
{
Expand Down Expand Up @@ -5033,7 +5030,7 @@ namespace Akka.Util
protected Resolve() { }
public abstract System.Type ActorType { get; }
protected static Akka.Util.IResolver Resolver { get; }
public abstract Akka.Actor.ActorBase Produce();
public abstract Akka.Actor.ActorBase Produce(Akka.Actor.Props props);
public void Release(Akka.Actor.ActorBase actor) { }
public static void SetResolver(Akka.Util.IResolver resolver) { }
}
Expand All @@ -5043,7 +5040,7 @@ namespace Akka.Util
public Resolve(params object[] args) { }
public override System.Type ActorType { get; }
public object[] Arguments { get; }
public override Akka.Actor.ActorBase Produce() { }
public override Akka.Actor.ActorBase Produce(Akka.Actor.Props props) { }
}
public class static Result
{
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.FSharp.Tests/ApiTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ let ``cannot spawn actor with simple expr args from expression`` () =
let system = Configuration.load() |> System.create "test"
// this formulation is supported in FsApi's expression evaluator, however the checks in Props.Create
// do not support this, so we test that we can evaluate this but not actually run it, as a proof of concept
Assert.Throws<InvalidCastException>(fun () ->
Assert.Throws<ArgumentException>(fun () ->
let actor = spawnObj system "test-actor" <@ fun () ->
let arg1 = 1
let arg2 = true
Expand Down
9 changes: 6 additions & 3 deletions src/core/Akka.Remote.Tests/RemotingSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ public void Remoting_must_create_by_IndirectActorProducer()
{
try
{
var r = Sys.ActorOf(Props.CreateBy(new TestResolver<Echo2>()), "echo");
var r = Sys.ActorOf(Props.CreateBy(new TestResolver<Echo2>(), typeof(Echo2)), "echo");
Assert.Equal("akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/echo", r.Path.ToString());
}
finally
Expand All @@ -426,7 +426,7 @@ public void Remoting_must_create_by_IndirectActorProducer_and_ping()
{
try
{
var r = Sys.ActorOf(Props.CreateBy(new TestResolver<Echo2>()), "echo");
var r = Sys.ActorOf(Props.CreateBy(new TestResolver<Echo2>(), typeof(Echo2)), "echo");
Assert.Equal("akka.test://remote-sys@localhost:12346/remote/akka.test/RemotingSpec@localhost:12345/user/echo", r.Path.ToString());
r.Tell("ping", TestActor);
ExpectMsg(("pong", TestActor), TimeSpan.FromSeconds(1.5));
Expand Down Expand Up @@ -911,8 +911,11 @@ public TestResolver(params object[] args)
_args = args;
}

public ActorBase Produce()
public ActorBase Produce(Props props)
{
if (props.Type != ActorType)
throw new InvalidOperationException("invalid actor type");

return (ActorBase)Activator.CreateInstance(ActorType, _args);
}

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public ActorPath RootPath
public Deployer Deployer { get; protected set; }

/// <inheritdoc/>
public Address DefaultAddress { get { return Transport.DefaultAddress; } }
public Address DefaultAddress { get { return Transport?.DefaultAddress; } }

private Information _serializationInformationCache;

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/FutureFlattenSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ public void TaskSource_must_fail_when_the_task_source_materialization_fails()
var (innerSourceMat, outerSinkMat) = Source.FromTaskSource(inner).ToMaterialized(Sink.Seq<int>(), Keep.Both).Run(_materializer);
// wait until the underlying tasks are completed
Thread.Sleep(100);
AwaitCondition(() => outerSinkMat.IsFaulted && innerSourceMat.IsFaulted, TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(100));
outerSinkMat.Exception.Flatten().InnerException.Should().Be(new TestException("INNER_FAILED"));
innerSourceMat.Exception.Flatten().InnerException.Should().Be(new TestException("INNER_FAILED"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,17 @@ public void SpecifiedNumbersOfExceptionsCanBeIntercepted()
[Fact]
public void ShouldFailIfMoreExceptionsThenSpecifiedAreLogged()
{
//todo fix logging race
var exception = XAssert.Throws<TrueException>(() =>
{
EventFilter.Exception<SomeException>().Expect(2, () =>
{
Log.Error(new SomeException(), "whatever");
Log.Error(new SomeException(), "whatever");
Log.Error(new SomeException(), "whatever");
}));
});
});

Assert.Contains("1 message too many", exception.Message, StringComparison.OrdinalIgnoreCase);
}

Expand Down

0 comments on commit 12c26c2

Please sign in to comment.