Skip to content

Commit

Permalink
Port Controller from Spray to Akka (apache#2218)
Browse files Browse the repository at this point in the history
* Port Controller from Spray to Akka
* Increase max-connections and Update JSON Unmarshaller
  • Loading branch information
dubee authored and rabbah committed Aug 12, 2017
1 parent 7e15559 commit a313ac6
Show file tree
Hide file tree
Showing 51 changed files with 1,155 additions and 1,159 deletions.
7 changes: 1 addition & 6 deletions common/scala/build.gradle
Expand Up @@ -13,15 +13,10 @@ dependencies {

compile 'io.spray:spray-caching_2.11:1.3.4'
compile 'io.spray:spray-json_2.11:1.3.3'
compile 'io.spray:spray-can_2.11:1.3.4'
compile 'io.spray:spray-client_2.11:1.3.4'
compile 'io.spray:spray-httpx_2.11:1.3.4'
compile 'io.spray:spray-io_2.11:1.3.4'
compile 'io.spray:spray-routing_2.11:1.3.4'

compile 'com.typesafe.akka:akka-actor_2.11:2.4.16'
compile 'com.typesafe.akka:akka-slf4j_2.11:2.4.16'
compile 'com.typesafe.akka:akka-http-core_2.11:10.0.2'
compile 'com.typesafe.akka:akka-http-core_2.11:10.0.9'
compile 'com.typesafe.akka:akka-http-spray-json_2.11:10.0.2'

compile 'log4j:log4j:1.2.16'
Expand Down
2 changes: 1 addition & 1 deletion common/scala/src/main/resources/application.conf
@@ -1,4 +1,4 @@
# default application configuration file for spray/akka
# default application configuration file for akka
include "logging"

akka.http {
Expand Down
43 changes: 0 additions & 43 deletions common/scala/src/main/scala/whisk/common/HttpClient.scala

This file was deleted.

Expand Up @@ -19,9 +19,11 @@ package whisk.core.entity

import scala.util.Try

import spray.http.StatusCodes.OK
import akka.http.scaladsl.model.StatusCodes.OK

import spray.json._
import spray.json.DefaultJsonProtocol

import whisk.common.Logging
import whisk.http.Messages._

Expand Down
140 changes: 69 additions & 71 deletions common/scala/src/main/scala/whisk/http/BasicHttpService.scala
Expand Up @@ -19,56 +19,56 @@ package whisk.http

import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.collection.immutable.Seq
import scala.concurrent.Future

import akka.actor.Actor
import akka.actor.ActorContext
import akka.actor.ActorSystem
import akka.actor.Props
import akka.event.Logging
import akka.io.IO
import akka.japi.Creator
import akka.pattern.ask
import akka.util.Timeout
import spray.can.Http
import spray.http.ContentType
import spray.http.HttpEntity
import spray.http.HttpRequest
import spray.http.HttpResponse
import spray.http.MediaTypes.`text/plain`
import spray.httpx.SprayJsonSupport.sprayJsonMarshaller
import spray.httpx.marshalling
import spray.httpx.marshalling.ToResponseMarshallable.isMarshallable
import spray.routing.AuthenticationFailedRejection
import spray.routing.Directive.pimpApply
import spray.routing.Directives
import spray.routing.HttpService
import spray.routing.RejectionHandler
import spray.routing.Route
import spray.routing.directives.DebuggingDirectives
import spray.routing.directives.LogEntry
import spray.routing.directives.LoggingMagnet.forMessageFromFullShow
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.http.scaladsl.server.directives.LogEntry
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.RejectionHandler
import akka.http.scaladsl.server.UnacceptedResponseContentTypeRejection
import akka.http.scaladsl.server.RouteResult.Rejected
import akka.http.scaladsl.Http

import spray.json._

import whisk.common.LogMarker
import whisk.common.LogMarkerToken
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionCounter
import whisk.common.TransactionId
import akka.stream.ActorMaterializer

/**
* This trait extends the spray HttpService trait with logging and transaction counting
* This trait extends the Akka Directives and Actor with logging and transaction counting
* facilities common to all OpenWhisk REST services.
*/
trait BasicHttpService extends HttpService with TransactionCounter {

/**
* Gets the actor context.
*/
implicit def actorRefFactory: ActorContext

/**
* Gets the logging
*/
trait BasicHttpService extends Directives with Actor with TransactionCounter {
implicit def logging: Logging
implicit val materializer = ActorMaterializer()
implicit val actorSystem = context.system
implicit val executionContext = actorSystem.dispatcher

val port: Int

/** Rejection handler to terminate connection on a bad request. Delegates to Akka handler. */
implicit def customRejectionHandler(implicit transid: TransactionId) =
RejectionHandler.default.mapRejectionResponse {
case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) =>
val error = ErrorResponse(ent.data.utf8String, transid).toJson
res.copy(entity = HttpEntity(ContentTypes.`application/json`, error.compactPrint))
case x => x
}

/**
* Gets the routes implemented by the HTTP service.
Expand All @@ -86,39 +86,47 @@ trait BasicHttpService extends HttpService with TransactionCounter {
*/
def loglevelForRoute(route: String): Logging.LogLevel = Logging.InfoLevel

/** Rejection handler to terminate connection on a bad request. Delegates to Akka handler. */
val prioritizeRejections = recoverRejections { rejections =>
val priorityRejection = rejections.find {
case rejection: UnacceptedResponseContentTypeRejection => true
case _ => false
}

priorityRejection.map(rejection => Rejected(Seq(rejection))).getOrElse(Rejected(rejections))
}

/**
* Receives a message and runs the router.
*/
def receive = runRoute(
def route: Route = {
assignId { implicit transid =>
DebuggingDirectives.logRequest(logRequestInfo _) {
DebuggingDirectives.logRequestResponse(logResponseInfo _) {
routes
handleRejections(customRejectionHandler) {
prioritizeRejections {
DebuggingDirectives.logRequest(logRequestInfo _) {
DebuggingDirectives.logRequestResult(logResponseInfo _) {
toStrictEntity(1.second) {
routes
}
}
}
}
}
})
}
}

def receive = {
case _ =>
}

/** Assigns transaction id to every request. */
protected val assignId = extract(_ => transid())

/** Rejection handler to terminate connection on a bad request. Delegates to Spray handler. */

protected def customRejectionHandler(implicit transid: TransactionId) = RejectionHandler {
case rejections => {
logging.info(this, s"[REJECT] $rejections")
rejections match {
case AuthenticationFailedRejection(cause, challengeHeaders) :: _ =>
BasicHttpService.customRejectionHandler.apply(rejections.takeRight(1))
case _ => BasicHttpService.customRejectionHandler.apply(rejections)
}
}
}

/** Generates log entry for every request. */
protected def logRequestInfo(req: HttpRequest)(implicit tid: TransactionId): LogEntry = {
val m = req.method.toString
val m = req.method.name.toString
val p = req.uri.path.toString
val q = req.uri.query.toString
val q = req.uri.query().toString
val l = loglevelForRoute(p)
LogEntry(s"[$tid] $m $p $q", l)
}
Expand All @@ -137,29 +145,19 @@ trait BasicHttpService extends HttpService with TransactionCounter {
Some(LogEntry(s"[$tid] [$name] $marker", l))
case _ => None // other kind of responses
}

val bindingFuture = {
Http().bindAndHandle(route, "0.0.0.0", port)
}

def shutdown(): Future[Unit] = {
bindingFuture.flatMap(_.unbind()).map(_ => ())
}
}

object BasicHttpService extends Directives {
def startService[T <: Actor](system: ActorSystem, name: String, interface: String, port: Integer, service: Creator[T]) = {
def startService[T <: Actor](system: ActorSystem, name: String, interface: String, service: Creator[T]) = {
val actor = system.actorOf(Props.create(service), s"$name-service")

implicit val timeout = Timeout(5 seconds)
IO(Http)(system) ? Http.Bind(actor, interface, port)
}

/** Rejection handler to terminate connection on a bad request. Delegates to Spray handler. */
def customRejectionHandler(implicit transid: TransactionId) = RejectionHandler {
// get default rejection message, package it as an ErrorResponse instance
// which gets serialized into a Json object
case r if RejectionHandler.Default.isDefinedAt(r) => {
ctx =>
RejectionHandler.Default(r) {
ctx.withHttpResponseMapped {
case resp @ HttpResponse(_, HttpEntity.NonEmpty(ContentType(`text/plain`, _), msg), _, _) =>
resp.withEntity(marshalling.marshalUnsafe(ErrorResponse(msg.asString, transid)))
}
}
}
case CustomRejection(status, cause) :: _ => complete(status, ErrorResponse(cause, transid))
}
}
Expand Up @@ -18,7 +18,7 @@
package whisk.http

import akka.event.Logging
import spray.httpx.SprayJsonSupport._

import whisk.common.Logging
import whisk.common.TransactionId

Expand Down
33 changes: 13 additions & 20 deletions common/scala/src/main/scala/whisk/http/ErrorResponse.scala
Expand Up @@ -21,17 +21,16 @@ import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

import spray.http.MediaType
import spray.http.StatusCode
import spray.http.StatusCodes.Forbidden
import spray.http.StatusCodes.NotFound
import spray.httpx.SprayJsonSupport.sprayJsonMarshaller
import spray.httpx.marshalling.ToResponseMarshallable.isMarshallable
import akka.http.scaladsl.model.StatusCode
import akka.http.scaladsl.model.StatusCodes.Forbidden
import akka.http.scaladsl.model.StatusCodes.NotFound
import akka.http.scaladsl.model.MediaType
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarshaller
import akka.http.scaladsl.server.StandardRoute

import spray.json._
import spray.json.DefaultJsonProtocol._
import spray.routing.Directives
import spray.routing.Rejection
import spray.routing.StandardRoute

import whisk.common.TransactionId
import whisk.core.entity.SizeError
import whisk.core.entity.ByteSize
Expand Down Expand Up @@ -107,6 +106,9 @@ object Messages {
/** Error messages for activations. */
val abnormalInitialization = "The action did not initialize and exited unexpectedly."
val abnormalRun = "The action did not produce a valid response and exited unexpectedly."
def badEntityName(value: String) = s"Parameter is not a valid value for a entity name: $value"
def badNamespace(value: String) = s"Parameter is not a valid value for a namespace: $value"
def badEpoch(value: String) = s"Parameter is not a valid value for epoch seconds: $value"

/** Error message for size conformance. */
def entityTooBig(error: SizeError) = {
Expand Down Expand Up @@ -165,10 +167,7 @@ object Messages {
/** Replaces rejections with Json object containing cause and transaction id. */
case class ErrorResponse(error: String, code: TransactionId)

/** Custom rejection, wraps status code for response and a cause. */
case class CustomRejection private (status: StatusCode, cause: String) extends Rejection

object ErrorResponse extends Directives {
object ErrorResponse extends Directives with DefaultJsonProtocol {

def terminate(status: StatusCode, error: String)(implicit transid: TransactionId): StandardRoute = {
terminate(status, Option(error) filter { _.trim.nonEmpty } map {
Expand Down Expand Up @@ -207,9 +206,3 @@ object ErrorResponse extends Directives {
}

}

object CustomRejection {
def apply(status: StatusCode): CustomRejection = {
CustomRejection(status, status.defaultMessage)
}
}

0 comments on commit a313ac6

Please sign in to comment.