From 837c189e8e0052328ebb62d34e42d67af7da6851 Mon Sep 17 00:00:00 2001 From: Nathan Fischer Date: Wed, 16 May 2018 16:18:26 -0700 Subject: [PATCH] Implement reverseProxy directive (#240) --- .../http/scaladsl/server/Directives.scala | 1 + .../directives/ReverseProxyDirectives.scala | 116 ++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 akka-http/src/main/scala/akka/http/scaladsl/server/directives/ReverseProxyDirectives.scala diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala index 56b68945871..ef557bb1ed8 100644 --- a/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/Directives.scala @@ -37,6 +37,7 @@ trait Directives extends RouteConcatenation with SecurityDirectives with WebSocketDirectives with FramedEntityStreamingDirectives + with ReverseProxyDirectives /** * Collects all default directives into one object for simple importing. diff --git a/akka-http/src/main/scala/akka/http/scaladsl/server/directives/ReverseProxyDirectives.scala b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/ReverseProxyDirectives.scala new file mode 100644 index 00000000000..399f661ba1e --- /dev/null +++ b/akka-http/src/main/scala/akka/http/scaladsl/server/directives/ReverseProxyDirectives.scala @@ -0,0 +1,116 @@ +package akka.http.scaladsl.server.directives + +import scala.concurrent.Future +import scala.util.Success + +import akka.actor.ActorSystem +import akka.http.scaladsl.Http +import akka.http.scaladsl.model.Uri.Authority +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers._ +import akka.http.scaladsl.server.Directives._ +import akka.http.scaladsl.server.{ RequestContext, Route } +import akka.util.ByteString + +trait ReverseProxyDirectives { + + trait ReverseProxyTargetConfig { + def targetScheme: String + def targetAuthority: Authority + def useUnmatchedPath: Boolean + } + + object ReverseProxyTargetConfig { + def apply(baseUri: Uri, useUnmatchedPath: Boolean): ReverseProxyTargetConfig = ReverseProxyTargetConfigImpl(baseUri, useUnmatchedPath) + } + + private case class ReverseProxyTargetConfigImpl(baseUri: Uri, useUnmatchedPath: Boolean) extends ReverseProxyTargetConfig { + val targetScheme = baseUri.scheme + val targetAuthority = baseUri.authority + } + + trait ReverseProxyTargetMagnet { + def config: ReverseProxyTargetConfig + def httpClient: HttpRequest ⇒ Future[HttpResponse] + } + + object ReverseProxyTargetMagnet { + import scala.language.implicitConversions + + implicit def fromConfig(targetConfig: ReverseProxyTargetConfig)(implicit system: ActorSystem) = + new ReverseProxyTargetMagnet { + val config = targetConfig + val httpClient = Http().singleRequest(_) + } + + implicit def fromUri(uri: Uri)(implicit system: ActorSystem) = fromConfig(ReverseProxyTargetConfigImpl(uri, false)) + } + + def reverseProxy(target: ReverseProxyTargetMagnet): Route = + extractExecutionContext { implicit ec ⇒ + extractMaterializer { implicit mat ⇒ + // todo customize response + def processTimeout(proxyResult: Future[HttpResponse]): HttpRequest ⇒ HttpResponse = _ ⇒ { + proxyResult.andThen { + case Success(proxyResponse) ⇒ proxyResponse.discardEntityBytes() + } + + HttpResponse(StatusCodes.ServiceUnavailable, entity = + HttpEntity.Strict( + ContentTypes.`application/json`, + ByteString("""{"error":"The server was not able to produce a timely response to your request."}""") + ) + ) + } + + extractRequestContext { ctx ⇒ + // we don't need to use request.effectiveUri here since we're going to overwrite the scheme and authority + val incomingUri = ctx.request.uri + val outgoingUri = + (if (target.config.useUnmatchedPath) incomingUri.withPath(ctx.unmatchedPath) else incomingUri) + .withAuthority(target.config.targetAuthority) + .withScheme(target.config.targetScheme) + + val outgoingRequest = ctx.request.withUri(outgoingUri) + val eventualResponse = target.httpClient(outgoingRequest) + + withRequestTimeoutResponse(processTimeout(eventualResponse)) { + mapRequestContext(mapProxyContext) { + complete(eventualResponse) + } + } + } + } + } + + // remove any headers that shouldn't be passed through a proxy + private def mapProxyContext(ctx: RequestContext): RequestContext = { + val incomingHeaders = ctx.request.headers + + val remoteAddressOption = ctx.request.header[`Remote-Address`].map(_.address) + val xForwardedForAddressesOption = ctx.request.header[`X-Forwarded-For`].map(_.addresses) + //TODO: Add `Via` HttpHeader as defined in https://tools.ietf.org/html/rfc2616#section-14.45 in akka.http.scaladsl.model.incomingHeaders + + val updatedXRealIpHeaderOption = ctx.request.header[`X-Real-Ip`] + .map(_.address) + .orElse(xForwardedForAddressesOption.flatMap(_.headOption)) + .orElse(remoteAddressOption) + .map(`X-Real-Ip`(_)) + + val updatedXForwardedForHeaderOption = remoteAddressOption + .map(_ +: xForwardedForAddressesOption.getOrElse(Nil)) + .map(`X-Forwarded-For`(_)) + + val outgoingHeaders = incomingHeaders.flatMap { + case _: `X-Real-Ip` ⇒ updatedXRealIpHeaderOption + case _: `X-Forwarded-For` ⇒ updatedXForwardedForHeaderOption + case _: `Timeout-Access` ⇒ Nil + case _: Connection ⇒ Nil + case h ⇒ Some(h) + } + + ctx.mapRequest(_.withHeaders(outgoingHeaders)) + } +} + +object ReverseProxyDirectives extends ReverseProxyDirectives