Skip to content

Commit

Permalink
added docs for Valve Akka.Streams stage (#7219)
Browse files Browse the repository at this point in the history
close #7217
  • Loading branch information
Aaronontheweb authored May 31, 2024
1 parent f9666a1 commit 1af99d3
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 8 deletions.
11 changes: 11 additions & 0 deletions docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,17 @@ a function has to be provided to calculate the individual cost of each element.

**completes** when upstream completes

### Valve

Materializes into a Task with an [`IValveSwitch`](xref:Akka.Streams.Dsl.IValveSwitch) which provides a method that will pause or resume elements being emitted from the stream.

As long as the valve is closed it will backpressure.

> [!NOTE]
> Closing the valve could result in one element being buffered inside the stage, and if the stream completes or fails while being closed, that element may be lost.
[!code-csharp[Valve](../../../src/core/Akka.Streams.Tests/Dsl/ValveSpec.cs?name=OpenValve)]

### DivertTo

Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.
Expand Down
14 changes: 7 additions & 7 deletions src/core/Akka.Streams.Tests/Dsl/ValveSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ public async Task Closed_Valve_should_emit_only_3_elements_into_a_sequence_when_
[Fact]
public async Task Closed_Valve_should_emit_only_5_elements_when_the_valve_is_switched_to_open()
{
var t = Source.From(Enumerable.Range(1, 5))
// <OpenValve>
var (switchTask, probe) = Source.From(Enumerable.Range(1, 5))
.ViaMaterialized(new Valve<int>(SwitchMode.Close), Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Sys.Materializer());

var switchTask = t.Item1;
var probe = t.Item2;

var valveSwitch = await switchTask.ShouldCompleteWithin(3.Seconds());

IValveSwitch valveSwitch = await switchTask.ShouldCompleteWithin(3.Seconds());
probe.Request(2);
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));

var flip = valveSwitch.Flip(SwitchMode.Open);
Task<bool> flip = valveSwitch.Flip(SwitchMode.Open);
var complete = await flip.ShouldCompleteWithin(3.Seconds());
// valve is now open
complete.Should().BeTrue();
// </OpenValve>

probe.ExpectNext(1, 2);

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams/Dsl/Valve.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Task<SwitchMode> GetMode()
}

/// <summary>
/// Materializes into a task of <see cref="IValveSwitch"/> which provides a the method flip that stops or restarts the flow of elements passing through the stage.
/// Materializes into a task of <see cref="IValveSwitch"/> which provides a method that will stop or restart the flow of elements passing through the stage.
/// As long as the valve is closed it will backpressure.
/// Note that closing the valve could result in one element being buffered inside the stage, and if the stream completes or fails while being closed, that element may be lost.
/// </summary>
Expand Down

0 comments on commit 1af99d3

Please sign in to comment.