New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Akka.Streams: ReuseLatest
stage to repeatedly emit the most recent value until a newer one is pushed
#6262
Akka.Streams: ReuseLatest
stage to repeatedly emit the most recent value until a newer one is pushed
#6262
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Described all my changes
@@ -14,7 +14,7 @@ | |||
[assembly: System.Runtime.CompilerServices.InternalsVisibleToAttribute("Akka.Management.Cluster.Http")] | |||
[assembly: System.Runtime.InteropServices.ComVisibleAttribute(false)] | |||
[assembly: System.Runtime.InteropServices.GuidAttribute("0e3e691b-0c31-4718-9b1a-d749b93208c9")] | |||
[assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETStandard,Version=v2.0", FrameworkDisplayName="")] | |||
[assembly: System.Runtime.Versioning.TargetFrameworkAttribute(".NETStandard,Version=v2.0", FrameworkDisplayName=".NET Standard 2.0")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is added by the latest .NET SDK build tools from .NET 7.0 - and this is the first PR we've done on a machine with it installed.
@@ -189,3 +189,32 @@ var driftFlow = Flow.Create<int>() | |||
``` | |||
|
|||
Note that all of the elements coming from upstream will go through expand at least once. This means that the output of this flow is going to report a drift of zero if producer is fast enough, or a larger drift otherwise. | |||
|
|||
### Repeating Previous Values Downstream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wanted to add a lengthier article about RepeatPrevious
in the context of different sources working at decoupled rates, which is why I added this stage in this first place.
Enter the `RepeatPrevious` stage - which will allow us to reuse the most recent `HttpClient` each time a new `HttpRequestMessage` arrives: | ||
|
||
```csharp | ||
public static Source<HttpClient, ICancelable> CreateSourceInternal(string clientId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This meant to be pseudo code.
@@ -1383,6 +1383,8 @@ namespace Akka.Streams.Dsl | |||
[System.ObsoleteAttribute("Use RecoverWithRetries instead. [1.1.2]")] | |||
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> RecoverWith<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc) { } | |||
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> RecoverWithRetries<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow, System.Func<System.Exception, Akka.Streams.IGraph<Akka.Streams.SourceShape<TOut>, TMat>> partialFunc, int attempts) { } | |||
public static Akka.Streams.Dsl.Flow<TIn, TOut, TMat> RepeatPrevious<TIn, TOut, TMat>(this Akka.Streams.Dsl.Flow<TIn, TOut, TMat> flow) { } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Flow DSL changes
@@ -1791,6 +1793,14 @@ namespace Akka.Streams.Dsl | |||
public Pulse(System.TimeSpan interval, bool initiallyOpen = False) { } | |||
protected override Akka.Streams.Stage.GraphStageLogic CreateLogic(Akka.Streams.Attributes inheritedAttributes) { } | |||
} | |||
public sealed class RepeatPrevious<T> : Akka.Streams.Stage.GraphStage<Akka.Streams.FlowShape<T, T>> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RepeatPrevious<T>
API implementation
} | ||
|
||
[Fact] | ||
public async Task RepeatPrevious_should_immediately_terminate_with_Empty_source() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instant completion test case.
var result = await source.RunWith(Sink.Seq<int>(), Materializer); | ||
|
||
// as a side-effect of RepeatPrevious' buffering process, there's going to be an extra element in the result | ||
result.Should().BeEquivalentTo(1, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, and I spent a couple of hours trying to fix this - we're going to get an extra event due to the work involved in detecting that the upstream has completed. Comes with the territory if you're separating upstream from downstream in the manner that this stage does.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment may help: #6262 (comment)
} | ||
|
||
[Fact] | ||
public async Task RepeatPrevious_should_fail_when_upstream_fails() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests to validate that failure was propagated.
@@ -301,7 +301,7 @@ public Attributes(params IAttribute[] attributes) | |||
/// INTERNAL API | |||
/// </summary> | |||
internal bool IsAsync | |||
=> _attributes.Count() > 0 && | |||
=> _attributes.Length > 0 && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance optimization / syntax clean up
{ | ||
var next = Grab(_stage._in); | ||
if (_last.HasValue) | ||
_swapPrevious(_last.Value, next); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Swap function is only needed for scenarios where removing the previous value requires some cleanup - i.e. in my original usecase I needed to call Dispose
on an HttpClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern here is that there may or may not be some implicit buffering concerns here.
Is there a risk that either:
- There are 'extra' instances of the item getting disposed in a buffer in-between stages
- This stage is processing input while the downstream stage is trying to use the last value?
My knowledge of akka streams stage runs is not the best, but I know there are cases where internal buffers can lead to surprising results.
In order to get this to pass I'm going to need to upgrade the build system to use .NET 7 - otherwise the extra .DLL metadata is going to cause conflicts with the build system. |
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => | ||
new Logic(this, _swapPrevious); | ||
|
||
private sealed class Logic : InAndOutGraphStageLogic |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will say that overall this reminds me of a version of Expand
with a bit more sane of an API. I'm not sure if that means we can de-dupe any logic between the two however (Expand is... weird to me.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had no idea about Expand
TBH - part of why I did what I did in the documentation with this stage is that I think our overview of built-in stages is pretty weak and not easy to grok.
/// when a new value is emitted. | ||
/// </summary> | ||
/// <typeparam name="T">The type of element handled by the <see cref="RepeatPrevious{T}"/></typeparam> | ||
public delegate void SwapPrevious<T>(T previousValue, T newValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the biggest fan of Custom Delegates, as they an make function reuse more painful.
Also wonder whether OnItemChange
would be a better name; SwapPrevious
has mutation connotations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good idea - I'm not a fan of the custom delegate either but it was confusing as hell trying to make sense of which is which without naming the parameters.
{ | ||
if (_last.HasValue) | ||
{ | ||
if (!HasBeenPulled(_stage._in)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To your comments about repeat elements in the test, I wonder if guard code in pull would help. Expand does something like this:
if (_iterator.HasNext())
{
if (!_expanded)
{
_expanded = true;
if (IsClosed(_stage.In))
{
Push(_stage.Out, _iterator.Next());
CompleteStage();
}
else
{
// expand needs to pull first to be "fair" when upstream is not actually slow
Pull(_stage.In);
Push(_stage.Out, _iterator.Next());
}
}
else
Push(_stage.Out, _iterator.Next());
}
And I wonder if that IsClosed guard would help?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll take a look at this! I tried coming up with something similar to this but I just kept moving the problem somewhere else in the test suite. Let me give this a look though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I tried this again and it's not going to work - the issue is that in order to detect that the upstream is closed it has to be pulled a second time and during that timeframe we're going to emit an extra item.
The Expand
specs don't really check for the case where the upstream terminates - they're mostly just testing that expansion doesn't happen so long as downstream can keep up with demand.
_onItemChanged(_last.Value, next); | ||
_last = next; | ||
|
||
if (IsAvailable(_stage._out)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're not buffering, if the RepeatPrevious
stage is actually faster than its companion(s), would that mean that we'll lose events/data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to document this, just in case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't lose any events / data - we always emit the current element as it's being discovered and we'll repeatedly emit it downstream when a newer value hasn't been pushed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the stage name is a bit misleading since we're not buffering anything. Maybe RepeatLatest
or RetainLatest
be better?
The name is |
Its just a bit ambiguous, its not clear what "previous" is.
"Latest" is a bit more clear that the last received event got zipped |
The user would receive ( |
ReuseLatest
stage to repeatedly emit the most recent value until a newer one is pushed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just need to remove the benchmark artifact files.
Done, sorry about that :( |
This needs a port to v1.5 also, along with several other PRs that have been made to the v1.4 branch. |
…value until a newer one is pushed (akkadotnet#6262) * code cleanup in Akka.Streams `Attributes` * added `RepeatPrevious{T}` stage * WIP - debugging `RepeatPreviousSpecs` * fixed tests and added documentation * fixed documentation * API approvals * fixed markdown linting * removed `SwapPrevious<T>` delegate. * renamed stage from `RepeatPrevious` to `ReuseLatest` * remove BDN results
* cleaned up duplicate System.Collections.Immutable package reference (#6264) also standardized all System.* packages on a common version * converted build system to .NET 7.0 (#6263) * converted build system to .NET 7.0 * upgrade to Incrementalist.Cmd v0.8.0 * upgraded MNTR to support .NET 7.0 * fixed build system to target .NET 7.0 * upgrade to latest version of DocFx * add .NET 6 SDK back to build system * fixed HyperionConfigTests * Akka.Streams: `ReuseLatest` stage to repeatedly emit the most recent value until a newer one is pushed (#6262) * code cleanup in Akka.Streams `Attributes` * added `RepeatPrevious{T}` stage * WIP - debugging `RepeatPreviousSpecs` * fixed tests and added documentation * fixed documentation * API approvals * fixed markdown linting * removed `SwapPrevious<T>` delegate. * renamed stage from `RepeatPrevious` to `ReuseLatest` * remove BDN results * added real UID to `ActorPathBenchmarks` (#6276) While working on #6195 I realized that none of those `ActorPath`s actually have a set UID, thus we're missing that entire facet from both the parsing and serialization benchmarks. * Enable dynamic PGO for RemotePingPong and PingPong (#6277) * eliminate `ActorPath.ToSerializationFormat` UID allocations (#6195) * eliminate `ActorPath.ToSerializationFormat` UID allocations Used some more `Span<char>` magic to avoid additional allocations when string-ifying `ActorPath` components. * adding `SpanHacks` benchmarks * sped up `Int64SizeInCharacters` * added `TryFormat` benchmarks * fixed n+1 error in jump table * cleaned up `TryFormat` inside `SpanHacks` * fixed `SpanHacks` index calculation * removed BDN results * Update SpanHacks.cs * compilation fixes and V1.5 api approval
/// </remarks> | ||
public static Flow<TIn, TOut, TMat> RepeatPrevious<TIn, TOut, TMat>(this Flow<TIn, TOut, TMat> flow, Action<TOut, TOut> onItemChanged) | ||
{ | ||
return flow.Via(new ReuseLatest<TOut>(onItemChanged)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not named as RepeatLatest
just as the stage?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got feedback on name changes and didn't propagate it everywhere I guess, although the stage implementation is really meant to be internal.
|
||
public override string ToString() | ||
{ | ||
return "RepeatPrevious"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReuseLatest
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, also needs to be renamed - this got missed during the review
@Aaronontheweb Hi, as the reuseLatest can be implemented with |
ReuseLatest is meant to be its own thing - Expand is funky and allocates lists each time |
Changes
Adds a new
RepeatPrevious
Flow<TOut>
stage to Akka.Streams, designed for use with fan-in stages where one producer might only emit output intermittently - such as https://github.com/Aaronontheweb/AkkaStreams.Demo.HttpClient (with theHttpClient
production stream.)Checklist
For significant changes, please ensure that the following have been completed (delete if not relevant):