Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket support for handhake headers (such as:Sec-WebSocket-Protoco… #1607

Merged
merged 10 commits into from
Dec 22, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -25,51 +25,55 @@ private[blaze] trait WebSocketSupport[F[_]] extends Http1ServerStage[F] {
val ws = resp.attributes.get(org.http4s.server.websocket.websocketKey[F])
logger.debug(s"Websocket key: $ws\nRequest headers: " + req.headers)

if (ws.isDefined) {
val hdrs = req.headers.map(h => (h.name.toString, h.value))
if (WebsocketHandshake.isWebSocketRequest(hdrs)) {
WebsocketHandshake.serverHandshake(hdrs) match {
case Left((code, msg)) =>
logger.info(s"Invalid handshake $code, $msg")
async.unsafeRunAsync {
Response[F](Status.BadRequest)
.withBody(msg)
.map(
_.replaceAllHeaders(
Connection("close".ci),
Header.Raw(headers.`Sec-WebSocket-Version`.name, "13")
))
} {
case Right(resp) =>
IO(super.renderResponse(req, resp, cleanup))
case Left(_) =>
IO.unit
}
ws match {
case None => super.renderResponse(req, resp, cleanup)
case Some(wsContext) =>
val hdrs = req.headers.map(h => (h.name.toString, h.value))
if (WebsocketHandshake.isWebSocketRequest(hdrs)) {
WebsocketHandshake.serverHandshake(hdrs) match {
case Left((code, msg)) =>
logger.info(s"Invalid handshake $code, $msg")
async.unsafeRunAsync {
wsContext.failureResponse
.map(
_.replaceAllHeaders(
Connection("close".ci),
Header.Raw(headers.`Sec-WebSocket-Version`.name, "13")
))
} {
case Right(resp) =>
IO(super.renderResponse(req, resp, cleanup))
case Left(_) =>
IO.unit
}

case Right(hdrs) => // Successful handshake
val sb = new StringBuilder
sb.append("HTTP/1.1 101 Switching Protocols\r\n")
hdrs.foreach {
case (k, v) => sb.append(k).append(": ").append(v).append('\r').append('\n')
}
sb.append('\r').append('\n')
case Right(hdrs) => // Successful handshake
val sb = new StringBuilder
sb.append("HTTP/1.1 101 Switching Protocols\r\n")
hdrs.foreach {
case (k, v) => sb.append(k).append(": ").append(v).append('\r').append('\n')
}

// write the accept headers and reform the pipeline
channelWrite(ByteBuffer.wrap(sb.result().getBytes(ISO_8859_1))).onComplete {
case Success(_) =>
logger.debug("Switching pipeline segments for websocket")
wsContext.headers.foreach(hdr =>
sb.append(hdr.name).append(": ").append(hdr.value).append('\r').append('\n'))

val segment = LeafBuilder(new Http4sWSStage[F](ws.get))
.prepend(new WSFrameAggregator)
.prepend(new WebSocketDecoder(false))
sb.append('\r').append('\n')

this.replaceInline(segment)
// write the accept headers and reform the pipeline
channelWrite(ByteBuffer.wrap(sb.result().getBytes(ISO_8859_1))).onComplete {
case Success(_) =>
logger.debug("Switching pipeline segments for websocket")

case Failure(t) => fatalError(t, "Error writing Websocket upgrade response")
}(executionContext)
}
val segment = LeafBuilder(new Http4sWSStage[F](wsContext.webSocket))
.prepend(new WSFrameAggregator)
.prepend(new WebSocketDecoder(false))

} else super.renderResponse(req, resp, cleanup)
} else super.renderResponse(req, resp, cleanup)
this.replaceInline(segment)

case Failure(t) => fatalError(t, "Error writing Websocket upgrade response")
}(executionContext)
}
} else super.renderResponse(req, resp, cleanup)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.http4s.websocket

import org.http4s.{Headers, Response}

final case class WebSocketContext[F[_]](
webSocket: Websocket[F],
headers: Headers,
failureResponse: F[Response[F]])
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class BlazeWebSocketExampleApp[F[_]](implicit F: Effect[F]) extends StreamApp[F]
case f => F.delay(println(s"Unknown type: $f"))
}
}
WS(toClient, fromClient)
WebSocketBuilder[F].build(toClient, fromClient)

case GET -> Root / "wsecho" =>
val queue = async.unboundedQueue[F, WebSocketFrame]
Expand All @@ -41,7 +41,7 @@ class BlazeWebSocketExampleApp[F[_]](implicit F: Effect[F]) extends StreamApp[F]
queue.flatMap { q =>
val d = q.dequeue.through(echoReply)
val e = q.enqueue
WS(d, e)
WebSocketBuilder[F].build(d, e)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package org.http4s.server.websocket

import cats._
import cats.implicits._
import fs2._
import org.http4s.websocket.WebsocketBits.WebSocketFrame
import org.http4s.websocket.{WebSocketContext, Websocket}
import org.http4s.{AttributeEntry, Headers, Response, Status}

/**
* Build a response which will accept an HTTP websocket upgrade request and initiate a websocket connection using the
* supplied exchange to process and respond to websocket messages.
* @param send The send side of the Exchange represents the outgoing stream of messages that should be sent to the client
* @param receive The receive side of the Exchange is a sink to which the framework will push the incoming websocket messages
* Once both streams have terminated, the server will initiate a close of the websocket connection.
* As defined in the websocket specification, this means the server
* will send a CloseFrame to the client and wait for a CloseFrame in response before closing the
* connection, this ensures that no messages are lost in flight. The server will shutdown the
* connection when it receives the `CloseFrame` message back from the client. The connection will also
* be closed if the client does not respond with a `CloseFrame` after some reasonable amount of
* time.
* Another way of closing the connection is by emitting a `CloseFrame` in the stream of messages
* heading to the client. This method allows one to attach a message to the `CloseFrame` as defined
* by the websocket protocol.
* Unfortunately the current implementation does not quite respect the description above, it violates
* the websocket protocol by terminating the connection immediately upon reception
* of a `CloseFrame`. This bug will be addressed soon in an upcoming release and this message will be
* removed.
* Currently, there is no way for the server to be notified when the connection is closed, neither in
* the case of a normal disconnection such as a Close handshake or due to a connection error. There
* are plans to address this limitation in the future.
* @param headers Handshake response headers, such as such as:Sec-WebSocket-Protocol.
* @param onNonWebSocketRequest The status code to return to a client making a non-websocket HTTP request to this route.
* default: NotImplemented
* @param onHandshakeFailure The status code to return when failing to handle a websocket HTTP request to this route.
* default: BadRequest
*/
case class WebSocketBuilder[F[_]](
send: Stream[F, WebSocketFrame],
receive: Sink[F, WebSocketFrame],
headers: Headers,
onNonWebSocketRequest: F[Response[F]],
onHandshakeFailure: F[Response[F]])
object WebSocketBuilder {

class Builder[F[_]: Monad] {
def build(
send: Stream[F, WebSocketFrame],
receive: Sink[F, WebSocketFrame],
headers: Headers = Headers.empty,
onNonWebSocketRequest: F[Response[F]] =
Response[F](Status.NotImplemented).withBody("This is a WebSocket route."),
onHandshakeFailure: F[Response[F]] =
Response[F](Status.BadRequest).withBody("WebSocket handshake failed.")): F[Response[F]] =
WebSocketBuilder(send, receive, headers, onNonWebSocketRequest, onHandshakeFailure).onNonWebSocketRequest
.map(
_.withAttribute(
AttributeEntry(
websocketKey[F],
WebSocketContext(Websocket(send, receive), headers, onHandshakeFailure))))
}
def apply[F[_]: Monad]: Builder[F] = new Builder[F]
}
19 changes: 13 additions & 6 deletions server/src/main/scala/org/http4s/server/websocket/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ package org.http4s
package server

import cats._
import cats.implicits._
import fs2._
import org.http4s.websocket.Websocket
import org.http4s.websocket.WebSocketContext
import org.http4s.websocket.WebsocketBits.WebSocketFrame

package object websocket {
private[this] object Keys {
val WebSocket: AttributeKey[Any] = AttributeKey[Any]
}
def websocketKey[F[_]]: AttributeKey[Websocket[F]] =
Keys.WebSocket.asInstanceOf[AttributeKey[Websocket[F]]]

def websocketKey[F[_]]: AttributeKey[WebSocketContext[F]] =
Keys.WebSocket.asInstanceOf[AttributeKey[WebSocketContext[F]]]

/**
* Build a response which will accept an HTTP websocket upgrade request and initiate a websocket connection using the
Expand All @@ -38,12 +38,19 @@ package object websocket {
* are plans to address this limitation in the future.
* @param status The status code to return to a client making a non-websocket HTTP request to this route
*/
@deprecated("Use WebSocketBuilder", "0.18.0-M7")
def WS[F[_]](
send: Stream[F, WebSocketFrame],
receive: Sink[F, WebSocketFrame],
status: F[Response[F]])(implicit F: Functor[F]): F[Response[F]] =
status.map(_.withAttribute(AttributeEntry(websocketKey[F], Websocket(send, receive))))
status: F[Response[F]])(implicit F: Monad[F]): F[Response[F]] =
WebSocketBuilder[F].build(
send,
receive,
Headers.empty,
status,
Response[F](Status.BadRequest).withBody("WebSocket handshake failed."))

@deprecated("Use WebSocketBuilder", "0.18.0-M7")
def WS[F[_]](send: Stream[F, WebSocketFrame], receive: Sink[F, WebSocketFrame])(
implicit F: Monad[F],
W: EntityEncoder[F, String]): F[Response[F]] =
Expand Down