forked from akka/akka-http
/
Http2.scala
224 lines (192 loc) · 10.6 KB
/
Http2.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.http.scaladsl
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
import akka.dispatch.ExecutionContexts
import akka.event.LoggingAdapter
import akka.http.impl.engine.http2.{ AlpnSwitch, Http2AlpnSupport, Http2Blueprint, PriorKnowledgeSwitch }
import akka.http.impl.engine.server.MasterServerTerminator
import akka.http.impl.util.LogByteStringTools.logTLSBidiBySetting
import akka.http.scaladsl.Http.ServerBinding
import akka.http.scaladsl.UseHttp2.{ Negotiated, Never, Always }
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.http.scaladsl.settings.ServerSettings
import akka.stream.TLSProtocol.{ SendBytes, SessionBytes, SslTlsInbound, SslTlsOutbound }
import akka.stream.scaladsl.{ BidiFlow, Flow, Keep, Sink, TLS, TLSPlacebo, Tcp, RunnableGraph }
import akka.stream.{ IgnoreComplete, Materializer }
import akka.util.ByteString
import akka.{ Done, NotUsed }
import com.typesafe.config.Config
import javax.net.ssl.SSLEngine
import scala.concurrent.Future
import scala.util.Success
import scala.util.control.NonFatal
/** Entry point for Http/2 server */
final class Http2Ext(private val config: Config)(implicit val system: ActorSystem)
extends akka.actor.Extension {
// FIXME: won't having the same package as top-level break osgi?
private[this] final val DefaultPortForProtocol = -1 // any negative value
val http = Http(system)
/**
* Handle requests using HTTP/2 immediately, without any TLS or negotiation layer.
*/
private def bindAndHandleWithoutNegotiation(
handler: HttpRequest => Future[HttpResponse],
interface: String,
port: Int,
settings: ServerSettings,
parallelism: Int,
log: LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] = {
if (parallelism == 1)
log.warning("HTTP/2 `bindAndHandleAsync` was called with default parallelism = 1. This means that request handling " +
"concurrency per connection is disabled. This is likely not what you want with HTTP/2.")
val effectivePort = if (port >= 0) port else 80
val serverLayer: Flow[ByteString, ByteString, Future[Done]] =
Flow[HttpRequest]
.watchTermination()(Keep.right)
// FIXME: parallelism should maybe kept in track with SETTINGS_MAX_CONCURRENT_STREAMS so that we don't need
// to buffer requests that cannot be handled in parallel
.via(Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler)(system.dispatcher))
.joinMat(Http2Blueprint.serverStack(settings, log))(Keep.left)
createServerRunnableGraph(interface, effectivePort, settings, () => serverLayer, log).run()
}
private def bindAndHandleConsiderPriorKnowledge(
handler: HttpRequest => Future[HttpResponse],
interface: String,
port: Int,
settings: ServerSettings,
parallelism: Int,
log: LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] = {
if (parallelism == 1)
log.warning("HTTP/2 `bindAndHandleConsiderPriorKnowledge` was called with default parallelism = 1. This means that request handling " +
"concurrency per connection is disabled. This is likely not what you want with HTTP/2.")
val effectivePort = if (port >= 0) port else 80
val http2 = Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler)(system.dispatcher).joinMat(Http2Blueprint.serverStack(settings, log))(Keep.left)
val http1 = Flow[HttpRequest].mapAsync(parallelism)(handler).joinMat(http.serverLayer(settings, None, log) atop TLSPlacebo())(Keep.left)
// FIXME: parallelism should maybe kept in track with SETTINGS_MAX_CONCURRENT_STREAMS so that we don't need
// to buffer requests that cannot be handled in parallel
val fullLayer: Flow[ByteString, ByteString, Future[Done]] =
Flow[ByteString].watchTermination()(Keep.right).viaMat(PriorKnowledgeSwitch(http1, http2))(Keep.left)
createServerRunnableGraph(interface, effectivePort, settings, () => fullLayer, log).run()
}
// TODO: split up similarly to what `Http` does into `serverLayer`, `bindAndHandle`, etc.
def bindAndHandleAsync(
handler: HttpRequest => Future[HttpResponse],
interface: String, port: Int = DefaultPortForProtocol,
connectionContext: ConnectionContext,
settings: ServerSettings = ServerSettings(system),
parallelism: Int = 1,
log: LoggingAdapter = system.log)(implicit fm: Materializer): Future[ServerBinding] = {
connectionContext.http2 match {
case Never => throw new IllegalArgumentException("ConnectionContext HTTP2 support set to Never!")
case _ if connectionContext.isSecure =>
bindAndHandleAsync(handler, interface, port, connectionContext.asInstanceOf[HttpsConnectionContext], settings, parallelism, log)
case Negotiated =>
bindAndHandleConsiderPriorKnowledge(handler, interface, port, settings, parallelism, log)
case Always =>
bindAndHandleWithoutNegotiation(handler, interface, port, settings, parallelism, log)
}
}
private def bindAndHandleAsync(
handler: HttpRequest => Future[HttpResponse],
interface: String, port: Int,
httpsContext: HttpsConnectionContext,
settings: ServerSettings,
parallelism: Int,
log: LoggingAdapter)(implicit fm: Materializer): Future[ServerBinding] = {
// automatically preserves association between request and response by setting the right headers, can use mapAsyncUnordered
val effectivePort = if (port >= 0) port else 443
def http2Layer(): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
Http2Blueprint.serverStack(settings, log) atop
unwrapTls atop
logTLSBidiBySetting("server-plain-text", settings.logUnencryptedNetworkBytes)
// Flow is not reusable because we need a side-channel to transport the protocol
// chosen by ALPN from the SSLEngine to the switching stage
def serverLayer(): BidiFlow[HttpResponse, ByteString, ByteString, HttpRequest, NotUsed] = {
// Mutable cell to transport the chosen protocol from the SSLEngine to
// the switch stage.
// Doesn't need to be volatile because there's a happens-before relationship (enforced by memory barriers)
// between the SSL handshake and sending out the first SessionBytes, and receiving the first SessionBytes
// and reading out the variable.
var chosenProtocol: Option[String] = None
def setChosenProtocol(protocol: String): Unit =
if (chosenProtocol.isEmpty) chosenProtocol = Some(protocol)
else throw new IllegalStateException("ChosenProtocol was set twice. Http2.serverLayer is not reusable.")
def getChosenProtocol(): String = chosenProtocol.getOrElse("h1") // default to http/1, e.g. when ALPN jar is missing
var eng: Option[SSLEngine] = None
def createEngine(): SSLEngine = {
val engine = httpsContext.sslContext.createSSLEngine()
eng = Some(engine)
engine.setUseClientMode(false)
Http2AlpnSupport.applySessionParameters(engine, httpsContext.firstSession)
Http2AlpnSupport.enableForServer(engine, setChosenProtocol)
}
val tls = TLS(() => createEngine, _ => Success(()), IgnoreComplete)
val removeEngineOnTerminate: BidiFlow[ByteString, ByteString, ByteString, ByteString, NotUsed] = {
implicit val ec = fm.executionContext
BidiFlow.fromFlows(
Flow[ByteString],
Flow[ByteString]
.watchTermination()((n, fd) => {
fd.onComplete(_ => eng.foreach(Http2AlpnSupport.cleanupForServer))
n
})
)
}
AlpnSwitch(() => getChosenProtocol, http.serverLayer(settings, None, log), http2Layer()) atop
tls atop removeEngineOnTerminate
}
// Not reusable, see above.
def fullLayer(): Flow[ByteString, ByteString, Future[Done]] = Flow.fromGraph(
Flow[HttpRequest]
.watchTermination()(Keep.right)
// FIXME: parallelism should maybe kept in track with SETTINGS_MAX_CONCURRENT_STREAMS so that we don't need
// to buffer requests that cannot be handled in parallel
.via(Http2Blueprint.handleWithStreamIdHeader(parallelism)(handler)(system.dispatcher))
.joinMat(serverLayer())(Keep.left))
createServerRunnableGraph(interface, effectivePort, settings, fullLayer _, log).run()
}
private def createServerRunnableGraph(
interface: String, port: Int,
settings: ServerSettings,
createLayer: () => Flow[ByteString, ByteString, Future[Done]],
log: LoggingAdapter)(implicit fm: Materializer): RunnableGraph[Future[ServerBinding]] = {
val connections = Tcp().bind(interface, port, settings.backlog, settings.socketOptions, halfClose = false, settings.timeouts.idleTimeout)
val masterTerminator = new MasterServerTerminator(log)
connections.mapAsyncUnordered(settings.maxConnections) {
incoming: Tcp.IncomingConnection =>
try {
createLayer().addAttributes(Http.prepareAttributes(settings, incoming)).joinMat(incoming.flow)(Keep.left)
.run().recover {
// Ignore incoming errors from the connection as they will cancel the binding.
// As far as it is known currently, these errors can only happen if a TCP error bubbles up
// from the TCP layer through the HTTP layer to the Http.IncomingConnection.flow.
// See https://github.com/akka/akka/issues/17992
case NonFatal(ex) =>
Done
}(ExecutionContexts.sameThreadExecutionContext)
} catch {
case NonFatal(e) =>
log.error(e, "Could not materialize handling flow for {}", incoming)
throw e
}
}.mapMaterializedValue {
_.map(tcpBinding => ServerBinding(tcpBinding.localAddress)(
() => tcpBinding.unbind(),
timeout => masterTerminator.terminate(timeout)(fm.executionContext)
))(fm.executionContext)
}.to(Sink.ignore)
}
private val unwrapTls: BidiFlow[ByteString, SslTlsOutbound, SslTlsInbound, ByteString, NotUsed] =
BidiFlow.fromFlows(Flow[ByteString].map(SendBytes), Flow[SslTlsInbound].collect {
case SessionBytes(_, bytes) => bytes
})
}
object Http2 extends ExtensionId[Http2Ext] with ExtensionIdProvider {
override def get(system: ActorSystem): Http2Ext = super.get(system)
def apply()(implicit system: ActorSystem): Http2Ext = super.apply(system)
def lookup(): ExtensionId[_ <: Extension] = Http2
def createExtension(system: ExtendedActorSystem): Http2Ext =
new Http2Ext(system.settings.config getConfig "akka.http")(system)
}