-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Sink.scala
545 lines (484 loc) · 25.3 KB
/
Sink.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
/**
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.scaladsl
import akka.{ Done, NotUsed }
import akka.dispatch.ExecutionContexts
import akka.actor.{ ActorRef, Props, Status }
import akka.annotation.InternalApi
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl._
import akka.stream.impl.fusing.GraphStages
import akka.stream.stage._
import akka.stream.{ javadsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec
import scala.collection.generic.CanBuildFrom
import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
/**
* A `Sink` is a set of stream processing steps that has one open input.
* Can be used as a `Subscriber`
*/
final class Sink[-In, +Mat](
override val traversalBuilder: LinearTraversalBuilder,
override val shape: SinkShape[In])
extends Graph[SinkShape[In], Mat] {
// TODO: Debug string
override def toString: String = s"Sink($shape)"
/**
* Transform this Sink by applying a function to each *incoming* upstream element before
* it is passed to the [[Sink]]
*
* '''Backpressures when''' original [[Sink]] backpressures
*
* '''Cancels when''' original [[Sink]] cancels
*/
def contramap[In2](f: In2 ⇒ In): Sink[In2, Mat] = Flow.fromFunction(f).toMat(this)(Keep.right)
/**
* Connect this `Sink` to a `Source` and run it. The returned value is the materialized value
* of the `Source`, e.g. the `Subscriber` of a [[Source#subscriber]].
*/
def runWith[Mat2](source: Graph[SourceShape[In], Mat2])(implicit materializer: Materializer): Mat2 =
Source.fromGraph(source).to(this).run()
/**
* Transform only the materialized value of this Sink, leaving all other properties as they were.
*/
def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): Sink[In, Mat2] =
new Sink(
traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]),
shape)
/**
* Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink
* that can be consume elements 'into' the pre-materialized one.
*
* Useful for when you need a materialized value of a Sink when handing it out to someone to materialize it for you.
*/
def preMaterialize()(implicit materializer: Materializer): (Mat, Sink[In, NotUsed]) = {
val (sub, mat) = Source.asSubscriber.toMat(this)(Keep.both).run()
(mat, Sink.fromSubscriber(sub))
}
/**
* Replace the attributes of this [[Sink]] with the given ones. If this Sink is a composite
* of multiple graphs, new attributes on the composite will be less specific than attributes
* set directly on the individual graphs of the composite.
*/
override def withAttributes(attr: Attributes): Sink[In, Mat] =
new Sink(
traversalBuilder.setAttributes(attr),
shape)
/**
* Add the given attributes to this [[Sink]]. If the specific attribute was already present
* on this graph this means the added attribute will be more specific than the existing one.
* If this Sink is a composite of multiple graphs, new attributes on the composite will be
* less specific than attributes set directly on the individual graphs of the composite.
*/
override def addAttributes(attr: Attributes): Sink[In, Mat] =
withAttributes(traversalBuilder.attributes and attr)
/**
* Add a ``name`` attribute to this Sink.
*/
override def named(name: String): Sink[In, Mat] = addAttributes(Attributes.name(name))
/**
* Put an asynchronous boundary around this `Source`
*/
override def async: Sink[In, Mat] = super.async.asInstanceOf[Sink[In, Mat]]
/**
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
*/
override def async(dispatcher: String): Sink[In, Mat] =
super.async(dispatcher).asInstanceOf[Sink[In, Mat]]
/**
* Put an asynchronous boundary around this `Graph`
*
* @param dispatcher Run the graph on this dispatcher
* @param inputBufferSize Set the input buffer to this size for the graph
*/
override def async(dispatcher: String, inputBufferSize: Int): Sink[In, Mat] =
super.async(dispatcher, inputBufferSize).asInstanceOf[Sink[In, Mat]]
/**
* Converts this Scala DSL element to it's Java DSL counterpart.
*/
def asJava[JIn <: In, JMat >: Mat]: javadsl.Sink[JIn, JMat] = new javadsl.Sink(this)
}
object Sink {
/** INTERNAL API */
def shape[T](name: String): SinkShape[T] = SinkShape(Inlet(name + ".in"))
/**
* A graph with the shape of a sink logically is a sink, this method makes
* it so also in type.
*/
def fromGraph[T, M](g: Graph[SinkShape[T], M]): Sink[T, M] =
g match {
case s: Sink[T, M] ⇒ s
case s: javadsl.Sink[T, M] ⇒ s.asScala
case g: GraphStageWithMaterializedValue[SinkShape[T], M] ⇒
// move these from the stage itself to make the returned source
// behave as it is the stage with regards to attributes
val attrs = g.traversalBuilder.attributes
val noAttrStage = g.withAttributes(Attributes.none)
new Sink(
LinearTraversalBuilder.fromBuilder(noAttrStage.traversalBuilder, noAttrStage.shape, Keep.right),
noAttrStage.shape
).withAttributes(attrs)
case other ⇒ new Sink(
LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right),
other.shape)
}
/**
* Helper to create [[Sink]] from `Subscriber`.
*/
def fromSubscriber[T](subscriber: Subscriber[T]): Sink[T, NotUsed] =
fromGraph(new SubscriberSink(subscriber, DefaultAttributes.subscriberSink, shape("SubscriberSink")))
/**
* A `Sink` that immediately cancels its upstream after materialization.
*/
def cancelled[T]: Sink[T, NotUsed] =
fromGraph[Any, NotUsed](new CancelSink(DefaultAttributes.cancelledSink, shape("CancelledSink")))
/**
* A `Sink` that materializes into a `Future` of the first value received.
* If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]].
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
*
* See also [[headOption]].
*/
def head[T]: Sink[T, Future[T]] =
Sink.fromGraph(new HeadOptionStage[T]).withAttributes(DefaultAttributes.headSink)
.mapMaterializedValue(e ⇒ e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.sameThreadExecutionContext))
/**
* A `Sink` that materializes into a `Future` of the optional first value received.
* If the stream completes before signaling at least a single element, the value of the Future will be [[None]].
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
*
* See also [[head]].
*/
def headOption[T]: Sink[T, Future[Option[T]]] =
Sink.fromGraph(new HeadOptionStage[T]).withAttributes(DefaultAttributes.headOptionSink)
/**
* A `Sink` that materializes into a `Future` of the last value received.
* If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]].
* If the stream signals an error, the Future will be failed with the stream's exception.
*
* See also [[lastOption]], [[takeLast]].
*/
def last[T]: Sink[T, Future[T]] = {
Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastSink)
.mapMaterializedValue { e ⇒
e.map(_.headOption.getOrElse(throw new NoSuchElementException("last of empty stream")))(ExecutionContexts.sameThreadExecutionContext)
}
}
/**
* A `Sink` that materializes into a `Future` of the optional last value received.
* If the stream completes before signaling at least a single element, the value of the Future will be [[None]].
* If the stream signals an error, the Future will be failed with the stream's exception.
*
* See also [[last]], [[takeLast]].
*/
def lastOption[T]: Sink[T, Future[Option[T]]] = {
Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastOptionSink)
.mapMaterializedValue { e ⇒
e.map(_.headOption)(ExecutionContexts.sameThreadExecutionContext)
}
}
/**
* A `Sink` that materializes into a a `Future` of `immutable.Seq[T]` containing the last `n` collected elements.
*
* If the stream completes before signaling at least n elements, the `Future` will complete with all elements seen so far.
* If the stream never completes, the `Future` will never complete.
* If there is a failure signaled in the stream the `Future` will be completed with failure.
*/
def takeLast[T](n: Int): Sink[T, Future[immutable.Seq[T]]] =
Sink.fromGraph(new TakeLastStage[T](n)).withAttributes(DefaultAttributes.takeLastSink)
/**
* A `Sink` that keeps on collecting incoming elements until upstream terminates.
* As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants)
* may be used to ensure boundedness.
* Materializes into a `Future` of `Seq[T]` containing all the collected elements.
* `Seq` is limited to `Int.MaxValue` elements, this Sink will cancel the stream
* after having received that many elements.
*
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def seq[T]: Sink[T, Future[immutable.Seq[T]]] = Sink.fromGraph(new SeqStage[T, Vector[T]])
/**
* A `Sink` that keeps on collecting incoming elements until upstream terminates.
* As upstream may be unbounded, `Flow[T].take` or the stricter `Flow[T].limit` (and their variants)
* may be used to ensure boundedness.
* Materializes into a `Future` of `That[T]` containing all the collected elements.
* `That[T]` is limited to the limitations of the CanBuildFrom associated with it. For example, `Seq` is limited to
* `Int.MaxValue` elements. See [The Architecture of Scala Collections](https://docs.scala-lang.org/overviews/core/architecture-of-scala-collections.html) for more info.
* This Sink will cancel the stream after having received that many elements.
*
* See also [[Flow.limit]], [[Flow.limitWeighted]], [[Flow.take]], [[Flow.takeWithin]], [[Flow.takeWhile]]
*/
def collection[T, That](implicit cbf: CanBuildFrom[Nothing, T, That with immutable.Traversable[_]]): Sink[T, Future[That]] =
Sink.fromGraph(new SeqStage[T, That])
/**
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
*
* If `fanout` is `true`, the materialized `Publisher` will support multiple `Subscriber`s and
* the size of the `inputBuffer` configured for this operator becomes the maximum number of elements that
* the fastest [[org.reactivestreams.Subscriber]] can be ahead of the slowest one before slowing
* the processing down due to back pressure.
*
* If `fanout` is `false` then the materialized `Publisher` will only support a single `Subscriber` and
* reject any additional `Subscriber`s.
*/
def asPublisher[T](fanout: Boolean): Sink[T, Publisher[T]] =
fromGraph(
if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink"))
else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink")))
/**
* A `Sink` that will consume the stream and discard the elements.
*/
def ignore: Sink[Any, Future[Done]] = fromGraph(GraphStages.IgnoreSink)
/**
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
* into a [[scala.concurrent.Future]] which will be completed with `Success` when reaching the
* normal end of the stream, or completed with `Failure` if there is a failure signaled in
* the stream.
*/
def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]] =
Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink")
/**
* Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`.
*/
def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int ⇒ Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed] =
Sink.fromGraph(GraphDSL.create() { implicit b ⇒
import GraphDSL.Implicits._
val d = b.add(strategy(rest.size + 2))
d.out(0) ~> first
d.out(1) ~> second
@tailrec def combineRest(idx: Int, i: Iterator[Sink[U, _]]): SinkShape[T] =
if (i.hasNext) {
d.out(idx) ~> i.next()
combineRest(idx + 1, i)
} else new SinkShape(d.in)
combineRest(2, rest.iterator)
})
/**
* A `Sink` that will invoke the given function to each of the elements
* as they pass in. The sink is materialized into a [[scala.concurrent.Future]]
*
* If `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Stop]] the `Future` will be completed with failure.
*
* If `f` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Resume]] or [[akka.stream.Supervision.Restart]] the
* element is dropped and the stream continues.
*
* See also [[Flow.mapAsyncUnordered]]
*/
def foreachParallel[T](parallelism: Int)(f: T ⇒ Unit)(implicit ec: ExecutionContext): Sink[T, Future[Done]] =
Flow[T].mapAsyncUnordered(parallelism)(t ⇒ Future(f(t))).toMat(Sink.ignore)(Keep.right)
/**
* A `Sink` that will invoke the given function for every received element, giving it its previous
* output (or the given `zero` value) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
*
* @see [[#foldAsync]]
*/
def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
/**
* A `Sink` that will invoke the given asynchronous function for every received element, giving it its previous
* output (or the given `zero` value) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
*
* @see [[#fold]]
*/
def foldAsync[U, T](zero: U)(f: (U, T) ⇒ Future[U]): Sink[T, Future[U]] = Flow[T].foldAsync(zero)(f).toMat(Sink.head)(Keep.right).named("foldAsyncSink")
/**
* A `Sink` that will invoke the given function for every received element, giving it its previous
* output (from the second element) and the element as input.
* The returned [[scala.concurrent.Future]] will be completed with value of the final
* function evaluation when the input stream ends, or completed with `Failure`
* if there is a failure signaled in the stream.
*
* If the stream is empty (i.e. completes before signalling any elements),
* the reduce operator will fail its downstream with a [[NoSuchElementException]],
* which is semantically in-line with that Scala's standard library collections
* do in such situations.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*/
def reduce[T](f: (T, T) ⇒ T): Sink[T, Future[T]] =
Flow[T].reduce(f).toMat(Sink.head)(Keep.right).named("reduceSink")
/**
* A `Sink` that when the flow is completed, either through a failure or normal
* completion, apply the provided function with [[scala.util.Success]]
* or [[scala.util.Failure]].
*/
def onComplete[T](callback: Try[Done] ⇒ Unit): Sink[T, NotUsed] = {
def newOnCompleteStage(): GraphStage[FlowShape[T, NotUsed]] = {
new GraphStage[FlowShape[T, NotUsed]] {
val in = Inlet[T]("in")
val out = Outlet[NotUsed]("out")
override val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
var completionSignalled = false
override def onPush(): Unit = pull(in)
override def onPull(): Unit = pull(in)
override def onUpstreamFailure(cause: Throwable): Unit = {
callback(Failure(cause))
completionSignalled = true
failStage(cause)
}
override def onUpstreamFinish(): Unit = {
callback(Success(Done))
completionSignalled = true
completeStage()
}
override def postStop(): Unit = {
if (!completionSignalled) callback(Failure(new AbruptStageTerminationException(this)))
}
setHandlers(in, out, this)
}
}
}
Flow[T].via(newOnCompleteStage()).to(Sink.ignore).named("onCompleteSink")
}
/**
* INTERNAL API
*
* Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure the `onFailureMessage` will be invoked
* and its result will be sent to the destination actor.
*
* It will request at most `maxInputBufferSize` number of elements from
* upstream, but there is no back-pressure signal from the destination actor,
* i.e. if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow. For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting operator in front of this `Sink`.
*/
@InternalApi private[akka] def actorRef[T](ref: ActorRef, onCompleteMessage: Any, onFailureMessage: Throwable ⇒ Any): Sink[T, NotUsed] =
fromGraph(new ActorRefSink(ref, onCompleteMessage, onFailureMessage,
DefaultAttributes.actorRefSink, shape("ActorRefSink")))
/**
* Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure a [[akka.actor.Status.Failure]]
* message will be sent to the destination actor.
*
* It will request at most `maxInputBufferSize` number of elements from
* upstream, but there is no back-pressure signal from the destination actor,
* i.e. if the actor is not consuming the messages fast enough the mailbox
* of the actor will grow. For potentially slow consumer actors it is recommended
* to use a bounded mailbox with zero `mailbox-push-timeout-time` or use a rate
* limiting operator in front of this `Sink`.
*/
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed] =
fromGraph(new ActorRefSink(ref, onCompleteMessage, t ⇒ Status.Failure(t),
DefaultAttributes.actorRefSink, shape("ActorRefSink")))
/**
* INTERNAL API
*
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is created by calling `onInitMessage` with an `ActorRef` of the actor that
* expects acknowledgements. Then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* Every message that is sent to the actor is first transformed using `messageAdapter`.
* This can be used to capture the ActorRef of the actor that expects acknowledgments as
* well as transforming messages from the stream to the ones that actor under `ref` handles.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*/
@InternalApi private[akka] def actorRefWithAck[T](ref: ActorRef, messageAdapter: ActorRef ⇒ T ⇒ Any,
onInitMessage: ActorRef ⇒ Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) ⇒ Any): Sink[T, NotUsed] =
Sink.fromGraph(new ActorRefBackpressureSinkStage(ref, messageAdapter, onInitMessage, ackMessage, onCompleteMessage, onFailureMessage))
/**
* Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
* First element is always `onInitMessage`, then stream is waiting for acknowledgement message
* `ackMessage` from the given actor which means that it is ready to process
* elements. It also requires `ackMessage` message after each stream element
* to make backpressure work.
*
* If the target actor terminates the stream will be canceled.
* When the stream is completed successfully the given `onCompleteMessage`
* will be sent to the destination actor.
* When the stream is completed with failure - result of `onFailureMessage(throwable)`
* function will be sent to the destination actor.
*
*/
def actorRefWithAck[T](ref: ActorRef, onInitMessage: Any, ackMessage: Any, onCompleteMessage: Any,
onFailureMessage: (Throwable) ⇒ Any = Status.Failure): Sink[T, NotUsed] =
actorRefWithAck(ref, _ ⇒ identity, _ ⇒ onInitMessage, ackMessage, onCompleteMessage, onFailureMessage)
/**
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
* be [[akka.stream.actor.ActorSubscriber]].
*
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
def actorSubscriber[T](props: Props): Sink[T, ActorRef] = {
require(classOf[ActorSubscriber].isAssignableFrom(props.actorClass()), "Actor must be ActorSubscriber")
fromGraph(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink")))
}
/**
* Creates a `Sink` that is materialized as an [[akka.stream.scaladsl.SinkQueue]].
* [[akka.stream.scaladsl.SinkQueue.pull]] method is pulling element from the stream and returns ``Future[Option[T]]``.
* `Future` completes when element is available.
*
* Before calling pull method second time you need to wait until previous Future completes.
* Pull returns Failed future with ''IllegalStateException'' if previous future has not yet completed.
*
* `Sink` will request at most number of elements equal to size of `inputBuffer` from
* upstream and then stop back pressure. You can configure size of input
* buffer by using [[Sink.withAttributes]] method.
*
* For stream completion you need to pull all elements from [[akka.stream.scaladsl.SinkQueue]] including last None
* as completion marker
*
* See also [[akka.stream.scaladsl.SinkQueueWithCancel]]
*/
def queue[T](): Sink[T, SinkQueueWithCancel[T]] =
Sink.fromGraph(new QueueSink())
/**
* Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements,
* because of completion or error.
*
* If upstream completes before an element was received then the `Future` is completed with the value created by fallback.
* If upstream fails before an element was received, `sinkFactory` throws an exception, or materialization of the internal
* sink fails then the `Future` is completed with the exception.
* Otherwise the `Future` is completed with the materialized value of the internal sink.
*/
@Deprecated
@deprecated("Use lazyInitAsync instead. (lazyInitAsync no more needs a fallback function and the materialized value more clearly indicates if the internal sink was materialized or not.)", "2.5.11")
def lazyInit[T, M](sinkFactory: T ⇒ Future[Sink[T, M]], fallback: () ⇒ M): Sink[T, Future[M]] =
Sink.fromGraph(new LazySink[T, M](sinkFactory)).mapMaterializedValue(_.map(_.getOrElse(fallback()))(ExecutionContexts.sameThreadExecutionContext))
/**
* Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements,
* because of completion or error.
*
* If upstream completes before an element was received then the `Future` is completed with `None`.
* If upstream fails before an element was received, `sinkFactory` throws an exception, or materialization of the internal
* sink fails then the `Future` is completed with the exception.
* Otherwise the `Future` is completed with the materialized value of the internal sink.
*/
def lazyInitAsync[T, M](sinkFactory: () ⇒ Future[Sink[T, M]]): Sink[T, Future[Option[M]]] =
Sink.fromGraph(new LazySink[T, M](_ ⇒ sinkFactory()))
}