Skip to content

Commit

Permalink
Merge branch 'dev' into akka-io
Browse files Browse the repository at this point in the history
  • Loading branch information
Aaronontheweb committed Dec 8, 2016
2 parents 4081426 + 74615f2 commit 7054ba1
Show file tree
Hide file tree
Showing 24 changed files with 1,609 additions and 167 deletions.
159 changes: 53 additions & 106 deletions src/benchmark/PersistenceBenchmark/PerformanceActors.cs
Expand Up @@ -6,144 +6,91 @@
//-----------------------------------------------------------------------

using System;
using Akka;
using Akka.Persistence;
using Akka.Actor;

namespace PersistenceBenchmark
{
public abstract class PerformanceTestActorBase : PersistentActor
public sealed class Init
{
private readonly string _persistenceId;

protected long FailAt { get; set; }

protected PerformanceTestActorBase(string persistenceId)
{
_persistenceId = persistenceId;
}

public sealed override string PersistenceId { get { return _persistenceId; } }

protected sealed override bool ReceiveRecover(object message)
{
if (LastSequenceNr % 1000 == 0) ;

return true;
}

protected bool ControlBehavior(object message)
{
var sender = Sender;
if (message is StopMeasure) DeferAsync(StopMeasure.Instance, _ => sender.Tell(StopMeasure.Instance));
else if (message is FailAt) FailAt = ((FailAt)message).SequenceNr;
else return false;
return true;
}
public static readonly Init Instance = new Init();
private Init() { }
}

public sealed class CommandsourcedPersistentActor : PerformanceTestActorBase
public sealed class Finish
{
public CommandsourcedPersistentActor(string persistenceId) : base(persistenceId)
{
}
public static readonly Finish Instance = new Finish();
private Finish() { }
}
public sealed class Done
{
public static readonly Done Instance = new Done();
private Done() { }
}
public sealed class Finished
{
public readonly long State;

protected override bool ReceiveCommand(object message)
public Finished(long state)
{
if (!ControlBehavior(message))
{
PersistAsync(message, e =>
{
if (LastSequenceNr % 1000 == 0) ;
if (LastSequenceNr == FailAt) throw new PerformanceTestException("boom");
});
}
return true;
State = state;
}
}

public sealed class EventsourcedPersistentActor : PerformanceTestActorBase
public sealed class Store
{
public EventsourcedPersistentActor(string persistenceId) : base(persistenceId)
{
}
public readonly int Value;

protected override bool ReceiveCommand(object message)
public Store(int value)
{
if (!ControlBehavior(message))
{
Persist(message, e =>
{
if (LastSequenceNr % 1000 == 0) ;
if (LastSequenceNr == FailAt) throw new PerformanceTestException("boom");
});
}
return true;
Value = value;
}
}

public sealed class MixedPersistentActor : PerformanceTestActorBase
public sealed class Stored
{
private int counter = 0;
public MixedPersistentActor(string persistenceId) : base(persistenceId)
{
}
public readonly int Value;

private void Handler(object message)
public Stored(int value)
{
if (LastSequenceNr % 1000 == 0) ;
if (LastSequenceNr == FailAt) throw new PerformanceTestException("boom");
}

protected override bool ReceiveCommand(object message)
{
if (!ControlBehavior(message))
{
counter++;
if (counter % 10 == 0) Persist(message, Handler);
else PersistAsync(message, Handler);
}
return true;
Value = value;
}
}

public sealed class StashingEventsourcedPersistentActor : PerformanceTestActorBase
public class PerformanceTestActor : PersistentActor
{
public StashingEventsourcedPersistentActor(string persistenceId) : base(persistenceId)
private long state = 0L;
public PerformanceTestActor(string persistenceId)
{
PersistenceId = persistenceId;
}

private object PrintProgress(object message)
{
if (LastSequenceNr % 1000 == 0) Console.Write(".");
return message;
}
public sealed override string PersistenceId { get; }

private bool ProcessC(object message)
{
PrintProgress(message);
if (object.Equals("c", message))
{
var context = Context;
Persist("c", _ => context.UnbecomeStacked());
Stash.UnstashAll();
}
else Stash.Stash();
return true;
}
protected override bool ReceiveRecover(object message) => message.Match()
.With<Stored>(s => state += s.Value)
.WasHandled;

protected override bool ReceiveCommand(object message)
{
PrintProgress(message);
if (!ControlBehavior(message))
protected override bool ReceiveCommand(object message) => message.Match()
.With<Store>(store =>
{
var context = Context;
if (object.Equals("a", message))
Persist("a", _ => context.BecomeStacked(ProcessC));
else if (object.Equals("b", message))
Persist("b", s => { });
else return false;
}
return true;
}
Persist(new Stored(store.Value), s =>
{
state += s.Value;
});
})
.With<Init>(_ =>
{
var sender = Sender;
Persist(new Stored(0), s =>
{
state += s.Value;
sender.Tell(Done.Instance);
});
})
.With<Finish>(_ => Sender.Tell(new Finished(state)))
.WasHandled;
}

}
16 changes: 16 additions & 0 deletions src/benchmark/PersistenceBenchmark/PersistenceBenchmark.csproj
Expand Up @@ -13,6 +13,8 @@
<FileAlignment>512</FileAlignment>
<SolutionDir Condition="$(SolutionDir) == '' Or $(SolutionDir) == '*Undefined*'">..\..\</SolutionDir>
<RestorePackages>true</RestorePackages>
<NuGetPackageImportStamp>
</NuGetPackageImportStamp>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
Expand Down Expand Up @@ -48,6 +50,10 @@
</Reference>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Data.SQLite, Version=1.0.103.0, Culture=neutral, PublicKeyToken=db937bc2d44ff139, processorArchitecture=MSIL">
<HintPath>..\..\packages\System.Data.SQLite.Core.1.0.103\lib\net45\System.Data.SQLite.dll</HintPath>
<Private>True</Private>
</Reference>
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
Expand All @@ -66,6 +72,14 @@
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\contrib\persistence\Akka.Persistence.Sql.Common\Akka.Persistence.Sql.Common.csproj">
<Project>{3b9e6211-9488-4db5-b714-24248693b38f}</Project>
<Name>Akka.Persistence.Sql.Common</Name>
</ProjectReference>
<ProjectReference Include="..\..\contrib\persistence\Akka.Persistence.Sqlite\Akka.Persistence.Sqlite.csproj">
<Project>{453efd22-7c53-4887-9dbf-fcfc9172e909}</Project>
<Name>Akka.Persistence.Sqlite</Name>
</ProjectReference>
<ProjectReference Include="..\..\core\Akka.Persistence\Akka.Persistence.csproj">
<Project>{fca84dea-c118-424b-9eb8-34375dfef18a}</Project>
<Name>Akka.Persistence</Name>
Expand All @@ -82,7 +96,9 @@
<ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText>
</PropertyGroup>
<Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" />
<Error Condition="!Exists('..\..\packages\System.Data.SQLite.Core.1.0.103\build\net45\System.Data.SQLite.Core.targets')" Text="$([System.String]::Format('$(ErrorText)', '..\..\packages\System.Data.SQLite.Core.1.0.103\build\net45\System.Data.SQLite.Core.targets'))" />
</Target>
<Import Project="..\..\packages\System.Data.SQLite.Core.1.0.103\build\net45\System.Data.SQLite.Core.targets" Condition="Exists('..\..\packages\System.Data.SQLite.Core.1.0.103\build\net45\System.Data.SQLite.Core.targets')" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
Expand Down
98 changes: 53 additions & 45 deletions src/benchmark/PersistenceBenchmark/Program.cs
Expand Up @@ -6,9 +6,12 @@
//-----------------------------------------------------------------------

using System;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
using Akka.Pattern;

namespace PersistenceBenchmark
{
Expand All @@ -17,66 +20,71 @@ class Program
// if you want to benchmark your persistent storage provides, paste the configuration in string below
// by default we're checking against in-memory journal
private static Config config = ConfigurationFactory.ParseString(@"
");

public const int LoadCycles = 1000;
akka {
suppress-json-serializer-warning = true
persistence.journal {
plugin = ""akka.persistence.journal.sqlite""
sqlite {
class = ""Akka.Persistence.Sqlite.Journal.BatchingSqliteJournal, Akka.Persistence.Sqlite""
plugin-dispatcher = ""akka.actor.default-dispatcher""
table-name = event_journal
metadata-table-name = journal_metadata
auto-initialize = on
connection-string = ""FullUri=file:memdb-journal.db?mode=memory&cache=shared;Version=3;""
}
}
}");

public const int ActorCount = 1000;
public const int MessagesPerActor = 100;

static void Main(string[] args)
{
using (var system = ActorSystem.Create("persistent-benchmark", config))
using (var system = ActorSystem.Create("persistent-benchmark", config.WithFallback(ConfigurationFactory.Default())))
{
StressCommandsourcedActor(system, null);
StressEventsourcedActor(system, null);
}

Console.ReadLine();
}
Console.WriteLine("Performance benchmark starting...");

private static void StressCommandsourcedActor(ActorSystem system, long? failAt)
{
var pref = system.ActorOf(Props.Create(() => new CommandsourcedPersistentActor("commandsourced-1")));
StressPersistentActor(pref, failAt, "persistent commands");
}
var actors = new IActorRef[ActorCount];
for (int i = 0; i < ActorCount; i++)
{
var pid = "a-" + i;
actors[i] = system.ActorOf(Props.Create(() => new PerformanceTestActor(pid)));
}

private static void StressEventsourcedActor(ActorSystem system, long? failAt)
{
var pref = system.ActorOf(Props.Create(() => new EventsourcedPersistentActor("eventsourced-1")));
StressPersistentActor(pref, failAt, "persistent events");
}
Task.WaitAll(actors.Select(a => a.Ask<Done>(Init.Instance)).Cast<Task>().ToArray());

private static void StressMixedActor(ActorSystem system, long? failAt)
{
var pref = system.ActorOf(Props.Create(() => new MixedPersistentActor("mixed-1")));
StressPersistentActor(pref, failAt, "persistent events and commands");
}
Console.WriteLine("All actors have been initialized...");

private static void StressStashingPersistentActor(ActorSystem system)
{
var pref = system.ActorOf(Props.Create(() => new StashingEventsourcedPersistentActor("stashing-1")));
var m = new Measure(LoadCycles);
var stopwatch = new Stopwatch();
stopwatch.Start();

var commands = Enumerable.Range(1, LoadCycles/3).SelectMany(_ => new[] {"a", "b", "c"}).ToArray();
m.StartMeasure();
for (int i = 0; i < MessagesPerActor; i++)
for (int j = 0; j < ActorCount; j++)
{
actors[j].Tell(new Store(1));
}

foreach (var command in commands) pref.Tell(command);
var finished = new Task[ActorCount];
for (int i = 0; i < ActorCount; i++)
{
finished[i] = actors[i].Ask<Finished>(Finish.Instance);
}

pref.Ask(StopMeasure.Instance, TimeSpan.FromSeconds(100)).Wait();
var ratio = m.StopMeasure();
Console.WriteLine("Throughtput: {0} persisted events per second", ratio);
}
Task.WaitAll(finished);

private static void StressPersistentActor(IActorRef pref, long? failAt, string description)
{
if (failAt.HasValue) pref.Tell(new FailAt(failAt.Value));
var elapsed = stopwatch.ElapsedMilliseconds;

var m = new Measure(LoadCycles);
m.StartMeasure();
Console.WriteLine($"{ActorCount} actors stored {MessagesPerActor} events each in {elapsed/1000.0} sec. Average: {ActorCount*MessagesPerActor*1000.0/elapsed} events/sec");

for (int i = 1; i <= LoadCycles; i++) pref.Tell("msg" + i);
foreach (Task<Finished> task in finished)
{
if (!task.IsCompleted || task.Result.State != MessagesPerActor)
throw new IllegalStateException("Actor's state was invalid");
}
}

pref.Ask(StopMeasure.Instance, TimeSpan.FromSeconds(100)).Wait();
var ratio = m.StopMeasure();
Console.WriteLine("Throughtput: {0} {1} per second", ratio, description);
Console.ReadLine();
}

}
}
1 change: 1 addition & 0 deletions src/benchmark/PersistenceBenchmark/packages.config
Expand Up @@ -2,4 +2,5 @@
<packages>
<package id="Google.ProtocolBuffersLite" version="2.4.1.555" targetFramework="net45" userInstalled="true" />
<package id="Newtonsoft.Json" version="9.0.1" targetFramework="net45" userInstalled="true" />
<package id="System.Data.SQLite.Core" version="1.0.103" targetFramework="net45" />
</packages>
Expand Up @@ -46,8 +46,10 @@
</ItemGroup>
<ItemGroup>
<Compile Include="InternalExtensions.cs" />
<Compile Include="Journal\BatchingSqlJournal.cs" />
<Compile Include="Journal\ITimestampProvider.cs" />
<Compile Include="Journal\Messages.cs" />
<Compile Include="Journal\MultiValueDictionaryExtensions.cs" />
<Compile Include="Journal\QueryApi.cs" />
<Compile Include="Journal\QueryExecutor.cs" />
<Compile Include="Journal\SqlJournal.cs" />
Expand Down

0 comments on commit 7054ba1

Please sign in to comment.