diff --git a/akka-http-core/src/main/scala/akka/http/javadsl/ServerBuilder.scala b/akka-http-core/src/main/scala/akka/http/javadsl/ServerBuilder.scala index 96e45583ccd..3cf18365500 100644 --- a/akka-http-core/src/main/scala/akka/http/javadsl/ServerBuilder.scala +++ b/akka-http-core/src/main/scala/akka/http/javadsl/ServerBuilder.scala @@ -53,6 +53,11 @@ trait ServerBuilder { */ def enableHttps(context: HttpsConnectionContext): ServerBuilder + /** + * Use custom [[Materializer]] for the binding + */ + def withMaterializer(materializer: Materializer): ServerBuilder + /** * Bind a new HTTP server and use the given asynchronous `handler` * [[akka.stream.javadsl.Flow]] for processing all incoming connections. @@ -129,18 +134,24 @@ trait HandlerProvider { object ServerBuilder { private[http] def apply(interface: String, port: Int, system: ClassicActorSystemProvider): ServerBuilder = - Impl(interface, port, scaladsl.HttpConnectionContext, system.classicSystem.log, ServerSettings.create(system.classicSystem), system) + Impl( + interface, port, + scaladsl.HttpConnectionContext, + system.classicSystem.log, + ServerSettings.create(system.classicSystem), + system, + SystemMaterializer(system).materializer) private case class Impl( - interface: String, - port: Int, - context: ConnectionContext, - log: LoggingAdapter, - settings: ServerSettings, - system: ClassicActorSystemProvider + interface: String, + port: Int, + context: ConnectionContext, + log: LoggingAdapter, + settings: ServerSettings, + system: ClassicActorSystemProvider, + materializer: Materializer ) extends ServerBuilder { private implicit def executionContext: ExecutionContext = system.classicSystem.dispatcher - private implicit def materializer: Materializer = SystemMaterializer(system).materializer private def http: scaladsl.HttpExt = scaladsl.Http(system) def onInterface(newInterface: String): ServerBuilder = copy(interface = newInterface) @@ -149,11 +160,12 @@ object ServerBuilder { def withSettings(newSettings: ServerSettings): ServerBuilder = copy(settings = newSettings) def adaptSettings(f: Function[ServerSettings, ServerSettings]): ServerBuilder = copy(settings = f(settings)) def enableHttps(newContext: HttpsConnectionContext): ServerBuilder = copy(context = newContext) + def withMaterializer(newMaterializer: Materializer): ServerBuilder = copy(materializer = newMaterializer) def bind(handler: Function[HttpRequest, CompletionStage[HttpResponse]]): CompletionStage[ServerBinding] = http.bindAndHandleAsyncImpl( handler.apply(_).asScala, - interface, port, context.asScala, settings.asScala, parallelism = 0, log = log) + interface, port, context.asScala, settings.asScala, parallelism = 0, log = log)(materializer) .map(new ServerBinding(_)).toJava def bind(handlerProvider: HandlerProvider): CompletionStage[ServerBinding] = bind(handlerProvider.handler(system)) @@ -161,13 +173,13 @@ object ServerBuilder { def bindSync(handler: Function[HttpRequest, HttpResponse]): CompletionStage[ServerBinding] = http.bindAndHandleAsyncImpl( req => FastFuture.successful(handler(req).asScala), - interface, port, context.asScala, settings.asScala, parallelism = 0, log) + interface, port, context.asScala, settings.asScala, parallelism = 0, log)(materializer) .map(new ServerBinding(_)).toJava def bindFlow(handlerFlow: Flow[HttpRequest, HttpResponse, _]): CompletionStage[ServerBinding] = http.bindAndHandleImpl( handlerFlow.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala, - interface, port, context.asScala, settings.asScala, log) + interface, port, context.asScala, settings.asScala, log)(materializer) .map(new ServerBinding(_)).toJava def connectionSource(): Source[IncomingConnection, CompletionStage[ServerBinding]] = diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/ServerBuilder.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/ServerBuilder.scala index c1c7cf90967..b4aaa5bc562 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/ServerBuilder.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/ServerBuilder.scala @@ -48,6 +48,11 @@ trait ServerBuilder { */ def enableHttps(context: HttpsConnectionContext): ServerBuilder + /** + * Use custom [[Materializer]] for the binding + */ + def withMaterializer(materializer: Materializer): ServerBuilder + /** * Bind a new HTTP server at the given endpoint and use the given asynchronous `handler` * [[akka.stream.scaladsl.Flow]] for processing all incoming connections. @@ -107,18 +112,26 @@ trait ServerBuilder { @InternalApi private[http] object ServerBuilder { def apply(interface: String, port: Int, system: ClassicActorSystemProvider): ServerBuilder = - Impl(interface, port, scaladsl.HttpConnectionContext, system.classicSystem.log, ServerSettings(system.classicSystem), system) + Impl( + interface, + port, + scaladsl.HttpConnectionContext, + system.classicSystem.log, + ServerSettings(system.classicSystem), + system, + SystemMaterializer(system).materializer + ) private case class Impl( - interface: String, - port: Int, - context: ConnectionContext, - log: LoggingAdapter, - settings: ServerSettings, - system: ClassicActorSystemProvider + interface: String, + port: Int, + context: ConnectionContext, + log: LoggingAdapter, + settings: ServerSettings, + system: ClassicActorSystemProvider, + materializer: Materializer ) extends ServerBuilder { private val http: scaladsl.HttpExt = scaladsl.Http(system) - private implicit val mat: Materializer = SystemMaterializer(system).materializer def onInterface(newInterface: String): ServerBuilder = copy(interface = newInterface) def onPort(newPort: Int): ServerBuilder = copy(port = newPort) @@ -126,15 +139,16 @@ private[http] object ServerBuilder { def withSettings(newSettings: ServerSettings): ServerBuilder = copy(settings = newSettings) def adaptSettings(f: ServerSettings => ServerSettings): ServerBuilder = copy(settings = f(settings)) def enableHttps(newContext: HttpsConnectionContext): ServerBuilder = copy(context = newContext) + def withMaterializer(newMaterializer: Materializer): ServerBuilder = copy(materializer = newMaterializer) def connectionSource(): Source[Http.IncomingConnection, Future[ServerBinding]] = http.bindImpl(interface, port, context, settings, log) def bindFlow(handlerFlow: Flow[HttpRequest, HttpResponse, _]): Future[ServerBinding] = - http.bindAndHandleImpl(handlerFlow, interface, port, context, settings, log) + http.bindAndHandleImpl(handlerFlow, interface, port, context, settings, log)(materializer) def bind(handler: HttpRequest => Future[HttpResponse]): Future[ServerBinding] = - http.bindAndHandleAsyncImpl(handler, interface, port, context, settings, parallelism = 0, log) + http.bindAndHandleAsyncImpl(handler, interface, port, context, settings, parallelism = 0, log)(materializer) def bindSync(handler: HttpRequest => HttpResponse): Future[ServerBinding] = bind(req => FastFuture.successful(handler(req)))