Skip to content

Commit

Permalink
=htc Support UseHttp2.Negotiated for http2 connections over plain h…
Browse files Browse the repository at this point in the history
…ttp (akka#2543)

By looking ahead and deciding on the prior knowledge preface a
potential HTTP/2 client sends whether to use HTTP/1.1 or HTTP/2.

Co-Authored-By: Arnout Engelen <github@bzzt.net>
Co-Authored-By: Johannes Rudolph <johannes.rudolph@gmail.com>
  • Loading branch information
3 people authored and franktominc committed Jun 12, 2019
1 parent 9c3274a commit c00ec4c
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 51 deletions.
8 changes: 4 additions & 4 deletions akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala
Expand Up @@ -21,7 +21,7 @@ import akka.http.impl.engine.server._
import akka.http.impl.engine.ws.WebSocketClientBlueprint
import akka.http.impl.settings.{ ConnectionPoolSetup, HostConnectionPoolSetup }
import akka.http.impl.util.StreamUtils
import akka.http.scaladsl.UseHttp2.{ Always, Never }
import akka.http.scaladsl.UseHttp2.{ Always, Negotiated, Never }
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.headers.Host
import akka.http.scaladsl.model.ws.{ Message, WebSocketRequest, WebSocketUpgradeResponse }
Expand Down Expand Up @@ -318,11 +318,11 @@ class HttpExt private[http] (private val config: Config)(implicit val system: Ex
settings: ServerSettings = ServerSettings(system),
parallelism: Int = 0,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] = {
val http2Enabled = settings.previewServerSettings.enableHttp2 && connectionContext.http2 != Never
val http2Enabled = settings.previewServerSettings.enableHttp2
val http2Forced = connectionContext.http2 == Always
if (http2Enabled && (connectionContext.isSecure || http2Forced)) {
if (http2Enabled && connectionContext.http2 != Never) {
// We do not support HTTP/2 negotiation for insecure connections (h2c), https://github.com/akka/akka-http/issues/1966
log.debug("Binding server using HTTP/2{}", if (http2Forced) " (forced to be used without TLS)" else "")
log.debug("Binding server using HTTP/2{}", if (http2Forced && !connectionContext.isSecure) " (forced to be used without TLS)" else "")

val definitiveSettings =
if (parallelism > 0) settings.mapHttp2Settings(_.withMaxConcurrentStreams(parallelism))
Expand Down
@@ -0,0 +1,141 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.http.impl.engine.http2

import javax.net.ssl.SSLException

import akka.util.ByteString
import akka.NotUsed
import akka.annotation.InternalApi
import akka.http.impl.engine.server.HttpAttributes
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.stream.TLSProtocol.{ SessionBytes, SessionTruncated, SslTlsInbound, SslTlsOutbound }
import akka.stream.scaladsl.Flow
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
import akka.stream._

/** INTERNAL API */
@InternalApi
private[http] object PriorKnowledgeSwitch {
type HttpServerFlow = Flow[ByteString, ByteString, NotUsed]
type HttpServerShape = FlowShape[ByteString, ByteString]

private final val HTTP2_CONNECTION_PREFACE = ByteString("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")

def apply(
http1Stack: HttpServerFlow,
http2Stack: HttpServerFlow): HttpServerFlow =
Flow.fromGraph(
new GraphStage[HttpServerShape] {

// --- outer ports ---
val netIn = Inlet[ByteString]("PriorKnowledgeSwitch.netIn")
val netOut = Outlet[ByteString]("PriorKnowledgeSwitch.netOut")
// --- end of outer ports ---

override val shape: HttpServerShape =
FlowShape(netIn, netOut)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
logic =>

// --- inner ports, bound to actual server in install call ---
val serverDataIn = new SubSinkInlet[ByteString]("ServerImpl.netIn")
val serverDataOut = new SubSourceOutlet[ByteString]("ServerImpl.netOut")
// --- end of inner ports ---

override def preStart(): Unit = pull(netIn)

setHandler(netIn, new InHandler {
private[this] var grabbed = ByteString.empty
def onPush(): Unit = {
val data = grabbed ++ grab(netIn)
if (data.length >= HTTP2_CONNECTION_PREFACE.length) { // We should know by now
if (data.startsWith(HTTP2_CONNECTION_PREFACE, 0))
install(http2Stack, data)
else
install(http1Stack, data)
} else if (data.isEmpty || data.startsWith(HTTP2_CONNECTION_PREFACE, 0)) { // Still unknown
grabbed = data
} else { // Not a Prior Knowledge request
install(http1Stack, data)
}
}
})

setHandler(netOut, new OutHandler { def onPull(): Unit = () }) // Ignore pull

def install(serverImplementation: HttpServerFlow, firstElement: ByteString): Unit = {
connect(netIn, serverDataOut, Some(firstElement))
connect(serverDataIn, netOut)

serverImplementation
.addAttributes(inheritedAttributes) // propagate attributes to "real" server (such as HttpAttributes)
.join(Flow.fromSinkAndSource(serverDataIn.sink, serverDataOut.source)) // Network side
.run()(interpreter.subFusingMaterializer)
}

// helpers to connect inlets and outlets also binding completion signals of given ports
def connect[T](in: Inlet[T], out: SubSourceOutlet[T], initialElement: Option[T]): Unit = {

val firstElementHandler = {
val propagatePull = new OutHandler { override def onPull(): Unit = pull(in) }

initialElement match {
case Some(ele) if out.isAvailable =>
out.push(ele)
propagatePull
case Some(ele) =>
new OutHandler {
override def onPull(): Unit = {
out.push(ele)
out.setHandler(propagatePull)
}
}
case None => propagatePull
}
}

out.setHandler(firstElementHandler)

setHandler(in, new InHandler {
override def onPush(): Unit = out.push(grab(in))

override def onUpstreamFinish(): Unit = {
out.complete()
super.onUpstreamFinish()
}

override def onUpstreamFailure(ex: Throwable): Unit = {
out.fail(ex)
super.onUpstreamFailure(ex)
}
})

if (out.isAvailable) pull(in) // to account for lost pulls during initialization
}

def connect[T](in: SubSinkInlet[T], out: Outlet[T]): Unit = {
val handler = new InHandler {
override def onPush(): Unit = push(out, in.grab())
}

val outHandler = new OutHandler {
override def onPull(): Unit = in.pull()
override def onDownstreamFinish(): Unit = {
in.cancel()
super.onDownstreamFinish()
}
}

in.setHandler(handler)
setHandler(out, outHandler)

if (isAvailable(out)) in.pull() // to account for lost pulls during initialization
}
}
}
)
}
91 changes: 48 additions & 43 deletions akka-http2-support/src/main/scala/akka/http/scaladsl/Http2.scala
Expand Up @@ -7,15 +7,15 @@ package akka.http.scaladsl
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.dispatch.ExecutionContexts
import akka.event.LoggingAdapter
import akka.http.impl.engine.http2.{ AlpnSwitch, Http2AlpnSupport, Http2Blueprint }
import akka.http.impl.engine.http2.{ AlpnSwitch, Http2AlpnSupport, Http2Blueprint, PriorKnowledgeSwitch }
import akka.http.impl.engine.server.MasterServerTerminator
import akka.http.impl.util.LogByteStringTools.logTLSBidiBySetting
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.UseHttp2.{ Negotiated, Never }
import akka.http.scaladsl.UseHttp2.{ Negotiated, Never, Always }
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.http.scaladsl.settings.ServerSettings
import akka.stream.TLSProtocol.{ SendBytes, SessionBytes, SslTlsInbound, SslTlsOutbound }
import akka.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, TLS, Tcp }
import akka.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, TLS, TLSPlacebo, Tcp, RunnableGraph }
import akka.stream.{ IgnoreComplete, Materializer }
import akka.util.ByteString
import akka.{ Done, NotUsed }
Expand Down Expand Up @@ -51,60 +51,56 @@ final class Http2Ext(private val config: Config)(implicit val system: ActorSyste

val effectivePort = if (port >= 0) port else 80

val serverLayer: Flow[ByteString, ByteString, Future[Done]] = Flow.fromGraph(
val serverLayer: Flow[ByteString, ByteString, Future[Done]] =
Flow[HttpRequest]
.watchTermination()(Keep.right)
// FIXME: parallelism should maybe kept in track with SETTINGS_MAX_CONCURRENT_STREAMS so that we don't need
// to buffer requests that cannot be handled in parallel
.via(Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler)(system.dispatcher))
.joinMat(Http2Blueprint.serverStack(settings, log))(Keep.left))
.joinMat(Http2Blueprint.serverStack(settings, log))(Keep.left)

val connections = Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, halfClose = false, settings.timeouts.idleTimeout)
createServerRunnableGraph(interface, effectivePort, settings, () => serverLayer, log).run()
}

val masterTerminator = new MasterServerTerminator(log)
private def bindAndHandleConsiderPriorKnowledge(
handler: HttpRequest => Future[HttpResponse],
interface: String,
port: Int,
settings: ServerSettings,
parallelism: Int,
log: LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] = {
if (parallelism == 1)
log.warning("HTTP/2 `bindAndHandleConsiderPriorKnowledge` was called with default parallelism = 1. This means that request handling " +
"concurrency per connection is disabled. This is likely not what you want with HTTP/2.")

connections.mapAsyncUnordered(settings.maxConnections) {
incoming: Tcp.IncomingConnection =>
try {
serverLayer.addAttributes(Http.prepareAttributes(settings, incoming)).joinMat(incoming.flow)(Keep.left)
.run().recover {
// Ignore incoming errors from the connection as they will cancel the binding.
// As far as it is known currently, these errors can only happen if a TCP error bubbles up
// from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow.
// See https://github.com/akka/akka/issues/17992
case NonFatal(ex) =>
Done
}(ExecutionContexts.sameThreadExecutionContext)
} catch {
case NonFatal(e) =>
log.error(e, "Could not materialize handling flow for {}", incoming)
throw e
}
}.mapMaterializedValue {
_.map(tcpBinding => ServerBinding(tcpBinding.localAddress)(
() => tcpBinding.unbind(),
timeout => masterTerminator.terminate(timeout)(fm.executionContext)
))(fm.executionContext)
}.to(Sink.ignore).run()
val effectivePort = if (port >= 0) port else 80

val http2 = Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler)(system.dispatcher).joinMat(Http2Blueprint.serverStack(settings, log))(Keep.left)
val http1 = Flow[HttpRequest].mapAsync(parallelism)(handler).joinMat(http.serverLayer(settings, None, log) atop TLSPlacebo())(Keep.left)

// FIXME: parallelism should maybe kept in track with SETTINGS_MAX_CONCURRENT_STREAMS so that we don't need
// to buffer requests that cannot be handled in parallel
val fullLayer: Flow[ByteString, ByteString, Future[Done]] =
Flow[ByteString].watchTermination()(Keep.right).viaMat(PriorKnowledgeSwitch(http1, http2))(Keep.left)

createServerRunnableGraph(interface, effectivePort, settings, () => fullLayer, log).run()
}

// TODO: split up similarly to what `Http` does into `serverLayer`, `bindAndHandle`, etc.
def bindAndHandleAsync(
handler: HttpRequest => Future[HttpResponse],
interface: String, port: Int = DefaultPortForProtocol,
connectionContext: ConnectionContext,
settings: ServerSettings = ServerSettings(system),
parallelism: Int = 1,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] = {
// TODO: split up similarly to what `Http` does into `serverLayer`, `bindAndHandle`, etc.
require(connectionContext.http2 != Never)

if (connectionContext.isSecure) {
bindAndHandleAsync(handler, interface, port, connectionContext.asInstanceOf[HttpsConnectionContext], settings, parallelism, log)
} else {
if (connectionContext.http2 == Negotiated)
// https://github.com/akka/akka-http/issues/1966
throw new NotImplementedError("h2c not supported")
else
connectionContext.http2 match {
case Never => throw new IllegalArgumentException("ConnectionContext HTTP2 support set to Never!")
case _ if connectionContext.isSecure =>
bindAndHandleAsync(handler, interface, port, connectionContext.asInstanceOf[HttpsConnectionContext], settings, parallelism, log)
case Negotiated =>
bindAndHandleConsiderPriorKnowledge(handler, interface, port, settings, parallelism, log)
case Always =>
bindAndHandleWithoutNegotiation(handler, interface, port, settings, parallelism, log)
}
}
Expand Down Expand Up @@ -174,14 +170,23 @@ final class Http2Ext(private val config: Config)(implicit val system: ActorSyste
.via(Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler)(system.dispatcher))
.joinMat(serverLayer())(Keep.left))

val connections = Tcp().bind(interface, effectivePort, settings.backlog, settings.socketOptions, halfClose = false, settings.timeouts.idleTimeout)
createServerRunnableGraph(interface, effectivePort, settings, fullLayer _, log).run()
}

private def createServerRunnableGraph(
interface: String, port: Int,
settings: ServerSettings,
createLayer: () => Flow[ByteString, ByteString, Future[Done]],
log: LoggingAdapter)(implicit fm: Materializer): RunnableGraph[Future[ServerBinding]] = {

val connections = Tcp().bind(interface, port, settings.backlog, settings.socketOptions, halfClose = false, settings.timeouts.idleTimeout)

val masterTerminator = new MasterServerTerminator(log)

connections.mapAsyncUnordered(settings.maxConnections) {
incoming: Tcp.IncomingConnection =>
try {
fullLayer().addAttributes(Http.prepareAttributes(settings, incoming)).joinMat(incoming.flow)(Keep.left)
createLayer().addAttributes(Http.prepareAttributes(settings, incoming)).joinMat(incoming.flow)(Keep.left)
.run().recover {
// Ignore incoming errors from the connection as they will cancel the binding.
// As far as it is known currently, these errors can only happen if a TCP error bubbles up
Expand All @@ -200,7 +205,7 @@ final class Http2Ext(private val config: Config)(implicit val system: ActorSyste
() => tcpBinding.unbind(),
timeout => masterTerminator.terminate(timeout)(fm.executionContext)
))(fm.executionContext)
}.to(Sink.ignore).run()
}.to(Sink.ignore)
}

private val unwrapTls: BidiFlow[ByteString, SslTlsOutbound, SslTlsInbound, ByteString, NotUsed] =
Expand Down
@@ -0,0 +1,82 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.http.impl.engine.http2

import akka.http.scaladsl.model.{ HttpResponse, StatusCodes }
import akka.http.scaladsl.{ Http, HttpConnectionContext, UseHttp2 }
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, HttpProtocols }
import akka.stream.ActorMaterializer
import java.util.Base64
import akka.stream.scaladsl.{ Sink, Source, Tcp }
import akka.testkit.AkkaSpec
import akka.util.ByteString

import scala.concurrent.Future
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ SinkQueue, Keep }

class WithPriorKnowledgeSpec extends AkkaSpec("""
akka.loglevel = warning
akka.loggers = ["akka.testkit.TestEventListener"]
akka.http.server.preview.enable-http2 = on
""") {

implicit val ec = system.dispatcher

"An HTTP server with PriorKnowledge" should {
implicit val mat = ActorMaterializer()

val binding = Http().bindAndHandleAsync(
_ Future.successful(HttpResponse(status = StatusCodes.ImATeapot)),
"127.0.0.1",
port = 0,
HttpConnectionContext(UseHttp2.Negotiated)
).futureValue

"respond to cleartext HTTP/1.1 requests with cleartext HTTP/1.1" in {
val (host, port) = (binding.localAddress.getHostName, binding.localAddress.getPort)
val responseFuture: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = s"http://$host:$port"))
val response = responseFuture.futureValue
response.protocol should be(HttpProtocols.`HTTP/1.1`)
response.status should be(StatusCodes.ImATeapot)
}

"respond to cleartext HTTP/2 requests with cleartext HTTP/2" in {
val (host, port) = (binding.localAddress.getHostName, binding.localAddress.getPort)

val (source, sink) =
Source.queue[String](1000, OverflowStrategy.fail)
.map(str => ByteString(Base64.getDecoder.decode(str)))
.via(Tcp().outgoingConnection(host, port))
.toMat(Sink.queue())(Keep.both)
.run()

// Obtained by converting the input request bytes from curl with --http2-prior-knowledge
// This includes port 9009 as 'authority', which our server accepts.
source.offer("UFJJICogSFRUUC8yLjANCg0KU00NCg0KAAASBAAAAAAAAAMAAABkAARAAAAAAAIAAAAAAAAECAAAAAAAP/8AAQAAHgEFAAAAAYKEhkGKCJ1cC4Fw3HwAf3qIJbZQw6u20uBTAyovKg==").futureValue

// read settings frame
Http2Protocol.FrameType.byId(sink.pull().futureValue.get(3)) should be(Http2Protocol.FrameType.SETTINGS)
// read settings frame
Http2Protocol.FrameType.byId(sink.pull().futureValue.get(3)) should be(Http2Protocol.FrameType.SETTINGS)
// ack settings
source.offer("AAAABAEAAAAA")

val response = readSink(sink).futureValue
val tpe = Http2Protocol.FrameType.byId(response(3))
tpe should be(Http2Protocol.FrameType.HEADERS)
response.map(_.toChar).mkString should include("418")
}
}

private def readSink(sink: SinkQueue[ByteString]): Future[ByteString] = {
sink.pull().flatMap {
case Some(bytes) if bytes.isEmpty =>
readSink(sink)
case Some(bytes) =>
Future.successful(bytes)
}
}
}

0 comments on commit c00ec4c

Please sign in to comment.