-
Couldn't load subscription status.
- Fork 594
Implement reverseProxy directive (#240) #2811
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| /* | ||
| * Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com> | ||
| */ | ||
|
|
||
| package akka.http.scaladsl.model.headers | ||
|
|
||
| import akka.http.impl.util.{ Rendering, ValueRenderable } | ||
|
|
||
| final case class ViaIntermediary(protocolName: Option[String], protocolVersion: String, receivedBy: String) extends ValueRenderable { | ||
| def render[R <: Rendering](r: R): r.type = { | ||
| protocolName.foreach(name => r ~~ s"$name/") | ||
| r ~~ protocolVersion ~~ ' ' ~~ receivedBy | ||
| r | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| /* | ||
| * Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com> | ||
| */ | ||
|
|
||
| package akka.http.scaladsl.server.directives | ||
|
|
||
| import java.net.InetAddress | ||
|
|
||
| import scala.concurrent.Future | ||
| import scala.concurrent.duration.Duration | ||
|
|
||
| import akka.http.scaladsl.model._ | ||
| import akka.http.scaladsl.model.headers._ | ||
| import akka.http.scaladsl.server.RoutingSpec | ||
| import akka.http.scaladsl.server.directives.ReverseProxyDirectives._ | ||
|
|
||
| class ReverseProxyDirectivesSpec extends RoutingSpec { | ||
| "The reverseProxy directive" should { | ||
| "forward requests to the configured target" in new ReverseProxyDirectiveSpecHelper { | ||
|
|
||
| val targetConfig = ReverseProxyTargetConfig("https://target.domain:1234", false) | ||
|
|
||
| Get("http://notthetarget.domain/the/path?foo=bar&bar=baz") ~> reverseProxy(magnet) ~> check { | ||
| status shouldBe StatusCodes.OK | ||
| receivedRequest should not be null | ||
| receivedRequest.effectiveUri(false) shouldBe Uri("https://target.domain:1234/the/path?foo=bar&bar=baz") | ||
| } | ||
| } | ||
|
|
||
| "use unmatched the unmatched path as configured" in new ReverseProxyDirectiveSpecHelper { | ||
|
|
||
| val targetConfig = ReverseProxyTargetConfig("https://target.domain:1234", useUnmatchedPath = true) | ||
|
|
||
| Get("http://notthetarget.domain/the/path?foo=bar&bar=baz") ~> pathPrefix("the")(reverseProxy(magnet)) ~> check { | ||
| status shouldBe StatusCodes.OK | ||
| receivedRequest should not be null | ||
| receivedRequest.effectiveUri(false) shouldBe Uri("https://target.domain:1234/path?foo=bar&bar=baz") | ||
| } | ||
| } | ||
|
|
||
| "strip hop-by-hop headers" in new ReverseProxyDirectiveSpecHelper { | ||
|
|
||
| val targetConfig = ReverseProxyTargetConfig("https://target.domain:1234", false) | ||
|
|
||
| val request = Get("http://notthetarget.domain/the/path?foo=bar&bar=baz") | ||
| .withHeaders( | ||
| `X-Real-Ip`(RemoteAddress(InetAddress.getLocalHost)), | ||
| Connection("close"), | ||
| `Proxy-Authenticate`(HttpChallenge("scheme", "realm")), | ||
| `Proxy-Authorization`(BasicHttpCredentials("username", "password")), | ||
| `Transfer-Encoding`(TransferEncodings.chunked), | ||
| Upgrade(List(UpgradeProtocol("protocol"))) | ||
| ) | ||
|
|
||
| request ~> withRequestTimeout(Duration.Inf)(reverseProxy(magnet)) ~> check { | ||
| status shouldBe StatusCodes.OK | ||
| receivedRequest should not be null | ||
| receivedRequest.header[`Timeout-Access`] should not be defined | ||
| receivedRequest.header[Connection] should not be defined | ||
| receivedRequest.header[`Proxy-Authenticate`] should not be defined | ||
| receivedRequest.header[`Proxy-Authorization`] should not be defined | ||
| receivedRequest.header[`Transfer-Encoding`] should not be defined | ||
| receivedRequest.header[Upgrade] should not be defined | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| trait ReverseProxyDirectiveSpecHelper { | ||
| var receivedRequest: HttpRequest = null | ||
|
|
||
| def targetConfig: ReverseProxyTargetConfig | ||
|
|
||
| val magnet = new ReverseProxyTargetMagnet { | ||
| def config = targetConfig | ||
|
|
||
| val httpClient: HttpRequest => Future[HttpResponse] = request => { | ||
| receivedRequest = request | ||
| Future.successful(HttpResponse()) | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| /* | ||
| * Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com> | ||
| */ | ||
|
|
||
| package akka.http.javadsl.server.directives | ||
|
|
||
| import java.util.concurrent.CompletionStage | ||
| import java.util.function.{ Function => JFunction } | ||
|
|
||
| import akka.annotation.ApiMayChange | ||
| import akka.http.javadsl.model.{ Uri, HttpRequest => JHttpRequest, HttpResponse => JHttpResponse } | ||
| import akka.http.javadsl.server.Route | ||
| import akka.http.scaladsl.model | ||
| import akka.http.scaladsl.model.HttpResponse | ||
| import akka.http.scaladsl.server.directives.ReverseProxyDirectives.{ ReverseProxyTargetConfig, ReverseProxyTargetMagnet } | ||
| import akka.http.scaladsl.server.{ Directives => D } | ||
|
|
||
| import scala.compat.java8.FunctionConverters._ | ||
| import scala.compat.java8.FutureConverters._ | ||
| import scala.concurrent.Future | ||
|
|
||
| abstract class ReverseProxyDirectives extends FramedEntityStreamingDirectives { | ||
| import akka.http.impl.util.JavaMapping.Implicits._ | ||
|
|
||
| private val javaToScalaResponse = ((res: JHttpResponse) => res.asScala).asJava | ||
|
|
||
| @ApiMayChange | ||
| def reverseProxy( | ||
| targetUri: Uri, | ||
| useUnmatchedPath: Boolean, | ||
| httpClient: JFunction[JHttpRequest, CompletionStage[JHttpResponse]] | ||
| ): Route = { | ||
| def _client = httpClient | ||
| RouteAdapter( | ||
| D.reverseProxy(new ReverseProxyTargetMagnet { | ||
| val config = ReverseProxyTargetConfig(targetUri.asScala, useUnmatchedPath) | ||
| val httpClient: model.HttpRequest => Future[model.HttpResponse] = | ||
| req => _client(req).thenApply[HttpResponse](javaToScalaResponse).toScala | ||
| }) | ||
| ) | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,151 @@ | ||
| /* | ||
| * Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com> | ||
| */ | ||
|
|
||
| package akka.http.scaladsl.server.directives | ||
|
|
||
| import scala.concurrent.Future | ||
| import scala.util.Success | ||
| import akka.actor.ActorSystem | ||
| import akka.annotation.ApiMayChange | ||
| import akka.http.scaladsl.Http | ||
| import akka.http.scaladsl.model.Uri.Authority | ||
| import akka.http.scaladsl.model._ | ||
| import akka.http.scaladsl.model.headers._ | ||
| import akka.http.scaladsl.server.Directives._ | ||
| import akka.http.scaladsl.server.Route | ||
| import akka.http.scaladsl.server.directives.ReverseProxyDirectives.{ ReverseProxyTargetConfig, ReverseProxyTargetMagnet } | ||
| import akka.util.ByteString | ||
|
|
||
| trait ReverseProxyDirectives { | ||
|
|
||
| @ApiMayChange | ||
| def reverseProxy(target: ReverseProxyTargetMagnet): Route = | ||
| extractExecutionContext { implicit ec => | ||
| extractMaterializer { implicit mat => | ||
| // todo customize response | ||
| // drain response entity if incoming request timeout occurs | ||
| def processTimeout(proxyResult: Future[HttpResponse]): HttpRequest => HttpResponse = _ => { | ||
| proxyResult.andThen { | ||
| case Success(proxyResponse) => proxyResponse.discardEntityBytes() | ||
| } | ||
|
|
||
| HttpResponse(StatusCodes.ServiceUnavailable, entity = | ||
| HttpEntity.Strict( | ||
| ContentTypes.`application/json`, | ||
| ByteString("""{"error":"The server was not able to produce a timely response to your request."}""") | ||
| ) | ||
| ) | ||
| } | ||
|
|
||
| extractRequestContext { ctx => | ||
| // we don't need to use request.effectiveUri here since we're going to overwrite the scheme and authority | ||
| val incomingUri = ctx.request.uri | ||
| val outgoingUri = | ||
| (if (target.config.useUnmatchedPath) incomingUri.withPath(ctx.unmatchedPath) else incomingUri) | ||
| .withAuthority(target.config.targetAuthority) | ||
| .withScheme(target.config.targetScheme) | ||
|
|
||
| val outgoingRequest = mapProxyRequest(ctx.request, target.config).withUri(outgoingUri) | ||
| val eventualResponse = target.httpClient(outgoingRequest) | ||
|
|
||
| withRequestTimeoutResponse(processTimeout(eventualResponse)) { | ||
| complete(eventualResponse) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // remove any headers that shouldn't be passed through a proxy | ||
| private def mapProxyRequest(request: HttpRequest, config: ReverseProxyTargetConfig): HttpRequest = { | ||
| val incomingHeaders = request.headers | ||
|
|
||
| val remoteAddressOption = request.header[`Remote-Address`].map(_.address) | ||
| val xForwardedForAddressesOption = request.header[`X-Forwarded-For`].map(_.addresses) | ||
|
|
||
| val updatedXRealIpHeaderOption = request.header[`X-Real-Ip`] | ||
| .map(_.address) | ||
| .orElse(xForwardedForAddressesOption.flatMap(_.headOption)) | ||
| .orElse(remoteAddressOption) | ||
| .map(`X-Real-Ip`(_)) | ||
|
|
||
| val updatedXForwardedForHeaderOption = remoteAddressOption | ||
| .map(_ +: xForwardedForAddressesOption.getOrElse(Nil)) | ||
| .map(`X-Forwarded-For`(_)) | ||
|
|
||
| val intermediary = { | ||
| val (protocol, version) = { | ||
| val parts = request.protocol.value.split('/') | ||
| if (parts(0) == "HTTP" && parts.length > 1) None -> parts(1) | ||
| else if (parts.length > 1) Some(parts(0)) -> parts(1) | ||
| else None -> parts(0) | ||
| } | ||
|
|
||
| config.viaId.map(ViaIntermediary(protocol, version, _)) | ||
| } | ||
|
|
||
| val maybeVia = request.header[Via] | ||
| .map(via => via.copy(intermediaries = via.intermediaries ++ intermediary)) | ||
| .orElse(intermediary.map(i => Via(List(i)))) | ||
|
|
||
| val outgoingHeaders = incomingHeaders.flatMap { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How was this list compiled? Is there an RFC that gives some recommendations? Should the user be able to configure it? E.g. for the ones that are just filtered-out we could give the list in the configuration where we could also document why they are filtered out. What about headers that might carry authentication information? Should they be forwarded automatically? Or should it be opt-in? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hop-by-hop headers are defined in http 1.1. These don't seem to be something the user would configure. I think we should establish that aside from headers that are specifically relevant to proxying, we should establish the default as "the request is forwarded as-is" (which is something that should go in that documentation page you mentioned :)). This way the user doesn't form an expectation that any part of the request will be treated specially, and that they need to modify it in an outer directive if they want to change what is forwarded. For authentication specifically this prevents us from having to define what qualifies as a "default" authentication header/cookie. |
||
| // region hop-by-hop headers https://tools.ietf.org/html/rfc2616#section-13.5.1 | ||
| case _: Connection => Nil | ||
| // keep alive header is not included in modeled headers | ||
| case _: `Proxy-Authenticate` => Nil | ||
| case _: `Proxy-Authorization` => Nil | ||
| case h if h.is("te") => Nil | ||
| case h if h.is("trailers") => Nil | ||
| case _: `Transfer-Encoding` => Nil | ||
| case _: Upgrade => Nil | ||
| // endregion | ||
| case _: `X-Real-Ip` => updatedXRealIpHeaderOption | ||
| case _: `X-Forwarded-For` => updatedXForwardedForHeaderOption | ||
| case _: `Timeout-Access` => Nil | ||
| case _: Via => Nil // added back at the end | ||
| case h => Some(h) | ||
| } ++ maybeVia | ||
|
|
||
| request.withHeaders(outgoingHeaders) | ||
| } | ||
| } | ||
|
|
||
| object ReverseProxyDirectives extends ReverseProxyDirectives { | ||
|
|
||
| trait ReverseProxyTargetConfig { | ||
| def targetScheme: String | ||
| def targetAuthority: Authority | ||
| def useUnmatchedPath: Boolean | ||
| def viaId: Option[String] | ||
| } | ||
|
|
||
| object ReverseProxyTargetConfig { | ||
| def apply(baseUri: Uri, useUnmatchedPath: Boolean): ReverseProxyTargetConfig = ReverseProxyTargetConfigImpl(baseUri, useUnmatchedPath) | ||
| } | ||
|
|
||
| private case class ReverseProxyTargetConfigImpl( | ||
| baseUri: Uri, | ||
| useUnmatchedPath: Boolean, | ||
| viaId: Option[String] = None | ||
| ) extends ReverseProxyTargetConfig { | ||
| val targetScheme = baseUri.scheme | ||
| val targetAuthority = baseUri.authority | ||
| } | ||
|
|
||
| trait ReverseProxyTargetMagnet { | ||
| def config: ReverseProxyTargetConfig | ||
| def httpClient: HttpRequest => Future[HttpResponse] | ||
| } | ||
|
|
||
| object ReverseProxyTargetMagnet { | ||
| import scala.language.implicitConversions | ||
|
|
||
| implicit def fromConfig(targetConfig: ReverseProxyTargetConfig)(implicit system: ActorSystem) = | ||
| new ReverseProxyTargetMagnet { | ||
| val config = targetConfig | ||
| val httpClient: HttpRequest => Future[HttpResponse] = Http().singleRequest(_) | ||
| } | ||
|
|
||
| implicit def fromUri(uri: Uri)(implicit system: ActorSystem) = fromConfig(ReverseProxyTargetConfigImpl(uri, false)) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By now, we'd rather not use magnets if it can be avoided. It seems here it is required to access the
systemto get the client. Maybe with the Akka 2.6 changes to use system more than materializer in many places, we could try to add a system to theRequestContextin the first place? That would be a follow up issue to this one.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you suggesting we keep the magnet for now and remove it in a follow up issue,
or remove it now and use overloaded functions that take an implicit system (removing the implicit parameter in the follow up issue)?
I support either course of action.