Skip to content

Commit

Permalink
rework of Akka.IO namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
fergusn authored and Horusiath committed May 17, 2017
1 parent 755381d commit 46dc134
Show file tree
Hide file tree
Showing 73 changed files with 3,793 additions and 6,529 deletions.
2 changes: 1 addition & 1 deletion build.fsx
Expand Up @@ -23,7 +23,7 @@ let copyright = "Copyright © 2013-2017 Akka.NET Team"
let company = "Akka.NET Team"
let description = "Akka.NET is a port of the popular Java/Scala framework Akka to .NET"
let tags = ["akka";"actors";"actor";"model";"Akka";"concurrency"]
let configuration = "Release"
let configuration = if isMono then "Release Mono" else "Release"
let toolDir = "tools"
let CloudCopyDir = toolDir @@ "CloudCopy"
let AzCopyDir = toolDir @@ "AzCopy"
Expand Down
1 change: 1 addition & 0 deletions io-benchmark.bat
@@ -0,0 +1 @@
.\src\packages\NBench.Runner\lib\net45\NBench.Runner.exe .\src\core\Akka.Tests.Performance\bin\Release\Akka.Tests.Performance.dll output-directory=PerfResults include=Akka.Tests.Performance.IO*
Binary file added src/.nuget/NuGet.exe
Binary file not shown.
342 changes: 133 additions & 209 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt

Large diffs are not rendered by default.

Expand Up @@ -809,14 +809,14 @@ namespace Akka.Persistence.Fsm
public void WhenUnhandled(Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.StateFunction stateFunction) { }
public class State<TState, TData, TEvent> : Akka.Actor.FSMBase.State<TState, TData>
{
public State(TState stateName, TData stateData, System.Nullable<System.TimeSpan> timeout = null, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.IReadOnlyList<object> replies = null, Akka.Util.ILinearSeq<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public State(TState stateName, TData stateData, System.Nullable<System.TimeSpan> timeout = null, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.IReadOnlyList<object> replies = null, System.Collections.Generic.IEnumerable<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public System.Action<TData> AfterTransitionHandler { get; }
public Akka.Util.ILinearSeq<TEvent> DomainEvents { get; }
public System.Collections.Generic.IEnumerable<TEvent> DomainEvents { get; }
public new bool Notifies { get; set; }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State AndThen(System.Action<TData> handler) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(Akka.Util.ILinearSeq<TEvent> events) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(System.Collections.Generic.IEnumerable<TEvent> events) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(TEvent e) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Copy(System.Nullable<System.TimeSpan> timeout, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.IReadOnlyList<object> replies = null, Akka.Util.ILinearSeq<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Copy(System.Nullable<System.TimeSpan> timeout, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.IReadOnlyList<object> replies = null, System.Collections.Generic.IEnumerable<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State ForMax(System.TimeSpan timeout) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Replying(object replyValue) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Using(TData nextStateData) { }
Expand All @@ -829,19 +829,19 @@ namespace Akka.Persistence.Fsm
where TE : TEvent
{
public State(Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State state) { }
public State(TState stateName, TData stateData, System.Nullable<System.TimeSpan> timeout = null, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.List<object> replies = null, Akka.Util.ILinearSeq<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public State(TState stateName, TData stateData, System.Nullable<System.TimeSpan> timeout = null, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.List<object> replies = null, System.Collections.Generic.IEnumerable<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public System.Action<TData> AfterTransitionHandler { get; }
public Akka.Util.ILinearSeq<TEvent> DomainEvents { get; }
public System.Collections.Generic.IEnumerable<TEvent> DomainEvents { get; }
public bool Notifies { get; set; }
public System.Collections.Generic.IReadOnlyList<object> Replies { get; }
public TData StateData { get; }
public TState StateName { get; }
public Akka.Actor.FSMBase.Reason StopReason { get; }
public System.Nullable<System.TimeSpan> Timeout { get; }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State AndThen(System.Action<TData> handler) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(Akka.Util.ILinearSeq<TEvent> events) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(System.Collections.Generic.IEnumerable<TEvent> events) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Applying(TEvent e) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Copy(System.Nullable<System.TimeSpan> timeout, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.List<object> replies = null, Akka.Util.ILinearSeq<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Copy(System.Nullable<System.TimeSpan> timeout, Akka.Actor.FSMBase.Reason stopReason = null, System.Collections.Generic.List<object> replies = null, System.Collections.Generic.IEnumerable<TEvent> domainEvents = null, System.Action<TData> afterTransitionDo = null) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State ForMax(System.TimeSpan timeout) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Replying(object replyValue) { }
public Akka.Persistence.Fsm.PersistentFSMBase<TState, TData, TEvent>.State Using(TData nextStateData) { }
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.MultiNodeTestRunner/Program.cs
Expand Up @@ -302,7 +302,7 @@ public TcpLoggingServer(IActorRef sinkCoordinator)

Receive<Tcp.Received>(received =>
{
var message = received.Data.DecodeString();
var message = received.Data.ToString();
_sinkCoordinator.Tell(message);
});
}
Expand Down
22 changes: 11 additions & 11 deletions src/core/Akka.Persistence/Fsm/PersistentFSMBase.cs
Expand Up @@ -906,7 +906,7 @@ public State(State state)
TimeSpan? timeout = null,
FSMBase.Reason stopReason = null,
List<object> replies = null,
ILinearSeq<TEvent> domainEvents = null,
IEnumerable<TEvent> domainEvents = null,
Action<TData> afterTransitionDo = null)
{
_state = new State(stateName, stateData, timeout, stopReason, replies, domainEvents, afterTransitionDo);
Expand Down Expand Up @@ -948,7 +948,7 @@ public Action<TData> AfterTransitionHandler
/// <summary>
/// TBD
/// </summary>
public ILinearSeq<TEvent> DomainEvents
public IEnumerable<TEvent> DomainEvents
{
get
{
Expand Down Expand Up @@ -977,7 +977,7 @@ public bool Notifies
/// </summary>
/// <param name="events">TBD</param>
/// <returns>TBD</returns>
public State Applying(ILinearSeq<TEvent> events)
public State Applying(IEnumerable<TEvent> events)
{
return _state.Applying(events);
}
Expand Down Expand Up @@ -1015,7 +1015,7 @@ public State AndThen(Action<TData> handler)
TimeSpan? timeout,
FSMBase.Reason stopReason = null,
List<object> replies = null,
ILinearSeq<TEvent> domainEvents = null,
IEnumerable<TEvent> domainEvents = null,
Action<TData> afterTransitionDo = null)
{
return _state.Copy(timeout, stopReason, replies, domainEvents, afterTransitionDo);
Expand Down Expand Up @@ -1133,7 +1133,7 @@ public class State : FSMBase.State<TState, TData>
/// <param name="domainEvents">TBD</param>
/// <param name="afterTransitionDo">TBD</param>
public State(TState stateName, TData stateData, TimeSpan? timeout = null, FSMBase.Reason stopReason = null,
IReadOnlyList<object> replies = null, ILinearSeq<TEvent> domainEvents = null, Action<TData> afterTransitionDo = null)
IReadOnlyList<object> replies = null, IEnumerable<TEvent> domainEvents = null, Action<TData> afterTransitionDo = null)
: base(stateName, stateData, timeout, stopReason, replies)
{
AfterTransitionHandler = afterTransitionDo;
Expand All @@ -1144,7 +1144,7 @@ public class State : FSMBase.State<TState, TData>
/// <summary>
/// TBD
/// </summary>
public ILinearSeq<TEvent> DomainEvents { get; private set; }
public IEnumerable<TEvent> DomainEvents { get; private set; }

/// <summary>
/// TBD
Expand All @@ -1156,13 +1156,13 @@ public class State : FSMBase.State<TState, TData>
/// </summary>
/// <param name="events">TBD</param>
/// <returns>TBD</returns>
public State Applying(ILinearSeq<TEvent> events)
public State Applying(IEnumerable<TEvent> events)
{
if (DomainEvents == null)
{
return Copy(null, null, null, events);
}
return Copy(null, null, null, new ArrayLinearSeq<TEvent>(DomainEvents.Concat(events).ToArray()));
return Copy(null, null, null, DomainEvents.Concat(events).ToArray());
}


Expand All @@ -1175,12 +1175,12 @@ public State Applying(TEvent e)
{
if (DomainEvents == null)
{
return Copy(null, null, null, new ArrayLinearSeq<TEvent>(new[] {e}));
return Copy(null, null, null, new[] {e});
}
var events = new List<TEvent>();
events.AddRange(DomainEvents);
events.Add(e);
return Copy(null, null, null, new ArrayLinearSeq<TEvent>(events.ToArray()));
return Copy(null, null, null, events.ToArray());
}


Expand All @@ -1204,7 +1204,7 @@ public State AndThen(Action<TData> handler)
/// <param name="afterTransitionDo">TBD</param>
/// <returns>TBD</returns>
public State Copy(TimeSpan? timeout, FSMBase.Reason stopReason = null,
IReadOnlyList<object> replies = null, ILinearSeq<TEvent> domainEvents = null, Action<TData> afterTransitionDo = null)
IReadOnlyList<object> replies = null, IEnumerable<TEvent> domainEvents = null, Action<TData> afterTransitionDo = null)
{
return new State(StateName, StateData, timeout ?? Timeout, stopReason ?? StopReason,
replies ?? Replies,
Expand Down
1 change: 0 additions & 1 deletion src/core/Akka.Remote/Akka.Remote.csproj
Expand Up @@ -145,7 +145,6 @@
<Compile Include="Serialization\ProtobufSerializer.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="RemoteActorRefProvider.cs" />
<Compile Include="IPExtensions.cs" />
<Compile Include="Transport\AkkaPduCodec.cs" />
<Compile Include="Transport\DotNetty\AkkaLoggingHandler.cs" />
<Compile Include="Transport\DotNetty\DotNettyTransportSettings.cs" />
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka.Streams.Tests/Dsl/BidiFlowSpec.cs
Expand Up @@ -35,7 +35,7 @@ public BidiFlowSpec()
BidiFlow.FromFlows(
Flow.Create<int>().Select(x => ((long) x) + 2).WithAttributes(Attributes.CreateName("top")),
Flow.Create<ByteString>()
.Select(x => x.DecodeString(Encoding.UTF8))
.Select(x => x.ToString(Encoding.UTF8))
.WithAttributes(Attributes.CreateName("bottom")));
}

Expand All @@ -56,7 +56,7 @@ public BidiFlowSpec()
b.From(Source.Single(42).MapMaterializedValue(_=>Task.FromResult(0))).To(s);
var top = b.Add(Flow.Create<int>().Select(x => ((long) x) + 2));
var bottom = b.Add(Flow.Create<ByteString>().Select(x => x.DecodeString(Encoding.UTF8)));
var bottom = b.Add(Flow.Create<ByteString>().Select(x => x.ToString(Encoding.UTF8)));
return new BidiShape<int,long,ByteString, string>(top.Inlet, top.Outlet, bottom.Inlet, bottom.Outlet);
}));
}
Expand Down
10 changes: 5 additions & 5 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs
Expand Up @@ -92,7 +92,7 @@ private ByteString RandomByteString(int size)
{
var a = new byte[size];
ThreadLocalRandom.Current.NextBytes(a);
return ByteString.Create(a);
return ByteString.FromBytes(a);
}

[Fact]
Expand Down Expand Up @@ -457,7 +457,7 @@ public void GroupBy_must_work_under_fuzzing_stress_test()
var subscriber = this.CreateManualSubscriberProbe<IEnumerable<byte>>();
var firstGroup = (Source<IEnumerable<byte>, NotUsed>)Source.FromPublisher(publisherProbe)
.GroupBy(256, element => element.Head)
.GroupBy(256, element => element[0])
.Select(b => b.Reverse())
.MergeSubstreams();
var secondGroup = (Source<IEnumerable<byte>, NotUsed>)firstGroup.GroupBy(256, bytes => bytes.First())
Expand Down Expand Up @@ -539,7 +539,7 @@ public void GroupBy_must_work_with_random_demand()
var probeShape = new SinkShape<ByteString>(new Inlet<ByteString>("ProbeSink.in"));
var probeSink = new ProbeSink(probeShape, props, Attributes.None);
Source.FromPublisher(publisherProbe)
.GroupBy(100, element => Math.Abs(element.Head % 100))
.GroupBy(100, element => Math.Abs(element[0] % 100))
.To(new Sink<ByteString, TestSubscriber.Probe<ByteString>>(probeSink))
.Run(materializer);
Expand All @@ -548,7 +548,7 @@ public void GroupBy_must_work_with_random_demand()
for (var i = 1; i <= 400; i++)
{
var byteString = RandomByteString(10);
var index = Math.Abs(byteString.Head % 100);
var index = Math.Abs(byteString[0] % 100);
upstreamSubscription.ExpectRequest();
upstreamSubscription.SendNext(byteString);
Expand Down Expand Up @@ -675,7 +675,7 @@ private void RandomDemand(Dictionary<int, SubFlowState> map, RandomDemandPropert
state.Probe.ExpectNext().ShouldBeEquivalentTo(state.FirstElement);
map[key] = new SubFlowState(state.Probe, false, null);
}
else if (props.BlockingNextElement != null && Math.Abs(props.BlockingNextElement.Head % 100) == key)
else if (props.BlockingNextElement != null && Math.Abs(props.BlockingNextElement[0] % 100) == key)
{
state.Probe.ExpectNext().ShouldBeEquivalentTo(props.BlockingNextElement);
props.BlockingNextElement = null;
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams.Tests/Dsl/FlowThrottleSpec.cs
Expand Up @@ -41,7 +41,7 @@ private static ByteString GenerateByteString(int length)
.Take(length)
.Select(Convert.ToByte)
.ToArray();
return ByteString.Create(bytes);
return ByteString.FromBytes(bytes);
}

[Fact]
Expand Down

0 comments on commit 46dc134

Please sign in to comment.