Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added docs for Valve Akka.Streams stage #7219

Merged
merged 2 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/articles/streams/builtinstages.md
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,17 @@ a function has to be provided to calculate the individual cost of each element.

**completes** when upstream completes

### Valve

Materializes into a Task with an [`IValveSwitch`](xref:Akka.Streams.Dsl.IValveSwitch) which provides a method that will pause or resume elements being emitted from the stream.

As long as the valve is closed it will backpressure.

> [!NOTE]
> Closing the valve could result in one element being buffered inside the stage, and if the stream completes or fails while being closed, that element may be lost.

[!code-csharp[Valve](../../../src/core/Akka.Streams.Tests/Dsl/ValveSpec.cs?name=OpenValve)]

### DivertTo

Each upstream element will either be diverted to the given sink, or the downstream consumer according to the predicate function applied to the element.
Expand Down
14 changes: 7 additions & 7 deletions src/core/Akka.Streams.Tests/Dsl/ValveSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,21 @@ public async Task Closed_Valve_should_emit_only_3_elements_into_a_sequence_when_
[Fact]
public async Task Closed_Valve_should_emit_only_5_elements_when_the_valve_is_switched_to_open()
{
var t = Source.From(Enumerable.Range(1, 5))
// <OpenValve>
var (switchTask, probe) = Source.From(Enumerable.Range(1, 5))
.ViaMaterialized(new Valve<int>(SwitchMode.Close), Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Sys.Materializer());

var switchTask = t.Item1;
var probe = t.Item2;

var valveSwitch = await switchTask.ShouldCompleteWithin(3.Seconds());

IValveSwitch valveSwitch = await switchTask.ShouldCompleteWithin(3.Seconds());
probe.Request(2);
probe.ExpectNoMsg(TimeSpan.FromMilliseconds(100));

var flip = valveSwitch.Flip(SwitchMode.Open);
Task<bool> flip = valveSwitch.Flip(SwitchMode.Open);
var complete = await flip.ShouldCompleteWithin(3.Seconds());
// valve is now open
complete.Should().BeTrue();
// </OpenValve>

probe.ExpectNext(1, 2);

Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka.Streams/Dsl/Valve.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Task<SwitchMode> GetMode()
}

/// <summary>
/// Materializes into a task of <see cref="IValveSwitch"/> which provides a the method flip that stops or restarts the flow of elements passing through the stage.
/// Materializes into a task of <see cref="IValveSwitch"/> which provides a method that will stop or restart the flow of elements passing through the stage.
/// As long as the valve is closed it will backpressure.
/// Note that closing the valve could result in one element being buffered inside the stage, and if the stream completes or fails while being closed, that element may be lost.
/// </summary>
Expand Down