From c7e6102fc85fdbfe62b20dec6b486772fe96d107 Mon Sep 17 00:00:00 2001 From: Aaron Stannard Date: Fri, 31 May 2024 14:11:00 -0500 Subject: [PATCH] added docs for `Valve` Akka.Streams stage close #7217 --- docs/articles/streams/builtinstages.md | 11 +++++++++++ src/core/Akka.Streams.Tests/Dsl/ValveSpec.cs | 14 +++++++------- src/core/Akka.Streams/Dsl/Valve.cs | 2 +- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/docs/articles/streams/builtinstages.md b/docs/articles/streams/builtinstages.md index d8fbf4e5e41..19463c431ed 100644 --- a/docs/articles/streams/builtinstages.md +++ b/docs/articles/streams/builtinstages.md @@ -787,6 +787,17 @@ a function has to be provided to calculate the individual cost of each element. **completes** when upstream completes +### Valve + +Materializes into a Task with an [`IValveSwitch`](xref:Akka.Streams.Dsl.IValveSwitch) which provides a method that will pause or resume elements being emitted from the stream. + +As long as the valve is closed it will backpressure. + +> [!NOTE] +> Closing the valve could result in one element being buffered inside the stage, and if the stream completes or fails while being closed, that element may be lost. + +[!code-csharp[Valve](../../../src/core/Akka.Streams.Tests/Dsl/ValveSpec.cs?name=OpenValve)] + ### DivertTo Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element. diff --git a/src/core/Akka.Streams.Tests/Dsl/ValveSpec.cs b/src/core/Akka.Streams.Tests/Dsl/ValveSpec.cs index f77eefb9438..66f1b7b3e1c 100644 --- a/src/core/Akka.Streams.Tests/Dsl/ValveSpec.cs +++ b/src/core/Akka.Streams.Tests/Dsl/ValveSpec.cs @@ -42,21 +42,21 @@ public async Task Closed_Valve_should_emit_only_3_elements_into_a_sequence_when_ [Fact] public async Task Closed_Valve_should_emit_only_5_elements_when_the_valve_is_switched_to_open() { - var t = Source.From(Enumerable.Range(1, 5)) + // + var (switchTask, probe) = Source.From(Enumerable.Range(1, 5)) .ViaMaterialized(new Valve(SwitchMode.Close), Keep.Right) .ToMaterialized(this.SinkProbe(), Keep.Both) .Run(Sys.Materializer()); - - var switchTask = t.Item1; - var probe = t.Item2; - - var valveSwitch = await switchTask.ShouldCompleteWithin(3.Seconds()); + + IValveSwitch valveSwitch = await switchTask.ShouldCompleteWithin(3.Seconds()); probe.Request(2); probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100)); - var flip = valveSwitch.Flip(SwitchMode.Open); + Task flip = valveSwitch.Flip(SwitchMode.Open); var complete = await flip.ShouldCompleteWithin(3.Seconds()); + // valve is now open complete.Should().BeTrue(); + // probe.ExpectNext(1, 2); diff --git a/src/core/Akka.Streams/Dsl/Valve.cs b/src/core/Akka.Streams/Dsl/Valve.cs index c98dccdb41d..eeac3c185b2 100644 --- a/src/core/Akka.Streams/Dsl/Valve.cs +++ b/src/core/Akka.Streams/Dsl/Valve.cs @@ -66,7 +66,7 @@ public Task GetMode() } /// - /// Materializes into a task of which provides a the method flip that stops or restarts the flow of elements passing through the stage. + /// Materializes into a task of which provides a method that will stop or restart the flow of elements passing through the stage. /// As long as the valve is closed it will backpressure. /// Note that closing the valve could result in one element being buffered inside the stage, and if the stream completes or fails while being closed, that element may be lost. ///