Skip to content

Commit

Permalink
Akka IO redesign using async IO - SocketAsyncEventArgs
Browse files Browse the repository at this point in the history
fix Streams.IO specs

changed approved public api for akka io

updates for akka io tcp impl

post merge fixes

akka-io stress spec

post-rebase fixes

fix for mono issues with missing IPAddress API

mono conditional build

akka.io perf tests

minor fixes for mono

explicit tracing of tcp spec

more readable specs

removed post-rebase

fixes after rebase with dev

fixed one failing TCP spec for akka.streams

reduced execution time of TcpHorizontalScaleSpec

removed unusued files from akka.io

added more akka.io perf specs

minor code simplifications

minor fixes in perf benchmarks for new akka.io

removed redundant socket event args pool

reimplemented ByteString

fixed FramingSpecs

new approach to socket connections

fixed ByteString related specs

redesigned the way of setting buffers in SocketAsyncEventArgs

fixed missing buffer allocations
  • Loading branch information
fergusn authored and Horusiath committed May 9, 2017
1 parent 755381d commit cf350af
Show file tree
Hide file tree
Showing 72 changed files with 3,215 additions and 6,292 deletions.
2 changes: 1 addition & 1 deletion build.fsx
Expand Up @@ -23,7 +23,7 @@ let copyright = "Copyright © 2013-2017 Akka.NET Team"
let company = "Akka.NET Team"
let description = "Akka.NET is a port of the popular Java/Scala framework Akka to .NET"
let tags = ["akka";"actors";"actor";"model";"Akka";"concurrency"]
let configuration = "Release"
let configuration = if isMono then "Release Mono" else "Release"
let toolDir = "tools"
let CloudCopyDir = toolDir @@ "CloudCopy"
let AzCopyDir = toolDir @@ "AzCopy"
Expand Down
1 change: 1 addition & 0 deletions io-benchmark.bat
@@ -0,0 +1 @@
.\src\packages\NBench.Runner\lib\net45\NBench.Runner.exe .\src\core\Akka.Tests.Performance\bin\Release\Akka.Tests.Performance.dll output-directory=PerfResults include=Akka.Tests.Performance.IO*
Binary file added src/.nuget/NuGet.exe
Binary file not shown.
323 changes: 127 additions & 196 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt

Large diffs are not rendered by default.

Expand Up @@ -809,14 +809,14 @@ namespace Akka.Persistence.Fsm
public void WhenUnhandled(Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.StateFunction stateFunction) { }
public class State<TState, TData, TEvent> : Akka.Actor.FSMBase.State<TState, TData>
{
public State(TState stateName, TData stateData, System.Nullable<System.TimeSpan> timeout = null, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.IReadOnlyList<object> replies = null, Akka.Util.ILinearSeq<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public State(TState stateName, TData stateData, System.Nullable<System.TimeSpan> timeout = null, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.IReadOnlyList<object> replies = null, System.Collections.Generic.IEnumerable<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public System.Action<TData> AfterTransitionHandler { get; }
public Akka.Util.ILinearSeq<TEvent> DomainEvents { get; }
public System.Collections.Generic.IEnumerable<TEvent> DomainEvents { get; }
public new bool Notifies { get; set; }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State AndThen(System.Action<TData> handler) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(Akka.Util.ILinearSeq<TEvent> events) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(System.Collections.Generic.IEnumerable<TEvent> events) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(TEvent e) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Copy(System.Nullable<System.TimeSpan> timeout, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.IReadOnlyList<object> replies = null, Akka.Util.ILinearSeq<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Copy(System.Nullable<System.TimeSpan> timeout, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.IReadOnlyList<object> replies = null, System.Collections.Generic.IEnumerable<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State ForMax(System.TimeSpan timeout) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Replying(object replyValue) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Using(TData nextStateData) { }
Expand All @@ -829,19 +829,19 @@ namespace Akka.Persistence.Fsm
where TE : TEvent
{
public State(Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State state) { }
public State(TState stateName, TData stateData, System.Nullable<System.TimeSpan> timeout = null, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.List<object> replies = null, Akka.Util.ILinearSeq<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public State(TState stateName, TData stateData, System.Nullable<System.TimeSpan> timeout = null, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.List<object> replies = null, System.Collections.Generic.IEnumerable<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public System.Action<TData> AfterTransitionHandler { get; }
public Akka.Util.ILinearSeq<TEvent> DomainEvents { get; }
public System.Collections.Generic.IEnumerable<TEvent> DomainEvents { get; }
public bool Notifies { get; set; }
public System.Collections.Generic.IReadOnlyList<object> Replies { get; }
public TData StateData { get; }
public TState StateName { get; }
public Akka.Actor.FSMBase.Reason StopReason { get; }
public System.Nullable<System.TimeSpan> Timeout { get; }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State AndThen(System.Action<TData> handler) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(Akka.Util.ILinearSeq<TEvent> events) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(System.Collections.Generic.IEnumerable<TEvent> events) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(TEvent e) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Copy(System.Nullable<System.TimeSpan> timeout, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.List<object> replies = null, Akka.Util.ILinearSeq<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Copy(System.Nullable<System.TimeSpan> timeout, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.List<object> replies = null, System.Collections.Generic.IEnumerable<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State ForMax(System.TimeSpan timeout) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Replying(object replyValue) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Using(TData nextStateData) { }
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.MultiNodeTestRunner/Program.cs
Expand Up @@ -302,7 +302,7 @@ public TcpLoggingServer(IActorRef sinkCoordinator)

Receive<Tcp.Received>(received =>
{
var message = received.Data.DecodeString();
var message = received.Data.ToString();
_sinkCoordinator.Tell(message);
});
}
Expand Down
22 changes: 11 additions & 11 deletions src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs
Expand Up @@ -906,7 +906,7 @@ public State(State state)
TimeSpan? timeout = null,
FSMBase.Reason stopReason = null,
List<object> replies = null,
ILinearSeq<TEvent> domainEvents = null,
IEnumerable<TEvent> domainEvents = null,
Action<TData> afterTransitionDo = null)
{
_state = new State(stateName, stateData, timeout, stopReason, replies, domainEvents, afterTransitionDo);
Expand Down Expand Up @@ -948,7 +948,7 @@ public Action<TData> AfterTransitionHandler
/// <summary>
/// TBD
/// </summary>
public ILinearSeq<TEvent> DomainEvents
public IEnumerable<TEvent> DomainEvents
{
get
{
Expand Down Expand Up @@ -977,7 +977,7 @@ public bool Notifies
/// </summary>
/// <param name="events">TBD</param>
/// <returns>TBD</returns>
public State Applying(ILinearSeq<TEvent> events)
public State Applying(IEnumerable<TEvent> events)
{
return _state.Applying(events);
}
Expand Down Expand Up @@ -1015,7 +1015,7 @@ public State AndThen(Action<TData> handler)
TimeSpan? timeout,
FSMBase.Reason stopReason = null,
List<object> replies = null,
ILinearSeq<TEvent> domainEvents = null,
IEnumerable<TEvent> domainEvents = null,
Action<TData> afterTransitionDo = null)
{
return _state.Copy(timeout, stopReason, replies, domainEvents, afterTransitionDo);
Expand Down Expand Up @@ -1133,7 +1133,7 @@ public class State : FSMBase.State<TState, TData>
/// <param name="domainEvents">TBD</param>
/// <param name="afterTransitionDo">TBD</param>
public State(TState stateName, TData stateData, TimeSpan? timeout = null, FSMBase.Reason stopReason = null,
IReadOnlyList<object> replies = null, ILinearSeq<TEvent> domainEvents = null, Action<TData> afterTransitionDo = null)
IReadOnlyList<object> replies = null, IEnumerable<TEvent> domainEvents = null, Action<TData> afterTransitionDo = null)
: base(stateName, stateData, timeout, stopReason, replies)
{
AfterTransitionHandler = afterTransitionDo;
Expand All @@ -1144,7 +1144,7 @@ public class State : FSMBase.State<TState, TData>
/// <summary>
/// TBD
/// </summary>
public ILinearSeq<TEvent> DomainEvents { get; private set; }
public IEnumerable<TEvent> DomainEvents { get; private set; }

/// <summary>
/// TBD
Expand All @@ -1156,13 +1156,13 @@ public class State : FSMBase.State<TState, TData>
/// </summary>
/// <param name="events">TBD</param>
/// <returns>TBD</returns>
public State Applying(ILinearSeq<TEvent> events)
public State Applying(IEnumerable<TEvent> events)
{
if (DomainEvents == null)
{
return Copy(null, null, null, events);
}
return Copy(null, null, null, new ArrayLinearSeq<TEvent>(DomainEvents.Concat(events).ToArray()));
return Copy(null, null, null, DomainEvents.Concat(events).ToArray());
}


Expand All @@ -1175,12 +1175,12 @@ public State Applying(TEvent e)
{
if (DomainEvents == null)
{
return Copy(null, null, null, new ArrayLinearSeq<TEvent>(new[] {e}));
return Copy(null, null, null, new[] {e});
}
var events = new List<TEvent>();
events.AddRange(DomainEvents);
events.Add(e);
return Copy(null, null, null, new ArrayLinearSeq<TEvent>(events.ToArray()));
return Copy(null, null, null, events.ToArray());
}


Expand All @@ -1204,7 +1204,7 @@ public State AndThen(Action<TData> handler)
/// <param name="afterTransitionDo">TBD</param>
/// <returns>TBD</returns>
public State Copy(TimeSpan? timeout, FSMBase.Reason stopReason = null,
IReadOnlyList<object> replies = null, ILinearSeq<TEvent> domainEvents = null, Action<TData> afterTransitionDo = null)
IReadOnlyList<object> replies = null, IEnumerable<TEvent> domainEvents = null, Action<TData> afterTransitionDo = null)
{
return new State(StateName, StateData, timeout ?? Timeout, stopReason ?? StopReason,
replies ?? Replies,
Expand Down
1 change: 0 additions & 1 deletion src/core/Akka.Remote/Akka.Remote.csproj
Expand Up @@ -145,7 +145,6 @@
<Compile Include="Serialization\ProtobufSerializer.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RemoteActorRefProvider.cs" />
<Compile Include="IPExtensions.cs" />
<Compile Include="Transport\AkkaPduCodec.cs" />
<Compile Include="Transport\DotNetty\AkkaLoggingHandler.cs" />
<Compile Include="Transport\DotNetty\DotNettyTransportSettings.cs" />
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/BidiFlowSpec.cs
Expand Up @@ -35,7 +35,7 @@ public BidiFlowSpec()
BidiFlow.FromFlows(
Flow.Create<int>().Select(x => ((long) x) + 2).WithAttributes(Attributes.CreateName("top")),
Flow.Create<ByteString>()
.Select(x => x.DecodeString(Encoding.UTF8))
.Select(x => x.ToString(Encoding.UTF8))
.WithAttributes(Attributes.CreateName("bottom")));
}

Expand All @@ -56,7 +56,7 @@ public BidiFlowSpec()
b.From(Source.Single(42).MapMaterializedValue(_=>Task.FromResult(0))).To(s);
var top = b.Add(Flow.Create<int>().Select(x => ((long) x) + 2));
var bottom = b.Add(Flow.Create<ByteString>().Select(x => x.DecodeString(Encoding.UTF8)));
var bottom = b.Add(Flow.Create<ByteString>().Select(x => x.ToString(Encoding.UTF8)));
return new BidiShape<int,long,ByteString, string>(top.Inlet, top.Outlet, bottom.Inlet, bottom.Outlet);
}));
}
Expand Down
10 changes: 5 additions & 5 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs
Expand Up @@ -92,7 +92,7 @@ private ByteString RandomByteString(int size)
{
var a = new byte[size];
ThreadLocalRandom.Current.NextBytes(a);
return ByteString.Create(a);
return ByteString.FromBytes(a);
}

[Fact]
Expand Down Expand Up @@ -457,7 +457,7 @@ public void GroupBy_must_work_under_fuzzing_stress_test()
var subscriber = this.CreateManualSubscriberProbe<IEnumerable<byte>>();
var firstGroup = (Source<IEnumerable<byte>, NotUsed>)Source.FromPublisher(publisherProbe)
.GroupBy(256, element => element.Head)
.GroupBy(256, element => element[0])
.Select(b => b.Reverse())
.MergeSubstreams();
var secondGroup = (Source<IEnumerable<byte>, NotUsed>)firstGroup.GroupBy(256, bytes => bytes.First())
Expand Down Expand Up @@ -539,7 +539,7 @@ public void GroupBy_must_work_with_random_demand()
var probeShape = new SinkShape<ByteString>(new Inlet<ByteString>("ProbeSink.in"));
var probeSink = new ProbeSink(probeShape, props, Attributes.None);
Source.FromPublisher(publisherProbe)
.GroupBy(100, element => Math.Abs(element.Head % 100))
.GroupBy(100, element => Math.Abs(element[0] % 100))
.To(new Sink<ByteString, TestSubscriber.Probe<ByteString>>(probeSink))
.Run(materializer);
Expand All @@ -548,7 +548,7 @@ public void GroupBy_must_work_with_random_demand()
for (var i = 1; i <= 400; i++)
{
var byteString = RandomByteString(10);
var index = Math.Abs(byteString.Head % 100);
var index = Math.Abs(byteString[0] % 100);
upstreamSubscription.ExpectRequest();
upstreamSubscription.SendNext(byteString);
Expand Down Expand Up @@ -675,7 +675,7 @@ private void RandomDemand(Dictionary<int, SubFlowState> map, RandomDemandPropert
state.Probe.ExpectNext().ShouldBeEquivalentTo(state.FirstElement);
map[key] = new SubFlowState(state.Probe, false, null);
}
else if (props.BlockingNextElement != null && Math.Abs(props.BlockingNextElement.Head % 100) == key)
else if (props.BlockingNextElement != null && Math.Abs(props.BlockingNextElement[0] % 100) == key)
{
state.Probe.ExpectNext().ShouldBeEquivalentTo(props.BlockingNextElement);
props.BlockingNextElement = null;
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs
Expand Up @@ -41,7 +41,7 @@ private static ByteString GenerateByteString(int length)
.Take(length)
.Select(Convert.ToByte)
.ToArray();
return ByteString.Create(bytes);
return ByteString.FromBytes(bytes);
}

[Fact]
Expand Down

0 comments on commit cf350af

Please sign in to comment.