New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Port Controller from Spray to Akka #2218
Changes from all commits
5d95f83
fdcf8cd
5e7c67c
e7f08f8
ede5000
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,56 +19,56 @@ package whisk.http | |
|
||
import scala.concurrent.duration.DurationInt | ||
import scala.language.postfixOps | ||
import scala.collection.immutable.Seq | ||
import scala.concurrent.Future | ||
|
||
import akka.actor.Actor | ||
import akka.actor.ActorContext | ||
import akka.actor.ActorSystem | ||
import akka.actor.Props | ||
import akka.event.Logging | ||
import akka.io.IO | ||
import akka.japi.Creator | ||
import akka.pattern.ask | ||
import akka.util.Timeout | ||
import spray.can.Http | ||
import spray.http.ContentType | ||
import spray.http.HttpEntity | ||
import spray.http.HttpRequest | ||
import spray.http.HttpResponse | ||
import spray.http.MediaTypes.`text/plain` | ||
import spray.httpx.SprayJsonSupport.sprayJsonMarshaller | ||
import spray.httpx.marshalling | ||
import spray.httpx.marshalling.ToResponseMarshallable.isMarshallable | ||
import spray.routing.AuthenticationFailedRejection | ||
import spray.routing.Directive.pimpApply | ||
import spray.routing.Directives | ||
import spray.routing.HttpService | ||
import spray.routing.RejectionHandler | ||
import spray.routing.Route | ||
import spray.routing.directives.DebuggingDirectives | ||
import spray.routing.directives.LogEntry | ||
import spray.routing.directives.LoggingMagnet.forMessageFromFullShow | ||
import akka.http.scaladsl.server.Directives | ||
import akka.http.scaladsl.server.directives.DebuggingDirectives | ||
import akka.http.scaladsl.server.directives.LogEntry | ||
import akka.http.scaladsl.server.Route | ||
import akka.http.scaladsl.model.HttpRequest | ||
import akka.http.scaladsl.model._ | ||
import akka.http.scaladsl.server.RejectionHandler | ||
import akka.http.scaladsl.server.UnacceptedResponseContentTypeRejection | ||
import akka.http.scaladsl.server.RouteResult.Rejected | ||
import akka.http.scaladsl.Http | ||
|
||
import spray.json._ | ||
|
||
import whisk.common.LogMarker | ||
import whisk.common.LogMarkerToken | ||
import whisk.common.Logging | ||
import whisk.common.LoggingMarkers | ||
import whisk.common.TransactionCounter | ||
import whisk.common.TransactionId | ||
import akka.stream.ActorMaterializer | ||
|
||
/** | ||
* This trait extends the spray HttpService trait with logging and transaction counting | ||
* This trait extends the Akka Directives and Actor with logging and transaction counting | ||
* facilities common to all OpenWhisk REST services. | ||
*/ | ||
trait BasicHttpService extends HttpService with TransactionCounter { | ||
|
||
/** | ||
* Gets the actor context. | ||
*/ | ||
implicit def actorRefFactory: ActorContext | ||
|
||
/** | ||
* Gets the logging | ||
*/ | ||
trait BasicHttpService extends Directives with Actor with TransactionCounter { | ||
implicit def logging: Logging | ||
implicit val materializer = ActorMaterializer() | ||
implicit val actorSystem = context.system | ||
implicit val executionContext = actorSystem.dispatcher | ||
|
||
val port: Int | ||
|
||
/** Rejection handler to terminate connection on a bad request. Delegates to Akka handler. */ | ||
implicit def customRejectionHandler(implicit transid: TransactionId) = | ||
RejectionHandler.default.mapRejectionResponse { | ||
case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) => | ||
val error = ErrorResponse(ent.data.utf8String, transid).toJson | ||
res.copy(entity = HttpEntity(ContentTypes.`application/json`, error.compactPrint)) | ||
case x => x | ||
} | ||
|
||
/** | ||
* Gets the routes implemented by the HTTP service. | ||
|
@@ -86,39 +86,47 @@ trait BasicHttpService extends HttpService with TransactionCounter { | |
*/ | ||
def loglevelForRoute(route: String): Logging.LogLevel = Logging.InfoLevel | ||
|
||
/** Rejection handler to terminate connection on a bad request. Delegates to Akka handler. */ | ||
val prioritizeRejections = recoverRejections { rejections => | ||
val priorityRejection = rejections.find { | ||
case rejection: UnacceptedResponseContentTypeRejection => true | ||
case _ => false | ||
} | ||
|
||
priorityRejection.map(rejection => Rejected(Seq(rejection))).getOrElse(Rejected(rejections)) | ||
} | ||
|
||
/** | ||
* Receives a message and runs the router. | ||
*/ | ||
def receive = runRoute( | ||
def route: Route = { | ||
assignId { implicit transid => | ||
DebuggingDirectives.logRequest(logRequestInfo _) { | ||
DebuggingDirectives.logRequestResponse(logResponseInfo _) { | ||
routes | ||
handleRejections(customRejectionHandler) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @dubee why did the directive order change? |
||
prioritizeRejections { | ||
DebuggingDirectives.logRequest(logRequestInfo _) { | ||
DebuggingDirectives.logRequestResult(logResponseInfo _) { | ||
toStrictEntity(1.second) { | ||
routes | ||
} | ||
} | ||
} | ||
} | ||
} | ||
}) | ||
} | ||
} | ||
|
||
def receive = { | ||
case _ => | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this needed? Is an Actor required to implement it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the Actor requires receive to be overridden. Tried not inheriting from the Actor, but it looks like we need access to the Actor context in controller.scala There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you point out where that is? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm fairly sure we can eliminate this but it's a bit of a change - I'll send a PR against your branch or we can deal with it later. |
||
|
||
/** Assigns transaction id to every request. */ | ||
protected val assignId = extract(_ => transid()) | ||
|
||
/** Rejection handler to terminate connection on a bad request. Delegates to Spray handler. */ | ||
|
||
protected def customRejectionHandler(implicit transid: TransactionId) = RejectionHandler { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is my understanding correct that the prioritization above lowers the priority on the auth rejection? |
||
case rejections => { | ||
logging.info(this, s"[REJECT] $rejections") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we losing the rejection log message? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'm ok dropping this - it's of nominal value. |
||
rejections match { | ||
case AuthenticationFailedRejection(cause, challengeHeaders) :: _ => | ||
BasicHttpService.customRejectionHandler.apply(rejections.takeRight(1)) | ||
case _ => BasicHttpService.customRejectionHandler.apply(rejections) | ||
} | ||
} | ||
} | ||
|
||
/** Generates log entry for every request. */ | ||
protected def logRequestInfo(req: HttpRequest)(implicit tid: TransactionId): LogEntry = { | ||
val m = req.method.toString | ||
val m = req.method.name.toString | ||
val p = req.uri.path.toString | ||
val q = req.uri.query.toString | ||
val q = req.uri.query().toString | ||
val l = loglevelForRoute(p) | ||
LogEntry(s"[$tid] $m $p $q", l) | ||
} | ||
|
@@ -137,29 +145,19 @@ trait BasicHttpService extends HttpService with TransactionCounter { | |
Some(LogEntry(s"[$tid] [$name] $marker", l)) | ||
case _ => None // other kind of responses | ||
} | ||
|
||
val bindingFuture = { | ||
Http().bindAndHandle(route, "0.0.0.0", port) | ||
} | ||
|
||
def shutdown(): Future[Unit] = { | ||
bindingFuture.flatMap(_.unbind()).map(_ => ()) | ||
} | ||
} | ||
|
||
object BasicHttpService extends Directives { | ||
def startService[T <: Actor](system: ActorSystem, name: String, interface: String, port: Integer, service: Creator[T]) = { | ||
def startService[T <: Actor](system: ActorSystem, name: String, interface: String, service: Creator[T]) = { | ||
val actor = system.actorOf(Props.create(service), s"$name-service") | ||
|
||
implicit val timeout = Timeout(5 seconds) | ||
IO(Http)(system) ? Http.Bind(actor, interface, port) | ||
} | ||
|
||
/** Rejection handler to terminate connection on a bad request. Delegates to Spray handler. */ | ||
def customRejectionHandler(implicit transid: TransactionId) = RejectionHandler { | ||
// get default rejection message, package it as an ErrorResponse instance | ||
// which gets serialized into a Json object | ||
case r if RejectionHandler.Default.isDefinedAt(r) => { | ||
ctx => | ||
RejectionHandler.Default(r) { | ||
ctx.withHttpResponseMapped { | ||
case resp @ HttpResponse(_, HttpEntity.NonEmpty(ContentType(`text/plain`, _), msg), _, _) => | ||
resp.withEntity(marshalling.marshalUnsafe(ErrorResponse(msg.asString, transid))) | ||
} | ||
} | ||
} | ||
case CustomRejection(status, cause) :: _ => complete(status, ErrorResponse(cause, transid)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this in particular was used for the auth failure mode #2607. we can delete CustomRejection since it will be unused now - assuming we keep this behavior. |
||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of those should be removable to complete the pull-request, modulo spray-json.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the above, spray-caching hasn't been ported to Akka yet. There is an issue open for it here: akka/akka-http#213. Will look at replacing spray-client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like spray-client is used by HttpClient.scala, but nothing uses the HttpClient code, so I deleted it.