From f46d845df46522dda15091b3fa1f7a45cb708f66 Mon Sep 17 00:00:00 2001 From: Ismael Hamed <1279846+ismaelhamed@users.noreply.github.com> Date: Mon, 20 Jun 2022 19:49:33 +0200 Subject: [PATCH] Added GroupedWeightedWithin operator (#6000) Co-authored-by: Gregorius Soedharmo --- docs/articles/streams/builtinstages.md | 19 +- ...reAPISpec.ApproveStreams.Core.verified.txt | 10 +- ...APISpec.ApproveStreams.DotNet.verified.txt | 10 +- ...oreAPISpec.ApproveStreams.Net.verified.txt | 10 +- .../Dsl/FlowGroupedWithinSpec.cs | 185 +++++++++++++- src/core/Akka.Streams/Dsl/FlowOperations.cs | 49 +++- .../Dsl/Internal/InternalFlowOperations.cs | 67 ++++- src/core/Akka.Streams/Dsl/SourceOperations.cs | 52 ++++ .../Akka.Streams/Dsl/SubFlowOperations.cs | 31 ++- .../Akka.Streams/Implementation/Fusing/Ops.cs | 238 +++++++++++------- .../Implementation/Stages/Stages.cs | 8 + 11 files changed, 542 insertions(+), 137 deletions(-) diff --git a/docs/articles/streams/builtinstages.md b/docs/articles/streams/builtinstages.md index e85abb667c6..1ded80edb24 100644 --- a/docs/articles/streams/builtinstages.md +++ b/docs/articles/streams/builtinstages.md @@ -843,12 +843,23 @@ Skip elements until a timeout has fired ### GroupedWithin -Chunk up the stream into groups of elements received within a time window, or limited by the given number of elements, -whichever happens first. +Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group. -**emits** when the configured time elapses since the last group has been emitted +**emits** when the configured time elapses since the last group has been emitted, but not if no elements has been grouped (i.e: no empty groups), or when limit has been reached. -**backpressures** when the group has been assembled (the duration elapsed) and downstream backpressures +**backpressures** when downstream backpressures, and there are n+1 buffered elements + +**completes** when upstream completes + +### GroupedWeightedWithin + +Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. +The last group before end-of-stream will contain the buffered elements since the previously emitted group. + +**emits** when the configured time elapses since the last group has been emitted, +but not if no elements has been grouped (i.e: no empty groups), or when weight limit has been reached. + +**backpressures** downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight` **completes** when upstream completes diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt index 030ff441605..671e0b73a40 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Core.verified.txt @@ -1371,6 +1371,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc, bool allowClosedSubstreamRecreation) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Flow, TMat> Grouped(this Akka.Streams.Dsl.Flow flow, int n) { } + public static Akka.Streams.Dsl.Flow, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Flow flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.Flow, TMat> GroupedWithin(this Akka.Streams.Dsl.Flow flow, int n, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Flow IdleTimeout(this Akka.Streams.Dsl.Flow flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Flow InitialDelay(this Akka.Streams.Dsl.Flow flow, System.TimeSpan delay) { } @@ -2048,6 +2049,8 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source Expand(this Akka.Streams.Dsl.Source flow, System.Func> extrapolate) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Source flow, int maxSubstreams, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Source, TMat> Grouped(this Akka.Streams.Dsl.Source flow, int n) { } + public static Akka.Streams.Dsl.Source, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Source flow, long maxWeight, System.TimeSpan interval, System.Func costFn) { } + public static Akka.Streams.Dsl.Source, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Source flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.Source, TMat> GroupedWithin(this Akka.Streams.Dsl.Source flow, int n, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Source IdleTimeout(this Akka.Streams.Dsl.Source flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Source InitialDelay(this Akka.Streams.Dsl.Source flow, System.TimeSpan delay) { } @@ -2234,6 +2237,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.SubFlow DivertToMaterialized(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat2> that, System.Func when, System.Func materializerFunction) { } public static Akka.Streams.Dsl.SubFlow Expand(this Akka.Streams.Dsl.SubFlow flow, System.Func> extrapolate) { } public static Akka.Streams.Dsl.SubFlow, TMat, TClosed> Grouped(this Akka.Streams.Dsl.SubFlow flow, int n) { } + public static Akka.Streams.Dsl.SubFlow, TMat, TClosed> GroupedWeightedWithin(this Akka.Streams.Dsl.SubFlow flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.SubFlow, TMat, TClosed> GroupedWithin(this Akka.Streams.Dsl.SubFlow flow, int n, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.SubFlow IdleTimeout(this Akka.Streams.Dsl.SubFlow flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.SubFlow InitialDelay(this Akka.Streams.Dsl.SubFlow flow, System.TimeSpan delay) { } @@ -4180,9 +4184,9 @@ namespace Akka.Streams.Implementation.Fusing public static Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage Identity() { } } [Akka.Annotations.InternalApiAttribute()] - public sealed class GroupedWithin : Akka.Streams.Stage.GraphStage>> + public sealed class GroupedWeightedWithin : Akka.Streams.Stage.GraphStage>> { - public GroupedWithin(int count, System.TimeSpan timeout) { } + public GroupedWeightedWithin(long maxWeight, int maxNumber, System.Func costFn, System.TimeSpan interval) { } protected override Akka.Streams.Attributes InitialAttributes { get; } public override Akka.Streams.FlowShape> Shape { get; } protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } @@ -4544,6 +4548,8 @@ namespace Akka.Streams.Implementation.Stages public static readonly Akka.Streams.Attributes Fused; public static readonly Akka.Streams.Attributes GroupBy; public static readonly Akka.Streams.Attributes Grouped; + public static readonly Akka.Streams.Attributes GroupedWeightedWithin; + public static readonly Akka.Streams.Attributes GroupedWithin; public static readonly Akka.Streams.Attributes IODispatcher; public static readonly Akka.Streams.Attributes IdentityOp; public static readonly Akka.Streams.Attributes Idle; diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index 254bf635d29..09184c2e66d 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -1371,6 +1371,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc, bool allowClosedSubstreamRecreation) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Flow, TMat> Grouped(this Akka.Streams.Dsl.Flow flow, int n) { } + public static Akka.Streams.Dsl.Flow, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Flow flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.Flow, TMat> GroupedWithin(this Akka.Streams.Dsl.Flow flow, int n, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Flow IdleTimeout(this Akka.Streams.Dsl.Flow flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Flow InitialDelay(this Akka.Streams.Dsl.Flow flow, System.TimeSpan delay) { } @@ -2048,6 +2049,8 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source Expand(this Akka.Streams.Dsl.Source flow, System.Func> extrapolate) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Source flow, int maxSubstreams, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Source, TMat> Grouped(this Akka.Streams.Dsl.Source flow, int n) { } + public static Akka.Streams.Dsl.Source, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Source flow, long maxWeight, System.TimeSpan interval, System.Func costFn) { } + public static Akka.Streams.Dsl.Source, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Source flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.Source, TMat> GroupedWithin(this Akka.Streams.Dsl.Source flow, int n, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Source IdleTimeout(this Akka.Streams.Dsl.Source flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Source InitialDelay(this Akka.Streams.Dsl.Source flow, System.TimeSpan delay) { } @@ -2234,6 +2237,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.SubFlow DivertToMaterialized(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat2> that, System.Func when, System.Func materializerFunction) { } public static Akka.Streams.Dsl.SubFlow Expand(this Akka.Streams.Dsl.SubFlow flow, System.Func> extrapolate) { } public static Akka.Streams.Dsl.SubFlow, TMat, TClosed> Grouped(this Akka.Streams.Dsl.SubFlow flow, int n) { } + public static Akka.Streams.Dsl.SubFlow, TMat, TClosed> GroupedWeightedWithin(this Akka.Streams.Dsl.SubFlow flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.SubFlow, TMat, TClosed> GroupedWithin(this Akka.Streams.Dsl.SubFlow flow, int n, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.SubFlow IdleTimeout(this Akka.Streams.Dsl.SubFlow flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.SubFlow InitialDelay(this Akka.Streams.Dsl.SubFlow flow, System.TimeSpan delay) { } @@ -4192,9 +4196,9 @@ namespace Akka.Streams.Implementation.Fusing public static Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage Identity() { } } [Akka.Annotations.InternalApiAttribute()] - public sealed class GroupedWithin : Akka.Streams.Stage.GraphStage>> + public sealed class GroupedWeightedWithin : Akka.Streams.Stage.GraphStage>> { - public GroupedWithin(int count, System.TimeSpan timeout) { } + public GroupedWeightedWithin(long maxWeight, int maxNumber, System.Func costFn, System.TimeSpan interval) { } protected override Akka.Streams.Attributes InitialAttributes { get; } public override Akka.Streams.FlowShape> Shape { get; } protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } @@ -4556,6 +4560,8 @@ namespace Akka.Streams.Implementation.Stages public static readonly Akka.Streams.Attributes Fused; public static readonly Akka.Streams.Attributes GroupBy; public static readonly Akka.Streams.Attributes Grouped; + public static readonly Akka.Streams.Attributes GroupedWeightedWithin; + public static readonly Akka.Streams.Attributes GroupedWithin; public static readonly Akka.Streams.Attributes IODispatcher; public static readonly Akka.Streams.Attributes IdentityOp; public static readonly Akka.Streams.Attributes Idle; diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index 030ff441605..671e0b73a40 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -1371,6 +1371,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc, bool allowClosedSubstreamRecreation) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Flow flow, int maxSubstreams, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Flow, TMat> Grouped(this Akka.Streams.Dsl.Flow flow, int n) { } + public static Akka.Streams.Dsl.Flow, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Flow flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.Flow, TMat> GroupedWithin(this Akka.Streams.Dsl.Flow flow, int n, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Flow IdleTimeout(this Akka.Streams.Dsl.Flow flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Flow InitialDelay(this Akka.Streams.Dsl.Flow flow, System.TimeSpan delay) { } @@ -2048,6 +2049,8 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.Source Expand(this Akka.Streams.Dsl.Source flow, System.Func> extrapolate) { } public static Akka.Streams.Dsl.SubFlow> GroupBy(this Akka.Streams.Dsl.Source flow, int maxSubstreams, System.Func groupingFunc) { } public static Akka.Streams.Dsl.Source, TMat> Grouped(this Akka.Streams.Dsl.Source flow, int n) { } + public static Akka.Streams.Dsl.Source, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Source flow, long maxWeight, System.TimeSpan interval, System.Func costFn) { } + public static Akka.Streams.Dsl.Source, TMat> GroupedWeightedWithin(this Akka.Streams.Dsl.Source flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.Source, TMat> GroupedWithin(this Akka.Streams.Dsl.Source flow, int n, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Source IdleTimeout(this Akka.Streams.Dsl.Source flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.Source InitialDelay(this Akka.Streams.Dsl.Source flow, System.TimeSpan delay) { } @@ -2234,6 +2237,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.SubFlow DivertToMaterialized(this Akka.Streams.Dsl.SubFlow flow, Akka.Streams.IGraph, TMat2> that, System.Func when, System.Func materializerFunction) { } public static Akka.Streams.Dsl.SubFlow Expand(this Akka.Streams.Dsl.SubFlow flow, System.Func> extrapolate) { } public static Akka.Streams.Dsl.SubFlow, TMat, TClosed> Grouped(this Akka.Streams.Dsl.SubFlow flow, int n) { } + public static Akka.Streams.Dsl.SubFlow, TMat, TClosed> GroupedWeightedWithin(this Akka.Streams.Dsl.SubFlow flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func costFn) { } public static Akka.Streams.Dsl.SubFlow, TMat, TClosed> GroupedWithin(this Akka.Streams.Dsl.SubFlow flow, int n, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.SubFlow IdleTimeout(this Akka.Streams.Dsl.SubFlow flow, System.TimeSpan timeout) { } public static Akka.Streams.Dsl.SubFlow InitialDelay(this Akka.Streams.Dsl.SubFlow flow, System.TimeSpan delay) { } @@ -4180,9 +4184,9 @@ namespace Akka.Streams.Implementation.Fusing public static Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage Identity() { } } [Akka.Annotations.InternalApiAttribute()] - public sealed class GroupedWithin : Akka.Streams.Stage.GraphStage>> + public sealed class GroupedWeightedWithin : Akka.Streams.Stage.GraphStage>> { - public GroupedWithin(int count, System.TimeSpan timeout) { } + public GroupedWeightedWithin(long maxWeight, int maxNumber, System.Func costFn, System.TimeSpan interval) { } protected override Akka.Streams.Attributes InitialAttributes { get; } public override Akka.Streams.FlowShape> Shape { get; } protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } @@ -4544,6 +4548,8 @@ namespace Akka.Streams.Implementation.Stages public static readonly Akka.Streams.Attributes Fused; public static readonly Akka.Streams.Attributes GroupBy; public static readonly Akka.Streams.Attributes Grouped; + public static readonly Akka.Streams.Attributes GroupedWeightedWithin; + public static readonly Akka.Streams.Attributes GroupedWithin; public static readonly Akka.Streams.Attributes IODispatcher; public static readonly Akka.Streams.Attributes IdentityOp; public static readonly Akka.Streams.Attributes Idle; diff --git a/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs b/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs index b7840f385cc..21bbba35a42 100644 --- a/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/FlowGroupedWithinSpec.cs @@ -143,7 +143,7 @@ public void A_GroupedWithin_must_drop_empty_groups() .GroupedWithin(1000, TimeSpan.FromMilliseconds(500)) .To(Sink.FromSubscriber(c)) .Run(Materializer); - + var pSub = p.ExpectSubscription(); var cSub = c.ExpectSubscription(); @@ -153,7 +153,7 @@ public void A_GroupedWithin_must_drop_empty_groups() pSub.SendNext(1); pSub.SendNext(2); - c.ExpectNext().Should().BeEquivalentTo(new [] {1,2}); + c.ExpectNext().Should().BeEquivalentTo(new[] { 1, 2 }); // nothing more requested c.ExpectNoMsg(TimeSpan.FromMilliseconds(1100)); cSub.Request(3); @@ -182,7 +182,7 @@ public void A_GroupedWithin_must_not_emit_empty_group_when_finished_while_not_be pSub.SendComplete(); c.ExpectComplete(); } - + [Fact(Skip = "Skipped for async_testkit conversion build")] public void A_GroupedWithin_must_reset_time_window_when_max_elements_reached() { @@ -198,10 +198,10 @@ public void A_GroupedWithin_must_reset_time_window_when_max_elements_reached() downstream.Request(2); downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(1000)); - Enumerable.Range(1,4).ForEach(_=>upstream.SendNext(input.Next())); + Enumerable.Range(1, 4).ForEach(_ => upstream.SendNext(input.Next())); downstream.Within(TimeSpan.FromMilliseconds(1000), () => { - downstream.ExpectNext().Should().BeEquivalentTo(new[] {1, 2, 3}); + downstream.ExpectNext().Should().BeEquivalentTo(new[] { 1, 2, 3 }); return NotUsed.Instance; }); @@ -209,7 +209,7 @@ public void A_GroupedWithin_must_reset_time_window_when_max_elements_reached() downstream.Within(TimeSpan.FromMilliseconds(1000), () => { - downstream.ExpectNext().Should().BeEquivalentTo(new[] {4}); + downstream.ExpectNext().Should().BeEquivalentTo(new[] { 4 }); return NotUsed.Instance; }); @@ -228,7 +228,7 @@ public void A_GroupedWithin_must_group_early() var y = random.Next(); var z = random.Next(); - return ((ICollection)new[] {x, y, z}, (ICollection>)new[] {new[] {x, y, z}}); + return ((ICollection)new[] { x, y, z }, (ICollection>)new[] { new[] { x, y, z } }); }).ToArray()); RandomTestRange(Sys) @@ -242,7 +242,7 @@ public void A_GroupedWithin_must_group_with_rest() Func>> script = () => { var i = random.Next(); - var rest = (new[] {i}, new[] {new[] {i}}); + var rest = (new[] { i }, new[] { new[] { i } }); return Script.Create(RandomTestRange(Sys).Select(_ => { @@ -250,7 +250,7 @@ public void A_GroupedWithin_must_group_with_rest() var y = random.Next(); var z = random.Next(); - return ((ICollection)new[] { x, y, z }, (ICollection>)new[] { new[] { x, y, z }}); + return ((ICollection)new[] { x, y, z }, (ICollection>)new[] { new[] { x, y, z } }); }).Concat(rest).ToArray()); }; @@ -266,7 +266,172 @@ public void A_GroupedWithin_must_group_with_small_groups_with_backpressure() .Throttle(1, TimeSpan.FromMilliseconds(110), 0, ThrottleMode.Shaping) .RunWith(Sink.Seq>(), Materializer); t.Wait(TimeSpan.FromSeconds(3)).Should().BeTrue(); - t.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10).Select(i => new List {i})); + t.Result.Should().BeEquivalentTo(Enumerable.Range(1, 10).Select(i => new List { i })); + } + } + + public class FlowGroupedWeightedWithinSpec : ScriptedTest + { + private ActorMaterializerSettings Settings { get; } + private ActorMaterializer Materializer { get; } + + public FlowGroupedWeightedWithinSpec(ITestOutputHelper helper) : base(helper) + { + Settings = ActorMaterializerSettings.Create(Sys); + Materializer = ActorMaterializer.Create(Sys, Settings); + } + + [Fact] + public void A_GroupedWeightedWithin_must_handle_handle_elements_larger_than_the_limit() + { + var downstream = this.CreateSubscriberProbe>(); + + Source.From(new List { 1, 2, 3, 101, 4, 5, 6 }) + .GroupedWeightedWithin(100, TimeSpan.FromMilliseconds(100), t => t) + .To(Sink.FromSubscriber(downstream)) + .Run(Materializer); + + downstream.Request(1); + downstream.ExpectNext().Should().BeEquivalentTo(new List { 1, 2, 3 }); + downstream.Request(1); + downstream.ExpectNext().Should().BeEquivalentTo(new List { 101 }); + downstream.Request(1); + downstream.ExpectNext().Should().BeEquivalentTo(new List { 4, 5, 6 }); + downstream.ExpectComplete(); + } + + [Fact] + public void A_GroupedWeightedWithin_must_not_drop_a_pending_last_element_on_upstream_finish() + { + var upstream = this.CreatePublisherProbe(); + var downstream = this.CreateSubscriberProbe>(); + + Source.FromPublisher(upstream) + .GroupedWeightedWithin(5, TimeSpan.FromMilliseconds(50), t => t) + .To(Sink.FromSubscriber(downstream)) + .Run(Materializer); + + downstream.EnsureSubscription(); + downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + upstream.SendNext(1); + upstream.SendNext(2); + upstream.SendNext(3); + upstream.SendComplete(); + downstream.Request(1); + downstream.ExpectNext().Should().BeEquivalentTo(new List { 1, 2 }); + downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); + downstream.Request(1); + downstream.ExpectNext().Should().BeEquivalentTo(new List { 3 }); + downstream.ExpectComplete(); + } + + [Fact] + public void A_GroupedWeightedWithin_must_append_zero_weighted_elements_to_a_full_group_before_timeout_received_if_downstream_hasnt_pulled_yet() + { + var upstream = this.CreatePublisherProbe(); + var downstream = this.CreateSubscriberProbe>(); + + Source.FromPublisher(upstream) + .GroupedWeightedWithin(5, TimeSpan.FromMilliseconds(50), t => t.Length) + .To(Sink.FromSubscriber(downstream)) + .Run(Materializer); + + downstream.EnsureSubscription(); + upstream.SendNext("333"); + upstream.SendNext("22"); + upstream.SendNext(""); + upstream.SendNext(""); + upstream.SendNext(""); + downstream.Request(1); + downstream.ExpectNext().Should().BeEquivalentTo(new List { "333", "22", "", "", "" }); + upstream.SendNext(""); + upstream.SendNext(""); + upstream.SendComplete(); + downstream.Request(1); + downstream.ExpectNext().Should().BeEquivalentTo(new List { "", "" }); + downstream.ExpectComplete(); + } + + [Fact] + public void A_GroupedWeightedWithin_must_not_emit_an_empty_group_if_first_element_is_heavier_than_maxWeight() + { + var upstream = this.CreatePublisherProbe(); + var downstream = this.CreateSubscriberProbe>(); + + Source.FromPublisher(upstream) + .GroupedWeightedWithin(10, TimeSpan.FromMilliseconds(50), t => t) + .To(Sink.FromSubscriber(downstream)) + .Run(Materializer); + + downstream.EnsureSubscription(); + downstream.Request(1); + upstream.SendNext(11); + downstream.ExpectNext().Should().BeEquivalentTo(new List { 11 }); + upstream.SendComplete(); + downstream.ExpectComplete(); + } + + [Fact] + public void A_GroupedWeightedWithin_must_handle_zero_cost_function_to_get_only_timed_based_grouping_without_limit() + { + var upstream = this.CreatePublisherProbe(); + var downstream = this.CreateSubscriberProbe>(); + + Source.FromPublisher(upstream) + .GroupedWeightedWithin(1L, TimeSpan.FromMilliseconds(100), _ => 0L) + .To(Sink.FromSubscriber(downstream)) + .Run(Materializer); + + downstream.EnsureSubscription(); + downstream.Request(1); + upstream.SendNext("333"); + upstream.SendNext("22"); + upstream.SendNext("333"); + upstream.SendNext("22"); + downstream.ExpectNoMsg(TimeSpan.FromMilliseconds(50)); + downstream.ExpectNext().Should().BeEquivalentTo(new List { "333", "22", "333", "22" }); + upstream.SendComplete(); + downstream.ExpectComplete(); + } + + [Fact] + public void A_GroupedWeightedWithin_must_group_by_max_weight_and_max_number_of_elements_reached() + { + var upstream = this.CreatePublisherProbe(); + var downstream = this.CreateSubscriberProbe>(); + + Source.FromPublisher(upstream) + .GroupedWeightedWithin(10, 3, TimeSpan.FromSeconds(30), t => t) + .To(Sink.FromSubscriber(downstream)) + .Run(Materializer); + + downstream.EnsureSubscription(); + upstream.SendNext(1); + upstream.SendNext(2); + upstream.SendNext(3); + upstream.SendNext(4); + upstream.SendNext(5); + upstream.SendNext(6); + upstream.SendNext(11); + upstream.SendNext(7); + upstream.SendNext(2); + upstream.SendComplete(); + downstream.Request(1); + // split because of maxNumber: 3 element + downstream.ExpectNext().Should().BeEquivalentTo(new List { 1, 2, 3 }); + downstream.Request(1); + // split because of maxWeight: 9=4+5, one more element did not fit + downstream.ExpectNext().Should().BeEquivalentTo(new List { 4, 5 }); + downstream.Request(1); + // split because of maxWeight: 6, one more element did not fit + downstream.ExpectNext().Should().BeEquivalentTo(new List { 6 }); + downstream.Request(1); + // split because of maxWeight: 11 + downstream.ExpectNext().Should().BeEquivalentTo(new List { 11 }); + downstream.Request(1); + // no split + downstream.ExpectNext().Should().BeEquivalentTo(new List { 7, 2 }); + downstream.ExpectComplete(); } } } diff --git a/src/core/Akka.Streams/Dsl/FlowOperations.cs b/src/core/Akka.Streams/Dsl/FlowOperations.cs index 022e0e85a69..00e51966058 100644 --- a/src/core/Akka.Streams/Dsl/FlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/FlowOperations.cs @@ -922,10 +922,35 @@ public static class FlowOperations /// TBD /// Thrown if is less than or equal zero or is . /// TBD - public static Flow, TMat> GroupedWithin(this Flow flow, int n, TimeSpan timeout) - { - return (Flow, TMat>)InternalFlowOperations.GroupedWithin(flow, n, timeout); - } + public static Flow, TMat> GroupedWithin(this Flow flow, int n, TimeSpan timeout) => + (Flow, TMat>)InternalFlowOperations.GroupedWithin(flow, n, timeout); + + /// + /// Chunk up this stream into groups of elements received within a time window, + /// or limited by the weight and number of the elements, whatever happens first. + /// Empty groups will not be emitted if no elements are received from upstream. + /// The last group before end-of-stream will contain the buffered elements + /// since the previously emitted group. + /// + /// must be positive, must be positive, and must be greater than 0 seconds, + /// otherwise is thrown. + /// + /// Emits when the configured time elapses since the last group has been emitted or weight limit reached + /// Backpressures when downstream backpressures, and buffered group(+ pending element) weighs more than `maxWeight` or has more than `maxNumber` elements + /// Completes when upstream completes(emits last group) + /// Cancels when downstream completes + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + public static Flow, TMat> GroupedWeightedWithin(this Flow flow, long maxWeight, int maxNumber, TimeSpan interval, Func costFn) => + (Flow, TMat>)InternalFlowOperations.GroupedWeightedWithin(flow, maxWeight, maxNumber, interval, costFn); /// /// Shifts elements emission in time by a specified amount. It allows to store elements @@ -1366,7 +1391,7 @@ public static class FlowOperations /// Computes the key for each element /// Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion /// TBD - public static SubFlow> GroupBy(this Flow flow, int maxSubstreams, Func groupingFunc, bool allowClosedSubstreamRecreation) => + public static SubFlow> GroupBy(this Flow flow, int maxSubstreams, Func groupingFunc, bool allowClosedSubstreamRecreation) => flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow, TMat>)f).To(s), allowClosedSubstreamRecreation); /// @@ -1587,7 +1612,7 @@ public static class FlowOperations { return (Flow)InternalFlowOperations.MergeMany(flow, breadth, flatten); } - + /// /// Combine the elements of current flow into a stream of tuples consisting /// of all elements paired with their index. Indices start at 0. @@ -1858,7 +1883,7 @@ public static class FlowOperations { return (Flow)InternalFlowOperations.AlsoTo(flow, that); } - + /// /// /// Attaches the given to this , as a wire tap, meaning that elements that pass @@ -1870,7 +1895,7 @@ public static class FlowOperations /// Completes when upstream completes /// Cancels when downstream cancels /// - public static Flow WireTap(this Flow flow, IGraph, TMat> that) => + public static Flow WireTap(this Flow flow, IGraph, TMat> that) => (Flow)InternalFlowOperations.WireTap(flow, that); /// @@ -1924,7 +1949,7 @@ public static class FlowOperations /// TBD /// TBD /// TBD - public static Flow DivertTo(this Flow flow, IGraph, TMat> that, Func when) => + public static Flow DivertTo(this Flow flow, IGraph, TMat> that, Func when) => (Flow)InternalFlowOperations.DivertTo(flow, that, when); /// @@ -1943,7 +1968,7 @@ public static class FlowOperations /// TBD /// TBD /// TBD - public static Flow WatchTermination(this Flow flow, Func, TMat2> materializerFunction) => + public static Flow WatchTermination(this Flow flow, Func, TMat2> materializerFunction) => (Flow)InternalFlowOperations.WatchTermination(flow, materializerFunction); /// @@ -2399,7 +2424,7 @@ public static class FlowOperations /// TBD public static Flow OrElseMaterialized(this Flow flow, IGraph, TMat2> secondary, Func materializedFunction) => (Flow)InternalFlowOperations.OrElseMaterialized(flow, secondary, materializedFunction); - + /// /// The operator fails with an if the target actor is terminated. /// @@ -2411,7 +2436,7 @@ public static class FlowOperations /// public static Flow Watch(this Flow flow, IActorRef actorRef) => (Flow)InternalFlowOperations.Watch(flow, actorRef); - + /// /// Turns a Flow into a FlowWithContext which manages a context per element along a stream. /// diff --git a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs index 5e0ebe6addc..5fd98cc19d2 100644 --- a/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/Internal/InternalFlowOperations.cs @@ -878,9 +878,9 @@ internal static class InternalFlowOperations /// must be positive, and must be greater than 0 seconds, otherwise /// is thrown. /// - /// Emits when the configured time elapses since the last group has been emitted + /// Emits when the configured time elapses since the last group has been emitted or `n` elements is buffered /// - /// Backpressures when the configured time elapses since the last group has been emitted + /// Backpressures when downstream backpressures, and there are `n+1` buffered elements /// /// Completes when upstream completes (emits last group) /// @@ -893,14 +893,63 @@ internal static class InternalFlowOperations /// TBD /// Thrown if is less than or equal zero or is . /// TBD - public static IFlow, TMat> GroupedWithin(this IFlow flow, int n, - TimeSpan timeout) - { - if (n <= 0) throw new ArgumentException("n must be > 0", nameof(n)); - if (timeout == TimeSpan.Zero) throw new ArgumentException("Timeout must be non-zero", nameof(timeout)); + public static IFlow, TMat> GroupedWithin(this IFlow flow, int n, TimeSpan timeout) => + flow.Via(new Fusing.GroupedWeightedWithin(long.MaxValue, n, _ => 0L, timeout) + .WithAttributes(DefaultAttributes.GroupedWithin)); - return flow.Via(new Fusing.GroupedWithin(n, timeout)); - } + /// + /// Chunk up this stream into groups of elements received within a time window, + /// or limited by the weight of the elements, whatever happens first. + /// Empty groups will not be emitted if no elements are received from upstream. + /// The last group before end-of-stream will contain the buffered elements + /// since the previously emitted group. + /// + /// must be positive, and must be greater than 0 seconds, + /// otherwise ArgumentException is thrown. + /// + /// Emits when the configured time elapses since the last group has been emitted or weight limit reached + /// Backpressures when downstream backpressures, and buffered group(+ pending element) weighs more than `maxWeight` + /// Completes when upstream completes(emits last group) + /// Cancels when downstream completes + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + public static IFlow, TMat> GroupedWeightedWithin(this IFlow flow, long maxWeight, TimeSpan interval, Func costFn) => + flow.Via(new Fusing.GroupedWeightedWithin(maxWeight, int.MaxValue, costFn, interval)); + + /// + /// Chunk up this stream into groups of elements received within a time window, + /// or limited by the weight of the elements, whatever happens first. + /// Empty groups will not be emitted if no elements are received from upstream. + /// The last group before end-of-stream will contain the buffered elements + /// since the previously emitted group. + /// + /// must be positive, must be positive, and must be greater than 0 seconds, + /// otherwise ArgumentException is thrown. + /// + /// Emits when the configured time elapses since the last group has been emitted or weight limit reached + /// + /// Backpressures when downstream backpressures, and buffered group(+ pending element) weighs more than `maxWeight` + /// or has more than `maxNumber` elements + /// + /// Completes when upstream completes(emits last group) + /// Cancels when downstream completes + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + public static IFlow, TMat> GroupedWeightedWithin(this IFlow flow, long maxWeight, int maxNumber, TimeSpan interval, Func costFn) => + flow.Via(new Fusing.GroupedWeightedWithin(maxWeight, maxNumber, costFn, interval)); /// /// Shifts elements emission in time by a specified amount. It allows to store elements diff --git a/src/core/Akka.Streams/Dsl/SourceOperations.cs b/src/core/Akka.Streams/Dsl/SourceOperations.cs index 7c22a513630..8e638db5838 100644 --- a/src/core/Akka.Streams/Dsl/SourceOperations.cs +++ b/src/core/Akka.Streams/Dsl/SourceOperations.cs @@ -839,6 +839,58 @@ public static class SourceOperations return (Source, TMat>)InternalFlowOperations.GroupedWithin(flow, n, timeout); } + /// + /// Chunk up this stream into groups of elements received within a time window, + /// or limited by the weight of the elements, whatever happens first. + /// Empty groups will not be emitted if no elements are received from upstream. + /// The last group before end-of-stream will contain the buffered elements + /// since the previously emitted group. + /// + /// must be positive, and must be greater than 0 seconds, + /// otherwise ArgumentException is thrown. + /// + /// Emits when the configured time elapses since the last group has been emitted or weight limit reached + /// Backpressures when downstream backpressures, and buffered group(+ pending element) weighs more than `maxWeight` + /// Completes when upstream completes(emits last group) + /// Cancels when downstream completes + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + public static Source, TMat> GroupedWeightedWithin(this Source flow, long maxWeight, TimeSpan interval, Func costFn) => + (Source, TMat>)InternalFlowOperations.GroupedWeightedWithin(flow, maxWeight, int.MaxValue, interval, costFn); + + /// + /// Chunk up this stream into groups of elements received within a time window, + /// or limited by the weight and number of the elements, whatever happens first. + /// Empty groups will not be emitted if no elements are received from upstream. + /// The last group before end-of-stream will contain the buffered elements + /// since the previously emitted group. + /// + /// must be positive, must be positive, and must be greater than 0 seconds, + /// otherwise is thrown. + /// + /// Emits when the configured time elapses since the last group has been emitted or weight limit reached + /// Backpressures when downstream backpressures, and buffered group(+ pending element) weighs more than `maxWeight` or has more than `maxNumber` elements + /// Completes when upstream completes(emits last group) + /// Cancels when downstream completes + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + public static Source, TMat> GroupedWeightedWithin(this Source flow, long maxWeight, int maxNumber, TimeSpan interval, Func costFn) => + (Source, TMat>)InternalFlowOperations.GroupedWeightedWithin(flow, maxWeight, maxNumber, interval, costFn); + /// /// Shifts elements emission in time by a specified amount. It allows to store elements /// in internal buffer while waiting for next element to be emitted. Depending on the defined diff --git a/src/core/Akka.Streams/Dsl/SubFlowOperations.cs b/src/core/Akka.Streams/Dsl/SubFlowOperations.cs index fba0d62b5a0..87a0f2976fd 100644 --- a/src/core/Akka.Streams/Dsl/SubFlowOperations.cs +++ b/src/core/Akka.Streams/Dsl/SubFlowOperations.cs @@ -844,9 +844,9 @@ public static class SubFlowOperations /// must be positive, and must be greater than 0 seconds, otherwise /// is thrown. /// - /// Emits when the configured time elapses since the last group has been emitted + /// Emits when the configured time elapses since the last group has been emitted or `n` elements is buffered /// - /// Backpressures when the configured time elapses since the last group has been emitted + /// Backpressures when downstream backpressures, and there are `n+1` buffered elements /// /// Completes when upstream completes (emits last group) /// @@ -865,6 +865,33 @@ public static class SubFlowOperations return (SubFlow, TMat, TClosed>)InternalFlowOperations.GroupedWithin(flow, n, timeout); } + /// + /// Chunk up this stream into groups of elements received within a time window, + /// or limited by the weight of the elements, whatever happens first. + /// Empty groups will not be emitted if no elements are received from upstream. + /// The last group before end-of-stream will contain the buffered elements + /// since the previously emitted group. + /// + /// must be positive, and must be greater than 0 seconds, + /// otherwise ArgumentException is thrown. + /// + /// Emits when the configured time elapses since the last group has been emitted or weight limit reached + /// Backpressures when downstream backpressures, and buffered group(+ pending element) weighs more than `maxWeight` + /// Completes when upstream completes(emits last group) + /// Cancels when downstream completes + /// + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + /// TBD + public static SubFlow, TMat, TClosed> GroupedWeightedWithin(this SubFlow flow, long maxWeight, int maxNumber, TimeSpan interval, Func costFn) => + (SubFlow, TMat, TClosed>)InternalFlowOperations.GroupedWeightedWithin(flow, maxWeight, maxNumber, interval, costFn); + /// /// Shifts elements emission in time by a specified amount. It allows to store elements /// in internal buffer while waiting for next element to be emitted. Depending on the defined diff --git a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs index de61fb36a80..d47b7acfddb 100644 --- a/src/core/Akka.Streams/Implementation/Fusing/Ops.cs +++ b/src/core/Akka.Streams/Implementation/Fusing/Ops.cs @@ -2927,8 +2927,8 @@ public override void OnDownstreamFinish(Exception cause) { if (IsEnabled(_logLevels.OnFinish)) _log.Log( - _logLevels.OnFinish, - "[{0}] Downstream finished. cause: {1}: {2}.", + _logLevels.OnFinish, + "[{0}] Downstream finished. cause: {1}: {2}.", _stage._name, Logging.SimpleName(cause.GetType()), cause.Message); @@ -3021,123 +3021,180 @@ internal enum TimerKeys /// /// TBD [InternalApi] - public sealed class GroupedWithin : GraphStage>> + public sealed class GroupedWeightedWithin : GraphStage>> { - #region internal classes + #region Logic private sealed class Logic : TimerGraphStageLogic, IInHandler, IOutHandler { - private const string GroupedWithinTimer = "GroupedWithinTimer"; + private const string GroupedWeightedWithinTimer = "GroupedWeightedWithinTimer"; - private readonly GroupedWithin _stage; - private List _buffer; + private readonly GroupedWeightedWithin _stage; + private readonly List _buffer = new List(); + private T _pending = default; + private long _pendingWeight = 0L; // True if: // - buf is nonEmpty // AND - // - timer fired OR group is full - private bool _groupClosed; + // - (timer fired + // OR + // totalWeight >= maxWeight + // OR + // pending != null + // OR + // upstream completed) + private bool _pushEagerly; private bool _groupEmitted = true; private bool _finished; - private int _elements; + private long _totalWeight = 0L; + private int _totalNumber = 0; + private bool _hasElements; - public Logic(GroupedWithin stage) : base(stage.Shape) + public Logic(GroupedWeightedWithin stage) + : base(stage.Shape) { _stage = stage; - _buffer = new List(_stage._count); SetHandler(_stage._in, this); SetHandler(_stage._out, this); } - public void OnPush() + public override void PreStart() { - if (!_groupClosed) - NextElement(Grab(_stage._in)); // otherwise keep the element for next round + ScheduleRepeatedly(GroupedWeightedWithinTimer, _stage._interval); + Pull(_stage._in); } - public void OnUpstreamFinish() + private void NextElement(T element) { - _finished = true; - - // Fix for issue #4514 - // Force check if we have a dangling last element because: - // OnTimer may close the group just before OnUpstreamFinish is called - // (race condition), dropping the last element in the stream. - if (IsAvailable(_stage._in)) - NextElement(Grab(_stage._in)); - - if (_groupEmitted) - CompleteStage(); + _groupEmitted = false; + var cost = _stage._costFn(element); + if (cost < 0L) + FailStage(new ArgumentException($"Negative weight [{cost}] for element [{element}] is not allowed")); else - CloseGroup(); - } + { + _hasElements = true; + // if there is place (both weight and number) for `elem` in the current group + if (_totalWeight + cost <= _stage._maxWeight && _totalNumber + 1 <= _stage._maxNumber) + { + _buffer.Add(element); + _totalWeight += cost; + _totalNumber += 1; - public void OnUpstreamFailure(Exception e) => FailStage(e); + // if potentially there is a place (both weight and number) for one more element in the current group + if (_totalWeight < _stage._maxWeight && _totalNumber < _stage._maxNumber) + Pull(_stage._in); + else + { + // `totalWeight >= maxWeight` which means that downstream can get the next group. + if (!IsAvailable(_stage._out)) + { + // we should emit group when downstream becomes available + _pushEagerly = true; + // we want to pull anyway, since we allow for zero weight elements + // but since `emitGroup()` will pull internally (by calling `startNewGroup()`) + // we also have to pull if downstream hasn't yet requested an element. + Pull(_stage._in); + } + else + { + ScheduleRepeatedly(GroupedWeightedWithinTimer, _stage._interval); + EmitGroup(); + } + } + } + else + { + // if there is a single heavy element that weighs more than the limit + if (_totalWeight == 0L && _totalNumber == 0) + { + _buffer.Add(element); + _totalWeight += cost; + _totalNumber += 1; + _pushEagerly = true; + } + else + { + _pending = element; + _pendingWeight = cost; + } + ScheduleRepeatedly(GroupedWeightedWithinTimer, _stage._interval); + TryCloseGroup(); + } + } + } - public void OnPull() + private void TryCloseGroup() { - if (_groupClosed) - EmitGroup(); + if (IsAvailable(_stage._out)) EmitGroup(); + else if (!EqualityComparer.Default.Equals(_pending, default) || _finished) _pushEagerly = true; } - public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); - - public override void PreStart() + private void EmitGroup() { - ScheduleRepeatedly(GroupedWithinTimer, _stage._timeout); - Pull(_stage._in); + _groupEmitted = true; + Push(_stage._out, _buffer.ToList()); + _buffer.Clear(); + if (!_finished) + StartNewGroup(); + else if (!EqualityComparer.Default.Equals(_pending, default)) + Emit(_stage._out, new List { _pending }, () => CompleteStage()); + else + CompleteStage(); } - private void NextElement(T element) + private void StartNewGroup() { - _groupEmitted = false; - _buffer.Add(element); - _elements++; - if (_elements == _stage._count) + if (!EqualityComparer.Default.Equals(_pending, default)) { - ScheduleRepeatedly(GroupedWithinTimer, _stage._timeout); - CloseGroup(); + _totalWeight = _pendingWeight; + _totalNumber = 1; + _pendingWeight = 0L; + _buffer.Add(_pending); + _pending = default; + _groupEmitted = false; } - // Do not pull if we're finished. - else if (!_finished) + else { - Pull(_stage._in); + _totalWeight = 0L; + _totalNumber = 0; + _hasElements = false; } + _pushEagerly = false; + if (IsAvailable(_stage._in)) NextElement(Grab(_stage._in)); + else if (!HasBeenPulled(_stage._in)) Pull(_stage._in); } - private void CloseGroup() + public void OnPush() { - _groupClosed = true; - if (IsAvailable(_stage._out)) - EmitGroup(); + if (EqualityComparer.Default.Equals(_pending, default)) + NextElement(Grab(_stage._in)); // otherwise keep the element for next round } - private void EmitGroup() + public void OnPull() { - _groupEmitted = true; - Push(_stage._out, _buffer); - _buffer = new List(_stage._count); - if (!_finished) - StartNewGroup(); - else - CompleteStage(); + if (_pushEagerly) EmitGroup(); } - private void StartNewGroup() + public void OnUpstreamFinish() { - _elements = 0; - _groupClosed = false; - if (IsAvailable(_stage._in)) - NextElement(Grab(_stage._in)); - else if (!HasBeenPulled(_stage._in)) - Pull(_stage._in); + _finished = true; + if (_groupEmitted) CompleteStage(); + else TryCloseGroup(); } + public void OnUpstreamFailure(Exception e) => FailStage(e); + + public void OnDownstreamFinish(Exception cause) => InternalOnDownstreamFinish(cause); + protected internal override void OnTimer(object timerKey) { - if (_elements > 0) - CloseGroup(); + if (_hasElements) + { + if (IsAvailable(_stage._out)) EmitGroup(); + else _pushEagerly = true; + } } } @@ -3145,36 +3202,29 @@ protected internal override void OnTimer(object timerKey) private readonly Inlet _in = new Inlet("in"); private readonly Outlet> _out = new Outlet>("out"); - private readonly int _count; - private readonly TimeSpan _timeout; + private readonly long _maxWeight; + private readonly int _maxNumber; + private readonly Func _costFn; + private readonly TimeSpan _interval; - /// - /// TBD - /// - /// TBD - /// TBD - public GroupedWithin(int count, TimeSpan timeout) + protected override Attributes InitialAttributes { get; } = DefaultAttributes.GroupedWeightedWithin; + + public override FlowShape> Shape { get; } + + public GroupedWeightedWithin(long maxWeight, int maxNumber, Func costFn, TimeSpan interval) { - _count = count; - _timeout = timeout; - Shape = new FlowShape>(_in, _out); - } + if (maxWeight <= 0) throw new ArgumentException("must be greater than 0", nameof(maxWeight)); + if (maxNumber <= 0) throw new ArgumentException("must be greater than 0", nameof(maxNumber)); + if (interval <= TimeSpan.Zero) throw new ArgumentException("must be greater than zero", nameof(interval)); - /// - /// TBD - /// - protected override Attributes InitialAttributes { get; } = Attributes.CreateName("GroupedWithin"); + _maxWeight = maxWeight; + _maxNumber = maxNumber; + _costFn = costFn; + _interval = interval; - /// - /// TBD - /// - public override FlowShape> Shape { get; } + Shape = new FlowShape>(_in, _out); + } - /// - /// TBD - /// - /// TBD - /// TBD protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this); } diff --git a/src/core/Akka.Streams/Implementation/Stages/Stages.cs b/src/core/Akka.Streams/Implementation/Stages/Stages.cs index dd8b31915e4..4ac6ec49826 100644 --- a/src/core/Akka.Streams/Implementation/Stages/Stages.cs +++ b/src/core/Akka.Streams/Implementation/Stages/Stages.cs @@ -70,6 +70,14 @@ public static class DefaultAttributes /// /// TBD /// + public static readonly Attributes GroupedWithin = Attributes.CreateName("groupedWithin"); + /// + /// TBD + /// + public static readonly Attributes GroupedWeightedWithin = Attributes.CreateName("groupedWeightedWithin"); + /// + /// TBD + /// public static readonly Attributes Limit = Attributes.CreateName("limit"); /// /// TBD