/
Flow.scala
executable file
·2638 lines (2471 loc) · 115 KB
/
Flow.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/**
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.event.LoggingAdapter
import akka.stream._
import akka.Done
import akka.stream.impl._
import akka.stream.impl.fusing._
import akka.stream.stage._
import akka.util.ConstantFun
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds
import akka.stream.impl.fusing.FlattenMerge
import akka.NotUsed
import akka.annotation.DoNotInherit
/**
* A `Flow` is a set of stream processing steps that has one open input and one open output.
*/
final class Flow[-In, +Out, +Mat](
override val traversalBuilder: LinearTraversalBuilder,
override val shape: FlowShape[In, Out])
extends FlowOpsMat[Out, Mat] with Graph[FlowShape[In, Out], Mat] {
// TODO: debug string
override def toString: String = s"Flow($shape)"
override type Repr[+O] = Flow[In @uncheckedVariance, O, Mat @uncheckedVariance]
override type ReprMat[+O, +M] = Flow[In @uncheckedVariance, O, M]
override type Closed = Sink[In @uncheckedVariance, Mat @uncheckedVariance]
override type ClosedMat[+M] = Sink[In @uncheckedVariance, M]
private[stream] def isIdentity: Boolean = this.traversalBuilder eq Flow.identityTraversalBuilder
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left)
override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Flow[In, T, Mat3] = {
if (this.isIdentity) {
new Flow(
LinearTraversalBuilder.fromBuilder(flow.traversalBuilder, flow.shape, combine),
flow.shape).asInstanceOf[Flow[In, T, Mat3]]
} else {
new Flow(
traversalBuilder.append(flow.traversalBuilder, flow.shape, combine),
FlowShape[In, T](shape.in, flow.shape.out))
}
}
/**
* Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both.
* {{{
* +------------------------------+
* | Resulting Sink[In, Mat] |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | flow | ~~Out~~> | sink | |
* | | Mat| | M| |
* | +------+ +------+ |
* +------------------------------+
* }}}
* The materialized value of the combined [[Sink]] will be the materialized
* value of the current flow (ignoring the given Sink’s value), use
* [[Flow#toMat[Mat2* toMat]] if a different strategy is needed.
*
* See also [[toMat]] when access to materialized values of the parameter is needed.
*/
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): Sink[In, Mat] = toMat(sink)(Keep.left)
/**
* Connect this [[Flow]] to a [[Sink]], concatenating the processing steps of both.
* {{{
* +----------------------------+
* | Resulting Sink[In, M2] |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | flow | ~Out~> | sink | |
* | | Mat| | M| |
* | +------+ +------+ |
* +----------------------------+
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* Sink into the materialized value of the resulting Sink.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Sink[In, Mat3] = {
if (isIdentity) {
new Sink(
LinearTraversalBuilder.fromBuilder(sink.traversalBuilder, sink.shape, combine),
SinkShape(sink.shape.in)).asInstanceOf[Sink[In, Mat3]]
} else {
new Sink(
traversalBuilder.append(sink.traversalBuilder, sink.shape, combine),
SinkShape(shape.in))
}
}
/**
* Transform the materialized value of this Flow, leaving all other properties as they were.
*/
override def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): ReprMat[Out, Mat2] =
new Flow(
traversalBuilder.transformMat(f),
shape)
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]].
* {{{
* +------+ +-------+
* | | ~Out~> | |
* | this | | other |
* | | <~In~ | |
* +------+ +-------+
* }}}
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the other Flow’s value), use
* [[Flow#joinMat[Mat2* joinMat]] if a different strategy is needed.
*/
def join[Mat2](flow: Graph[FlowShape[Out, In], Mat2]): RunnableGraph[Mat] = joinMat(flow)(Keep.left)
/**
* Join this [[Flow]] to another [[Flow]], by cross connecting the inputs and outputs, creating a [[RunnableGraph]]
* {{{
* +------+ +-------+
* | | ~Out~> | |
* | this | | other |
* | | <~In~ | |
* +------+ +-------+
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* Flow into the materialized value of the resulting Flow.
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def joinMat[Mat2, Mat3](flow: Graph[FlowShape[Out, In], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = {
val resultBuilder = traversalBuilder
.append(flow.traversalBuilder, flow.shape, combine)
.wire(flow.shape.out, shape.in)
RunnableGraph(resultBuilder)
}
/**
* Join this [[Flow]] to a [[BidiFlow]] to close off the “top” of the protocol stack:
* {{{
* +---------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | ~Out~> | | ~~> O2
* | | flow | | bidi | |
* | | | <~In~ | | <~~ I2
* | +------+ +------+ |
* +---------------------------+
* }}}
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the [[BidiFlow]]’s value), use
* [[Flow#joinMat[I2* joinMat]] if a different strategy is needed.
*/
def join[I2, O2, Mat2](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2]): Flow[I2, O2, Mat] = joinMat(bidi)(Keep.left)
/**
* Join this [[Flow]] to a [[BidiFlow]] to close off the “top” of the protocol stack:
* {{{
* +---------------------------+
* | Resulting Flow |
* | |
* | +------+ +------+ |
* | | | ~Out~> | | ~~> O2
* | | flow | | bidi | |
* | | | <~In~ | | <~~ I2
* | +------+ +------+ |
* +---------------------------+
* }}}
* The `combine` function is used to compose the materialized values of this flow and that
* [[BidiFlow]] into the materialized value of the resulting [[Flow]].
*
* It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners
* where appropriate instead of manually writing functions that pass through one of the values.
*/
def joinMat[I2, O2, Mat2, M](bidi: Graph[BidiShape[Out, O2, I2, In], Mat2])(combine: (Mat, Mat2) ⇒ M): Flow[I2, O2, M] = {
val newBidiShape = bidi.shape.deepCopy()
val newFlowShape = shape.deepCopy()
val resultBuilder =
TraversalBuilder.empty()
.add(traversalBuilder, newFlowShape, Keep.right)
.add(bidi.traversalBuilder, newBidiShape, combine)
.wire(newFlowShape.out, newBidiShape.in1)
.wire(newBidiShape.out2, newFlowShape.in)
val newShape = FlowShape(newBidiShape.in2, newBidiShape.out1)
new Flow(
LinearTraversalBuilder.fromBuilder(resultBuilder, newShape, Keep.right),
newShape)
}
/**
* Replace the attributes of this [[Flow]] with the given ones. If this Flow is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite.
*
* Note that this operation has no effect on an empty Flow (because the attributes apply
* only to the contained processing stages).
*/
override def withAttributes(attr: Attributes): Repr[Out] =
new Flow(
traversalBuilder.setAttributes(attr),
shape)
/**
* Add the given attributes to this [[Flow]]. If the specific attribute was already present
* on this graph this means the added attribute will be more specific than the existing one.
* If this Flow is a composite of multiple graphs, new attributes on the composite will be
* less specific than attributes set directly on the individual graphs of the composite.
*/
override def addAttributes(attr: Attributes): Repr[Out] = withAttributes(traversalBuilder.attributes and attr)
/**
* Add a ``name`` attribute to this Flow.
*/
override def named(name: String): Repr[Out] = addAttributes(Attributes.name(name))
/**
* Put an asynchronous boundary around this `Flow`
*/
override def async: Repr[Out] = super.async.asInstanceOf[Repr[Out]]
/**
* Put an asynchronous boundary around this `Flow`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): Repr[Out] =
super.async(dispatcher).asInstanceOf[Repr[Out]]
/**
* Put an asynchronous boundary around this `Flow`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): Repr[Out] =
super.async(dispatcher, inputBufferSize).asInstanceOf[Repr[Out]]
/**
* Connect the `Source` to this `Flow` and then connect it to the `Sink` and run it. The returned tuple contains
* the materialized values of the `Source` and `Sink`, e.g. the `Subscriber` of a of a [[Source#subscriber]] and
* and `Publisher` of a [[Sink#publisher]].
*/
def runWith[Mat1, Mat2](source: Graph[SourceShape[In], Mat1], sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): (Mat1, Mat2) =
Source.fromGraph(source).via(this).toMat(sink)(Keep.both).run()
/**
* Converts this Flow to a [[RunnableGraph]] that materializes to a Reactive Streams [[org.reactivestreams.Processor]]
* which implements the operations encapsulated by this Flow. Every materialization results in a new Processor
* instance, i.e. the returned [[RunnableGraph]] is reusable.
*
* @return A [[RunnableGraph]] that materializes to a Processor when run() is called on it.
*/
def toProcessor: RunnableGraph[Processor[In @uncheckedVariance, Out @uncheckedVariance]] =
Source.asSubscriber[In].via(this).toMat(Sink.asPublisher[Out](false))(Keep.both[Subscriber[In], Publisher[Out]])
.mapMaterializedValue {
case (sub, pub) ⇒ new Processor[In, Out] {
override def onError(t: Throwable): Unit = sub.onError(t)
override def onSubscribe(s: Subscription): Unit = sub.onSubscribe(s)
override def onComplete(): Unit = sub.onComplete()
override def onNext(t: In): Unit = sub.onNext(t)
override def subscribe(s: Subscriber[_ >: Out]): Unit = pub.subscribe(s)
}
}
/** Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava: javadsl.Flow[In, Out, Mat] = new javadsl.Flow(this)
}
object Flow {
private[stream] val identityTraversalBuilder =
LinearTraversalBuilder.fromBuilder(GraphStages.identity.traversalBuilder, GraphStages.identity.shape, Keep.right)
private[this] val identity: Flow[Any, Any, NotUsed] = new Flow[Any, Any, NotUsed](
identityTraversalBuilder,
GraphStages.identity.shape)
/**
* Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]]
*/
def fromProcessor[I, O](processorFactory: () ⇒ Processor[I, O]): Flow[I, O, NotUsed] = {
fromProcessorMat(() ⇒ (processorFactory(), NotUsed))
}
/**
* Creates a Flow from a Reactive Streams [[org.reactivestreams.Processor]] and returns a materialized value.
*/
def fromProcessorMat[I, O, M](processorFactory: () ⇒ (Processor[I, O], M)): Flow[I, O, M] =
fromGraph(ProcessorModule(processorFactory))
/**
* Returns a `Flow` which outputs all its inputs.
*/
def apply[T]: Flow[T, T, NotUsed] = identity.asInstanceOf[Flow[T, T, NotUsed]]
/**
* Creates a [Flow] which will use the given function to transform its inputs to outputs. It is equivalent
* to `Flow[T].map(f)`
*/
def fromFunction[A, B](f: A ⇒ B): Flow[A, B, NotUsed] = apply[A].map(f)
/**
* A graph with the shape of a flow logically is a flow, this method makes
* it so also in type.
*/
def fromGraph[I, O, M](g: Graph[FlowShape[I, O], M]): Flow[I, O, M] =
g match {
case f: Flow[I, O, M] ⇒ f
case f: javadsl.Flow[I, O, M] ⇒ f.asScala
case g: GraphStageWithMaterializedValue[FlowShape[I, O], M] ⇒
// move these from the stage itself to make the returned source
// behave as it is the stage with regards to attributes
val attrs = g.traversalBuilder.attributes
val noAttrStage = g.withAttributes(Attributes.none)
new Flow(
LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right),
noAttrStage.shape
).withAttributes(attrs)
case other ⇒ new Flow(
LinearTraversalBuilder.fromBuilder(g.traversalBuilder, g.shape, Keep.right),
g.shape)
}
/**
* Creates a `Flow` from a `Sink` and a `Source` where the Flow's input
* will be sent to the Sink and the Flow's output will come from the Source.
*
* The resulting flow can be visualized as:
* {{{
* +----------------------------------------------+
* | Resulting Flow[I, O, NotUsed] |
* | |
* | +---------+ +-----------+ |
* | | | | | |
* I ~~> | Sink[I] | [no-connection!] | Source[O] | ~~> O
* | | | | | |
* | +---------+ +-----------+ |
* +----------------------------------------------+
* }}}
*
* The completion of the Sink and Source sides of a Flow constructed using
* this method are independent. So if the Sink receives a completion signal,
* the Source side will remain unaware of that. If you are looking to couple
* the termination signals of the two sides use `Flow.fromSinkAndSourceCoupled` instead.
*
* See also [[fromSinkAndSourceMat]] when access to materialized values of the parameters is needed.
*/
def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =
fromSinkAndSourceMat(sink, source)(Keep.none)
/**
* Creates a `Flow` from a `Sink` and a `Source` where the Flow's input
* will be sent to the Sink and the Flow's output will come from the Source.
*
* The resulting flow can be visualized as:
* {{{
* +-------------------------------------------------------+
* | Resulting Flow[I, O, M] |
* | |
* | +-------------+ +---------------+ |
* | | | | | |
* I ~~> | Sink[I, M1] | [no-connection!] | Source[O, M2] | ~~> O
* | | | | | |
* | +-------------+ +---------------+ |
* +------------------------------------------------------+
* }}}
*
* The completion of the Sink and Source sides of a Flow constructed using
* this method are independent. So if the Sink receives a completion signal,
* the Source side will remain unaware of that. If you are looking to couple
* the termination signals of the two sides use `Flow.fromSinkAndSourceCoupledMat` instead.
*
* The `combine` function is used to compose the materialized values of the `sink` and `source`
* into the materialized value of the resulting [[Flow]].
*/
def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(combine: (M1, M2) ⇒ M): Flow[I, O, M] =
fromGraph(GraphDSL.create(sink, source)(combine) { implicit b ⇒ (in, out) ⇒ FlowShape(in.in, out.out) })
/**
* Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them.
* Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two stages.
*
* The resulting flow can be visualized as:
* {{{
* +---------------------------------------------+
* | Resulting Flow[I, O, NotUsed] |
* | |
* | +---------+ +-----------+ |
* | | | | | |
* I ~~> | Sink[I] | ~~~(coupled)~~~ | Source[O] | ~~> O
* | | | | | |
* | +---------+ +-----------+ |
* +---------------------------------------------+
* }}}
*
* E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled,
* however the Sink will also be completed. The table below illustrates the effects in detail:
*
* <table>
* <tr>
* <th>Returned Flow</th>
* <th>Sink (<code>in</code>)</th>
* <th>Source (<code>out</code>)</th>
* </tr>
* <tr>
* <td><i>cause:</i> upstream (sink-side) receives completion</td>
* <td><i>effect:</i> receives completion</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* <tr>
* <td><i>cause:</i> upstream (sink-side) receives error</td>
* <td><i>effect:</i> receives error</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* <tr>
* <td><i>cause:</i> downstream (source-side) receives cancel</td>
* <td><i>effect:</i> completes</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* <tr>
* <td><i>effect:</i> cancels upstream, completes downstream</td>
* <td><i>effect:</i> completes</td>
* <td><i>cause:</i> signals complete</td>
* </tr>
* <tr>
* <td><i>effect:</i> cancels upstream, errors downstream</td>
* <td><i>effect:</i> receives error</td>
* <td><i>cause:</i> signals error or throws</td>
* </tr>
* <tr>
* <td><i>effect:</i> cancels upstream, completes downstream</td>
* <td><i>cause:</i> cancels</td>
* <td><i>effect:</i> receives cancel</td>
* </tr>
* </table>
*
* See also [[fromSinkAndSourceCoupledMat]] when access to materialized values of the parameters is needed.
*/
def fromSinkAndSourceCoupled[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] =
fromSinkAndSourceCoupledMat(sink, source)(Keep.none)
/**
* Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow from them.
* Similar to [[Flow.fromSinkAndSource]] however couples the termination of these two stages.
*
* The resulting flow can be visualized as:
* {{{
* +-----------------------------------------------------+
* | Resulting Flow[I, O, M] |
* | |
* | +-------------+ +---------------+ |
* | | | | | |
* I ~~> | Sink[I, M1] | ~~~(coupled)~~~ | Source[O, M2] | ~~> O
* | | | | | |
* | +-------------+ +---------------+ |
* +-----------------------------------------------------+
* }}}
*
* E.g. if the emitted [[Flow]] gets a cancellation, the [[Source]] of course is cancelled,
* however the Sink will also be completed. The table on [[Flow.fromSinkAndSourceCoupled]]
* illustrates the effects in detail.
*
* The `combine` function is used to compose the materialized values of the `sink` and `source`
* into the materialized value of the resulting [[Flow]].
*/
def fromSinkAndSourceCoupledMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(combine: (M1, M2) ⇒ M): Flow[I, O, M] =
// format: OFF
Flow.fromGraph(GraphDSL.create(sink, source)(combine) { implicit b => (i, o) =>
import GraphDSL.Implicits._
val bidi = b.add(new CoupledTerminationBidi[I, O])
/* bidi.in1 ~> */ bidi.out1 ~> i; o ~> bidi.in2 /* ~> bidi.out2 */
FlowShape(bidi.in1, bidi.out2)
})
// format: ON
}
object RunnableGraph {
/**
* A graph with a closed shape is logically a runnable graph, this method makes
* it so also in type.
*/
def fromGraph[Mat](g: Graph[ClosedShape, Mat]): RunnableGraph[Mat] =
g match {
case r: RunnableGraph[Mat] ⇒ r
case other ⇒ RunnableGraph(other.traversalBuilder)
}
}
/**
* Flow with attached input and output, can be executed.
*/
final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] {
override def shape = ClosedShape
/**
* Transform only the materialized value of this RunnableGraph, leaving all other properties as they were.
*/
def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): RunnableGraph[Mat2] =
copy(traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]))
/**
* Run this flow and return the materialized instance from the flow.
*/
def run()(implicit materializer: Materializer): Mat = materializer.materialize(this)
override def addAttributes(attr: Attributes): RunnableGraph[Mat] =
withAttributes(traversalBuilder.attributes and attr)
override def withAttributes(attr: Attributes): RunnableGraph[Mat] =
new RunnableGraph(traversalBuilder.setAttributes(attr))
override def named(name: String): RunnableGraph[Mat] =
addAttributes(Attributes.name(name))
/**
* Note that an async boundary around a runnable graph does not make sense
*/
override def async: RunnableGraph[Mat] =
super.async.asInstanceOf[RunnableGraph[Mat]]
/**
* Note that an async boundary around a runnable graph does not make sense
*/
override def async(dispatcher: String): RunnableGraph[Mat] =
super.async(dispatcher).asInstanceOf[RunnableGraph[Mat]]
/**
* Note that an async boundary around a runnable graph does not make sense
*/
override def async(dispatcher: String, inputBufferSize: Int): RunnableGraph[Mat] =
super.async(dispatcher, inputBufferSize).asInstanceOf[RunnableGraph[Mat]]
}
/**
* Scala API: Operations offered by Sources and Flows with a free output side: the DSL flows left-to-right only.
*
* INTERNAL API: this trait will be changed in binary-incompatible ways for classes that are derived from it!
* Do not implement this interface outside the Akka code base!
*
* Binary compatibility is only maintained for callers of this trait’s interface.
*/
@DoNotInherit
trait FlowOps[+Out, +Mat] {
import akka.stream.impl.Stages._
import GraphDSL.Implicits._
type Repr[+O] <: FlowOps[O, Mat] {
type Repr[+OO] = FlowOps.this.Repr[OO]
type Closed = FlowOps.this.Closed
}
// result of closing a Source is RunnableGraph, closing a Flow is Sink
type Closed
/**
* Transform this [[Flow]] by appending the given processing steps.
* {{{
* +---------------------------------+
* | Resulting Flow[In, T, Mat] |
* | |
* | +------+ +------+ |
* | | | | | |
* In ~~> | this | ~~Out~~> | flow | ~~> T
* | | Mat| | M| |
* | +------+ +------+ |
* +---------------------------------+
* }}}
* The materialized value of the combined [[Flow]] will be the materialized
* value of the current flow (ignoring the other Flow’s value), use
* [[Flow#viaMat viaMat]] if a different strategy is needed.
*
* See also [[FlowOpsMat.viaMat]] when access to materialized values of the parameter is needed.
*/
def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T]
/**
* Recover allows to send last element on failure and gracefully complete the stream
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recover` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T] = via(Recover(pf))
/**
* RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new
* Source may be materialized.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWith` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
@deprecated("Use recoverWithRetries instead.", "2.4.4")
def recoverWith[T >: Out](pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] =
via(new RecoverWith(-1, pf))
/**
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
* a failure has been recovered up to `attempts` number of times so that each time there is a failure
* it is fed into the `pf` and a new Source may be materialized. Note that if you pass in 0, this won't
* attempt to recover at all.
*
* A negative `attempts` number is interpreted as "infinite", which results in the exact same behavior as `recoverWith`.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Throwing an exception inside `recoverWithRetries` _will_ be logged on ERROR level automatically.
*
* '''Emits when''' element is available from the upstream or upstream is failed and element is available
* from alternative Source
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
* @param attempts Maximum number of retries or -1 to retry indefinitely
* @param pf Receives the failure cause and returns the new Source to be materialized if any
*
*/
def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T] =
via(new RecoverWith(attempts, pf))
/**
* While similar to [[recover]] this stage can be used to transform an error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
* would log the `t2` error.
*
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
* This stage can recover the failure signal, but not the skipped elements, which will be dropped.
*
* Similarily to [[recover]] throwing an exception inside `mapError` _will_ be logged.
*
* '''Emits when''' element is available from the upstream or upstream is failed and pf returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or upstream failed with exception pf can handle
*
* '''Cancels when''' downstream cancels
*
*/
def mapError(pf: PartialFunction[Throwable, Throwable]): Repr[Out] = via(MapError(pf))
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*
*/
def map[T](f: Out ⇒ T): Repr[T] = via(Map(f))
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream.
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements have been emitted
*
* '''Cancels when''' downstream cancels
*
*/
def mapConcat[T](f: Out ⇒ immutable.Iterable[T]): Repr[T] = statefulMapConcat(() ⇒ f)
/**
* Transform each input element into an `Iterable` of output elements that is
* then flattened into the output stream. The transformation is meant to be stateful,
* which is enabled by creating the transformation function anew for every materialization —
* the returned function will typically close over mutable objects to store state between
* invocations. For the stateless variant see [[FlowOps.mapConcat]].
*
* The returned `Iterable` MUST NOT contain `null` values,
* as they are illegal as stream elements - according to the Reactive Streams specification.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the mapping function returns an element or there are still remaining elements
* from the previously calculated collection
*
* '''Backpressures when''' downstream backpressures or there are still remaining elements from the
* previously calculated collection
*
* '''Completes when''' upstream completes and all remaining elements has been emitted
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.mapConcat]]
*/
def statefulMapConcat[T](f: () ⇒ Out ⇒ immutable.Iterable[T]): Repr[T] =
via(new StatefulMapConcat(f))
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. The function returns a `Future` and the
* value of that future will be emitted downstream. The number of Futures
* that shall run in parallel is given as the first argument to ``mapAsync``.
* These Futures may complete in any order, but the elements that
* are emitted downstream are in the same order as received from upstream.
*
* If the function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision.Stop]]
* the stream will be completed with failure.
*
* If the function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision.Resume]] or
* [[akka.stream.Supervision.Restart]] the element is dropped and the stream continues.
*
* The function `f` is always invoked on the elements in the order they arrive.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the Future returned by the provided function finishes for the next element in sequence
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures or the first future is not completed
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T]): Repr[T] = via(MapAsync(parallelism, f))
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. The function returns a `Future` and the
* value of that future will be emitted downstream. The number of Futures
* that shall run in parallel is given as the first argument to ``mapAsyncUnordered``.
* Each processed element will be emitted downstream as soon as it is ready, i.e. it is possible
* that the elements are not emitted downstream in the same order as received from upstream.
*
* If the function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision.Stop]]
* the stream will be completed with failure.
*
* If the function `f` throws an exception or if the `Future` is completed
* with failure and the supervision decision is [[akka.stream.Supervision.Resume]] or
* [[akka.stream.Supervision.Restart]] the element is dropped and the stream continues.
*
* The function `f` is always invoked on the elements in the order they arrive (even though the result of the futures
* returned by `f` might be emitted in a different order).
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' any of the Futures returned by the provided function complete
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](parallelism: Int)(f: Out ⇒ Future[T]): Repr[T] = via(MapAsyncUnordered(parallelism, f))
/**
* Only pass on those elements that satisfy the given predicate.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the given predicate returns true for the element
*
* '''Backpressures when''' the given predicate returns true for the element and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def filter(p: Out ⇒ Boolean): Repr[Out] = via(Filter(p))
/**
* Only pass on those elements that NOT satisfy the given predicate.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the given predicate returns false for the element
*
* '''Backpressures when''' the given predicate returns false for the element and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def filterNot(p: Out ⇒ Boolean): Repr[Out] =
via(Flow[Out].filter(!p(_)).withAttributes(DefaultAttributes.filterNot))
/**
* Terminate processing (and cancel the upstream publisher) after predicate
* returns false for the first time,
* Due to input buffering some elements may have been requested from upstream publishers
* that will then not be processed downstream of this step.
*
* The stream will be completed without producing any elements if predicate is false for
* the first stream element.
*
* '''Emits when''' the predicate is true
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
*
* '''Cancels when''' predicate returned false or downstream cancels
*
* See also [[FlowOps.limit]], [[FlowOps.limitWeighted]]
*/
def takeWhile(p: Out ⇒ Boolean): Repr[Out] = takeWhile(p, false)
/**
* Terminate processing (and cancel the upstream publisher) after predicate
* returns false for the first time, including the first failed element iff inclusive is true
* Due to input buffering some elements may have been requested from upstream publishers
* that will then not be processed downstream of this step.
*
* The stream will be completed without producing any elements if predicate is false for
* the first stream element.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the predicate is true
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' predicate returned false (or 1 after predicate returns false if `inclusive` or upstream completes
*
* '''Cancels when''' predicate returned false or downstream cancels
*
* See also [[FlowOps.limit]], [[FlowOps.limitWeighted]]
*/
def takeWhile(p: Out ⇒ Boolean, inclusive: Boolean): Repr[Out] = via(TakeWhile(p, inclusive))
/**
* Discard elements at the beginning of the stream while predicate is true.
* All elements will be taken after predicate returns false first time.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' predicate returned false and for all following stream elements
*
* '''Backpressures when''' predicate returned false and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def dropWhile(p: Out ⇒ Boolean): Repr[Out] = via(DropWhile(p))
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step.
* Non-matching elements are filtered out.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the provided partial function is defined for the element
*
* '''Backpressures when''' the partial function is defined for the element and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def collect[T](pf: PartialFunction[Out, T]): Repr[T] = via(Collect(pf))
/**
* Chunk up this stream into groups of the given size, with the last group
* possibly smaller than requested due to end-of-stream.
*
* `n` must be positive, otherwise IllegalArgumentException is thrown.
*
* '''Emits when''' the specified number of elements have been accumulated or upstream completed
*
* '''Backpressures when''' a group has been assembled and downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def grouped(n: Int): Repr[immutable.Seq[Out]] = via(Grouped(n))
/**
* Ensure stream boundedness by limiting the number of elements from upstream.
* If the number of incoming elements exceeds max, it will signal
* upstream failure `StreamLimitException` downstream.
*
* Due to input buffering some elements may have been
* requested from upstream publishers that will then not be processed downstream
* of this step.
*
* '''Emits when''' upstream emits and the number of emitted elements has not reached max
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and the number of emitted elements has not reached max
*
* '''Errors when''' the total number of incoming element exceeds max
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.take]], [[FlowOps.takeWithin]], [[FlowOps.takeWhile]]
*/
def limit(max: Long): Repr[Out] = limitWeighted(max)(_ ⇒ 1)
/**
* Ensure stream boundedness by evaluating the cost of incoming elements
* using a cost function. Exactly how many elements will be allowed to travel downstream depends on the
* evaluated cost of each element. If the accumulated cost exceeds max, it will signal
* upstream failure `StreamLimitException` downstream.
*
* Due to input buffering some elements may have been
* requested from upstream publishers that will then not be processed downstream
* of this step.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' upstream emits and the accumulated cost has not reached max
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes and the number of emitted elements has not reached max
*
* '''Errors when''' when the accumulated cost exceeds max
*
* '''Cancels when''' downstream cancels
*
* See also [[FlowOps.take]], [[FlowOps.takeWithin]], [[FlowOps.takeWhile]]
*/
def limitWeighted[T](max: Long)(costFn: Out ⇒ Long): Repr[Out] = via(LimitWeighted(max, costFn))
/**
* Apply a sliding window over the stream and return the windows as groups of elements, with the last group
* possibly smaller than requested due to end-of-stream.
*
* `n` must be positive, otherwise IllegalArgumentException is thrown.
* `step` must be positive, otherwise IllegalArgumentException is thrown.
*
* '''Emits when''' enough elements have been collected within the window or upstream completed
*
* '''Backpressures when''' a window has been assembled and downstream backpressures