Skip to content

Commit

Permalink
Added GroupedWeightedWithin operator (#6000)
Browse files Browse the repository at this point in the history
Co-authored-by: Gregorius Soedharmo <arkatufus@yahoo.com>
  • Loading branch information
ismaelhamed and Arkatufus committed Jun 20, 2022
1 parent 6c07c56 commit f46d845
Show file tree
Hide file tree
Showing 11 changed files with 542 additions and 137 deletions.
19 changes: 15 additions & 4 deletions docs/articles/streams/builtinstages.md
Expand Up @@ -843,12 +843,23 @@ Skip elements until a timeout has fired

### GroupedWithin

Chunk up the stream into groups of elements received within a time window, or limited by the given number of elements,
whichever happens first.
Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream. The last group before end-of-stream will contain the buffered elements since the previously emitted group.

**emits** when the configured time elapses since the last group has been emitted
**emits** when the configured time elapses since the last group has been emitted, but not if no elements has been grouped (i.e: no empty groups), or when limit has been reached.

**backpressures** when the group has been assembled (the duration elapsed) and downstream backpressures
**backpressures** when downstream backpressures, and there are n+1 buffered elements

**completes** when upstream completes

### GroupedWeightedWithin

Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first. Empty groups will not be emitted if no elements are received from upstream.
The last group before end-of-stream will contain the buffered elements since the previously emitted group.

**emits** when the configured time elapses since the last group has been emitted,
but not if no elements has been grouped (i.e: no empty groups), or when weight limit has been reached.

**backpressures** downstream backpressures, and buffered group (+ pending element) weighs more than `maxWeight`

**completes** when upstream completes

Expand Down
Expand Up @@ -1371,6 +1371,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> IdleTimeout<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> InitialDelay<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.TimeSpan delay) { }
Expand Down Expand Up @@ -2048,6 +2049,8 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<TOut2, TMat> Expand<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.IRunnableGraph<TMat>> GroupBy<TOut, TMat, TKey>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int n) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int n, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> IdleTimeout<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> InitialDelay<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.TimeSpan delay) { }
Expand Down Expand Up @@ -2234,6 +2237,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.SubFlow<TOut, TMat3, TClosed> DivertToMaterialized<TOut, TMat, TMat2, TMat3, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
public static Akka.Streams.Dsl.SubFlow<TOut2, TMat, TClosed> Expand<TOut1, TOut2, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut1, TMat, TClosed> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
public static Akka.Streams.Dsl.SubFlow<System.Collections.Generic.IEnumerable<TOut>, TMat, TClosed> Grouped<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, int n) { }
public static Akka.Streams.Dsl.SubFlow<System.Collections.Generic.IEnumerable<TOut>, TMat, TClosed> GroupedWeightedWithin<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.SubFlow<System.Collections.Generic.IEnumerable<TOut>, TMat, TClosed> GroupedWithin<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, int n, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> IdleTimeout<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> InitialDelay<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, System.TimeSpan delay) { }
Expand Down Expand Up @@ -4180,9 +4184,9 @@ namespace Akka.Streams.Implementation.Fusing
public static Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T> Identity<T>() { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class GroupedWithin<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, System.Collections.Generic.IEnumerable<T>>>
public sealed class GroupedWeightedWithin<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, System.Collections.Generic.IEnumerable<T>>>
{
public GroupedWithin(int count, System.TimeSpan timeout) { }
public GroupedWeightedWithin(long maxWeight, int maxNumber, System.Func<T, long> costFn, System.TimeSpan interval) { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public override Akka.Streams.FlowShape<T, System.Collections.Generic.IEnumerable<T>> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
Expand Down Expand Up @@ -4544,6 +4548,8 @@ namespace Akka.Streams.Implementation.Stages
public static readonly Akka.Streams.Attributes Fused;
public static readonly Akka.Streams.Attributes GroupBy;
public static readonly Akka.Streams.Attributes Grouped;
public static readonly Akka.Streams.Attributes GroupedWeightedWithin;
public static readonly Akka.Streams.Attributes GroupedWithin;
public static readonly Akka.Streams.Attributes IODispatcher;
public static readonly Akka.Streams.Attributes IdentityOp;
public static readonly Akka.Streams.Attributes Idle;
Expand Down
Expand Up @@ -1371,6 +1371,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> IdleTimeout<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> InitialDelay<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.TimeSpan delay) { }
Expand Down Expand Up @@ -2048,6 +2049,8 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Source<TOut2, TMat> Expand<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.IRunnableGraph<TMat>> GroupBy<TOut, TMat, TKey>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int n) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWeightedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.Source<System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWithin<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, int n, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> IdleTimeout<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> InitialDelay<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.TimeSpan delay) { }
Expand Down Expand Up @@ -2234,6 +2237,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.SubFlow<TOut, TMat3, TClosed> DivertToMaterialized<TOut, TMat, TMat2, TMat3, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
public static Akka.Streams.Dsl.SubFlow<TOut2, TMat, TClosed> Expand<TOut1, TOut2, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut1, TMat, TClosed> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
public static Akka.Streams.Dsl.SubFlow<System.Collections.Generic.IEnumerable<TOut>, TMat, TClosed> Grouped<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, int n) { }
public static Akka.Streams.Dsl.SubFlow<System.Collections.Generic.IEnumerable<TOut>, TMat, TClosed> GroupedWeightedWithin<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, long maxWeight, int maxNumber, System.TimeSpan interval, System.Func<TOut, long> costFn) { }
public static Akka.Streams.Dsl.SubFlow<System.Collections.Generic.IEnumerable<TOut>, TMat, TClosed> GroupedWithin<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, int n, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> IdleTimeout<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, System.TimeSpan timeout) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> InitialDelay<TOut, TMat, TClosed>(this Akka.Streams.Dsl.SubFlow<TOut, TMat, TClosed> flow, System.TimeSpan delay) { }
Expand Down Expand Up @@ -4192,9 +4196,9 @@ namespace Akka.Streams.Implementation.Fusing
public static Akka.Streams.Implementation.Fusing.SimpleLinearGraphStage<T> Identity<T>() { }
}
[Akka.Annotations.InternalApiAttribute()]
public sealed class GroupedWithin<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, System.Collections.Generic.IEnumerable<T>>>
public sealed class GroupedWeightedWithin<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, System.Collections.Generic.IEnumerable<T>>>
{
public GroupedWithin(int count, System.TimeSpan timeout) { }
public GroupedWeightedWithin(long maxWeight, int maxNumber, System.Func<T, long> costFn, System.TimeSpan interval) { }
protected override Akka.Streams.Attributes InitialAttributes { get; }
public override Akka.Streams.FlowShape<T, System.Collections.Generic.IEnumerable<T>> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
Expand Down Expand Up @@ -4556,6 +4560,8 @@ namespace Akka.Streams.Implementation.Stages
public static readonly Akka.Streams.Attributes Fused;
public static readonly Akka.Streams.Attributes GroupBy;
public static readonly Akka.Streams.Attributes Grouped;
public static readonly Akka.Streams.Attributes GroupedWeightedWithin;
public static readonly Akka.Streams.Attributes GroupedWithin;
public static readonly Akka.Streams.Attributes IODispatcher;
public static readonly Akka.Streams.Attributes IdentityOp;
public static readonly Akka.Streams.Attributes Idle;
Expand Down

0 comments on commit f46d845

Please sign in to comment.