-
Notifications
You must be signed in to change notification settings - Fork 787
/
WSClient.scala
264 lines (222 loc) · 10.1 KB
/
WSClient.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
/*
* Copyright 2014 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.http4s.client.websocket
import cats._
import cats.data.Chain
import cats.data.OptionT
import cats.effect._
import cats.effect.kernel.Deferred
import cats.effect.kernel.DeferredSource
import cats.implicits._
import fs2.Pipe
import fs2.Stream
import org.http4s.Headers
import org.http4s.Method
import org.http4s.Uri
import org.http4s.internal.reduceComparisons
import scodec.bits.ByteVector
/** A websocket request.
*
* @param uri
* The URI.
* @param headers
* The headers to send. Put your `Sec-Websocket-Protocol` headers here if needed. Some websocket
* clients reject other WS-specific headers.
* @param method
* The method of the intial HTTP request. Ignored by some clients.
*/
sealed abstract class WSRequest {
def uri: Uri
def headers: Headers
def method: Method
def withUri(uri: Uri): WSRequest
def withHeaders(headers: Headers): WSRequest
def withMethod(method: Method): WSRequest
}
object WSRequest {
def apply(uri: Uri): WSRequest = apply(uri, Headers.empty, Method.GET)
def apply(uri: Uri, headers: Headers, method: Method): WSRequest =
WSRequestImpl(uri, headers, method)
private[this] final case class WSRequestImpl(
override val uri: Uri,
override val headers: Headers,
override val method: Method,
) extends WSRequest {
def withUri(uri: Uri): WSRequestImpl = copy(uri = uri)
def withHeaders(headers: Headers): WSRequestImpl = copy(headers = headers)
def withMethod(method: Method): WSRequestImpl = copy(method = method)
}
implicit val catsHashAndOrderForWSRequest: Hash[WSRequest] with Order[WSRequest] =
new Hash[WSRequest] with Order[WSRequest] {
override def hash(x: WSRequest): Int = x.hashCode
override def compare(x: WSRequest, y: WSRequest): Int =
reduceComparisons(
x.headers.compare(y.headers),
Eval.later(x.method.compare(y.method)),
Eval.later(x.uri.compare(y.uri)),
)
}
implicit val catsShowForWSRequest: Show[WSRequest] =
Show.fromToString
implicit def stdLibOrdering: Ordering[WSRequest] =
catsHashAndOrderForWSRequest.toOrdering
}
sealed trait WSFrame extends Product with Serializable
sealed trait WSControlFrame extends WSFrame
sealed trait WSDataFrame extends WSFrame
object WSFrame {
final case class Close(statusCode: Int, reason: String) extends WSControlFrame
final case class Ping(data: ByteVector) extends WSControlFrame
final case class Pong(data: ByteVector) extends WSControlFrame
final case class Text(data: String, last: Boolean = true) extends WSDataFrame
final case class Binary(data: ByteVector, last: Boolean = true) extends WSDataFrame
}
trait WSConnection[F[_]] { outer =>
/** Send a single websocket frame. The sending side of this connection has to be open. */
def send(wsf: WSFrame): F[Unit]
/** Send multiple websocket frames. Equivalent to multiple `send` calls, but at least as fast. */
def sendMany[G[_]: Foldable, A <: WSFrame](wsfs: G[A]): F[Unit]
/** A `Pipe` which sends websocket frames and emits a `()` for each chunk sent. */
def sendPipe: Pipe[F, WSFrame, Unit] = _.chunks.evalMap(sendMany(_))
/** Wait for a single websocket frame to be received. Returns `None` if the receiving side is
* closed.
*/
def receive: F[Option[WSFrame]]
/** A stream of the incoming websocket frames. */
def receiveStream: Stream[F, WSFrame] = Stream.repeatEval(receive).unNoneTerminate
/** The negotiated subprotocol, if any. */
def subprotocol: Option[String]
def mapK[G[_]](fk: F ~> G): WSConnection[G] = new WSConnection[G] {
def send(wsf: WSFrame): G[Unit] = fk(outer.send(wsf))
def sendMany[H[_]: Foldable, A <: WSFrame](wsfs: H[A]): G[Unit] = fk(outer.sendMany(wsfs))
def receive: G[Option[WSFrame]] = fk(outer.receive)
def subprotocol: Option[String] = outer.subprotocol
}
}
trait WSConnectionHighLevel[F[_]] { outer =>
/** Send a single websocket text frame. The sending side of this connection has to be open. */
def sendText(text: String): F[Unit] = send(WSFrame.Text(text))
/** Send a single websocket binary frame. The sending side of this connection has to be open. */
def sendBinary(bytes: ByteVector): F[Unit] = send(WSFrame.Binary(bytes))
/** Send a single websocket frame. The sending side of this connection has to be open. */
def send(wsf: WSDataFrame): F[Unit]
/** Send multiple websocket frames. Equivalent to multiple `send` calls, but at least as fast. */
def sendMany[G[_]: Foldable, A <: WSDataFrame](wsfs: G[A]): F[Unit]
/** A `Pipe` which sends websocket frames and emits a `()` for each chunk sent. */
def sendPipe: Pipe[F, WSDataFrame, Unit] = _.chunks.evalMap(sendMany(_))
/** Wait for a websocket frame to be received. Returns `None` if the receiving side is closed.
* Fragmentation is handled automatically, the `last` attribute can be ignored.
*/
def receive: F[Option[WSDataFrame]]
/** A stream of the incoming websocket frames. */
def receiveStream: Stream[F, WSDataFrame] = Stream.repeatEval(receive).unNoneTerminate
/** The negotiated subprotocol, if any. */
def subprotocol: Option[String]
/** The close frame, if available. */
def closeFrame: DeferredSource[F, WSFrame.Close]
def mapK[G[_]](fk: F ~> G): WSConnectionHighLevel[G] =
new WSConnectionHighLevel[G] {
def send(wsf: WSDataFrame): G[Unit] = fk(outer.send(wsf))
def sendMany[H[_]: Foldable, A <: WSDataFrame](wsfs: H[A]): G[Unit] = fk(outer.sendMany(wsfs))
def receive: G[Option[WSDataFrame]] = fk(outer.receive)
def subprotocol: Option[String] = outer.subprotocol
def closeFrame: DeferredSource[G, WSFrame.Close] = new DeferredSource[G, WSFrame.Close] {
def get = fk(outer.closeFrame.get)
def tryGet = fk(outer.closeFrame.tryGet)
}
}
}
/** A websocket client capable of establishing [[WSClientHighLevel#connectHighLevel "high level" connections]].
* @see [[WSClient]] for a client also capable of "low-level" connections
*/
trait WSClientHighLevel[F[_]] { outer =>
/** Establish a "high level" websocket connection. You only get to handle Text and Binary frames.
* Pongs will be replied automatically. Received frames are grouped by the `last` attribute. The
* connection will be closed automatically.
*/
def connectHighLevel(request: WSRequest): Resource[F, WSConnectionHighLevel[F]]
def mapK[G[_]](
fk: F ~> G
)(implicit F: MonadCancel[F, _], G: MonadCancel[G, _]): WSClientHighLevel[G] =
new WSClientHighLevel[G] {
def connectHighLevel(request: WSRequest): Resource[G, WSConnectionHighLevel[G]] =
outer.connectHighLevel(request).map(_.mapK(fk)).mapK(fk)
}
}
trait WSClient[F[_]] extends WSClientHighLevel[F] { outer =>
/** Establish a websocket connection. It will be closed automatically if necessary. */
def connect(request: WSRequest): Resource[F, WSConnection[F]]
override def mapK[G[_]](
fk: F ~> G
)(implicit F: MonadCancel[F, _], G: MonadCancel[G, _]): WSClient[G] =
new WSClient[G] {
def connectHighLevel(request: WSRequest): Resource[G, WSConnectionHighLevel[G]] =
outer.connectHighLevel(request).map(_.mapK(fk)).mapK(fk)
def connect(request: WSRequest): Resource[G, WSConnection[G]] =
outer.connect(request).map(_.mapK(fk)).mapK(fk)
}
}
object WSClient {
def apply[F[_]](
respondToPings: Boolean
)(f: WSRequest => Resource[F, WSConnection[F]])(implicit F: Concurrent[F]): WSClient[F] =
new WSClient[F] {
override def connect(request: WSRequest) = f(request)
override def connectHighLevel(request: WSRequest) =
for {
recvCloseFrame <- Resource.eval(Deferred[F, WSFrame.Close])
conn <- f(request)
} yield new WSConnectionHighLevel[F] {
override def send(wsf: WSDataFrame) = conn.send(wsf)
override def sendMany[G[_]: Foldable, A <: WSDataFrame](wsfs: G[A]): F[Unit] =
conn.sendMany(wsfs)
override def receive: F[Option[WSDataFrame]] = {
def receiveDataFrame: OptionT[F, WSDataFrame] =
OptionT(conn.receive).flatMap { wsf =>
OptionT.liftF(wsf match {
case WSFrame.Ping(data) if respondToPings => conn.send(WSFrame.Pong(data))
case wsf: WSFrame.Close =>
recvCloseFrame.complete(wsf) *> conn.send(wsf)
case _ => F.unit
}) >> (wsf match {
case wsdf: WSDataFrame => OptionT.pure[F](wsdf)
case _ => receiveDataFrame
})
}
def defrag(text: Chain[String], binary: ByteVector): OptionT[F, WSDataFrame] =
receiveDataFrame.flatMap {
case WSFrame.Text(t, finalFrame) =>
val nextText = text :+ t
if (finalFrame) {
val sb = new StringBuilder(nextText.foldMap(_.length))
nextText.iterator.foreach(sb ++= _)
OptionT.pure[F](WSFrame.Text(sb.mkString))
} else
defrag(nextText, binary)
case WSFrame.Binary(b, finalFrame) =>
val nextBinary = binary ++ b
if (finalFrame)
OptionT.pure[F](WSFrame.Binary(nextBinary))
else
defrag(text, nextBinary)
}
defrag(Chain.empty, ByteVector.empty).value
}
override def subprotocol: Option[String] = conn.subprotocol
override def closeFrame: DeferredSource[F, WSFrame.Close] = recvCloseFrame
}
}
}