Skip to content

Commit

Permalink
Fixes lagom#2135, stop sending duplicate Sec-WebSocket-Protocol heade…
Browse files Browse the repository at this point in the history
…r in response when a subprotocol is defined
  • Loading branch information
borgespires committed Jan 27, 2020
1 parent bf71f88 commit 9e4b21c
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 5 deletions.
Expand Up @@ -113,7 +113,7 @@ class AkkaHttpServiceGateway(
Flow.fromSinkAndSource(Sink.fromSubscriber(subscriber), Source.fromPublisher(publisher)),
chosenSubprotocol
)
webSocketResponse.withHeaders(webSocketResponse.headers ++ filterHeaders(response.headers))
webSocketResponse.withHeaders(concatHeaders(response.headers, webSocketResponse.headers))

case InvalidUpgradeResponse(response, cause) =>
log.debug("WebSocket upgrade response was invalid: {}", cause)
Expand Down Expand Up @@ -191,6 +191,21 @@ class AkkaHttpServiceGateway(
headers.filterNot(header => HeadersToFilter(header.lowercaseName()))
}

private def concatHeaders(
serviceResponseHeaders: immutable.Seq[HttpHeader],
webSocketClientResponseHeaders: immutable.Seq[HttpHeader]
) = {
def addHeader(res: immutable.Seq[HttpHeader], header: HttpHeader) = {
if (res.exists(other => other.lowercaseName().equals(header.lowercaseName()))) {
res
} else {
res :+ header
}
}

filterHeaders(serviceResponseHeaders).foldLeft(webSocketClientResponseHeaders)(addHeader)
}

private val bindingFuture = Http().bindAndHandle(handler, config.host, config.port)

coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind, "unbind-akka-http-service-gateway") { () =>
Expand Down
Expand Up @@ -14,11 +14,10 @@ import akka.http.scaladsl.model.ws.Message
import akka.http.scaladsl.model.ws.TextMessage
import akka.http.scaladsl.model.ws.UpgradeToWebSocket
import akka.http.scaladsl.model.ws.WebSocketRequest
import akka.http.scaladsl.model.HttpEntity
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model.HttpResponse
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
Expand Down Expand Up @@ -54,7 +53,17 @@ class AkkaHttpServiceGatewaySpec extends WordSpec with Matchers with BeforeAndAf
case req if req.uri.path.toString() == "/echo-headers" =>
HttpResponse(entity = HttpEntity(req.headers.map(h => s"${h.name()}: ${h.value}").mkString("\n")))
case stream if stream.uri.path.toString() == "/stream" =>
stream.header[UpgradeToWebSocket].get.handleMessages(Flow[Message])
stream
.header[UpgradeToWebSocket]
.get
.handleMessages(
Flow[Message],
stream.headers
.find(_.lowercaseName() == "sec-websocket-protocol")
.map(_.value)
.map(_.split(","))
.map(_.head)
)
},
"localhost",
port = 0
Expand Down Expand Up @@ -118,6 +127,30 @@ class AkkaHttpServiceGatewaySpec extends WordSpec with Matchers with BeforeAndAf
(result should contain).inOrderOnly("Hello", "world")
}

"serve websocket requests with the correct response" in {
val flow = http.webSocketClientFlow(
WebSocketRequest(
s"$gatewayWsUri/stream",
collection.immutable.Seq.empty[HttpHeader],
collection.immutable.Seq("echo")
)
)
val result = Await.result(
Source
.single(TextMessage("hello world!"))
.viaMat(flow)(Keep.right)
.to(Sink.ignore)
.run(),
10.seconds
)

result.response.status should equal(StatusCodes.SwitchingProtocols)
result.response.headers.count(_.lowercaseName() == "sec-websocket-protocol") should equal(1)
result.response.headers.find(_.lowercaseName() == "sec-websocket-protocol").map(_.value()).get should equal(
"echo"
)
}

"serve not found when no ACL matches" in {
val response = Await.result(http.singleRequest(HttpRequest(uri = s"$gatewayUri/notfound")), 10.seconds)
response.status.intValue() should ===(404)
Expand Down

0 comments on commit 9e4b21c

Please sign in to comment.