Skip to content

Commit

Permalink
Merge pull request #3414 from jrudolph/add-with-materializer
Browse files Browse the repository at this point in the history
core: allow customization of materializer with new ServerBuider API
  • Loading branch information
jrudolph committed Aug 4, 2020
2 parents 819d1e3 + 14cd145 commit dec8b67
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 21 deletions.
34 changes: 23 additions & 11 deletions akka-http-core/src/main/scala/akka/http/javadsl/ServerBuilder.scala
Expand Up @@ -53,6 +53,11 @@ trait ServerBuilder {
*/
def enableHttps(context: HttpsConnectionContext): ServerBuilder

/**
* Use custom [[Materializer]] for the binding
*/
def withMaterializer(materializer: Materializer): ServerBuilder

/**
* Bind a new HTTP server and use the given asynchronous `handler`
* [[akka.stream.javadsl.Flow]] for processing all incoming connections.
Expand Down Expand Up @@ -129,18 +134,24 @@ trait HandlerProvider {

object ServerBuilder {
private[http] def apply(interface: String, port: Int, system: ClassicActorSystemProvider): ServerBuilder =
Impl(interface, port, scaladsl.HttpConnectionContext, system.classicSystem.log, ServerSettings.create(system.classicSystem), system)
Impl(
interface, port,
scaladsl.HttpConnectionContext,
system.classicSystem.log,
ServerSettings.create(system.classicSystem),
system,
SystemMaterializer(system).materializer)

private case class Impl(
interface: String,
port: Int,
context: ConnectionContext,
log: LoggingAdapter,
settings: ServerSettings,
system: ClassicActorSystemProvider
interface: String,
port: Int,
context: ConnectionContext,
log: LoggingAdapter,
settings: ServerSettings,
system: ClassicActorSystemProvider,
materializer: Materializer
) extends ServerBuilder {
private implicit def executionContext: ExecutionContext = system.classicSystem.dispatcher
private implicit def materializer: Materializer = SystemMaterializer(system).materializer
private def http: scaladsl.HttpExt = scaladsl.Http(system)

def onInterface(newInterface: String): ServerBuilder = copy(interface = newInterface)
Expand All @@ -149,25 +160,26 @@ object ServerBuilder {
def withSettings(newSettings: ServerSettings): ServerBuilder = copy(settings = newSettings)
def adaptSettings(f: Function[ServerSettings, ServerSettings]): ServerBuilder = copy(settings = f(settings))
def enableHttps(newContext: HttpsConnectionContext): ServerBuilder = copy(context = newContext)
def withMaterializer(newMaterializer: Materializer): ServerBuilder = copy(materializer = newMaterializer)

def bind(handler: Function[HttpRequest, CompletionStage[HttpResponse]]): CompletionStage[ServerBinding] =
http.bindAndHandleAsyncImpl(
handler.apply(_).asScala,
interface, port, context.asScala, settings.asScala, parallelism = 0, log = log)
interface, port, context.asScala, settings.asScala, parallelism = 0, log = log)(materializer)
.map(new ServerBinding(_)).toJava

def bind(handlerProvider: HandlerProvider): CompletionStage[ServerBinding] = bind(handlerProvider.handler(system))

def bindSync(handler: Function[HttpRequest, HttpResponse]): CompletionStage[ServerBinding] =
http.bindAndHandleAsyncImpl(
req => FastFuture.successful(handler(req).asScala),
interface, port, context.asScala, settings.asScala, parallelism = 0, log)
interface, port, context.asScala, settings.asScala, parallelism = 0, log)(materializer)
.map(new ServerBinding(_)).toJava

def bindFlow(handlerFlow: Flow[HttpRequest, HttpResponse, _]): CompletionStage[ServerBinding] =
http.bindAndHandleImpl(
handlerFlow.asInstanceOf[Flow[sm.HttpRequest, sm.HttpResponse, _]].asScala,
interface, port, context.asScala, settings.asScala, log)
interface, port, context.asScala, settings.asScala, log)(materializer)
.map(new ServerBinding(_)).toJava

def connectionSource(): Source[IncomingConnection, CompletionStage[ServerBinding]] =
Expand Down
Expand Up @@ -48,6 +48,11 @@ trait ServerBuilder {
*/
def enableHttps(context: HttpsConnectionContext): ServerBuilder

/**
* Use custom [[Materializer]] for the binding
*/
def withMaterializer(materializer: Materializer): ServerBuilder

/**
* Bind a new HTTP server at the given endpoint and use the given asynchronous `handler`
* [[akka.stream.scaladsl.Flow]] for processing all incoming connections.
Expand Down Expand Up @@ -107,34 +112,43 @@ trait ServerBuilder {
@InternalApi
private[http] object ServerBuilder {
def apply(interface: String, port: Int, system: ClassicActorSystemProvider): ServerBuilder =
Impl(interface, port, scaladsl.HttpConnectionContext, system.classicSystem.log, ServerSettings(system.classicSystem), system)
Impl(
interface,
port,
scaladsl.HttpConnectionContext,
system.classicSystem.log,
ServerSettings(system.classicSystem),
system,
SystemMaterializer(system).materializer
)

private case class Impl(
interface: String,
port: Int,
context: ConnectionContext,
log: LoggingAdapter,
settings: ServerSettings,
system: ClassicActorSystemProvider
interface: String,
port: Int,
context: ConnectionContext,
log: LoggingAdapter,
settings: ServerSettings,
system: ClassicActorSystemProvider,
materializer: Materializer
) extends ServerBuilder {
private val http: scaladsl.HttpExt = scaladsl.Http(system)
private implicit val mat: Materializer = SystemMaterializer(system).materializer

def onInterface(newInterface: String): ServerBuilder = copy(interface = newInterface)
def onPort(newPort: Int): ServerBuilder = copy(port = newPort)
def logTo(newLog: LoggingAdapter): ServerBuilder = copy(log = newLog)
def withSettings(newSettings: ServerSettings): ServerBuilder = copy(settings = newSettings)
def adaptSettings(f: ServerSettings => ServerSettings): ServerBuilder = copy(settings = f(settings))
def enableHttps(newContext: HttpsConnectionContext): ServerBuilder = copy(context = newContext)
def withMaterializer(newMaterializer: Materializer): ServerBuilder = copy(materializer = newMaterializer)

def connectionSource(): Source[Http.IncomingConnection, Future[ServerBinding]] =
http.bindImpl(interface, port, context, settings, log)

def bindFlow(handlerFlow: Flow[HttpRequest, HttpResponse, _]): Future[ServerBinding] =
http.bindAndHandleImpl(handlerFlow, interface, port, context, settings, log)
http.bindAndHandleImpl(handlerFlow, interface, port, context, settings, log)(materializer)

def bind(handler: HttpRequest => Future[HttpResponse]): Future[ServerBinding] =
http.bindAndHandleAsyncImpl(handler, interface, port, context, settings, parallelism = 0, log)
http.bindAndHandleAsyncImpl(handler, interface, port, context, settings, parallelism = 0, log)(materializer)

def bindSync(handler: HttpRequest => HttpResponse): Future[ServerBinding] =
bind(req => FastFuture.successful(handler(req)))
Expand Down

0 comments on commit dec8b67

Please sign in to comment.