Skip to content

Commit

Permalink
Merge pull request #1464 from jrudolph/jr/w/dont-require-materializer
Browse files Browse the repository at this point in the history
In top-level API don't require materializer where it's just ignored afterwards
  • Loading branch information
jrudolph committed Oct 11, 2017
2 parents c0fb7b6 + 1f6f3fb commit cf06b03
Show file tree
Hide file tree
Showing 5 changed files with 296 additions and 76 deletions.
Expand Up @@ -20,9 +20,15 @@ private[akka] object HttpAttributes {

private[akka] final case class RemoteAddress(address: InetSocketAddress) extends Attribute

private[akka] def remoteAddress(address: InetSocketAddress) =
private[akka] def remoteAddress(address: Option[InetSocketAddress]): Attributes =
address match {
case Some(addr) remoteAddress(addr)
case None empty
}

private[akka] def remoteAddress(address: InetSocketAddress): Attributes =
Attributes(RemoteAddress(address))

private[akka] val empty =
private[akka] val empty: Attributes =
Attributes()
}
209 changes: 171 additions & 38 deletions akka-http-core/src/main/scala/akka/http/javadsl/Http.scala
Expand Up @@ -48,17 +48,15 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* Constructs a server layer stage using the configured default [[akka.http.javadsl.settings.ServerSettings]]. The returned [[akka.stream.javadsl.BidiFlow]] isn't
* reusable and can only be materialized once.
*/
def serverLayer(materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayer()(materializer))
def serverLayer(): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayerImpl())

/**
* Constructs a server layer stage using the given [[akka.http.javadsl.settings.ServerSettings]]. The returned [[akka.stream.javadsl.BidiFlow]] isn't reusable and
* can only be materialized once.
*/
def serverLayer(
settings: ServerSettings,
materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayer(settings.asScala)(materializer))
def serverLayer(settings: ServerSettings): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayerImpl(settings.asScala))

/**
* Constructs a server layer stage using the given [[akka.http.javadsl.settings.ServerSettings]]. The returned [[akka.stream.javadsl.BidiFlow]] isn't reusable and
Expand All @@ -67,21 +65,56 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
*/
def serverLayer(
settings: ServerSettings,
remoteAddress: Optional[InetSocketAddress],
materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayer(settings.asScala, remoteAddress.asScala)(materializer))
remoteAddress: Optional[InetSocketAddress]): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayerImpl(settings.asScala, remoteAddress.asScala))

/**
* Constructs a server layer stage using the given [[ServerSettings]]. The returned [[akka.stream.javadsl.BidiFlow]] isn't reusable and
* can only be materialized once. The remoteAddress, if provided, will be added as a header to each [[HttpRequest]]
* this layer produces if the `akka.http.server.remote-address-header` configuration option is enabled.
*/
def serverLayer(
settings: ServerSettings,
remoteAddress: Optional[InetSocketAddress],
log: LoggingAdapter): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayerImpl(settings.asScala, remoteAddress.asScala, log))

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def serverLayer(materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayerImpl())

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def serverLayer(
settings: ServerSettings,
materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayerImpl(settings.asScala))

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def serverLayer(
settings: ServerSettings,
remoteAddress: Optional[InetSocketAddress],
materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayerImpl(settings.asScala, remoteAddress.asScala))

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def serverLayer(
settings: ServerSettings,
remoteAddress: Optional[InetSocketAddress],
log: LoggingAdapter,
materializer: Materializer): BidiFlow[HttpResponse, SslTlsOutbound, SslTlsInbound, HttpRequest, NotUsed] =
adaptServerLayer(delegate.serverLayer(settings.asScala, remoteAddress.asScala, log)(materializer))
adaptServerLayer(delegate.serverLayerImpl(settings.asScala, remoteAddress.asScala, log))

/**
* Creates a [[akka.stream.javadsl.Source]] of [[IncomingConnection]] instances which represents a prospective HTTP server binding
Expand All @@ -98,9 +131,9 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* The server will be bound using HTTPS if the [[ConnectHttp]] object is configured with an [[HttpsConnectionContext]],
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bind(connect: ConnectHttp, materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
def bind(connect: ConnectHttp): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, connectionContext)(materializer)
new Source(delegate.bindImpl(connect.host, connect.port, connectionContext)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}
Expand All @@ -121,11 +154,10 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bind(
connect: ConnectHttp,
settings: ServerSettings,
materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
connect: ConnectHttp,
settings: ServerSettings): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, settings = settings.asScala, connectionContext = connectionContext)(materializer)
new Source(delegate.bindImpl(connect.host, connect.port, settings = settings.asScala, connectionContext = connectionContext)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}
Expand All @@ -146,16 +178,42 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* or the [[defaultServerHttpContext]] has been configured to be an [[HttpsConnectionContext]].
*/
def bind(
connect: ConnectHttp,
settings: ServerSettings,
log: LoggingAdapter,
materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
connect: ConnectHttp,
settings: ServerSettings,
log: LoggingAdapter): Source[IncomingConnection, CompletionStage[ServerBinding]] = {
val connectionContext = connect.effectiveConnectionContext(defaultServerHttpContext).asScala
new Source(delegate.bind(connect.host, connect.port, connectionContext, settings.asScala, log)(materializer)
new Source(delegate.bindImpl(connect.host, connect.port, connectionContext, settings.asScala, log)
.map(new IncomingConnection(_))
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
}

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def bind(connect: ConnectHttp, materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] =
bind(connect)

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def bind(
connect: ConnectHttp,
settings: ServerSettings,
materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] =
bind(connect, settings)

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def bind(
connect: ConnectHttp,
settings: ServerSettings,
log: LoggingAdapter,
materializer: Materializer): Source[IncomingConnection, CompletionStage[ServerBinding]] =
bind(connect, settings, log)
/**
* Convenience method which starts a new HTTP server at the given endpoint and uses the given `handler`
* [[akka.stream.javadsl.Flow]] for processing all incoming connections.
Expand Down Expand Up @@ -421,8 +479,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def cachedHostConnectionPool[T](host: String, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
cachedHostConnectionPool(ConnectHttp.toHost(host), materializer)
def cachedHostConnectionPool[T](host: String): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
cachedHostConnectionPool(ConnectHttp.toHost(host))

/**
* Returns a [[akka.stream.javadsl.Flow]] which dispatches incoming HTTP requests to the per-ActorSystem pool of outgoing
Expand All @@ -441,20 +499,44 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def cachedHostConnectionPool[T](to: ConnectHttp, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPool[T](to.host, to.port)(materializer).mapMaterializedValue(_.toJava))
def cachedHostConnectionPool[T](to: ConnectHttp): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolImpl[T](to.host, to.port).mapMaterializedValue(_.toJava))

/**
* Same as [[cachedHostConnectionPool]] but with HTTPS encryption.
*
* The given [[ConnectionContext]] will be used for encryption on the connection.
*/
def cachedHostConnectionPoolHttps[T](
to: ConnectHttp,
settings: ConnectionPoolSettings,
log: LoggingAdapter): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolHttpsImpl[T](to.host, to.port, to.effectiveHttpsConnectionContext(defaultClientHttpsContext).asScala, settings.asScala, log)
.mapMaterializedValue(_.toJava))

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def cachedHostConnectionPool[T](host: String, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
cachedHostConnectionPool(host)

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def cachedHostConnectionPool[T](to: ConnectHttp, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
cachedHostConnectionPool(to)

/**
* @deprecated in favor of [[cachedHostConnectionPoolHttps]] that doesn't require an Materializer argument.
*/
@Deprecated
def cachedHostConnectionPool[T](
to: ConnectHttp,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], HostConnectionPool] =
adaptTupleFlow(delegate.cachedHostConnectionPoolHttps[T](to.host, to.port, to.effectiveHttpsConnectionContext(defaultClientHttpsContext).asScala, settings.asScala, log)(materializer)
.mapMaterializedValue(_.toJava))
cachedHostConnectionPoolHttps(to, settings, log)

/**
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
Expand All @@ -468,8 +550,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
adaptTupleFlow(delegate.superPool[T]()(materializer))
def superPool[T](): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
adaptTupleFlow(delegate.superPoolImpl[T]())

/**
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
Expand All @@ -489,8 +571,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
def superPool[T](
settings: ConnectionPoolSettings,
connectionContext: HttpsConnectionContext,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
adaptTupleFlow(delegate.superPool[T](connectionContext.asScala, settings.asScala, log)(materializer))
log: LoggingAdapter): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
adaptTupleFlow(delegate.superPoolImpl[T](connectionContext.asScala, settings.asScala, log))

/**
* Creates a new "super connection pool flow", which routes incoming requests to a (cached) host connection pool
Expand All @@ -507,10 +589,61 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* In order to allow for easy response-to-request association the flow takes in a custom, opaque context
* object of type `T` from the application which is emitted together with the corresponding response.
*/
def superPool[T](
settings: ConnectionPoolSettings,
log: LoggingAdapter): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
adaptTupleFlow(delegate.superPoolImpl[T](defaultClientHttpsContext.asScala, settings.asScala, log))

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def superPool[T](materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
superPool()

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def superPool[T](
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
adaptTupleFlow(delegate.superPool[T](defaultClientHttpsContext.asScala, settings.asScala, log)(materializer))
superPool(settings, log)

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def superPool[T](
settings: ConnectionPoolSettings,
connectionContext: HttpsConnectionContext,
log: LoggingAdapter, materializer: Materializer): Flow[Pair[HttpRequest, T], Pair[Try[HttpResponse], T], NotUsed] =
superPool(settings, connectionContext, log)

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def singleRequest(request: HttpRequest, materializer: Materializer): CompletionStage[HttpResponse] =
delegate.singleRequestImpl(request.asScala).toJava

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def singleRequest(request: HttpRequest, connectionContext: HttpsConnectionContext, materializer: Materializer): CompletionStage[HttpResponse] =
delegate.singleRequestImpl(request.asScala, connectionContext.asScala).toJava

/**
* @deprecated in favor of method that doesn't require materializer. You can just remove the materializer argument.
*/
@Deprecated
def singleRequest(
request: HttpRequest,
connectionContext: HttpsConnectionContext,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): CompletionStage[HttpResponse] =
delegate.singleRequestImpl(request.asScala, connectionContext.asScala, settings.asScala, log).toJava

/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
Expand All @@ -521,8 +654,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* Note that the request must have either an absolute URI or a valid `Host` header, otherwise
* the future will be completed with an error.
*/
def singleRequest(request: HttpRequest, materializer: Materializer): CompletionStage[HttpResponse] =
delegate.singleRequest(request.asScala)(materializer).toJava
def singleRequest(request: HttpRequest): CompletionStage[HttpResponse] =
delegate.singleRequestImpl(request.asScala).toJava

/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
Expand All @@ -533,8 +666,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
* Note that the request must have either an absolute URI or a valid `Host` header, otherwise
* the future will be completed with an error.
*/
def singleRequest(request: HttpRequest, connectionContext: HttpsConnectionContext, materializer: Materializer): CompletionStage[HttpResponse] =
delegate.singleRequest(request.asScala, connectionContext.asScala)(materializer).toJava
def singleRequest(request: HttpRequest, connectionContext: HttpsConnectionContext): CompletionStage[HttpResponse] =
delegate.singleRequestImpl(request.asScala, connectionContext.asScala).toJava

/**
* Fires a single [[HttpRequest]] across the (cached) host connection pool for the request's
Expand All @@ -549,8 +682,8 @@ class Http(system: ExtendedActorSystem) extends akka.actor.Extension {
request: HttpRequest,
connectionContext: HttpsConnectionContext,
settings: ConnectionPoolSettings,
log: LoggingAdapter, materializer: Materializer): CompletionStage[HttpResponse] =
delegate.singleRequest(request.asScala, connectionContext.asScala, settings.asScala, log)(materializer).toJava
log: LoggingAdapter): CompletionStage[HttpResponse] =
delegate.singleRequestImpl(request.asScala, connectionContext.asScala, settings.asScala, log).toJava

/**
* Constructs a WebSocket [[akka.stream.javadsl.BidiFlow]].
Expand Down

0 comments on commit cf06b03

Please sign in to comment.