Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ember Web Socket Client #7261

Open
wants to merge 96 commits into
base: series/0.23
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
6be7693
Fix typo
danghieutrung Jun 5, 2023
e9155c1
Add WebSocket response validation for ember client
danghieutrung Jun 5, 2023
d5a331a
Merge branch 'http4s:series/0.23' into ember-websocket-client
danghieutrung Jun 7, 2023
94eddeb
Add unit tests
danghieutrung Jun 10, 2023
dac0b23
Change to property testing
danghieutrung Jun 12, 2023
85bae5c
Change to property testing
danghieutrung Jun 13, 2023
7816340
Change to property testing
danghieutrung Jun 13, 2023
a224d50
Use pattern matching
danghieutrung Jun 13, 2023
d4505ad
Merge pull request #7138 from danghieutrung/ember-websocket-client
armanbilge Jun 16, 2023
de8fbba
Add build method for WebSocket
danghieutrung Jul 1, 2023
d1db42b
Add createWebSocketKey method
danghieutrung Jul 1, 2023
09fb168
Move shared WebSocket helpers functins
danghieutrung Jul 1, 2023
413049d
Create example WebSocket client
danghieutrung Jul 1, 2023
3d51d45
Update getSocket function
danghieutrung Jul 3, 2023
e8502f7
Update WSConnection interface
danghieutrung Jul 4, 2023
e1a7536
Update WSConnection interface
danghieutrung Jul 10, 2023
44c7add
Use shared function from ember-core WebSocketHelpers
danghieutrung Jul 12, 2023
dd920a1
Add headers
danghieutrung Jul 12, 2023
9c7306c
Update formatting
danghieutrung Jul 12, 2023
223e449
Add WSClient for Ember
danghieutrung Jul 12, 2023
cec189a
Fix formatting
danghieutrung Jul 13, 2023
97b11cb
Delete file
danghieutrung Jul 13, 2023
a9b6c41
Add WebSocketKey object
danghieutrung Jul 14, 2023
ca62628
Use WebSocketKey object for buildWebSocket method
danghieutrung Jul 14, 2023
5034505
Fix formatting
danghieutrung Jul 14, 2023
95b4dff
Fix formatting
danghieutrung Jul 14, 2023
323ad4d
Fix formatting
danghieutrung Jul 14, 2023
84dd6e6
Testing WSConnection interface
danghieutrung Jul 14, 2023
7ee439c
debugging. it works!
armanbilge Jul 14, 2023
d5f84d1
Fix send method
danghieutrung Jul 15, 2023
3823641
Add tests
danghieutrung Jul 15, 2023
35ef412
Fix formatting
danghieutrung Jul 18, 2023
4918133
Fix server address
danghieutrung Jul 18, 2023
9d521e3
Generalize from hardcoded Seexample Sec-WebSocket-Key string
danghieutrung Jul 19, 2023
b7491d9
Fix import order
danghieutrung Jul 19, 2023
612fbe9
Fix import order
danghieutrung Jul 20, 2023
56fb793
Failed connection to wss://ws.postman-echo.com/raw
danghieutrung Jul 23, 2023
7098848
Update WSClient interface
danghieutrung Jul 28, 2023
81f3584
Unable to receive binary frame
danghieutrung Jul 28, 2023
e599d10
Add a test for sending and receiving a binary frame
danghieutrung Jul 31, 2023
d5974d6
Fix formatting
danghieutrung Jul 31, 2023
3e7f226
Add handling for Close Frame
danghieutrung Aug 9, 2023
4663af0
Add test for receving Close Frame
danghieutrung Aug 9, 2023
f0fd00b
Update ember-client/shared/src/main/scala/org/http4s/ember/client/int…
danghieutrung Aug 10, 2023
e8ecfb9
Update ember-client/shared/src/main/scala/org/http4s/ember/client/int…
danghieutrung Aug 10, 2023
a64b724
Fix formatting
danghieutrung Aug 10, 2023
0fb6ad0
Workaround FS2 bug
armanbilge Aug 12, 2023
5c04627
Fix handling for closeFrame
danghieutrung Aug 15, 2023
c088a3e
Add test for receiving closeFrame in high-level connection
danghieutrung Aug 15, 2023
5c12d7a
Modify auto disconnection test
danghieutrung Aug 18, 2023
d468835
Replace queue with channel
danghieutrung Aug 22, 2023
83aad30
Hanging while trying to receive
danghieutrung Aug 22, 2023
3770c53
Automatically send the close frame
danghieutrung Aug 22, 2023
96bde01
Add a test for automatically sending the close frame
danghieutrung Aug 22, 2023
a79466f
Fix formatting
danghieutrung Aug 22, 2023
ea134a6
Merge branch 'series/0.23' into ember-wsclient
armanbilge Aug 25, 2023
54b6f5c
Publish this branch
armanbilge Aug 25, 2023
d65f0f2
Merge branch 'ember-wsclient'
armanbilge Aug 25, 2023
adb63a0
Add buildWebSocket
danghieutrung Aug 25, 2023
f5d7bf9
Add import
danghieutrung Aug 25, 2023
658e0e8
Fix formatting
danghieutrung Aug 25, 2023
3d739a9
Merge pull request #7196 from danghieutrung/ember-wsclient
armanbilge Aug 25, 2023
5e7c0d4
Automatically add WSRequest headers
danghieutrung Aug 26, 2023
ff06d36
Fix formatting
danghieutrung Aug 26, 2023
5f9ee73
Delete println
danghieutrung Aug 26, 2023
b4db366
Deduplicate code
danghieutrung Aug 26, 2023
fb5d8c2
Fix SecureRandom
danghieutrung Aug 27, 2023
738b23f
Merge pull request #7250 from danghieutrung/ember-wsclient-misc
armanbilge Aug 27, 2023
06efc8d
Change WSRequest uri prefix to https
danghieutrung Aug 27, 2023
aa59802
Fix WSRequest Scheme
danghieutrung Aug 28, 2023
a3316c2
Simplify code
danghieutrung Aug 28, 2023
510f500
Change URI scheme to ws
danghieutrung Aug 28, 2023
eb5dd80
Merge pull request #7252 from danghieutrung/ember-wsclient-fix-wsRequ…
armanbilge Aug 28, 2023
800f18c
Merge remote-tracking branch 'upstream/series/0.23' into ember-wsclient
danghieutrung Sep 23, 2023
ef42682
Use closeWithElement
danghieutrung Sep 23, 2023
705d217
Replace Java client with Ember client
danghieutrung Sep 23, 2023
4c6c828
Rename test
danghieutrung Sep 23, 2023
1ca5c6b
Use closeWithElement for channel
danghieutrung Sep 23, 2023
97533ca
Fix closeWithElement bugs
danghieutrung Sep 28, 2023
2b88231
Rename file
danghieutrung Sep 28, 2023
1861b3b
Move test file from jvm to shared
danghieutrung Sep 28, 2023
8f195e9
Remove println
danghieutrung Sep 28, 2023
fa5bc9b
Workaround fs2 bug
danghieutrung Sep 29, 2023
dfdf846
Merge branch 'series/0.23' into ember-wsclient
armanbilge Oct 7, 2023
5ac35a2
Fix deprecated API bug
danghieutrung Oct 11, 2023
faf6524
Fix formatting
danghieutrung Oct 11, 2023
4baa57f
Fix pattern matching
danghieutrung Oct 12, 2023
74bc2de
Fix pattern matching
danghieutrung Oct 13, 2023
daa418f
Fix pattern matching
danghieutrung Oct 14, 2023
742458d
Force HTTP/1 on WebSocket Upgrade Request
danghieutrung Nov 14, 2023
07a45de
Add test for forcing HTTP/1
danghieutrung Nov 18, 2023
f87a5a0
Delete comments
danghieutrung Nov 19, 2023
ae4fc61
Merge branch 'series/0.23' into ember-wsclient
armanbilge Nov 26, 2023
37eedd2
Fix warning
armanbilge Nov 26, 2023
63acc3c
Remove `.only`
armanbilge Nov 26, 2023
a1fcba6
Organize imports
armanbilge Nov 26, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ jobs:
publish:
name: Publish Artifacts
needs: [build]
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.23')
if: github.event_name != 'pull_request' && (startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/series/0.23' || github.ref == 'refs/heads/ember-wsclient')
strategy:
matrix:
os: [ubuntu-latest]
Expand Down
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ ThisBuild / developers += tlGitHubDev("rossabaker", "Ross A. Baker")
ThisBuild / tlCiReleaseBranches := Seq("series/0.23")
ThisBuild / tlSitePublishBranch := Some("series/0.23")

ThisBuild / tlCiReleaseBranches += "ember-wsclient"

ThisBuild / scalafixAll / skip := tlIsScala3.value
ThisBuild / ScalafixConfig / skip := tlIsScala3.value
ThisBuild / Test / scalafixConfig := Some(file(".scalafix.test.conf"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import org.http4s.Response
import org.http4s.client._
import org.http4s.client.middleware.Retry
import org.http4s.client.middleware.RetryPolicy
import org.http4s.client.websocket.WSClient
import org.http4s.ember.client.internal.ClientHelpers
import org.http4s.ember.client.internal.EmberWSClient
import org.http4s.ember.client.internal.WebSocketKey
import org.http4s.ember.core.h2.H2Client
import org.http4s.ember.core.h2.H2Frame
import org.http4s.ember.core.h2.H2Frame.Settings.ConnectionSettings.default
Expand Down Expand Up @@ -246,7 +249,7 @@ final class EmberClientBuilder[F[_]: Async: Network] private (
)
.whenA(timeout.isFinite && timeout >= idleConnectionTime)

def build: Resource[F, Client[F]] =
private def buildHelper(ws: Boolean = false): Resource[F, Client[F]] =
for {
_ <- Resource.eval(verifyTimeoutRelations)
sg <- Resource.pure(sgOpt.getOrElse(Network[F]))
Expand Down Expand Up @@ -334,7 +337,14 @@ final class EmberClientBuilder[F[_]: Async: Network] private (
case _ => Applicative[F].unit
}
}
} yield responseResource._1
_ <- Resource.eval(managed.canBeReused.set(Reusable.DontReuse))
} yield
if (ws)
responseResource._1.withAttribute(
WebSocketKey.webSocketConnection[F],
managed.value.keySocket.socket,
)
else responseResource._1

def unixSocketClient(
request: Request[F],
Expand Down Expand Up @@ -395,6 +405,14 @@ final class EmberClientBuilder[F[_]: Async: Network] private (
new EmberClient(h2Client, pool)
}
}

def build: Resource[F, Client[F]] = buildHelper()

def buildWebSocket: Resource[F, (Client[F], WSClient[F])] =
for {
httpClient <- buildHelper(ws = true)
wsClient <- Resource.eval(EmberWSClient[F](httpClient))
} yield (httpClient, wsClient)
}

object EmberClientBuilder extends EmberClientBuilderCompanionPlatform {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright 2019 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s.ember.client.internal

import cats.MonadThrow
import cats.effect.Async
import cats.effect.implicits._
import cats.effect.kernel.Resource
import cats.effect.std.Queue
import cats.effect.std.SecureRandom
import cats.syntax.all._
import fs2.concurrent.Channel
import org.http4s.Request
import org.http4s._
import org.http4s.client.Client
import org.http4s.client.websocket.WSClient
import org.http4s.client.websocket.WSConnection
import org.http4s.client.websocket.WSFrame
import org.http4s.ember.client.internal.WebSocketHelpers._
import org.http4s.ember.core.WebSocketHelpers._
import org.http4s.ember.core.h2.H2Keys.WebSocketUpgradeIdentifier
import org.http4s.headers.`Sec-WebSocket-Key`
import org.http4s.websocket.WebSocketFrame
import scodec.bits.ByteVector

import java.util.Base64

private[client] object EmberWSClient {
def apply[F[_]](
emberClient: Client[F]
)(implicit F: Async[F]): F[WSClient[F]] =
SecureRandom.javaSecuritySecureRandom[F].map { random =>
WSClient[F](respondToPings = false) { wsRequest =>
for {
randomByteArray <- Resource.eval(random.nextBytes(16))

uriScheme = wsRequest.uri.scheme.map(scheme =>
scheme.value match {
case "wss" => Uri.Scheme.https
case "ws" => Uri.Scheme.http
case _ => scheme
}
)

httpWSRequest = Request[F]()
.withUri(wsRequest.uri.copy(uriScheme))
.withHeaders(
Headers(
upgradeWebSocket,
connectionUpgrade,
supportedWebSocketVersionHeader,
new `Sec-WebSocket-Key`(ByteVector(Base64.getEncoder().encode(randomByteArray))),
)
)
.withMethod(Method.GET)
.withAttribute(WebSocketUpgradeIdentifier, ())

socketOption <- getSocket(emberClient, httpWSRequest)
socket <- socketOption.liftTo[F](new RuntimeException("Not an Ember client")).toResource

closeFrameDeffered <- F.deferred[WebSocketFrame.Close].toResource

clientReceiveQueue <- Queue.bounded[F, WebSocketFrame](100).toResource
clientSendChannel <- Channel.bounded[F, WebSocketFrame](100).toResource

_ <- socket.reads
.through(decodeFrames(true))
.foreach {
case f @ WebSocketFrame.Close(_) =>
closeFrameDeffered.complete(f).ifM(clientReceiveQueue.offer(f), F.unit)
case f =>
closeFrameDeffered.tryGet.flatMap { x =>
if (x.isDefined) F.unit else clientReceiveQueue.offer(f)
}
}
.compile
.drain
.background

sendingFinished <- clientSendChannel.stream
.foreach(f => frameToBytes(f, true).traverse_(c => socket.write(c)))
.compile
.drain
.background

_ <- Resource.onFinalize {
MonadThrow[F]
.fromEither(WebSocketFrame.Close(1000, "Connection automatically closed"))
.flatMap(clientSendChannel.closeWithElement(_)) *> sendingFinished.void
}
} yield new WSConnection[F] {
def receive: F[Option[WSFrame]] = clientReceiveQueue.take.flatMap {
case f @ WebSocketFrame.Close(_) =>
closeChannelWithCloseFrame(clientSendChannel).as(toWSFrame(f).some)
case f =>
toWSFrame(f).some.pure[F]
}
def send(wsf: WSFrame): F[Unit] =
toWebSocketFrame(wsf).flatMap {
case WebSocketFrame.Close(_) =>
closeChannelWithCloseFrame(clientSendChannel)
case f =>
clientSendChannel.send(f).void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue with void here. Can we add a test to cover that case?

}
def sendMany[G[_], A <: WSFrame](wsfs: G[A])(implicit
evidence$1: cats.Foldable[G]
): F[Unit] = wsfs.traverse_(send(_))
def subprotocol: Option[String] = ???
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright 2019 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s.ember.client.internal

import cats.Applicative
import cats.MonadThrow
import cats.data.EitherT
import cats.data.NonEmptyList
import cats.effect.Concurrent
import cats.effect.MonadCancel
import cats.effect.Resource
import cats.syntax.all._
import fs2.concurrent.Channel
import fs2.io.net.Socket
import org.http4s.Request
import org.http4s.Status
import org.http4s._
import org.http4s.client.Client
import org.http4s.client.websocket.WSFrame
import org.http4s.crypto.Hash
import org.http4s.crypto.HashAlgorithm
import org.http4s.headers._
import org.http4s.websocket.Rfc6455
import org.http4s.websocket.WebSocketFrame
import org.typelevel.ci._
import scodec.bits.ByteVector

private[internal] object WebSocketHelpers {

val supportedWebSocketVersion = 13L

val supportedWebSocketVersionHeader: `Sec-WebSocket-Version` =
`Sec-WebSocket-Version`.unsafeFromLong(
supportedWebSocketVersion
)
val upgradeCi: CIString = ci"upgrade"
val webSocketProtocol: Protocol = Protocol(ci"websocket", None)
val connectionUpgrade: Connection = Connection(NonEmptyList.of(upgradeCi))
val upgradeWebSocket: Upgrade = Upgrade(webSocketProtocol)

def closeChannelWithCloseFrame[F[_]: MonadThrow](
clientSendChannel: Channel[F, WebSocketFrame]
): F[Unit] =
for {
closeFrame <-
MonadThrow[F]
.fromEither(WebSocketFrame.Close(1000, "Connection automatically closed"))

_ <- EitherT(clientSendChannel.closeWithElement(closeFrame))
.getOrRaise(new RuntimeException("Connection already closed"))
} yield ()

def getSocket[F[_]](client: Client[F], request: Request[F])(implicit
F: MonadCancel[F, Throwable]
): Resource[F, Option[Socket[F]]] = {
val webSocketKey = WebSocketKey.webSocketConnection[F]
client
.run(request)
.evalMap { res =>
for {
secWebSocketKeyString <- request.headers
.get[`Sec-WebSocket-Key`]
.liftTo[F](new RuntimeException("Sec-WebSocket-Key header not found"))
.map(_.hashString)
isValid <- validateServerHandshake(res, secWebSocketKeyString)
} yield isValid.toOption *> res.attributes.lookup(webSocketKey)
}
}

def toWebSocketFrame[F[_]: Concurrent](wsFrame: WSFrame): F[WebSocketFrame] =
wsFrame match {
case WSFrame.Close(code, reason) =>
MonadThrow[F].fromEither(WebSocketFrame.Close(code, reason))
case WSFrame.Ping(data) => Applicative[F].pure(WebSocketFrame.Ping(data))
case WSFrame.Pong(data) => Applicative[F].pure(WebSocketFrame.Pong(data))
case WSFrame.Text(data, last) => Applicative[F].pure(WebSocketFrame.Text(data, last))
case WSFrame.Binary(data, last) => Applicative[F].pure(WebSocketFrame.Binary(data, last))
}

def toWSFrame(wsf: WebSocketFrame): WSFrame =
(wsf: @unchecked) match {
case c: WebSocketFrame.Close => WSFrame.Close(c.closeCode, c.reason)
case WebSocketFrame.Ping(data) => WSFrame.Ping(data)
case WebSocketFrame.Pong(data) => WSFrame.Pong(data)
case WebSocketFrame.Text(data, last) => WSFrame.Text(data, last)
case WebSocketFrame.Binary(data, last) => WSFrame.Binary(data, last)
}

/** Validate the opening handshake response from the server
* https://datatracker.ietf.org/doc/html/rfc6455#page-6
*/
def validateServerHandshake[F[_]](
response: Response[F],
secWebSocketKey: String,
)(implicit F: MonadThrow[F]): F[Either[ServerHandshakeError, Unit]] =
for {
secWebSocketAccept <- serverHandshake(response).pure[F]
correctSecWebSocketAccept <- clientHandshake(secWebSocketKey)
validated = secWebSocketAccept.flatMap(s =>
if (s == correctSecWebSocketAccept) Either.unit else Left(InvalidSecWebSocketAccept)
)
} yield validated

private[this] val magic = ByteVector.view(Rfc6455.handshakeMagicBytes)

def clientHandshake[F[_]](
value: String
)(implicit F: MonadThrow[F]): F[ByteVector] = for {
value <- ByteVector.encodeAscii(value).liftTo[F]
digest <- Hash[F].digest(HashAlgorithm.SHA1, value ++ magic)
} yield digest

private def serverHandshake[F[_]](res: Response[F]): Either[ServerHandshakeError, ByteVector] = {
val status = res.status match {
case Status.SwitchingProtocols => Either.unit
case _ => Left(InvalidStatus)
}

val connection = res.headers.get[Connection] match {
case Some(header) if header.hasUpgrade => Either.unit
case _ => Left(UpgradeRequired)
}

val upgrade = res.headers.get[Upgrade] match {
case Some(header) if header.values.contains_(webSocketProtocol) => Either.unit
case _ => Left(UpgradeRequired)
}

val secWebSocketAcceptKey = res.headers.get[`Sec-WebSocket-Accept`] match {
case Some(header) => Right(header.hashedKey)
case None => Left(SecWebSocketAcceptNotFound)
}

(status, connection, upgrade, secWebSocketAcceptKey).mapN {
case (_, _, _, secWebSocketAcceptKey) =>
secWebSocketAcceptKey
}
}

sealed abstract class ServerHandshakeError(val status: Status, val message: String)
case object InvalidStatus
extends ServerHandshakeError(
Status.BadRequest,
"Not found HTTP Status 101 Switching Protocol.",
)
case object UpgradeRequired
extends ServerHandshakeError(
Status.UpgradeRequired,
"Upgrade required for WebSocket communication.",
)
case object SecWebSocketAcceptNotFound
extends ServerHandshakeError(Status.BadRequest, "Sec-WebSocket-Accept header not present.")
case object InvalidSecWebSocketAccept
extends ServerHandshakeError(
Status.BadRequest,
"Sec-WebSocket-Accept does not correspond to the Sec-WebSocket-Key",
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2019 http4s.org
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.http4s.ember.client.internal

import cats.effect.SyncIO
import fs2.io.net.Socket
import org.typelevel.vault._

private[client] object WebSocketKey {

private[this] val wsConnectionInternal: Key[Any] = Key.newKey[SyncIO, Any].unsafeRunSync()
def webSocketConnection[F[_]]: Key[Socket[F]] =
wsConnectionInternal.asInstanceOf[Key[Socket[F]]]
}
Loading