From 6164fd87741182833057a7d0311c08d9e73bcbb7 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Wed, 5 Jun 2019 16:38:19 +0200 Subject: [PATCH] =ht2 consolidate http2 switches and instantiation code in Http2 --- .../http/impl/engine/http2/AlpnSwitch.scala | 32 +-- .../impl/engine/http2/Http2Blueprint.scala | 13 + .../engine/http2/PriorKnowledgeSwitch.scala | 141 ---------- .../main/scala/akka/http/scaladsl/Http2.scala | 250 +++++++----------- 4 files changed, 112 insertions(+), 324 deletions(-) delete mode 100644 akka-http2-support/src/main/scala/akka/http/impl/engine/http2/PriorKnowledgeSwitch.scala diff --git a/akka-http2-support/src/main/scala/akka/http/impl/engine/http2/AlpnSwitch.scala b/akka-http2-support/src/main/scala/akka/http/impl/engine/http2/AlpnSwitch.scala index 560d3131c8f..43e69faaf66 100644 --- a/akka-http2-support/src/main/scala/akka/http/impl/engine/http2/AlpnSwitch.scala +++ b/akka-http2-support/src/main/scala/akka/http/impl/engine/http2/AlpnSwitch.scala @@ -18,25 +18,22 @@ import akka.stream._ /** INTERNAL API */ @InternalApi private[http] object AlpnSwitch { - type HttpServerBidiFlow = BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] + type HttpServerFlow = Flow[SslTlsInbound, SslTlsOutbound, NotUsed] def apply( - chosenProtocolAccessor: () => String, - http1Stack: HttpServerBidiFlow, - http2Stack: HttpServerBidiFlow): HttpServerBidiFlow = - BidiFlow.fromGraph( - new GraphStage[BidiShape[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest]] { + chosenProtocolAccessor: SessionBytes => String, + http1Stack: HttpServerFlow, + http2Stack: HttpServerFlow): HttpServerFlow = + Flow.fromGraph( + new GraphStage[FlowShape[SslTlsInbound, SslTlsOutbound]] { // --- outer ports --- val netIn = Inlet[SslTlsInbound]("AlpnSwitch.netIn") val netOut = Outlet[SslTlsOutbound]("AlpnSwitch.netOut") - - val requestOut = Outlet[HttpRequest]("AlpnSwitch.requestOut") - val responseIn = Inlet[HttpResponse]("AlpnSwitch.responseIn") // --- end of outer ports --- - val shape: BidiShape[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest] = - BidiShape(responseIn, netOut, netIn, requestOut) + val shape: FlowShape[SslTlsInbound, SslTlsOutbound] = + FlowShape(netIn, netOut) def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { logic => @@ -44,9 +41,6 @@ private[http] object AlpnSwitch { // --- inner ports, bound to actual server in install call --- val serverDataIn = new SubSinkInlet[SslTlsOutbound]("ServerImpl.netIn") val serverDataOut = new SubSourceOutlet[SslTlsInbound]("ServerImpl.netOut") - - val serverRequestIn = new SubSinkInlet[HttpRequest]("ServerImpl.serverRequestIn") - val serverResponseOut = new SubSourceOutlet[HttpResponse]("ServerImpl.serverResponseOut") // --- end of inner ports --- override def preStart(): Unit = pull(netIn) @@ -55,7 +49,7 @@ private[http] object AlpnSwitch { def onPush(): Unit = grab(netIn) match { case first @ SessionBytes(session, bytes) => - val chosen = chosenProtocolAccessor() + val chosen = chosenProtocolAccessor(first) chosen match { case "h2" => install(http2Stack.addAttributes(HttpAttributes.tlsSessionInfo(session)), first) case _ => install(http1Stack, first) @@ -68,23 +62,17 @@ private[http] object AlpnSwitch { private val failPush = new InHandler { def onPush(): Unit = throw new IllegalStateException("Wasn't pulled yet") } setHandler(netOut, ignorePull) - setHandler(requestOut, ignorePull) - setHandler(responseIn, failPush) - def install(serverImplementation: HttpServerBidiFlow, firstElement: SslTlsInbound): Unit = { + def install(serverImplementation: HttpServerFlow, firstElement: SslTlsInbound): Unit = { val networkSide = Flow.fromSinkAndSource(serverDataIn.sink, serverDataOut.source) - val userSide = Flow.fromSinkAndSource(serverRequestIn.sink, serverResponseOut.source) connect(netIn, serverDataOut, Some(firstElement)) - connect(responseIn, serverResponseOut, None) connect(serverDataIn, netOut) - connect(serverRequestIn, requestOut) serverImplementation .addAttributes(inheritedAttributes) // propagate attributes to "real" server (such as HttpAttributes) .join(networkSide) - .join(userSide) .run()(interpreter.subFusingMaterializer) } diff --git a/akka-http2-support/src/main/scala/akka/http/impl/engine/http2/Http2Blueprint.scala b/akka-http2-support/src/main/scala/akka/http/impl/engine/http2/Http2Blueprint.scala index 7d172ebdc5f..529880022a8 100644 --- a/akka-http2-support/src/main/scala/akka/http/impl/engine/http2/Http2Blueprint.scala +++ b/akka-http2-support/src/main/scala/akka/http/impl/engine/http2/Http2Blueprint.scala @@ -11,6 +11,7 @@ import akka.http.impl.engine.http2.framing.{ Http2FrameParsing, Http2FrameRender import akka.http.impl.engine.http2.hpack.{ HeaderCompression, HeaderDecompression } import akka.http.impl.engine.parsing.HttpHeaderParser import akka.http.impl.util.StreamUtils +import akka.http.impl.util.LogByteStringTools.logTLSBidiBySetting import akka.http.scaladsl.model._ import akka.http.scaladsl.model.http2.Http2StreamIdHeader import akka.http.scaladsl.settings.{ Http2ServerSettings, ParserSettings, ServerSettings } @@ -20,6 +21,8 @@ import akka.util.ByteString import scala.concurrent.{ ExecutionContext, Future } import FrameEvent._ +import akka.http.scaladsl.Http2 +import akka.stream.TLSProtocol._ /** * Represents one direction of an Http2 substream. @@ -44,6 +47,11 @@ private[http2] final case class ChunkedHttp2SubStream( @InternalApi private[http] object Http2Blueprint { + def serverStackTls(settings: ServerSettings, log: LoggingAdapter): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] = + serverStack(settings, log) atop + unwrapTls atop + logTLSBidiBySetting("server-plain-text", settings.logUnencryptedNetworkBytes) + // format: OFF def serverStack(settings: ServerSettings, log: LoggingAdapter): BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, NotUsed] = httpLayer(settings, log) atop @@ -124,4 +132,9 @@ private[http] object Http2Blueprint { case ParserSettings.ErrorLoggingVerbosity.Simple => log.warning(info.summary) case ParserSettings.ErrorLoggingVerbosity.Full => log.warning(info.formatPretty) } + + private[http2] val unwrapTls: BidiFlow[ByteString, SslTlsOutbound, SslTlsInbound, ByteString, NotUsed] = + BidiFlow.fromFlows(Flow[ByteString].map(SendBytes), Flow[SslTlsInbound].collect { + case SessionBytes(_, bytes) => bytes + }) } diff --git a/akka-http2-support/src/main/scala/akka/http/impl/engine/http2/PriorKnowledgeSwitch.scala b/akka-http2-support/src/main/scala/akka/http/impl/engine/http2/PriorKnowledgeSwitch.scala deleted file mode 100644 index 2bf8a0e71d1..00000000000 --- a/akka-http2-support/src/main/scala/akka/http/impl/engine/http2/PriorKnowledgeSwitch.scala +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.http.impl.engine.http2 - -import javax.net.ssl.SSLException - -import akka.util.ByteString -import akka.NotUsed -import akka.annotation.InternalApi -import akka.http.impl.engine.server.HttpAttributes -import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } -import akka.stream.TLSProtocol.{ SessionBytes, SessionTruncated, SslTlsInbound, SslTlsOutbound } -import akka.stream.scaladsl.Flow -import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } -import akka.stream._ - -/** INTERNAL API */ -@InternalApi -private[http] object PriorKnowledgeSwitch { - type HttpServerFlow = Flow[ByteString, ByteString, NotUsed] - type HttpServerShape = FlowShape[ByteString, ByteString] - - private final val HTTP2_CONNECTION_PREFACE = ByteString("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") - - def apply( - http1Stack: HttpServerFlow, - http2Stack: HttpServerFlow): HttpServerFlow = - Flow.fromGraph( - new GraphStage[HttpServerShape] { - - // --- outer ports --- - val netIn = Inlet[ByteString]("PriorKnowledgeSwitch.netIn") - val netOut = Outlet[ByteString]("PriorKnowledgeSwitch.netOut") - // --- end of outer ports --- - - override val shape: HttpServerShape = - FlowShape(netIn, netOut) - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - logic => - - // --- inner ports, bound to actual server in install call --- - val serverDataIn = new SubSinkInlet[ByteString]("ServerImpl.netIn") - val serverDataOut = new SubSourceOutlet[ByteString]("ServerImpl.netOut") - // --- end of inner ports --- - - override def preStart(): Unit = pull(netIn) - - setHandler(netIn, new InHandler { - private[this] var grabbed = ByteString.empty - def onPush(): Unit = { - val data = grabbed ++ grab(netIn) - if (data.length >= HTTP2_CONNECTION_PREFACE.length) { // We should know by now - if (data.startsWith(HTTP2_CONNECTION_PREFACE, 0)) - install(http2Stack, data) - else - install(http1Stack, data) - } else if (data.isEmpty || data.startsWith(HTTP2_CONNECTION_PREFACE, 0)) { // Still unknown - grabbed = data - } else { // Not a Prior Knowledge request - install(http1Stack, data) - } - } - }) - - setHandler(netOut, new OutHandler { def onPull(): Unit = () }) // Ignore pull - - def install(serverImplementation: HttpServerFlow, firstElement: ByteString): Unit = { - connect(netIn, serverDataOut, Some(firstElement)) - connect(serverDataIn, netOut) - - serverImplementation - .addAttributes(inheritedAttributes) // propagate attributes to "real" server (such as HttpAttributes) - .join(Flow.fromSinkAndSource(serverDataIn.sink, serverDataOut.source)) // Network side - .run()(interpreter.subFusingMaterializer) - } - - // helpers to connect inlets and outlets also binding completion signals of given ports - def connect[T](in: Inlet[T], out: SubSourceOutlet[T], initialElement: Option[T]): Unit = { - - val firstElementHandler = { - val propagatePull = new OutHandler { override def onPull(): Unit = pull(in) } - - initialElement match { - case Some(ele) if out.isAvailable => - out.push(ele) - propagatePull - case Some(ele) => - new OutHandler { - override def onPull(): Unit = { - out.push(ele) - out.setHandler(propagatePull) - } - } - case None => propagatePull - } - } - - out.setHandler(firstElementHandler) - - setHandler(in, new InHandler { - override def onPush(): Unit = out.push(grab(in)) - - override def onUpstreamFinish(): Unit = { - out.complete() - super.onUpstreamFinish() - } - - override def onUpstreamFailure(ex: Throwable): Unit = { - out.fail(ex) - super.onUpstreamFailure(ex) - } - }) - - if (out.isAvailable) pull(in) // to account for lost pulls during initialization - } - - def connect[T](in: SubSinkInlet[T], out: Outlet[T]): Unit = { - val handler = new InHandler { - override def onPush(): Unit = push(out, in.grab()) - } - - val outHandler = new OutHandler { - override def onPull(): Unit = in.pull() - override def onDownstreamFinish(): Unit = { - in.cancel() - super.onDownstreamFinish() - } - } - - in.setHandler(handler) - setHandler(out, outHandler) - - if (isAvailable(out)) in.pull() // to account for lost pulls during initialization - } - } - } - ) -} diff --git a/akka-http2-support/src/main/scala/akka/http/scaladsl/Http2.scala b/akka-http2-support/src/main/scala/akka/http/scaladsl/Http2.scala index d773de5b381..5e8ab3e95d6 100644 --- a/akka-http2-support/src/main/scala/akka/http/scaladsl/Http2.scala +++ b/akka-http2-support/src/main/scala/akka/http/scaladsl/Http2.scala @@ -7,15 +7,14 @@ package akka.http.scaladsl import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider } import akka.dispatch.ExecutionContexts import akka.event.LoggingAdapter -import akka.http.impl.engine.http2.{ AlpnSwitch, Http2AlpnSupport, Http2Blueprint, PriorKnowledgeSwitch } +import akka.http.impl.engine.http2.{ AlpnSwitch, Http2AlpnSupport, Http2Blueprint, Http2Protocol } import akka.http.impl.engine.server.MasterServerTerminator -import akka.http.impl.util.LogByteStringTools.logTLSBidiBySetting import akka.http.scaladsl.Http.ServerBinding import akka.http.scaladsl.UseHttp2.{ Negotiated, Never, Always } import akka.http.scaladsl.model.{ HttpRequest, HttpResponse } import akka.http.scaladsl.settings.ServerSettings -import akka.stream.TLSProtocol.{ SendBytes, SessionBytes, SslTlsInbound, SslTlsOutbound } -import akka.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, TLS, TLSPlacebo, Tcp, RunnableGraph } +import akka.stream.TLSProtocol.{ SessionBytes, SslTlsInbound, SslTlsOutbound } +import akka.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, TLS, TLSPlacebo, Tcp } import akka.stream.{ IgnoreComplete, Materializer } import akka.util.ByteString import akka.{ Done, NotUsed } @@ -35,57 +34,6 @@ final class Http2Ext(private val config: Config)(implicit val system: ActorSyste val http = Http(system) - /** - * Handle requests using HTTP/2 immediately, without any TLS or negotiation layer. - */ - private def bindAndHandleWithoutNegotiation( - handler: HttpRequest => Future[HttpResponse], - interface: String, - port: Int, - settings: ServerSettings, - parallelism: Int, - log: LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] = { - if (parallelism == 1) - log.warning("HTTP/2 `bindAndHandleAsync` was called with default parallelism = 1. This means that request handling " + - "concurrency per connection is disabled. This is likely not what you want with HTTP/2.") - - val effectivePort = if (port >= 0) port else 80 - - val serverLayer: Flow[ByteString, ByteString, Future[Done]] = - Flow[HttpRequest] - .watchTermination()(Keep.right) - // FIXME: parallelism should maybe kept in track with SETTINGS_MAX_CONCURRENT_STREAMS so that we don't need - // to buffer requests that cannot be handled in parallel - .via(Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler)(system.dispatcher)) - .joinMat(Http2Blueprint.serverStack(settings, log))(Keep.left) - - createServerRunnableGraph(interface, effectivePort, settings, () => serverLayer, log).run() - } - - private def bindAndHandleConsiderPriorKnowledge( - handler: HttpRequest => Future[HttpResponse], - interface: String, - port: Int, - settings: ServerSettings, - parallelism: Int, - log: LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] = { - if (parallelism == 1) - log.warning("HTTP/2 `bindAndHandleConsiderPriorKnowledge` was called with default parallelism = 1. This means that request handling " + - "concurrency per connection is disabled. This is likely not what you want with HTTP/2.") - - val effectivePort = if (port >= 0) port else 80 - - val http2 = Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler)(system.dispatcher).joinMat(Http2Blueprint.serverStack(settings, log))(Keep.left) - val http1 = Flow[HttpRequest].mapAsync(parallelism)(handler).joinMat(http.serverLayer(settings, None, log) atop TLSPlacebo())(Keep.left) - - // FIXME: parallelism should maybe kept in track with SETTINGS_MAX_CONCURRENT_STREAMS so that we don't need - // to buffer requests that cannot be handled in parallel - val fullLayer: Flow[ByteString, ByteString, Future[Done]] = - Flow[ByteString].watchTermination()(Keep.right).viaMat(PriorKnowledgeSwitch(http1, http2))(Keep.left) - - createServerRunnableGraph(interface, effectivePort, settings, () => fullLayer, log).run() - } - // TODO: split up similarly to what `Http` does into `serverLayer`, `bindAndHandle`, etc. def bindAndHandleAsync( handler: HttpRequest => Future[HttpResponse], @@ -94,125 +42,105 @@ final class Http2Ext(private val config: Config)(implicit val system: ActorSyste settings: ServerSettings = ServerSettings(system), parallelism: Int = 1, log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] = { - connectionContext.http2 match { - case Never => throw new IllegalArgumentException("ConnectionContext HTTP2 support set to Never!") - case _ if connectionContext.isSecure => - bindAndHandleAsync(handler, interface, port, connectionContext.asInstanceOf[HttpsConnectionContext], settings, parallelism, log) - case Negotiated => - bindAndHandleConsiderPriorKnowledge(handler, interface, port, settings, parallelism, log) - case Always => - bindAndHandleWithoutNegotiation(handler, interface, port, settings, parallelism, log) - } - } - private def bindAndHandleAsync( - handler: HttpRequest => Future[HttpResponse], - interface: String, port: Int, - httpsContext: HttpsConnectionContext, - settings: ServerSettings, - parallelism: Int, - log: LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] = { - // automatically preserves association between request and response by setting the right headers, can use mapAsyncUnordered - - val effectivePort = if (port >= 0) port else 443 - - def http2Layer(): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] = - Http2Blueprint.serverStack(settings, log) atop - unwrapTls atop - logTLSBidiBySetting("server-plain-text", settings.logUnencryptedNetworkBytes) - - // Flow is not reusable because we need a side-channel to transport the protocol - // chosen by ALPN from the SSLEngine to the switching stage - def serverLayer(): BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, NotUsed] = { - // Mutable cell to transport the chosen protocol from the SSLEngine to - // the switch stage. - // Doesn't need to be volatile because there's a happens-before relationship (enforced by memory barriers) - // between the SSL handshake and sending out the first SessionBytes, and receiving the first SessionBytes - // and reading out the variable. - var chosenProtocol: Option[String] = None - def setChosenProtocol(protocol: String): Unit = - if (chosenProtocol.isEmpty) chosenProtocol = Some(protocol) - else throw new IllegalStateException("ChosenProtocol was set twice. Http2.serverLayer is not reusable.") - def getChosenProtocol(): String = chosenProtocol.getOrElse("h1") // default to http/1, e.g. when ALPN jar is missing - - var eng: Option[SSLEngine] = None - def createEngine(): SSLEngine = { - val engine = httpsContext.sslContext.createSSLEngine() - eng = Some(engine) - engine.setUseClientMode(false) - Http2AlpnSupport.applySessionParameters(engine, httpsContext.firstSession) - Http2AlpnSupport.enableForServer(engine, setChosenProtocol) - } - val tls = TLS(() => createEngine, _ => Success(()), IgnoreComplete) - - val removeEngineOnTerminate: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = { - implicit val ec = fm.executionContext - BidiFlow.fromFlows( - Flow[ByteString], - Flow[ByteString] - .watchTermination()((n, fd) => { - fd.onComplete(_ => eng.foreach(Http2AlpnSupport.cleanupForServer)) - n - }) - ) + val httpPlusSwitching: HttpPlusSwitching = + connectionContext.http2 match { + case Never => throw new IllegalArgumentException("ConnectionContext HTTP2 support set to Never!") + case _ if connectionContext.isSecure => httpsWithAlpn(connectionContext.asInstanceOf[HttpsConnectionContext], fm) + case Negotiated => priorKnowledge + case Always => onlyHttp2 } - AlpnSwitch(() => getChosenProtocol, http.serverLayer(settings, None, log), http2Layer()) atop - tls atop removeEngineOnTerminate - } + val effectivePort = + if (port >= 0) port + else if (connectionContext.isSecure) settings.defaultHttpsPort + else settings.defaultHttpPort - // Not reusable, see above. - def fullLayer(): Flow[ByteString, ByteString, Future[Done]] = Flow.fromGraph( - Flow[HttpRequest] - .watchTermination()(Keep.right) - // FIXME: parallelism should maybe kept in track with SETTINGS_MAX_CONCURRENT_STREAMS so that we don't need - // to buffer requests that cannot be handled in parallel - .via(Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler)(system.dispatcher)) - .joinMat(serverLayer())(Keep.left)) + val http1 = Flow[HttpRequest].mapAsync(parallelism)(handler).joinMat(http.serverLayer(settings, None, log))(Keep.left) + val http2 = Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler)(system.dispatcher).joinMat(Http2Blueprint.serverStackTls(settings, log))(Keep.left) + + val masterTerminator = new MasterServerTerminator(log) - createServerRunnableGraph(interface, effectivePort, settings, fullLayer _, log).run() + Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, halfClose = false, settings.timeouts.idleTimeout) + .mapAsyncUnordered(settings.maxConnections) { + incoming: Tcp.IncomingConnection => + try { + httpPlusSwitching(http1, http2).addAttributes(Http.prepareAttributes(settings, incoming)) + .watchTermination()(Keep.right) + .joinMat(incoming.flow)(Keep.left) + .run().recover { + // Ignore incoming errors from the connection as they will cancel the binding. + // As far as it is known currently, these errors can only happen if a TCP error bubbles up + // from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow. + // See https://github.com/akka/akka/issues/17992 + case NonFatal(ex) => + Done + }(ExecutionContexts.sameThreadExecutionContext) + } catch { + case NonFatal(e) => + log.error(e, "Could not materialize handling flow for {}", incoming) + throw e + } + }.mapMaterializedValue { + _.map(tcpBinding => ServerBinding(tcpBinding.localAddress)( + () => tcpBinding.unbind(), + timeout => masterTerminator.terminate(timeout)(fm.executionContext) + ))(fm.executionContext) + }.to(Sink.ignore).run() } - private def createServerRunnableGraph( - interface: String, port: Int, - settings: ServerSettings, - createLayer: () => Flow[ByteString, ByteString, Future[Done]], - log: LoggingAdapter)(implicit fm: Materializer): RunnableGraph[Future[ServerBinding]] = { + type HttpImplementation = Flow[SslTlsInbound, SslTlsOutbound, NotUsed] + type HttpPlusSwitching = (HttpImplementation, HttpImplementation) => Flow[ByteString, ByteString, NotUsed] - val connections = Tcp().bind(interface, port, settings.backlog, settings.socketOptions, halfClose = false, settings.timeouts.idleTimeout) + def onlyHttp2(http1: HttpImplementation, http2: HttpImplementation): Flow[ByteString, ByteString, NotUsed] = + TLSPlacebo().reversed join http2 - val masterTerminator = new MasterServerTerminator(log) + def priorKnowledge(http1: HttpImplementation, http2: HttpImplementation): Flow[ByteString, ByteString, NotUsed] = { + def chooseProtocol(sessionBytes: SessionBytes): String = + if (sessionBytes.bytes.startsWith(Http2Protocol.ClientConnectionPreface)) "h2" else "http/1.1" - connections.mapAsyncUnordered(settings.maxConnections) { - incoming: Tcp.IncomingConnection => - try { - createLayer().addAttributes(Http.prepareAttributes(settings, incoming)).joinMat(incoming.flow)(Keep.left) - .run().recover { - // Ignore incoming errors from the connection as they will cancel the binding. - // As far as it is known currently, these errors can only happen if a TCP error bubbles up - // from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow. - // See https://github.com/akka/akka/issues/17992 - case NonFatal(ex) => - Done - }(ExecutionContexts.sameThreadExecutionContext) - } catch { - case NonFatal(e) => - log.error(e, "Could not materialize handling flow for {}", incoming) - throw e - } - }.mapMaterializedValue { - _.map(tcpBinding => ServerBinding(tcpBinding.localAddress)( - () => tcpBinding.unbind(), - timeout => masterTerminator.terminate(timeout)(fm.executionContext) - ))(fm.executionContext) - }.to(Sink.ignore) + TLSPlacebo().reversed join + AlpnSwitch(chooseProtocol, http1, http2) } - private val unwrapTls: BidiFlow[ByteString, SslTlsOutbound, SslTlsInbound, ByteString, NotUsed] = - BidiFlow.fromFlows(Flow[ByteString].map(SendBytes), Flow[SslTlsInbound].collect { - case SessionBytes(_, bytes) => bytes - }) + def httpsWithAlpn(httpsContext: HttpsConnectionContext, fm: Materializer)(http1: HttpImplementation, http2: HttpImplementation): Flow[ByteString, ByteString, NotUsed] = { + // Mutable cell to transport the chosen protocol from the SSLEngine to + // the switch stage. + // Doesn't need to be volatile because there's a happens-before relationship (enforced by memory barriers) + // between the SSL handshake and sending out the first SessionBytes, and receiving the first SessionBytes + // and reading out the variable. + var chosenProtocol: Option[String] = None + def setChosenProtocol(protocol: String): Unit = + if (chosenProtocol.isEmpty) chosenProtocol = Some(protocol) + else throw new IllegalStateException("ChosenProtocol was set twice. Http2.serverLayer is not reusable.") + def getChosenProtocol(): String = chosenProtocol.getOrElse("h1") // default to http/1, e.g. when ALPN jar is missing + + var eng: Option[SSLEngine] = None + def createEngine(): SSLEngine = { + val engine = httpsContext.sslContext.createSSLEngine() + eng = Some(engine) + engine.setUseClientMode(false) + Http2AlpnSupport.applySessionParameters(engine, httpsContext.firstSession) + Http2AlpnSupport.enableForServer(engine, setChosenProtocol) + } + val tls = TLS(() => createEngine, _ => Success(()), IgnoreComplete) + + val removeEngineOnTerminate: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = { + implicit val ec = fm.executionContext + BidiFlow.fromFlows( + Flow[ByteString], + Flow[ByteString] + .watchTermination()((n, fd) => { + fd.onComplete(_ => eng.foreach(Http2AlpnSupport.cleanupForServer)) + n + }) + ) + } + AlpnSwitch(_ => getChosenProtocol(), http1, http2) join + tls join + removeEngineOnTerminate + } } object Http2 extends ExtensionId[Http2Ext] with ExtensionIdProvider {