forked from monix/monix-sample
-
Notifications
You must be signed in to change notification settings - Fork 0
/
BackPressuredWebSocketClient.scala
139 lines (126 loc) · 5.26 KB
/
BackPressuredWebSocketClient.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package client
import monix.execution.{Ack, Cancelable}
import monix.reactive.observers.Subscriber
import monix.reactive.{Observable, Observer}
import org.reactivestreams.Subscription
import org.scalajs.dom.raw.MessageEvent
import org.scalajs.dom.{CloseEvent, ErrorEvent, Event, WebSocket}
import scala.concurrent._
import scala.concurrent.duration._
import scala.util.control.NonFatal
/** Creates a connection to a server endpoint that supports the
* reactive-streams back-pressure protocol.
*
* This means that the client will keep sending a number to
* the server, signaling that it is ready for consuming more
* events and so the server is allowed to send a maximum number
* of events equal to the request, or a completion event.
*
* Here we really are making use of the Reactive Streams protocol
* (specified at http://www.reactive-streams.org/), because
* sending a `request(n)` over the wire will be more efficient
* than sending acknowledgements for each processed event.
*
* By doing this back-pressure, it means that the buffers are fixed
* and that the server is able to detect slow clients and do something
* about it, like the server can start dropping messages on the floor.
*
* This example is maybe a little too low-level, implementing
* an `Observable` from scratch, with no helpers. This shouldn't
* be done normally, as you have to know what you're doing.
*/
final class BackPressuredWebSocketClient private (url: String)
extends Observable[String] { self =>
/** An `Observable` that upon subscription will open a
* back-pressured web-socket connection.
*/
private val channel: Observable[String] =
// You can see the "unsafe" word here, and it's no joke. It means that
// you shouldn't do this unless you know what you're doing.
Observable.unsafeCreate { subscriber =>
// Reusing this in 2 places
def closeConnection(webSocket: WebSocket): Unit = {
Utils.log(s"Closing connection to $url")
if (webSocket != null && webSocket.readyState <= 1)
try webSocket.close() catch {
case _: Throwable => ()
}
}
// Converting a Monix Subscriber to a Reactive Subscriber.
// Will make sure to consume the stream by processing and
// requesting events in batches, applying back-pressure and
// respecting its contract.
val downstream = Subscriber.toReactiveSubscriber(subscriber)
try {
// Opening a WebSocket connection using Javascript's API
Utils.log(s"Connecting to $url")
val webSocket = new WebSocket(url)
// Registering on WebSocket's callbacks
webSocket.onopen = (event: Event) => {
// When we've got that connection, we can send the `onSubscribe`
// that basically describes what happens on `request(n)` and
// on `closeConnection` (according to the Reactive Streams protocol)
downstream.onSubscribe(new Subscription {
def cancel(): Unit =
closeConnection(webSocket)
def request(n: Long): Unit = {
webSocket.send(n.toString)
}
})
}
webSocket.onerror = (event: ErrorEvent) => {
// If error, signal it and it will be the last message
downstream.onError(BackPressuredWebSocketClient.Exception(event.message))
}
webSocket.onclose = (event: CloseEvent) => {
// If close, signal it and it will be the last message
downstream.onComplete()
}
webSocket.onmessage = (event: MessageEvent) => {
// Signal next event as usual
downstream.onNext(event.data.asInstanceOf[String])
}
Cancelable(() => closeConnection(webSocket))
} catch {
case NonFatal(ex) =>
// Normally this could be a race condition, meaning that we aren't allowed to
// send `onError` twice and at this point we have no way of knowing if `onError`
// already happened, but this right here is fine, for one because this is Javascript,
// but also because the `downstream` is protected by a concurrent buffer.
downstream.onError(ex)
Cancelable.empty
}
}
/** Subscribers to the WebSocket with a retry logic that keeps
* it connected or re-connecting no matter what (until canceled).
*
* This is actually equivalent to usage of `Observable.unsafeCreate`,
* same as above.
*/
override def unsafeSubscribeFn(subscriber: Subscriber[String]): Cancelable = {
import subscriber.scheduler
channel.unsafeSubscribeFn(new Observer[String] {
def onNext(elem: String): Future[Ack] =
subscriber.onNext(elem)
def onError(ex: Throwable): Unit = {
scheduler.reportFailure(ex)
// Retry connection in a couple of secs
self
.delaySubscription(3.seconds)
.unsafeSubscribeFn(subscriber)
}
def onComplete(): Unit = {
// Retry connection in a couple of secs
self
.delaySubscription(3.seconds)
.unsafeSubscribeFn(subscriber)
}
})
}
}
object BackPressuredWebSocketClient {
def apply(url: String): BackPressuredWebSocketClient = {
new BackPressuredWebSocketClient(url)
}
case class Exception(msg: String) extends RuntimeException(msg)
}