/
KeepAliveConcatSpec.scala
106 lines (76 loc) · 3.14 KB
/
KeepAliveConcatSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.contrib
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.testkit.{TestPublisher, TestSubscriber}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings}
import scala.concurrent.Await
import scala.concurrent.duration._
class KeepAliveConcatSpec extends BaseStreamSpec {
override protected def autoFusing = false
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorMaterializer(settings)
val sampleSource = Source((1 to 10).grouped(3).toVector)
val expand = (lst: IndexedSeq[Int]) ⇒ lst.toList.map(Vector(_))
"keepAliveConcat" must {
"not emit additional elements if upstream is fast enough" in {
Await
.result(sampleSource
.via(KeepAliveConcat(5, 1.second, expand))
.grouped(1000)
.runWith(Sink.head),
3.seconds)
.flatten should ===(1 to 10)
}
"emit elements periodically after silent periods" in {
val sourceWithIdleGap = Source((1 to 5).grouped(3).toList) ++
Source((6 to 10).grouped(3).toList).initialDelay(2.second)
Await
.result(sourceWithIdleGap
.via(KeepAliveConcat(5, 0.6.seconds, expand))
.grouped(1000)
.runWith(Sink.head),
3.seconds)
.flatten should ===(1 to 10)
}
"immediately pull upstream" in {
val upstream = TestPublisher.probe[Vector[Int]]()
val downstream = TestSubscriber.probe[Vector[Int]]()
Source.fromPublisher(upstream).via(KeepAliveConcat(2, 1.second, expand)).runWith(Sink.fromSubscriber(downstream))
downstream.request(1)
upstream.sendNext(Vector(1))
downstream.expectNext(Vector(1))
upstream.sendComplete()
downstream.expectComplete()
}
"immediately pull upstream after busy period" in {
val upstream = TestPublisher.probe[IndexedSeq[Int]]()
val downstream = TestSubscriber.probe[IndexedSeq[Int]]()
(sampleSource ++ Source.fromPublisher(upstream))
.via(KeepAliveConcat(2, 1.second, expand))
.runWith(Sink.fromSubscriber(downstream))
downstream.request(10)
downstream.expectNextN((1 to 3).grouped(1).toVector ++ (4 to 10).grouped(3).toVector)
downstream.request(1)
upstream.sendNext(Vector(1))
downstream.expectNext(Vector(1))
upstream.sendComplete()
downstream.expectComplete()
}
"work if timer fires before initial request after busy period" in {
val upstream = TestPublisher.probe[IndexedSeq[Int]]()
val downstream = TestSubscriber.probe[IndexedSeq[Int]]()
(sampleSource ++ Source.fromPublisher(upstream))
.via(KeepAliveConcat(2, 1.second, expand))
.runWith(Sink.fromSubscriber(downstream))
downstream.request(10)
downstream.expectNextN((1 to 3).grouped(1).toVector ++ (4 to 10).grouped(3).toVector)
downstream.expectNoMsg(1.5.second)
downstream.request(1)
upstream.sendComplete()
downstream.expectComplete()
}
}
}