Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use value tuples in Streams #3282

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Akka.sln
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
.editorconfig = .editorconfig
Akka.sln.DotSettings = Akka.sln.DotSettings
NuGet.Config = NuGet.Config
global.json = global.json
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Akka.Remote.Tests.MultiNode", "core\Akka.Remote.Tests.MultiNode\Akka.Remote.Tests.MultiNode.csproj", "{C9105C76-B084-4DA1-9348-1C74A8F22F6B}"
Expand Down
11 changes: 2 additions & 9 deletions src/core/Akka.Persistence.TCK/PluginSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace Akka.Persistence.TCK
{
public abstract class PluginSpec : Akka.TestKit.Xunit2.TestKit, IDisposable
public abstract class PluginSpec : Akka.TestKit.Xunit2.TestKit
{
private static readonly AtomicCounter Counter = new AtomicCounter(0);

Expand Down Expand Up @@ -43,19 +43,12 @@ public void Subscribe<T>(IActorRef subscriber)
Sys.EventStream.Subscribe(subscriber, typeof (T));
}

/// <inheritdoc/>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure that we can remove this part ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The same code is in the base class's Dispose method.

/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
/// <param name="disposing">if set to <c>true</c> the method has been called directly or indirectly by a
/// user's code. Managed and unmanaged resources will be disposed.<br />
/// if set to <c>false</c> the method has been called by the runtime from inside the finalizer and only
/// unmanaged resources can be disposed.</param>
protected virtual void Dispose(bool disposing)
protected override void Dispose(bool disposing)
{
//if (disposing) FSMBase.Shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ public void Setup(BenchmarkContext context)
b.From(bcast).To(zip.In0);
b.From(bcast).To(zip.In1);
var outlet =
b.From(zip.Out).Via(Flow.Create<Tuple<MutableElement, MutableElement>>().Select(t => t.Item1));
b.From(zip.Out).Via(Flow.Create<(MutableElement, MutableElement)>().Select(t => t.Item1));
return new FlowShape<MutableElement, MutableElement>(bcast.In, outlet.Out);
}));

Expand Down
66 changes: 37 additions & 29 deletions src/core/Akka.Streams.Tests/Actor/ActorPublisherSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void ActorPublisher_should_terminate_after_signalling_onCompleteThenStop(
actorRef.Tell(CompleteThenStop.Instance);
s.ExpectNext("elem-1");
s.ExpectComplete();
probe.ExpectTerminated(actorRef,TimeSpan.FromSeconds(3));
probe.ExpectTerminated(actorRef, TimeSpan.FromSeconds(3));
}

[Fact]
Expand Down Expand Up @@ -290,23 +290,21 @@ public void ActorPublisher_should_work_together_with_Flow_and_ActorSubscriber()
var probe = CreateTestProbe();
var source = Source.ActorPublisher<int>(Sender.Props);
var sink = Sink.ActorSubscriber<string>(Receiver.Props(probe.Ref));
var t = source.Collect(n =>

var (snd, rcv) = source.Collect(n =>
{
if (n%2 == 0)
if (n % 2 == 0)
return "elem-" + n;
return null;
}).ToMaterialized(sink, Keep.Both).Run(materializer);
var snd = t.Item1;
var rcv = t.Item2;

for (var i = 1; i <= 3; i++)
snd.Tell(i);
probe.ExpectMsg("elem-2");

for (var n = 4; n <= 500; n++)
{
if (n%19 == 0)
if (n % 19 == 0)
Thread.Sleep(50); // simulate bursts
snd.Tell(n);
}
Expand Down Expand Up @@ -345,9 +343,9 @@ public void ActorPublisher_should_work_in_a_GraphDsl()

builder.From(source1).To(merge.In(0));
builder.From(source2.Outlet).To(merge.In(1));

builder.From(merge.Out).Via(Flow.Create<int>().Select(i => i.ToString())).To(bcast.In);

builder.From(bcast.Out(0)).Via(Flow.Create<string>().Select(s => s + "mark")).To(sink1);
builder.From(bcast.Out(1)).To(sink2);

Expand All @@ -359,16 +357,17 @@ public void ActorPublisher_should_work_in_a_GraphDsl()
for (var i = 0; i < noOfMessages; i++)
{
senderRef1.Tell(i);
senderRef2.Tell(i+noOfMessages);
senderRef2.Tell(i + noOfMessages);
}

var probe1Messages = new List<string>(noOfMessages*2);
var probe2Messages = new List<string>(noOfMessages*2);
var probe1Messages = new List<string>(noOfMessages * 2);
var probe2Messages = new List<string>(noOfMessages * 2);
for (var i = 0; i < noOfMessages * 2; i++)
{
probe1Messages.Add(probe1.ExpectMsg<string>());
probe2Messages.Add(probe2.ExpectMsg<string>());
}

probe1Messages.Should().BeEquivalentTo(Enumerable.Range(0, noOfMessages * 2).Select(i => i + "mark"));
probe2Messages.Should().BeEquivalentTo(Enumerable.Range(0, noOfMessages * 2).Select(i => i.ToString()));
}
Expand Down Expand Up @@ -419,8 +418,8 @@ public void ActorPublisher_should_use_dispatcher_from_materializer_settings()
var materializer = ActorMaterializer.Create(Sys, Sys.Materializer().Settings.WithDispatcher("my-dispatcher1"));
var s = this.CreateManualSubscriberProbe<string>();
var actorRef = Source.ActorPublisher<string>(TestPublisher.Props(TestActor, useTestDispatcher: false))
.To(Sink.FromSubscriber(s))
.Run(materializer);
.To(Sink.FromSubscriber(s))
.Run(materializer);

actorRef.Tell(ThreadName.Instance);
ExpectMsg<string>().Should().Contain("my-dispatcher1");
Expand Down Expand Up @@ -482,7 +481,7 @@ public static Props Props(IActorRef probe, bool useTestDispatcher = true)
}

private readonly IActorRef _probe;

public TestPublisher(IActorRef probe)
{
_probe = probe;
Expand All @@ -498,7 +497,8 @@ protected override bool Receive(object message)
.With<Complete>(OnComplete)
.With<CompleteThenStop>(OnCompleteThenStop)
.With<Boom>(() => { throw new Exception("boom"); })
.With<ThreadName>(()=>_probe.Tell(Context.Props.Dispatcher /*Thread.CurrentThread.Name*/)) // TODO fix me when thread name is set by dispatcher
// TODO fix me when thread name is set by dispatcher
.With<ThreadName>(() => _probe.Tell(Context.Props.Dispatcher /*Thread.CurrentThread.Name*/))
.WasHandled;
}
}
Expand Down Expand Up @@ -527,7 +527,7 @@ protected override bool Receive(object message)

return true;
}

public IStash Stash { get; set; }
}

Expand Down Expand Up @@ -562,8 +562,8 @@ private void DeliverBuffer()

if (TotalDemand <= int.MaxValue)
{
var use = _buffer.Take((int) TotalDemand).ToImmutableList();
_buffer = _buffer.Skip((int) TotalDemand).ToImmutableList();
var use = _buffer.Take((int)TotalDemand).ToImmutableList();
_buffer = _buffer.Skip((int)TotalDemand).ToImmutableList();

use.ForEach(OnNext);
}
Expand All @@ -581,17 +581,17 @@ private void DeliverBuffer()
internal class TimeoutingPublisher : Actors.ActorPublisher<int>
{
public static Props Props(IActorRef probe, TimeSpan timeout) =>
Akka.Actor.Props.Create(() => new TimeoutingPublisher(probe, timeout))
.WithDispatcher("akka.test.stream-dispatcher");
Akka.Actor.Props.Create(() => new TimeoutingPublisher(probe, timeout))
.WithDispatcher("akka.test.stream-dispatcher");

private readonly IActorRef _probe;

public TimeoutingPublisher(IActorRef probe, TimeSpan timeout)
public TimeoutingPublisher(IActorRef probe, TimeSpan timeout)
{
_probe = probe;
SubscriptionTimeout = timeout;
}

protected override bool Receive(object message)
{
return message.Match()
Expand Down Expand Up @@ -641,7 +641,7 @@ public override bool Equals(object obj)
{
if (ReferenceEquals(null, obj)) return false;
if (ReferenceEquals(this, obj)) return true;
return obj.GetType() == GetType() && Equals((TotalDemand) obj);
return obj.GetType() == GetType() && Equals((TotalDemand)obj);
}

protected bool Equals(TotalDemand other) => Elements == other.Elements;
Expand Down Expand Up @@ -683,27 +683,35 @@ internal class Boom
{
public static Boom Instance { get; } = new Boom();

private Boom() { }
private Boom()
{
}
}

internal class Complete
{
public static Complete Instance { get; } = new Complete();

private Complete() { }
private Complete()
{
}
}

internal class CompleteThenStop
{
public static CompleteThenStop Instance { get; } = new CompleteThenStop();

private CompleteThenStop() { }
private CompleteThenStop()
{
}
}

internal class ThreadName
{
public static ThreadName Instance { get; } = new ThreadName();

private ThreadName() { }
private ThreadName()
{
}
}
}
}
12 changes: 2 additions & 10 deletions src/core/Akka.Streams.Tests/Akka.Streams.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,53 +1,45 @@
<Project Sdk="Microsoft.NET.Sdk">
<Import Project="..\..\common.props" />

<PropertyGroup>
<AssemblyName>Akka.Streams.Tests</AssemblyName>
<TargetFrameworks>net452;netcoreapp1.1</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
<Content Include="App.config" />
<ProjectReference Include="..\Akka.Streams\Akka.Streams.csproj" />
<ProjectReference Include="..\Akka.Streams.TestKit\Akka.Streams.TestKit.csproj" />
<ProjectReference Include="..\Akka.Streams.TestKit.Tests\Akka.Streams.TestKit.Tests.csproj" />
<ProjectReference Include="..\Akka.Tests.Shared.Internals\Akka.Tests.Shared.Internals.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="$(TestSdkVersion)" />
<PackageReference Include="System.ValueTuple" Version="4.4.0" />
<PackageReference Include="xunit" Version="$(XunitVersion)" />
<PackageReference Include="xunit.runner.visualstudio" Version="$(XunitVersion)" />
<DotNetCliToolReference Include="dotnet-xunit" Version="$(XunitVersion)" />
<PackageReference Include="FluentAssertions" Version="4.14.0" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'net452' ">
<Reference Include="System.Configuration" />
<Reference Include="System.Xml" />
<Reference Include="System.Xml.Linq" />
<Reference Include="Microsoft.CSharp" />
</ItemGroup>

<ItemGroup Condition=" '$(TargetFramework)' == 'netcoreapp1.1' ">
<PackageReference Include="System.Net.Sockets" Version="4.3.0" />
<PackageReference Include="System.Runtime.Extensions" Version="4.3.0" />
</ItemGroup>

<ItemGroup>
<Service Include="{82a7f48d-3b50-4b1e-b82e-3ada8210c358}" />
</ItemGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'net452' ">
<DefineConstants>TRACE;DEBUG;SERIALIZATION;CONFIGURATION;UNSAFE_THREADING;NET452;NET452</DefineConstants>
<DefineConstants>$(DefineConstants);SERIALIZATION;CONFIGURATION;UNSAFE_THREADING;AKKAIO</DefineConstants>
</PropertyGroup>

<PropertyGroup Condition=" '$(TargetFramework)' == 'netcoreapp1.1' ">
<DefineConstants>$(DefineConstants);CORECLR</DefineConstants>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<DefineConstants>$(DefineConstants);RELEASE</DefineConstants>
</PropertyGroup>
</Project>
</Project>
4 changes: 1 addition & 3 deletions src/core/Akka.Streams.Tests/Dsl/ActorRefSourceSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,9 @@ public void A_ActorRefSource_must_buffer_when_needed()
[Fact]
public void A_ActorRefSource_must_drop_new_when_full_and_with_DropNew_strategy()
{
var t = Source.ActorRef<int>(100, OverflowStrategy.DropNew)
var (actorRef, sub) = Source.ActorRef<int>(100, OverflowStrategy.DropNew)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var actorRef = t.Item1;
var sub = t.Item2;

Enumerable.Range(1, 20).ForEach(x => actorRef.Tell(x));
sub.Request(10);
Expand Down
15 changes: 4 additions & 11 deletions src/core/Akka.Streams.Tests/Dsl/BidiFlowSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,27 +68,24 @@ public BidiFlowSpec()
[Fact]
public void A_BidiFlow_must_work_top_and_bottom_in_isolation()
{
var t = RunnableGraph.FromGraph(GraphDsl.Create(Sink.First<long>(), Sink.First<string>(), Keep.Both,
var (top, bottom) = RunnableGraph.FromGraph(GraphDsl.Create(Sink.First<long>(), Sink.First<string>(), Keep.Both,
(b, st, sb) =>
{
var s = b.Add(Bidi());
b.From(
Source.Single(1)
.MapMaterializedValue(_ => Tuple.Create(Task.FromResult(1L), Task.FromResult(""))))
.MapMaterializedValue(_ => (Task.FromResult(1L), Task.FromResult(""))))
.To(s.Inlet1);
b.From(s.Outlet1).To(st);
b.To(sb).From(s.Outlet2);
b.To(s.Inlet2)
.From(
Source.Single(Bytes)
.MapMaterializedValue(_ => Tuple.Create(Task.FromResult(1L), Task.FromResult(""))));
.MapMaterializedValue(_ => (Task.FromResult(1L), Task.FromResult(""))));

return ClosedShape.Instance;
})).Run(Materializer);

var top = t.Item1;
var bottom = t.Item2;

top.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
bottom.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
top.Result.Should().Be(3);
Expand Down Expand Up @@ -186,13 +183,9 @@ public void A_BidiFlow_must_combine_materialization_values()
return new FlowShape<long, ByteString>(flow.Inlet, source.Outlet);
}));

var tt = left.JoinMaterialized(BidiMaterialized(), Keep.Both)
var ((l, m), r) = left.JoinMaterialized(BidiMaterialized(), Keep.Both)
.JoinMaterialized(right, Keep.Both)
.Run(Materializer);
var t = tt.Item1;
var l = t.Item1;
var m = t.Item2;
var r = tt.Item2;

Task.WhenAll(l, m, r).Wait(TimeSpan.FromSeconds(3)).Should().BeTrue();
l.Result.Should().Be(1);
Expand Down
4 changes: 1 addition & 3 deletions src/core/Akka.Streams.Tests/Dsl/FlowBufferSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,11 @@ public void Buffer_must_drop_all_elements_if_buffer_is_full_and_configured_so()
[Fact]
public void Buffer_must_drop_new_elements_if_buffer_is_full_and_configured_so()
{
var t =
var (publisher, subscriber) =
this.SourceProbe<int>()
.Buffer(100, OverflowStrategy.DropNew)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Materializer);
var publisher = t.Item1;
var subscriber = t.Item2;

subscriber.EnsureSubscription();

Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka.Streams.Tests/Dsl/FlowConcatSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,9 @@ public void A_Concat_for_Flow_must_work_with_Source_DSL()
task.Result.ShouldAllBeEquivalentTo(Enumerable.Range(1,10));

var runnable = testSource.ToMaterialized(Sink.Ignore<IEnumerable<int>>(), Keep.Left);
var t = runnable.Run(Materializer);
t.Item1.Should().BeOfType<NotUsed>();
t.Item2.Should().BeOfType<NotUsed>();
var (m1, m2) = runnable.Run(Materializer);
m1.Should().BeOfType<NotUsed>();
m2.Should().BeOfType<NotUsed>();

runnable.MapMaterializedValue(_ => "boo").Run(Materializer).Should().Be("boo");
}, Materializer);
Expand Down
4 changes: 1 addition & 3 deletions src/core/Akka.Streams.Tests/Dsl/FlowExpandSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,10 @@ public void Expand_must_backpressure_publisher_when_subscriber_is_slower()
[Fact]
public void Expand_must_work_properly_with_finite_extrapolations()
{
var t = TestSource.SourceProbe<int>(this)
var (source, sink) = TestSource.SourceProbe<int>(this)
.Expand(i => Enumerable.Range(0, 4).Select(x => Tuple.Create(i, x)).Take(3).GetEnumerator())
.ToMaterialized(this.SinkProbe<Tuple<int, int>>(), Keep.Both)
.Run(Materializer);
var source = t.Item1;
var sink = t.Item2;

source.SendNext(1);

Expand Down