Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port Controller from Spray to Akka #2218

Merged
merged 5 commits into from Aug 12, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 1 addition & 6 deletions common/scala/build.gradle
Expand Up @@ -13,15 +13,10 @@ dependencies {

compile 'io.spray:spray-caching_2.11:1.3.4'
compile 'io.spray:spray-json_2.11:1.3.3'
compile 'io.spray:spray-can_2.11:1.3.4'
compile 'io.spray:spray-client_2.11:1.3.4'
compile 'io.spray:spray-httpx_2.11:1.3.4'
compile 'io.spray:spray-io_2.11:1.3.4'
compile 'io.spray:spray-routing_2.11:1.3.4'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of those should be removable to complete the pull-request, modulo spray-json.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the above, spray-caching hasn't been ported to Akka yet. There is an issue open for it here: akka/akka-http#213. Will look at replacing spray-client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like spray-client is used by HttpClient.scala, but nothing uses the HttpClient code, so I deleted it.

compile 'com.typesafe.akka:akka-actor_2.11:2.4.16'
compile 'com.typesafe.akka:akka-slf4j_2.11:2.4.16'
compile 'com.typesafe.akka:akka-http-core_2.11:10.0.2'
compile 'com.typesafe.akka:akka-http-core_2.11:10.0.9'
compile 'com.typesafe.akka:akka-http-spray-json_2.11:10.0.2'

compile 'log4j:log4j:1.2.16'
Expand Down
2 changes: 1 addition & 1 deletion common/scala/src/main/resources/application.conf
@@ -1,4 +1,4 @@
# default application configuration file for spray/akka
# default application configuration file for akka
include "logging"

akka.http {
Expand Down
43 changes: 0 additions & 43 deletions common/scala/src/main/scala/whisk/common/HttpClient.scala

This file was deleted.

Expand Up @@ -19,9 +19,11 @@ package whisk.core.entity

import scala.util.Try

import spray.http.StatusCodes.OK
import akka.http.scaladsl.model.StatusCodes.OK

import spray.json._
import spray.json.DefaultJsonProtocol

import whisk.common.Logging
import whisk.http.Messages._

Expand Down
138 changes: 68 additions & 70 deletions common/scala/src/main/scala/whisk/http/BasicHttpService.scala
Expand Up @@ -19,56 +19,56 @@ package whisk.http

import scala.concurrent.duration.DurationInt
import scala.language.postfixOps
import scala.collection.immutable.Seq
import scala.concurrent.Future

import akka.actor.Actor
import akka.actor.ActorContext
import akka.actor.ActorSystem
import akka.actor.Props
import akka.event.Logging
import akka.io.IO
import akka.japi.Creator
import akka.pattern.ask
import akka.util.Timeout
import spray.can.Http
import spray.http.ContentType
import spray.http.HttpEntity
import spray.http.HttpRequest
import spray.http.HttpResponse
import spray.http.MediaTypes.`text/plain`
import spray.httpx.SprayJsonSupport.sprayJsonMarshaller
import spray.httpx.marshalling
import spray.httpx.marshalling.ToResponseMarshallable.isMarshallable
import spray.routing.AuthenticationFailedRejection
import spray.routing.Directive.pimpApply
import spray.routing.Directives
import spray.routing.HttpService
import spray.routing.RejectionHandler
import spray.routing.Route
import spray.routing.directives.DebuggingDirectives
import spray.routing.directives.LogEntry
import spray.routing.directives.LoggingMagnet.forMessageFromFullShow
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.server.directives.DebuggingDirectives
import akka.http.scaladsl.server.directives.LogEntry
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.model.HttpRequest
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.RejectionHandler
import akka.http.scaladsl.server.UnacceptedResponseContentTypeRejection
import akka.http.scaladsl.server.RouteResult.Rejected
import akka.http.scaladsl.Http

import spray.json._

import whisk.common.LogMarker
import whisk.common.LogMarkerToken
import whisk.common.Logging
import whisk.common.LoggingMarkers
import whisk.common.TransactionCounter
import whisk.common.TransactionId
import akka.stream.ActorMaterializer

/**
* This trait extends the spray HttpService trait with logging and transaction counting
* This trait extends the Akka Directives and Actor with logging and transaction counting
* facilities common to all OpenWhisk REST services.
*/
trait BasicHttpService extends HttpService with TransactionCounter {

/**
* Gets the actor context.
*/
implicit def actorRefFactory: ActorContext

/**
* Gets the logging
*/
trait BasicHttpService extends Directives with Actor with TransactionCounter {
implicit def logging: Logging
implicit val materializer = ActorMaterializer()
implicit val actorSystem = context.system
implicit val executionContext = actorSystem.dispatcher

val port: Int

/** Rejection handler to terminate connection on a bad request. Delegates to Akka handler. */
implicit def customRejectionHandler(implicit transid: TransactionId) =
RejectionHandler.default.mapRejectionResponse {
case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) =>
val error = ErrorResponse(ent.data.utf8String, transid).toJson
res.copy(entity = HttpEntity(ContentTypes.`application/json`, error.compactPrint))
case x => x
}

/**
* Gets the routes implemented by the HTTP service.
Expand All @@ -86,39 +86,47 @@ trait BasicHttpService extends HttpService with TransactionCounter {
*/
def loglevelForRoute(route: String): Logging.LogLevel = Logging.InfoLevel

/** Rejection handler to terminate connection on a bad request. Delegates to Akka handler. */
val prioritizeRejections = recoverRejections { rejections =>
val priorityRejection = rejections.find {
case rejection: UnacceptedResponseContentTypeRejection => true
case _ => false
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idiomatically these would be:

val priorityRejection = rejections.find { case _: UnacceptedResponseContentTypeRejection => true }
priorityRejection.map(rejection => Rejected(Seq(rejection))).getOrElse(Rejected(rejections))

getOrElse(null) should be avoided at any costs and is in fact not useful in this case. Alternatively you could've omitted that and called priorityRejection.isDefined on the option. Using map though is the safest way here.


priorityRejection.map(rejection => Rejected(Seq(rejection))).getOrElse(Rejected(rejections))
}

/**
* Receives a message and runs the router.
*/
def receive = runRoute(
def route: Route = {
assignId { implicit transid =>
DebuggingDirectives.logRequest(logRequestInfo _) {
DebuggingDirectives.logRequestResponse(logResponseInfo _) {
routes
handleRejections(customRejectionHandler) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dubee why did the directive order change?

prioritizeRejections {
DebuggingDirectives.logRequest(logRequestInfo _) {
DebuggingDirectives.logRequestResult(logResponseInfo _) {
toStrictEntity(1.second) {
routes
}
}
}
}
}
})
}
}

def receive = {
case _ =>
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed? Is an Actor required to implement it?

Copy link
Member Author

@dubee dubee Jul 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the Actor requires receive to be overridden. Tried not inheriting from the Actor, but it looks like we need access to the Actor context in controller.scala

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you point out where that is?

Copy link
Member Author

@dubee dubee Jul 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

open/common/scala/src/main/scala/whisk/http/BasicHttpService.scala:58: implicit ActorRefFactory required: if outside of an Actor you need an implicit ActorSystem, inside of an actor this should be the implicit ActorContext
    implicit val materializer = ActorMaterializer()
                                                 ^
open/common/scala/src/main/scala/whisk/http/BasicHttpService.scala:59: not found: value context
    implicit val actorSystem = context.system

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm fairly sure we can eliminate this but it's a bit of a change - I'll send a PR against your branch or we can deal with it later.


/** Assigns transaction id to every request. */
protected val assignId = extract(_ => transid())

/** Rejection handler to terminate connection on a bad request. Delegates to Spray handler. */

protected def customRejectionHandler(implicit transid: TransactionId) = RejectionHandler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is my understanding correct that the prioritization above lowers the priority on the auth rejection?

case rejections => {
logging.info(this, s"[REJECT] $rejections")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we losing the rejection log message?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm ok dropping this - it's of nominal value.

rejections match {
case AuthenticationFailedRejection(cause, challengeHeaders) :: _ =>
BasicHttpService.customRejectionHandler.apply(rejections.takeRight(1))
case _ => BasicHttpService.customRejectionHandler.apply(rejections)
}
}
}

/** Generates log entry for every request. */
protected def logRequestInfo(req: HttpRequest)(implicit tid: TransactionId): LogEntry = {
val m = req.method.toString
val p = req.uri.path.toString
val q = req.uri.query.toString
val q = req.uri.query().toString
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is calling query side effecting?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that the code won't compile without the parentheses.

missing argument list for method query in class Uri
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `query _` or `query(_,_)` instead of `query`.
            val query = ctx.request.uri.query

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't seem to comment on your above comment for def receive = {. Anyway that is needed for the actor that BasicHttpService extends.

val l = loglevelForRoute(p)
LogEntry(s"[$tid] $m $p $q", l)
}
Expand All @@ -137,29 +145,19 @@ trait BasicHttpService extends HttpService with TransactionCounter {
Some(LogEntry(s"[$tid] [$name] $marker", l))
case _ => None // other kind of responses
}

val bindingFuture = {
Http().bindAndHandle(route, "0.0.0.0", port)
}

def shutdown(): Future[Unit] = {
bindingFuture.flatMap(_.unbind()).map(_ => ())
}
}

object BasicHttpService extends Directives {
def startService[T <: Actor](system: ActorSystem, name: String, interface: String, port: Integer, service: Creator[T]) = {
def startService[T <: Actor](system: ActorSystem, name: String, interface: String, service: Creator[T]) = {
val actor = system.actorOf(Props.create(service), s"$name-service")

implicit val timeout = Timeout(5 seconds)
IO(Http)(system) ? Http.Bind(actor, interface, port)
}

/** Rejection handler to terminate connection on a bad request. Delegates to Spray handler. */
def customRejectionHandler(implicit transid: TransactionId) = RejectionHandler {
// get default rejection message, package it as an ErrorResponse instance
// which gets serialized into a Json object
case r if RejectionHandler.Default.isDefinedAt(r) => {
ctx =>
RejectionHandler.Default(r) {
ctx.withHttpResponseMapped {
case resp @ HttpResponse(_, HttpEntity.NonEmpty(ContentType(`text/plain`, _), msg), _, _) =>
resp.withEntity(marshalling.marshalUnsafe(ErrorResponse(msg.asString, transid)))
}
}
}
case CustomRejection(status, cause) :: _ => complete(status, ErrorResponse(cause, transid))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this in particular was used for the auth failure mode #2607.

we can delete CustomRejection since it will be unused now - assuming we keep this behavior.

}
}
Expand Up @@ -18,7 +18,7 @@
package whisk.http

import akka.event.Logging
import spray.httpx.SprayJsonSupport._

import whisk.common.Logging
import whisk.common.TransactionId

Expand Down
22 changes: 11 additions & 11 deletions common/scala/src/main/scala/whisk/http/ErrorResponse.scala
Expand Up @@ -21,17 +21,17 @@ import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

import spray.http.MediaType
import spray.http.StatusCode
import spray.http.StatusCodes.Forbidden
import spray.http.StatusCodes.NotFound
import spray.httpx.SprayJsonSupport.sprayJsonMarshaller
import spray.httpx.marshalling.ToResponseMarshallable.isMarshallable
import akka.http.scaladsl.model.StatusCode
import akka.http.scaladsl.model.StatusCodes.Forbidden
import akka.http.scaladsl.model.StatusCodes.NotFound
import akka.http.scaladsl.model.MediaType
import akka.http.scaladsl.server.Rejection
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarshaller
import akka.http.scaladsl.server.StandardRoute

import spray.json._
import spray.json.DefaultJsonProtocol._
import spray.routing.Directives
import spray.routing.Rejection
import spray.routing.StandardRoute

import whisk.common.TransactionId
import whisk.core.entity.SizeError
import whisk.core.entity.ByteSize
Expand Down Expand Up @@ -168,7 +168,7 @@ case class ErrorResponse(error: String, code: TransactionId)
/** Custom rejection, wraps status code for response and a cause. */
case class CustomRejection private (status: StatusCode, cause: String) extends Rejection

object ErrorResponse extends Directives {
object ErrorResponse extends Directives with DefaultJsonProtocol {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why the json serdes is required here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we marshaled the ErrorResponse case class ourselves with resp.withEntity(marshalling.marshalUnsafe(ErrorResponse(msg.asString, transid))). This marshalUnsafe method no longer seems to exist in Akka. Now I am calling .toJson on the ErrorResponse.

Ex:

val error = ErrorResponse(ent.data.utf8String, transid).toJson


def terminate(status: StatusCode, error: String)(implicit transid: TransactionId): StandardRoute = {
terminate(status, Option(error) filter { _.trim.nonEmpty } map {
Expand Down