Skip to content

Commit

Permalink
Allow GroupBy to recreate already closed substreams
Browse files Browse the repository at this point in the history
  • Loading branch information
ismaelhamed committed Apr 23, 2022
1 parent 4584f59 commit b77b68e
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 64 deletions.
10 changes: 9 additions & 1 deletion docs/articles/streams/builtinstages.md
Expand Up @@ -1000,7 +1000,15 @@ and returns a pair containing a strict sequence of the taken element and a strea

### GroupBy

De-multiplex the incoming stream into separate output streams.
This operation demultiplexes the incoming stream into separate output streams, one for each element key. The
key is computed for each element using the given function. When a new key is encountered for the first time
a new substream is opened and subsequently fed with all elements belonging to that key.

> [!NOTE]
> If `allowClosedSubstreamRecreation` is set to `true` substream completion and incoming elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing these elements might get lost.
> [!WARNING]
> If `allowClosedSubstreamRecreation` is set to `false` (default behavior) the stage keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream.
**emits** an element for which the grouping function returns a group that has not yet been created. Emits the new group
there is an element pending for a group whose substream backpressures
Expand Down
Expand Up @@ -1342,6 +1342,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> DivertTo<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat> that, System.Func<TOut, bool> when) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat3> DivertToMaterialized<TIn, TOut, TMat, TMat2, TMat3>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, Akka.Streams.IGraph<Akka.Streams.SinkShape<TOut>, TMat2> that, System.Func<TOut, bool> when, System.Func<TMat, TMat2, TMat3> materializerFunction) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> Expand<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, System.Func<TOut1, System.Collections.Generic.IEnumerator<TOut2>> extrapolate) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) { }
public static Akka.Streams.Dsl.SubFlow<TOut, TMat, Akka.Streams.Dsl.Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int maxSubstreams, System.Func<TOut, TKey> groupingFunc) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> Grouped<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n) { }
public static Akka.Streams.Dsl.Flow<TIn, System.Collections.Generic.IEnumerable<TOut>, TMat> GroupedWithin<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, int n, System.TimeSpan timeout) { }
Expand Down
35 changes: 35 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/FlowGroupBySpec.cs
Expand Up @@ -574,6 +574,41 @@ public void GroupBy_must_work_if_pull_is_exercised_from_multiple_substreams_whil
}, Materializer);
}

[Fact]
public void GroupBy_must_allow_to_recreate_an_already_closed_substream()
{
this.AssertAllStagesStopped(() =>
{
var f = Flow.Create<int>()
.GroupBy(2, x => x, allowClosedSubstreamRecreation: true)
.Take(1) // close the substream after 1 element
.MergeSubstreams();
var (up, down) = ((Flow<int, int, NotUsed>)f)
.RunWith(this.SourceProbe<int>(), this.SinkProbe<int>(), Materializer);
down.Request(4);
// Creates and closes substream "1"
up.SendNext(1);
down.ExpectNext(1);
// Creates and closes substream "2"
up.SendNext(2);
down.ExpectNext(2);
// Recreates and closes substream "1" twice
up.SendNext(1);
down.ExpectNext(1);
up.SendNext(1);
down.ExpectNext(1);
// Cleanup, not part of the actual test
up.SendComplete();
down.ExpectComplete();
}, Materializer);
}

[Fact]
public void GroupBy_must_cancel_if_downstream_has_cancelled_and_all_substreams_cancel()
{
Expand Down
83 changes: 55 additions & 28 deletions src/core/Akka.Streams/Dsl/FlowOperations.cs
Expand Up @@ -1315,48 +1315,75 @@ public static class FlowOperations
/// This operation demultiplexes the incoming stream into separate output
/// streams, one for each element key. The key is computed for each element
/// using the given function. When a new key is encountered for the first time
/// it is emitted to the downstream subscriber together with a fresh
/// flow that will eventually produce all the elements of the substream
/// for that key. Not consuming the elements from the created streams will
/// stop this processor from processing more elements, therefore you must take
/// care to unblock (or cancel) all of the produced streams even if you want
/// to consume only one of them.
///
/// a new substream is opened and subsequently fed with all elements belonging to
/// that key.
/// <para>
/// WARNING: If <paramref name="allowClosedSubstreamRecreation"/> is set to false (default behavior) the operator
/// keeps track of all keys of streams that have already been closed. If you expect an infinite number of keys this
/// can cause memory issues. Elements belonging to those keys are drained directly and not send to the substream.
/// </para>
/// <para>
/// Note: If <paramref name="allowClosedSubstreamRecreation"/> is set to true substream completion and incoming
/// elements are subject to race-conditions. If elements arrive for a stream that is in the process of closing
/// these elements might get lost.
/// </para>
/// <para>
/// The object returned from this method is not a normal <see cref="Flow"/>, it is a
/// <see cref="SubFlow{TOut, TMat, TClosed}"/>. This means that after this operator
/// all transformations are applied to all encountered substreams in the same fashion.
/// Substream mode is exited either by closing the substream (i.e. connecting it to a <see cref="Sink"/>)
/// or by merging the substreams back together; see the <c>To</c> and <c>MergeBack</c> methods
/// on <see cref="SubFlow{TOut, TMat, TClosed}"/> for more information.
/// </para>
/// <para>
/// It is important to note that the substreams also propagate back-pressure as any other stream, which means
/// that blocking one substream will block the <c>GroupBy</c> operator itself —and thereby all substreams— once all
/// internal or explicit buffers are filled.
/// </para>
/// <para>
/// If the group by function <paramref name="groupingFunc"/> throws an exception and the supervision decision
/// is <see cref="Supervision.Directive.Stop"/> the stream and substreams will be completed
/// with failure.
///
/// is <see cref="Supervision.Directive.Stop"/> the stream and substreams will be completed with failure.
/// </para>
/// <para>
/// If the group by <paramref name="groupingFunc"/> throws an exception and the supervision decision
/// is <see cref="Supervision.Directive.Resume"/> or <see cref="Supervision.Directive.Restart"/>
/// the element is dropped and the stream and substreams continue.
///
/// Function <paramref name="groupingFunc"/> MUST NOT return null. This will throw exception and trigger supervision decision mechanism.
/// <para>
/// Adheres to the <see cref="ActorAttributes.SupervisionStrategy"/> attribute.
/// </para>
/// <para>
/// Emits when an element for which the grouping function returns a group that has not yet been created.
/// Emits the new group
/// </para>
/// Backpressures when there is an element pending for a group whose substream backpressures
/// <para>
/// Completes when upstream completes
/// Function <paramref name="groupingFunc"/> MUST NOT return <c>null</c>. This will throw exception and trigger supervision decision mechanism.
/// </para>
/// Cancels when downstream cancels and all substreams cancel
/// <para>**Emits when** an element for which the grouping function returns a group that has not yet been created. Emits the new group.</para>
/// <para>**Backpressures when** there is an element pending for a group whose substream backpressures</para>
/// <para>**Completes when** upstream completes</para>
/// <para>**Cancels when** downstream cancels and all substreams cancel</para>
/// </summary>
/// <typeparam name="TIn">TBD</typeparam>
/// <typeparam name="TOut">TBD</typeparam>
/// <typeparam name="TMat">TBD</typeparam>
/// <typeparam name="TKey">TBD</typeparam>
/// <param name="flow">TBD</param>
/// <param name="maxSubstreams">TBD</param>
/// <param name="groupingFunc">TBD</param>
/// <param name="maxSubstreams">Configures the maximum number of substreams (keys) that are supported; if more distinct keys are encountered then the stream fails</param>
/// <param name="groupingFunc">Computes the key for each element</param>
/// <param name="allowClosedSubstreamRecreation">Enables recreation of already closed substreams if elements with their corresponding keys arrive after completion</param>
/// <returns>TBD</returns>
public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Flow<TIn, TOut, TMat> flow, int maxSubstreams, Func<TOut, TKey> groupingFunc)
{
return flow.GroupBy(maxSubstreams, groupingFunc,
(f, s) => ((Flow<TIn, Source<TOut, NotUsed>, TMat>) f).To(s));
}
public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Flow<TIn, TOut, TMat> flow, int maxSubstreams, Func<TOut, TKey> groupingFunc, bool allowClosedSubstreamRecreation) =>
flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow<TIn, Source<TOut, NotUsed>, TMat>)f).To(s), allowClosedSubstreamRecreation);

/// <summary>
/// This operation demultiplexes the incoming stream into separate output
/// streams, one for each element key. The key is computed for each element
/// using the given function. When a new key is encountered for the first time
/// a new substream is opened and subsequently fed with all elements belonging to
/// that key.
/// <para>
/// WARNING: The stage keeps track of all keys of streams that have already been closed.
/// If you expect an infinite number of keys this can cause memory issues. Elements belonging
/// to those keys are drained directly and not send to the substream.
/// </para>
/// See <seealso cref="GroupBy{TIn, TOut, TMat, TKey}(Flow{TIn, TOut, TMat}, int, Func{TOut, TKey}, bool)"/>
/// </summary>
public static SubFlow<TOut, TMat, Sink<TIn, TMat>> GroupBy<TIn, TOut, TMat, TKey>(this Flow<TIn, TOut, TMat> flow, int maxSubstreams, Func<TOut, TKey> groupingFunc) =>
flow.GroupBy(maxSubstreams, groupingFunc, (f, s) => ((Flow<TIn, Source<TOut, NotUsed>, TMat>)f).To(s), false);

/// <summary>
/// This operation applies the given predicate to all incoming elements and
Expand Down

0 comments on commit b77b68e

Please sign in to comment.