Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka.Streams: ReuseLatest stage to repeatedly emit the most recent value until a newer one is pushed #6262

Merged
29 changes: 29 additions & 0 deletions docs/articles/streams/buffersandworkingwithrate.md
Expand Up @@ -189,3 +189,32 @@ var driftFlow = Flow.Create<int>()
```

Note that all of the elements coming from upstream will go through expand at least once. This means that the output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise.

### Reusing Values Downstream

When working with fan-in stages such as `Zip` where one `Source<T>` might produce messages infrequently, it can be helpful to cache the previous value and re-use it in combination with a faster stream.

For instance, consider the following scenario:

* A `Source<HttpClient, NotUsed>` that emits an updated `HttpClient` with new bearer-token credentials every 30 minutes and
* A `Source<HttpRequestMessage, NotUsed>` that emits outbound `HttpRequestMessage`s as they come - at any given moment it can produce zero requests per second or thousands of requests per second.

In this scenario we're going to want to combine the `Source<HttpClient, NotUsed>` and `Source<HttpRequestMessage, NotUsed>` together so the `HttpClient` can execute all of the `HttpRequestMessage`s - however, given that `HttpClient`s are only emitted once every 30 minutes - how can we use a stage like `Zip` to make sure that every `HttpRequestMessage` gets serviced in a timely, low-latency fashion?

Enter the `ReuseLatest` stage - which will allow us to reuse the most recent `HttpClient` each time a new `HttpRequestMessage` arrives:

```csharp
public static Source<HttpClient, ICancelable> CreateSourceInternal(string clientId,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This meant to be pseudo code.

Func<Task<string>> tokenProvider, TimeSpan tokenRefreshTimeout)
{
var source = Source.Tick(TimeSpan.Zero, TimeSpan.FromSeconds(30), clientId)
.SelectAsync(1, async c =>
// refresh bearer token, create new HttpClient
CreateClient(c, (await tokenProvider().WaitAsync(tokenRefreshTimeout))))
// reuse the previous value whenever there's downstream demand
.ReuseLatest();
return source;
}
```

This type of design allows us to decouple the rate at which `HttpClient`s are produced from the rate at which `HttpRequestMessage`s are.
18 changes: 18 additions & 0 deletions docs/articles/streams/builtinstages.md
Expand Up @@ -667,6 +667,24 @@ Skip elements as long as a predicate function return true for the element

**completes** when upstream completes

### ReuseLatest

Re-use the most recently emitted element downstream.

> [!NOTE]
> `ReuseLatest` is typically used in combination with fan-in stages such as `Zip` - please see "[Reusing Values Downstream](xref:streams-buffers#reusing-values-downstream)"

**emits** as long as one element has been emitted from upstream, that element will be emitted downstream
whenever the `ReuseLatest` stage is pulled. If a new value is emitted from upstream, that value will be pushed and will replace the previous value.

**backpressures** when downstream backpressures.

**completes** when upstream completes

`ReuseLatest` Sample:

[!code-csharp[ReuseLatest](../../../src/core/Akka.Streams.Tests/Dsl/ReuseLatestSpec.cs?name=RepeatPrevious)]

### Recover

Allow sending of one last element downstream when a failure has happened upstream.
Expand Down
3 changes: 1 addition & 2 deletions docs/articles/streams/pipeliningandparallelism.md
Expand Up @@ -173,5 +173,4 @@ compared to the parallel pipelines. This pattern re-balances after each step, wh
at the entry point of the pipeline. This only matters however if the processing time distribution has a large
deviation.

[^foot-note-1]: Bartosz's reason for this seemingly suboptimal procedure is that he prefers the temperature of the second pan
to be slightly lower than the first in order to achieve a more homogeneous result.
[^foot-note-1]: Bartosz's reason for this seemingly suboptimal procedure is that he prefers the temperature of the second pan to be slightly lower than the first in order to achieve a more homogeneous result.
@@ -0,0 +1,15 @@
``` ini

BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19044.2251 (21H2)
AMD Ryzen 7 1700, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100
[Host] : .NET Core 3.1.23 (CoreCLR 4.700.22.11601, CoreFX 4.700.22.12208), X64 RyuJIT
DefaultJob : .NET Core 3.1.23 (CoreCLR 4.700.22.11601, CoreFX 4.700.22.12208), X64 RyuJIT


```
| Method | Formatted | Mean | Error | StdDev | Allocated |
|------------------------ |-------------------- |----------:|----------:|----------:|----------:|
| **Int64CharCountBenchmark** | **1** | **4.471 ns** | **0.0402 ns** | **0.0357 ns** | **-** |
| **Int64CharCountBenchmark** | **1000** | **4.818 ns** | **0.0517 ns** | **0.0484 ns** | **-** |
| **Int64CharCountBenchmark** | **9223372036854775807** | **25.199 ns** | **0.3360 ns** | **0.2978 ns** | **-** |
@@ -0,0 +1,4 @@
Method,Job,AnalyzeLaunchVariance,EvaluateOverhead,MaxAbsoluteError,MaxRelativeError,MinInvokeCount,MinIterationTime,OutlierMode,Affinity,EnvironmentVariables,Jit,Platform,PowerPlanMode,Runtime,AllowVeryLargeObjects,Concurrent,CpuGroups,Force,HeapAffinitizeMask,HeapCount,NoAffinitize,RetainVm,Server,Arguments,BuildConfiguration,Clock,EngineFactory,NuGetReferences,Toolchain,IsMutator,InvocationCount,IterationCount,IterationTime,LaunchCount,MaxIterationCount,MaxWarmupIterationCount,MemoryRandomization,MinIterationCount,MinWarmupIterationCount,RunStrategy,UnrollFactor,WarmupCount,Formatted,Mean,Error,StdDev,Allocated
Int64CharCountBenchmark,DefaultJob,False,Default,Default,Default,Default,Default,Default,1111111111111111,Empty,RyuJit,X64,8c5e7fda-e8bf-4a96-9a85-a6e23a8c635c,.NET Core 3.1,False,True,False,True,Default,Default,False,False,False,Default,Default,Default,Default,Default,Default,Default,1,Default,Default,Default,Default,Default,Default,Default,Default,Default,16,Default,1,4.471 ns,0.0402 ns,0.0357 ns,0 B
Int64CharCountBenchmark,DefaultJob,False,Default,Default,Default,Default,Default,Default,1111111111111111,Empty,RyuJit,X64,8c5e7fda-e8bf-4a96-9a85-a6e23a8c635c,.NET Core 3.1,False,True,False,True,Default,Default,False,False,False,Default,Default,Default,Default,Default,Default,Default,1,Default,Default,Default,Default,Default,Default,Default,Default,Default,16,Default,1000,4.818 ns,0.0517 ns,0.0484 ns,0 B
Int64CharCountBenchmark,DefaultJob,False,Default,Default,Default,Default,Default,Default,1111111111111111,Empty,RyuJit,X64,8c5e7fda-e8bf-4a96-9a85-a6e23a8c635c,.NET Core 3.1,False,True,False,True,Default,Default,False,False,False,Default,Default,Default,Default,Default,Default,Default,1,Default,Default,Default,Default,Default,Default,Default,Default,Default,16,Default,9223372036854775807,25.199 ns,0.3360 ns,0.2978 ns,0 B
@@ -0,0 +1,32 @@
<!DOCTYPE html>
<html lang='en'>
<head>
<meta charset='utf-8' />
<title>Akka.Benchmarks.Utils.SpanHackBenchmarks-20221128-162943</title>

<style type="text/css">
table { border-collapse: collapse; display: block; width: 100%; overflow: auto; }
td, th { padding: 6px 13px; border: 1px solid #ddd; text-align: right; }
tr { background-color: #fff; border-top: 1px solid #ccc; }
tr:nth-child(even) { background: #f8f8f8; }
</style>
</head>
<body>
<pre><code>
BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19044.2251 (21H2)
AMD Ryzen 7 1700, 1 CPU, 16 logical and 8 physical cores
.NET SDK=7.0.100
[Host] : .NET Core 3.1.23 (CoreCLR 4.700.22.11601, CoreFX 4.700.22.12208), X64 RyuJIT
DefaultJob : .NET Core 3.1.23 (CoreCLR 4.700.22.11601, CoreFX 4.700.22.12208), X64 RyuJIT
</code></pre>
<pre><code></code></pre>

<table>
<thead><tr><th> Method</th><th> Formatted</th><th>Mean</th><th>Error</th><th>StdDev</th><th>Allocated</th>
</tr>
</thead><tbody><tr><td>Int64CharCountBenchmark</td><td>1</td><td>4.471 ns</td><td>0.0402 ns</td><td>0.0357 ns</td><td>-</td>
</tr><tr><td>Int64CharCountBenchmark</td><td>1000</td><td>4.818 ns</td><td>0.0517 ns</td><td>0.0484 ns</td><td>-</td>
</tr><tr><td>Int64CharCountBenchmark</td><td>9223372036854775807</td><td>25.199 ns</td><td>0.3360 ns</td><td>0.2978 ns</td><td>-</td>
</tr></tbody></table>
</body>
</html>
12 changes: 12 additions & 0 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveStreams.verified.txt
Expand Up @@ -1383,6 +1383,8 @@ namespace Akka.Streams.Dsl
[System.ObsoleteAttribute("Use RecoverWithRetries instead. [1.1.2]")]
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> RecoverWith<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> RecoverWithRetries<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc, int attempts) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> RepeatPrevious<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow) { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flow DSL changes

public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> RepeatPrevious<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Action<TOut, TOut> onItemChanged) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> Scan<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, TOut2> scan) { }
public static Akka.Streams.Dsl.Flow<TIn, TOut2, TMat> ScanAsync<TIn, TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, System.Threading.Tasks.Task<TOut2>> scan) { }
public static Akka.Streams.Dsl.Flow<T, TOut, TMat> Select<T, TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<T, TIn, TMat> flow, System.Func<TIn, TOut> mapper) { }
Expand Down Expand Up @@ -1845,6 +1847,14 @@ namespace Akka.Streams.Dsl
[Akka.Annotations.ApiMayChangeAttribute()]
public static Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TIn, TState>, System.ValueTuple<Akka.Util.Result<TOut>, TState>>, TMat> Create<TIn, TState, TOut, TMat>(Akka.Streams.IGraph<Akka.Streams.FlowShape<System.ValueTuple<TIn, TState>, System.ValueTuple<Akka.Util.Result<TOut>, TState>>, TMat> flow, System.Func<TState, Akka.Util.Option<System.ValueTuple<TIn, TState>>> retryWith) { }
}
public sealed class ReuseLatest<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, T>>
{
public ReuseLatest() { }
public ReuseLatest(System.Action<T, T> onItemChanged) { }
public override Akka.Streams.FlowShape<T, T> Shape { get; }
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { }
public override string ToString() { }
}
public class static ReverseOps
{
public static Akka.Streams.Dsl.GraphDsl.Builder<TMat> From<TIn, TOut, TMat>(this Akka.Streams.Dsl.GraphDsl.ReverseOps<TIn, TMat> ops, Akka.Streams.Outlet<TOut> outlet)
Expand Down Expand Up @@ -2059,6 +2069,8 @@ namespace Akka.Streams.Dsl
[System.ObsoleteAttribute("Use RecoverWithRetries instead. [1.1.2]")]
public static Akka.Streams.Dsl.Source<TOut, TMat> RecoverWith<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> RecoverWithRetries<TOut, TMat>(this Akka.Streams.Dsl.Source<TOut, TMat> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc, int attempts) { }
public static Akka.Streams.Dsl.Source<T, TMat> RepeatPrevious<T, TMat>(this Akka.Streams.Dsl.Source<T, TMat> source) { }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Source DSL changes

public static Akka.Streams.Dsl.Source<T, TMat> RepeatPrevious<T, TMat>(this Akka.Streams.Dsl.Source<T, TMat> source, System.Action<T, T> onItemUpdated) { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> Scan<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, TOut2> scan) { }
public static Akka.Streams.Dsl.Source<TOut2, TMat> ScanAsync<TOut1, TOut2, TMat>(this Akka.Streams.Dsl.Source<TOut1, TMat> flow, TOut2 zero, System.Func<TOut2, TOut1, System.Threading.Tasks.Task<TOut2>> scan) { }
public static Akka.Streams.Dsl.Source<TOut, TMat> Select<TIn, TOut, TMat>(this Akka.Streams.Dsl.Source<TIn, TMat> flow, System.Func<TIn, TOut> mapper) { }
Expand Down
84 changes: 84 additions & 0 deletions src/core/Akka.Streams.Tests/Dsl/ReuseLatestSpec.cs
@@ -0,0 +1,84 @@
//-----------------------------------------------------------------------
// <copyright file="RepeatPreviousSpec.cs" company="Akka.NET Project">
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------

using System;
using System.Linq;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.Streams.Dsl;
using Akka.TestKit;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests.Dsl
{
public class ReuseLatestSpec : AkkaSpec
{
private ActorMaterializer Materializer { get; }

public ReuseLatestSpec(ITestOutputHelper testOutputHelper) : base(Config.Empty, output: testOutputHelper)
{
var settings = ActorMaterializerSettings.Create(Sys);
Materializer = ActorMaterializer.Create(Sys, settings);
}

[Fact]
public async Task RepeatPrevious_should_immediately_terminate_with_Empty_source()
{
var source = Source.Empty<int>();
var result = await source.RepeatPrevious().RunWith(Sink.Seq<int>(), Materializer);
result.Should().BeEmpty();
}

[Fact]
public async Task RepeatPrevious_should_complete_when_upstream_completes()
{
var source = Source.Single(1).RepeatPrevious();
var result = await source.RunWith(Sink.Seq<int>(), Materializer);

// as a side-effect of RepeatPrevious' buffering process, there's going to be an extra element in the result
result.Should().BeEquivalentTo(1, 1);
}

[Fact]
public async Task RepeatPrevious_should_fail_when_upstream_fails()
{
Func<Task> Exec() => async () =>
{
var source = Source.From(Enumerable.Range(0,9)).Where(i =>
{
if (i % 5 == 0)
{
throw new ApplicationException("failed");
}

return true;
}).RepeatPrevious();
var result = await source.RunWith(Sink.Seq<int>(), Materializer);
};

await Exec().Should().ThrowAsync<ApplicationException>();
}

[Fact]
public async Task RepeatPrevious_should_repeat_when_no_newValues_available()
{
// <RepeatPrevious>
var (queue, source) = Source.Queue<int>(10, OverflowStrategy.Backpressure).PreMaterialize(Materializer);

// populate 1 into queue
await queue.OfferAsync(1);

// take 4 items from the queue
var result = await source.RepeatPrevious().Take(4).RunWith(Sink.Seq<int>(), Materializer);

// the most recent queue item will be repeated 3 times, plus the original element
result.Should().BeEquivalentTo(1,1,1,1);
// </RepeatPrevious>
}
}
}
8 changes: 3 additions & 5 deletions src/core/Akka.Streams/Attributes.cs
Expand Up @@ -301,7 +301,7 @@ public Attributes(params IAttribute[] attributes)
/// INTERNAL API
/// </summary>
internal bool IsAsync
=> _attributes.Count() > 0 &&
=> _attributes.Length > 0 &&
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance optimization / syntax clean up

_attributes.Any(
attr => attr is AsyncBoundary ||
attr is ActorAttributes.Dispatcher);
Expand Down Expand Up @@ -489,9 +489,7 @@ public static Attributes CreateName(string name)
/// <returns>TBD</returns>
public static string ExtractName(IModule module, string defaultIfNotFound)
{
var copy = module as CopiedModule;

return copy != null
return module is CopiedModule copy
? copy.Attributes.And(copy.CopyOf.Attributes).GetNameOrDefault(defaultIfNotFound)
: module.Attributes.GetNameOrDefault(defaultIfNotFound);
}
Expand Down Expand Up @@ -531,7 +529,7 @@ public bool Equals(Dispatcher other)
return true;
return Equals(Name, other.Name);
}
public override bool Equals(object obj) => obj is Dispatcher && Equals((Dispatcher)obj);
public override bool Equals(object obj) => obj is Dispatcher dispatcher && Equals(dispatcher);
public override int GetHashCode() => Name?.GetHashCode() ?? 0;
public override string ToString() => $"Dispatcher({Name})";
}
Expand Down
39 changes: 38 additions & 1 deletion src/core/Akka.Streams/Dsl/FlowOperations.cs
Expand Up @@ -22,7 +22,7 @@
namespace Akka.Streams.Dsl
{
/// <summary>
/// TBD
/// The set of DSL methods for composing <see cref="Flow{TIn,TOut,TMat}"/> stages together.
/// </summary>
public static class FlowOperations
{
Expand Down Expand Up @@ -2438,5 +2438,42 @@ public static class FlowOperations

return FlowWithContext.From(flowWithTuples);
}

/// <summary>
/// Repeats the previous element from upstream until it's replaced by a new value.
/// </summary>
/// <param name="flow">The previous Flow stage in this stream.</param>
/// <typeparam name="TIn">The input type.</typeparam>
/// <typeparam name="TOut">The output type.</typeparam>
/// <typeparam name="TMat">The materialization type.</typeparam>
/// <remarks>
/// This is designed to allow fan-in stages where output from one of the sources is intermittent / infrequent
/// and users just want the previous value to be reused.
/// </remarks>
public static Flow<TIn, TOut, TMat> RepeatPrevious<TIn, TOut, TMat>(this Flow<TIn, TOut, TMat> flow)
{
return flow.Via(new ReuseLatest<TOut>());
}

/// <summary>
/// Repeats the previous element from upstream until it's replaced by a new value.
/// </summary>
/// <param name="flow">The previous Flow stage in this stream.</param>
/// <param name="onItemChanged">A <see cref="Action{TOut, TOut}"/> function that allows the stage to perform clean-up operations when the previously repeated
/// value is being replaced.
///
/// This is used for things like calling <see cref="IDisposable.Dispose"/> on the previous value.
/// </param>
/// <typeparam name="TIn">The input type.</typeparam>
/// <typeparam name="TOut">The output type.</typeparam>
/// <typeparam name="TMat">The materialization type.</typeparam>
/// <remarks>
/// This is designed to allow fan-in stages where output from one of the sources is intermittent / infrequent
/// and users just want the previous value to be reused.
/// </remarks>
public static Flow<TIn, TOut, TMat> RepeatPrevious<TIn, TOut, TMat>(this Flow<TIn, TOut, TMat> flow, Action<TOut, TOut> onItemChanged)
{
return flow.Via(new ReuseLatest<TOut>(onItemChanged));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not named as RepeatLatest just as the stage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got feedback on name changes and didn't propagate it everywhere I guess, although the stage implementation is really meant to be internal.

}
}
}