-
Notifications
You must be signed in to change notification settings - Fork 124
/
SwitchOnCancel.scala
74 lines (61 loc) · 1.99 KB
/
SwitchOnCancel.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/*
* Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.grpc.internal
import akka.stream.Attributes
import akka.stream.FanOutShape2
import akka.stream.Inlet
import akka.stream.Outlet
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import akka.util.OptionVal
/**
* Identity stage that feeds all incoming events to output 1 until it cancels, then switches over to output 2, after
* completing the materialized value future.
*
* INTERNAL API
* @tparam T
*/
final private[akka] class SwitchOnCancel[T] extends GraphStage[FanOutShape2[T, T, (Throwable, T)]] {
val in = Inlet[T]("in")
val mainOut = Outlet[T]("mainOut")
val failoverOut = Outlet[(Throwable, T)]("failoverOut")
override def shape: FanOutShape2[T, T, (Throwable, T)] = new FanOutShape2(in, mainOut, failoverOut)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
var failedOver: OptionVal[Throwable] = OptionVal.None
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
failedOver match {
case OptionVal.Some(error) => push(failoverOut, (error, elem))
case _ => push(mainOut, elem)
}
}
})
setHandler(
mainOut,
new OutHandler {
override def onPull(): Unit =
pull(in)
override def onDownstreamFinish(cause: Throwable): Unit = {
// on downstream cancel or failure switch to second out
failedOver = OptionVal.Some(cause)
if (isAvailable(failoverOut) && !hasBeenPulled(in)) {
pull(in)
}
}
})
setHandler(
failoverOut,
new OutHandler {
override def onPull(): Unit = {
// may have been pulled and then failed over
if (!hasBeenPulled(in)) pull(in)
}
})
}
}