-
Notifications
You must be signed in to change notification settings - Fork 1k
/
LastElementSpec.cs
72 lines (60 loc) · 2.69 KB
/
LastElementSpec.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//-----------------------------------------------------------------------
// <copyright file="LastElementSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2022 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
//-----------------------------------------------------------------------
using System;
using System.Linq;
using Akka.Streams.Dsl;
using Akka.Streams.TestKit;
using Akka.Streams.Util;
using Akka.Util;
using FluentAssertions;
using Xunit;
namespace Akka.Streams.Tests.Dsl
{
public class LastElementSpec : Akka.TestKit.Xunit2.TestKit
{
[Fact]
public void A_stream_via_LastElement_should_materialize_to_the_last_element_emitted_by_a_finite_nonempty_successful_source()
{
var t = Source.From(new[] { 1, 2, 3 })
.ViaMaterialized(new LastElement<int>(), Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Sys.Materializer());
var lastElement = t.Item1;
var probe = t.Item2;
probe.Request(3)
.ExpectNext(1, 2, 3)
.ExpectComplete();
lastElement.AwaitResult(TimeSpan.FromSeconds(1)).Should().Be(Option<int>.Create(3));
}
[Fact]
public void A_stream_via_LastElement_should_materialize_to_materialize_to_None_for_an_empty_successful_source()
{
var t = Source.From(Enumerable.Empty<int>())
.ViaMaterialized(new LastElement<int>(), Keep.Right)
.ToMaterialized(this.SinkProbe<int>(), Keep.Both)
.Run(Sys.Materializer());
var lastElement = t.Item1;
var probe = t.Item2;
probe.Request(3)
.ExpectComplete();
lastElement.AwaitResult(TimeSpan.FromSeconds(1)).Should().Be(Option<int>.None);
}
[Fact]
public void A_stream_via_LastElement_should_materialize_to_the_last_element_emitted_by_a_source_before_it_failed()
{
var t = Source.UnfoldInfinite(1, n => n >= 3 ? throw new Exception() : (n + 1, n + 1))
.ViaMaterialized(new LastElement<int>(), Keep.Right)
.ToMaterialized(Sink.Aggregate<int, Option<int>>(Option<int>.None, (_, o) => Option<int>.Create(o)), Keep.Both)
.Run(Sys.Materializer());
var lastElement = t.Item1;
var lastEmitted = t.Item2;
lastElement.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue();
lastEmitted.Wait(TimeSpan.FromSeconds(1)).Should().BeTrue();
lastElement.Result.Should().Be(lastEmitted.Result);
}
}
}