-
Notifications
You must be signed in to change notification settings - Fork 20
/
WSClient.scala
182 lines (154 loc) · 6.95 KB
/
WSClient.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
/*
* Copyright 2021 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.http4s.jdkhttpclient
import cats._
import cats.data.Chain
import cats.data.OptionT
import cats.effect._
import cats.implicits._
import fs2.Pipe
import fs2.Stream
import org.http4s.Headers
import org.http4s.Method
import org.http4s.Uri
import scodec.bits.ByteVector
/** A websocket request.
*
* @param uri
* The URI.
* @param headers
* The headers to send. Put your `Sec-Websocket-Protocol` headers here if needed. Some websocket
* clients reject other WS-specific headers.
* @param method
* The method of the intial HTTP request. Ignored by some clients.
*/
case class WSRequest(
uri: Uri,
headers: Headers = Headers.empty,
method: Method = Method.GET
)
sealed trait WSFrame extends Product with Serializable
sealed trait WSControlFrame extends WSFrame
sealed trait WSDataFrame extends WSFrame
object WSFrame {
final case class Close(statusCode: Int, reason: String) extends WSControlFrame
final case class Ping(data: ByteVector) extends WSControlFrame
final case class Pong(data: ByteVector) extends WSControlFrame
final case class Text(data: String, last: Boolean = true) extends WSDataFrame
final case class Binary(data: ByteVector, last: Boolean = true) extends WSDataFrame
}
trait WSConnection[F[_]] {
/** Send a single websocket frame. The sending side of this connection has to be open. */
def send(wsf: WSFrame): F[Unit]
/** Send multiple websocket frames. Equivalent to multiple `send` calls, but at least as fast. */
def sendMany[G[_]: Foldable, A <: WSFrame](wsfs: G[A]): F[Unit]
/** A `Pipe` which sends websocket frames and emits a `()` for each chunk sent. */
final def sendPipe: Pipe[F, WSFrame, Unit] = _.chunks.evalMap(sendMany(_))
/** Wait for a single websocket frame to be received. Returns `None` if the receiving side is
* closed.
*/
def receive: F[Option[WSFrame]]
/** A stream of the incoming websocket frames. */
final def receiveStream: Stream[F, WSFrame] = Stream.repeatEval(receive).unNoneTerminate
/** The negotiated subprotocol, if any. */
def subprotocol: Option[String]
}
trait WSConnectionHighLevel[F[_]] {
/** Send a single websocket frame. The sending side of this connection has to be open. */
def send(wsf: WSDataFrame): F[Unit]
/** Send multiple websocket frames. Equivalent to multiple `send` calls, but at least as fast. */
def sendMany[G[_]: Foldable, A <: WSDataFrame](wsfs: G[A]): F[Unit]
/** A `Pipe` which sends websocket frames and emits a `()` for each chunk sent. */
final def sendPipe: Pipe[F, WSDataFrame, Unit] = _.chunks.evalMap(sendMany(_))
/** Send a Ping frame. */
def sendPing(data: ByteVector = ByteVector.empty): F[Unit]
/** Send a Close frame. The sending side of this connection will be closed. */
def sendClose(reason: String = ""): F[Unit]
/** Wait for a websocket frame to be received. Returns `None` if the receiving side is closed.
* Fragmentation is handled automatically, the `last` attribute can be ignored.
*/
def receive: F[Option[WSDataFrame]]
/** A stream of the incoming websocket frames. */
final def receiveStream: Stream[F, WSDataFrame] = Stream.repeatEval(receive).unNoneTerminate
/** The negotiated subprotocol, if any. */
def subprocotol: Option[String]
/** The close frame, if available. */
def closeFrame: Deferred[F, WSFrame.Close]
}
trait WSClient[F[_]] {
/** Establish a websocket connection. It will be closed automatically if necessary. */
def connect(request: WSRequest): Resource[F, WSConnection[F]]
/** Establish a "high level" websocket connection. You only get to handle Text and Binary frames.
* Pongs will be replied automatically. Received frames are grouped by the `last` attribute. The
* connection will be closed automatically.
*/
def connectHighLevel(request: WSRequest): Resource[F, WSConnectionHighLevel[F]]
}
object WSClient {
def defaultImpl[F[_]](
respondToPings: Boolean
)(f: WSRequest => Resource[F, WSConnection[F]])(implicit F: Concurrent[F]): WSClient[F] =
new WSClient[F] {
override def connect(request: WSRequest) = f(request)
override def connectHighLevel(request: WSRequest) =
for {
recvCloseFrame <- Resource.eval(Deferred[F, WSFrame.Close])
outputOpen <- Resource.eval(Ref[F].of(false))
conn <- f(request)
} yield new WSConnectionHighLevel[F] {
override def send(wsf: WSDataFrame) = conn.send(wsf)
override def sendMany[G[_]: Foldable, A <: WSDataFrame](wsfs: G[A]): F[Unit] =
conn.sendMany(wsfs)
override def sendPing(data: ByteVector) = conn.send(WSFrame.Ping(data))
override def sendClose(reason: String) =
conn.send(WSFrame.Close(1000, reason)) *> outputOpen.set(false)
override def receive: F[Option[WSDataFrame]] = {
def receiveDataFrame: OptionT[F, WSDataFrame] =
OptionT(conn.receive).flatMap { wsf =>
OptionT.liftF(wsf match {
case WSFrame.Ping(data) if respondToPings => conn.send(WSFrame.Pong(data))
case wsf: WSFrame.Close =>
recvCloseFrame.complete(wsf) *> outputOpen.get.flatMap(conn.send(wsf).whenA(_))
case _ => F.unit
}) >> (wsf match {
case wsdf: WSDataFrame => OptionT.pure[F](wsdf)
case _ => receiveDataFrame
})
}
def defrag(text: Chain[String], binary: ByteVector): OptionT[F, WSDataFrame] =
receiveDataFrame.flatMap {
case WSFrame.Text(t, finalFrame) =>
val nextText = text :+ t
if (finalFrame) {
val sb = new StringBuilder(nextText.foldMap(_.length))
nextText.iterator.foreach(sb ++= _)
OptionT.pure[F](WSFrame.Text(sb.mkString))
} else
defrag(nextText, binary)
case WSFrame.Binary(b, finalFrame) =>
val nextBinary = binary ++ b
if (finalFrame)
OptionT.pure[F](WSFrame.Binary(nextBinary))
else
defrag(text, nextBinary)
}
defrag(Chain.empty, ByteVector.empty).value
}
override def subprocotol: Option[String] = conn.subprotocol
override def closeFrame: Deferred[F, WSFrame.Close] = recvCloseFrame
}
}
}