Skip to content

Commit

Permalink
Merge pull request #2876 from Aaronontheweb/actorpath-performance-imp…
Browse files Browse the repository at this point in the history
…rovements

Actorpath performance improvements
  • Loading branch information
Danthar committed Aug 3, 2017
2 parents 590db3c + 9fb3ae3 commit 3759b4a
Show file tree
Hide file tree
Showing 20 changed files with 342 additions and 223 deletions.
4 changes: 3 additions & 1 deletion src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ namespace Akka.Actor
protected ActorPath(Akka.Actor.Address address, string name) { }
protected ActorPath(Akka.Actor.ActorPath parentPath, string name, long uid) { }
public Akka.Actor.Address Address { get; }
public System.Collections.Generic.IReadOnlyList<string> Elements { get; }
public abstract System.Collections.Generic.IReadOnlyList<string> Elements { get; }
public string Name { get; }
public abstract Akka.Actor.ActorPath Parent { get; }
public abstract Akka.Actor.ActorPath Root { get; }
Expand Down Expand Up @@ -471,6 +471,7 @@ namespace Akka.Actor
public class ChildActorPath : Akka.Actor.ActorPath
{
public ChildActorPath(Akka.Actor.ActorPath parentPath, string name, long uid) { }
public override System.Collections.Generic.IReadOnlyList<string> Elements { get; }
public override Akka.Actor.ActorPath Parent { get; }
public override Akka.Actor.ActorPath Root { get; }
public override int CompareTo(Akka.Actor.ActorPath other) { }
Expand Down Expand Up @@ -1476,6 +1477,7 @@ namespace Akka.Actor
public class RootActorPath : Akka.Actor.ActorPath
{
public RootActorPath(Akka.Actor.Address address, string name = "") { }
public override System.Collections.Generic.IReadOnlyList<string> Elements { get; }
public override Akka.Actor.ActorPath Parent { get; }
[Newtonsoft.Json.JsonIgnoreAttribute()]
public override Akka.Actor.ActorPath Root { get; }
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Remote.Tests/Transport/AkkaProtocolSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Remote.Serialization;
using Akka.Remote.Transport;
using Akka.TestKit;
using Akka.Util.Internal;
Expand All @@ -30,7 +31,7 @@ public class AkkaProtocolSpec : AkkaSpec
Address remoteAddress = new Address("test", "testsystem2", "testhost2", 1234);
Address remoteAkkaAddress = new Address("akka.test", "testsystem2", "testhost2", 1234);

AkkaPduCodec codec = new AkkaPduProtobuffCodec();
private AkkaPduCodec codec;

SerializedMessage testMsg =
new SerializedMessage { SerializerId = 0, Message = ByteString.CopyFromUtf8("foo") };
Expand All @@ -47,6 +48,7 @@ public class AkkaProtocolSpec : AkkaSpec
public AkkaProtocolSpec()
: base(@"akka.test.default-timeout = 1.5 s")
{
codec = new AkkaPduProtobuffCodec(Sys);
testEnvelope = codec.ConstructMessage(localAkkaAddress, TestActor, testMsg);
testMsgPdu = codec.ConstructPayload(testEnvelope);

Expand Down
5 changes: 3 additions & 2 deletions src/core/Akka.Remote.Tests/Transport/GenericTransportSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Remote.Serialization;
using Akka.Remote.Transport;
using Akka.TestKit;
using Google.Protobuf;
Expand Down Expand Up @@ -47,7 +48,7 @@ private Akka.Remote.Transport.Transport WrapTransport(Akka.Remote.Transport.Tran
if (withAkkaProtocol) {
var provider = (RemoteActorRefProvider)((ExtendedActorSystem)Sys).Provider;

return new AkkaProtocolTransport(transport, Sys, new AkkaProtocolSettings(provider.RemoteSettings.Config), new AkkaPduProtobuffCodec());
return new AkkaProtocolTransport(transport, Sys, new AkkaProtocolSettings(provider.RemoteSettings.Config), new AkkaPduProtobuffCodec(Sys));
}

return transport;
Expand Down Expand Up @@ -153,7 +154,7 @@ public void Transport_must_successfully_send_PDUs()
handleB.ReadHandlerSource.SetResult(new ActorHandleEventListener(TestActor));

var payload = ByteString.CopyFromUtf8("PDU");
var pdu = withAkkaProtocol ? new AkkaPduProtobuffCodec().ConstructPayload(payload) : payload;
var pdu = withAkkaProtocol ? new AkkaPduProtobuffCodec(Sys).ConstructPayload(payload) : payload;

AwaitCondition(() => registry.ExistsAssociation(addressATest, addressBTest));

Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Remote/Endpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
using Akka.Dispatch.SysMsg;
using Akka.Event;
using Akka.Pattern;
using Akka.Remote.Serialization;
using Akka.Remote.Transport;
using Akka.Serialization;
using Akka.Util;
Expand Down Expand Up @@ -882,7 +883,7 @@ private IActorRef CreateWriter()
Context.ActorOf(RARP.For(Context.System)
.ConfigureDispatcher(
EndpointWriter.EndpointWriterProps(_currentHandle, _localAddress, _remoteAddress, _refuseUid, _transport,
_settings, new AkkaPduProtobuffCodec(), _receiveBuffers, Self)
_settings, new AkkaPduProtobuffCodec(Context.System), _receiveBuffers, Self)
.WithDeploy(Deploy.Local)),
"endpointWriter");
Context.Watch(writer);
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Remote/EndpointManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ private void HandleInboundAssociation(InboundAssociation ia, bool writerIsIdle)
//Apply AkkaProtocolTransport wrapper to the end of the chain
//The chain at this point:
// AkkaProtocolTransport <-- Adapter <-- .. <-- Adapter <-- Driver
transports.Add(new AkkaProtocolTransport(wrappedTransport, Context.System, new AkkaProtocolSettings(_conf), new AkkaPduProtobuffCodec()));
transports.Add(new AkkaProtocolTransport(wrappedTransport, Context.System, new AkkaProtocolSettings(_conf), new AkkaPduProtobuffCodec(Context.System)));
}

// Collect all transports, listen addresses, and listener promises in one Task
Expand Down Expand Up @@ -1212,7 +1212,7 @@ private void CreateAndRegisterEndpoint(AkkaProtocolHandle handle, int? refuseUid
Context.ActorOf(RARP.For(Context.System)
.ConfigureDispatcher(
ReliableDeliverySupervisor.ReliableDeliverySupervisorProps(handleOption, localAddress,
remoteAddress, refuseUid, transport, endpointSettings, new AkkaPduProtobuffCodec(),
remoteAddress, refuseUid, transport, endpointSettings, new AkkaPduProtobuffCodec(Context.System),
_receiveBuffers, endpointSettings.Dispatcher)
.WithDeploy(Deploy.Local)),
string.Format("reliableEndpointWriter-{0}-{1}", AddressUrlEncoder.Encode(remoteAddress),
Expand All @@ -1224,7 +1224,7 @@ private void CreateAndRegisterEndpoint(AkkaProtocolHandle handle, int? refuseUid
Context.ActorOf(RARP.For(Context.System)
.ConfigureDispatcher(
EndpointWriter.EndpointWriterProps(handleOption, localAddress, remoteAddress, refuseUid,
transport, endpointSettings, new AkkaPduProtobuffCodec(), _receiveBuffers,
transport, endpointSettings, new AkkaPduProtobuffCodec(Context.System), _receiveBuffers,
reliableDeliverySupervisor: null)
.WithDeploy(Deploy.Local)),
string.Format("endpointWriter-{0}-{1}", AddressUrlEncoder.Encode(remoteAddress), _endpointId.Next()));
Expand Down
19 changes: 18 additions & 1 deletion src/core/Akka.Remote/RemoteActorRefProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Internal;
Expand Down Expand Up @@ -138,6 +139,7 @@ public void UnregisterTempActor(ActorPath path)
private volatile IActorRef _remoteWatcher;

private volatile ActorRefResolveThreadLocalCache _actorRefResolveThreadLocalCache;
private volatile ActorPathThreadLocalCache _actorPathThreadLocalCache;

/// <summary>
/// The remote death watcher.
Expand All @@ -153,6 +155,7 @@ public virtual void Init(ActorSystemImpl system)
_local.Init(system);

_actorRefResolveThreadLocalCache = ActorRefResolveThreadLocalCache.For(system);
_actorPathThreadLocalCache = ActorPathThreadLocalCache.For(system);

_remotingTerminator =
_system.SystemActorOf(
Expand Down Expand Up @@ -362,6 +365,20 @@ public IActorRef RootGuardianAt(Address address)
return _local.ActorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy, async);
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private bool TryParseCachedPath(string actorPath, out ActorPath path)
{
if (_actorPathThreadLocalCache != null)
{
path = _actorPathThreadLocalCache.Cache.GetOrCompute(actorPath);
return path != null;
}
else // cache not initialized yet
{
return ActorPath.TryParse(actorPath, out path);
}
}


/// <summary>
/// INTERNAL API.
Expand All @@ -374,7 +391,7 @@ public IActorRef RootGuardianAt(Address address)
internal IInternalActorRef ResolveActorRefWithLocalAddress(string path, Address localAddress)
{
ActorPath actorPath;
if (ActorPath.TryParse(path, out actorPath))
if (TryParseCachedPath(path, out actorPath))
{
//the actor's local address was already included in the ActorPath
if (HasAddress(actorPath.Address))
Expand Down
54 changes: 54 additions & 0 deletions src/core/Akka.Remote/Serialization/ActorPathCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using System;
using Akka.Actor;
using System.Threading;

namespace Akka.Remote.Serialization
{
/// <summary>
/// INTERNAL API
/// </summary>
internal sealed class ActorPathThreadLocalCache : ExtensionIdProvider<ActorPathThreadLocalCache>, IExtension
{
private readonly ThreadLocal<ActorPathCache> _current = new ThreadLocal<ActorPathCache>(() => new ActorPathCache());

public ActorPathCache Cache => _current.Value;

public override ActorPathThreadLocalCache CreateExtension(ExtendedActorSystem system)
{
return new ActorPathThreadLocalCache();
}

public static ActorPathThreadLocalCache For(ActorSystem system)
{
return system.WithExtension<ActorPathThreadLocalCache, ActorPathThreadLocalCache>();
}
}

/// <summary>
/// INTERNAL API
/// </summary>
internal sealed class ActorPathCache : LruBoundedCache<string, ActorPath>
{
public ActorPathCache(int capacity = 1024, int evictAgeThreshold = 600) : base(capacity, evictAgeThreshold)
{
}

protected override int Hash(string k)
{
return FastHash.OfStringFast(k);
}

protected override ActorPath Compute(string k)
{
ActorPath actorPath;
if (ActorPath.TryParse(k, out actorPath))
return actorPath;
return null;
}

protected override bool IsCacheable(ActorPath v)
{
return v != null;
}
}
}
61 changes: 61 additions & 0 deletions src/core/Akka.Remote/Serialization/AddressCache.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System;
using System.Threading;
using Akka.Actor;

namespace Akka.Remote.Serialization
{
/// <summary>
/// INTERNAL API
/// </summary>
internal sealed class AddressThreadLocalCache : ExtensionIdProvider<AddressThreadLocalCache>, IExtension
{
public AddressThreadLocalCache()
{
_current = new ThreadLocal<AddressCache>(() => new AddressCache());
}

public override AddressThreadLocalCache CreateExtension(ExtendedActorSystem system)
{
return new AddressThreadLocalCache();
}

private readonly ThreadLocal<AddressCache> _current;

public AddressCache Cache => _current.Value;

public static AddressThreadLocalCache For(ActorSystem system)
{
return system.WithExtension<AddressThreadLocalCache, AddressThreadLocalCache>();
}
}

/// <summary>
/// INTERNAL API
/// </summary>
internal sealed class AddressCache : LruBoundedCache<string, Address>
{
public AddressCache(int capacity = 1024, int evictAgeThreshold = 600) : base(capacity, evictAgeThreshold)
{
}

protected override int Hash(string k)
{
return FastHash.OfStringFast(k);
}

protected override Address Compute(string k)
{
Address addr;
if (ActorPath.TryParseAddress(k, out addr))
{
return addr;
}
return Address.AllSystems;
}

protected override bool IsCacheable(Address v)
{
return v != Address.AllSystems;
}
}
}
24 changes: 23 additions & 1 deletion src/core/Akka.Remote/Transport/AkkaPduCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using Akka.Actor;
using Google.Protobuf;
using System.Runtime.Serialization;
using Akka.Remote.Serialization;
using Akka.Remote.Serialization.Proto.Msg;
using SerializedMessage = Akka.Remote.Serialization.Proto.Msg.Payload;

Expand Down Expand Up @@ -202,6 +203,15 @@ public AckAndMessage(Ack ackOption, Message messageOption)
/// </summary>
internal abstract class AkkaPduCodec
{
protected readonly ActorSystem System;
protected readonly AddressThreadLocalCache AddressCache;

protected AkkaPduCodec(ActorSystem system)
{
System = system;
AddressCache = AddressThreadLocalCache.For(system);
}

/// <summary>
/// Return an <see cref="IAkkaPdu"/> instance that represents a PDU contained in the raw
/// <see cref="ByteString"/>.
Expand Down Expand Up @@ -407,7 +417,15 @@ public override AckAndMessage DecodeMessage(ByteString raw, RemoteActorRefProvid
{
var recipient = provider.ResolveActorRefWithLocalAddress(envelopeContainer.Recipient.Path, localAddress);
Address recipientAddress;
ActorPath.TryParseAddress(envelopeContainer.Recipient.Path, out recipientAddress);
if (AddressCache != null)
{
recipientAddress = AddressCache.Cache.GetOrCompute(envelopeContainer.Recipient.Path);
}
else
{
ActorPath.TryParseAddress(envelopeContainer.Recipient.Path, out recipientAddress);
}

var serializedMessage = envelopeContainer.Message;
IActorRef senderOption = null;
if (envelopeContainer.Sender != null)
Expand Down Expand Up @@ -555,5 +573,9 @@ private AddressData SerializeAddress(Address address)
}

#endregion

public AkkaPduProtobuffCodec(ActorSystem system) : base(system)
{
}
}
}
5 changes: 3 additions & 2 deletions src/core/Akka.Remote/Transport/AkkaProtocolTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Event;
using Akka.Remote.Serialization;
using Akka.Util.Internal;
using Google.Protobuf;

Expand Down Expand Up @@ -241,7 +242,7 @@ protected override void Ready(object message)
handle,
stateActorAssociationListener,
stateActorSettings,
new AkkaPduProtobuffCodec(),
new AkkaPduProtobuffCodec(Context.System),
failureDetector)), ActorNameFor(handle.RemoteAddress));
})
.With<AssociateUnderlying>(au => CreateOutboundStateActor(au.RemoteAddress, au.StatusPromise, null)) //need to create an Outbound ProtocolStateActor
Expand Down Expand Up @@ -271,7 +272,7 @@ private string ActorNameFor(Address remoteAddress)
statusPromise,
stateActorWrappedTransport,
stateActorSettings,
new AkkaPduProtobuffCodec(), failureDetector, refuseUid)),
new AkkaPduProtobuffCodec(Context.System), failureDetector, refuseUid)),
ActorNameFor(remoteAddress));
}

Expand Down

0 comments on commit 3759b4a

Please sign in to comment.