Skip to content

Commit

Permalink
Add V2 Route
Browse files Browse the repository at this point in the history
- Create info route with Akka
  • Loading branch information
dubee committed Jun 19, 2017
1 parent 5632460 commit f213d4e
Show file tree
Hide file tree
Showing 46 changed files with 1,449 additions and 1,242 deletions.
84 changes: 61 additions & 23 deletions common/scala/src/main/scala/whisk/http/BasicHttpService.scala
Expand Up @@ -20,32 +20,51 @@ import scala.concurrent.duration.DurationInt
import scala.language.postfixOps

import akka.actor.Actor
import akka.actor.ActorContext
//import akka.actor.ActorContext
import akka.actor.ActorSystem
import akka.actor.Props
import akka.event.Logging
import akka.io.IO
//import akka.io.IO
import akka.japi.Creator
import akka.pattern.ask
//import akka.pattern.ask
import akka.util.Timeout
import spray.can.Http

import akka.http.scaladsl.server.Directives
//import akka.http.scaladsl.server.directives.DebuggingDirectives
//import akka.http.scaladsl.server.directives.DebuggingDirectives._
//import akka.http.scaladsl.server.directives.LoggingMagnet.forMessageFromFullShow
import akka.http.scaladsl.server.directives.LogEntry

//import akka.http.scaladsl.model.ContentType
//import akka.http.scaladsl.model.HttpEntity
import akka.http.scaladsl.model.HttpResponse
//import akka.http.scaladsl.model.MediaTypes.`text/plain`
//import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarshaller
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.model.HttpRequest
//import akka.http.scaladsl.server.RejectionHandler
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.RejectionHandler
/*import spray.can.Http
import spray.http.ContentType
import spray.http.HttpEntity
import spray.http.HttpRequest
import spray.http.HttpResponse
import spray.http.MediaTypes.`text/plain`
import spray.httpx.SprayJsonSupport.sprayJsonMarshaller
import spray.httpx.marshalling
import spray.httpx.marshalling.ToResponseMarshallable.isMarshallable
//import spray.httpx.marshalling.ToResponseMarshallable.isMarshallable
import spray.routing.AuthenticationFailedRejection
import spray.routing.Directive.pimpApply
import spray.routing.Directives
import spray.routing.HttpService
//import spray.routing.HttpService
import spray.routing.RejectionHandler
import spray.routing.Route
import spray.routing.directives.DebuggingDirectives
import spray.routing.directives.LogEntry
import spray.routing.directives.LoggingMagnet.forMessageFromFullShow
import spray.routing.directives.LoggingMagnet.forMessageFromFullShow*/

import whisk.common.LogMarker
import whisk.common.LogMarkerToken
import whisk.common.Logging
Expand All @@ -57,24 +76,40 @@ import whisk.common.TransactionId
* This trait extends the spray HttpService trait with logging and transaction counting
* facilities common to all OpenWhisk REST services.
*/
trait BasicHttpService extends HttpService with TransactionCounter {
trait BasicHttpService extends Directives with Actor with TransactionCounter {

/**
* Gets the actor context.
*/
implicit def actorRefFactory: ActorContext
//implicit def actorRefFactory: ActorContext

/**
* Gets the logging
*/
implicit def logging: Logging

implicit def myRejectionHandler =
RejectionHandler.default
.mapRejectionResponse {
case res @ HttpResponse(_, _, ent: HttpEntity.Strict, _) =>
// since all Akka default rejection responses are Strict this will handle all rejections
val message = ent.data.utf8String.replaceAll("\"", """\"""")

// we copy the response in order to keep all headers and status code, wrapping the message as hand rolled JSON
// you could the entity using your favourite marshalling library (e.g. spray json or anything else)
res.copy(entity = HttpEntity(ContentTypes.`application/json`, s"""{"rejection": "$message"}"""))

//case CustomRejection(status, cause) :: _ => complete(status, ErrorResponse(cause, transid))

case x => x // pass through all other types of responses
}

/**
* Gets the routes implemented by the HTTP service.
*
* @param transid the id for the transaction (every request is assigned an id)
*/
def routes(implicit transid: TransactionId): Route
def routes: Route

/**
* Gets the log level for a given route. The default is
Expand All @@ -88,21 +123,24 @@ trait BasicHttpService extends HttpService with TransactionCounter {
/**
* Receives a message and runs the router.
*/
def receive = runRoute(
assignId { implicit transid =>
DebuggingDirectives.logRequest(logRequestInfo _) {
DebuggingDirectives.logRequestResponse(logResponseInfo _) {
routes
}
def receive = {
case _ =>

assignId { implicit transid =>
//DebuggingDirectives.logRequest(logRequestInfo _) {
// DebuggingDirectives.logRequestResponse(logResponseInfo _) {
Route.seal(routes)
// }
//}
}
})
}

/** Assigns transaction id to every request. */
protected val assignId = extract(_ => transid())

/** Rejection handler to terminate connection on a bad request. Delegates to Spray handler. */

protected def customRejectionHandler(implicit transid: TransactionId) = RejectionHandler {
/*protected def customRejectionHandler(implicit transid: TransactionId) = RejectionHandler {
case rejections => {
logging.info(this, s"[REJECT] $rejections")
rejections match {
Expand All @@ -111,13 +149,13 @@ trait BasicHttpService extends HttpService with TransactionCounter {
case _ => BasicHttpService.customRejectionHandler.apply(rejections)
}
}
}
}*/

/** Generates log entry for every request. */
protected def logRequestInfo(req: HttpRequest)(implicit tid: TransactionId): LogEntry = {
val m = req.method.toString
val p = req.uri.path.toString
val q = req.uri.query.toString
val q = req.uri.query().toString
val l = loglevelForRoute(p)
LogEntry(s"[$tid] $m $p $q", l)
}
Expand All @@ -143,11 +181,11 @@ object BasicHttpService extends Directives {
val actor = system.actorOf(Props.create(service), s"$name-service")

implicit val timeout = Timeout(5 seconds)
IO(Http)(system) ? Http.Bind(actor, interface, port)
//IO(Http)(system) ? Http.Bind(actor, interface, port)
}

/** Rejection handler to terminate connection on a bad request. Delegates to Spray handler. */
def customRejectionHandler(implicit transid: TransactionId) = RejectionHandler {
/*def customRejectionHandler(implicit transid: TransactionId) = RejectionHandler {
// get default rejection message, package it as an ErrorResponse instance
// which gets serialized into a Json object
case r if RejectionHandler.Default.isDefinedAt(r) => {
Expand All @@ -160,5 +198,5 @@ object BasicHttpService extends Directives {
}
}
case CustomRejection(status, cause) :: _ => complete(status, ErrorResponse(cause, transid))
}
}*/
}
12 changes: 8 additions & 4 deletions common/scala/src/main/scala/whisk/http/BasicRasService.scala
Expand Up @@ -20,17 +20,21 @@ import akka.actor.Actor
import akka.actor.ActorSystem
import akka.event.Logging
import akka.japi.Creator
import spray.httpx.SprayJsonSupport._

//import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport

//import spray.httpx.SprayJsonSupport._

import whisk.common.Logging
import whisk.common.TransactionId
//import whisk.common.TransactionId

/**
* This trait extends the BasicHttpService with a standard "ping" endpoint which
* responds to health queries, intended for monitoring.
*/
trait BasicRasService extends BasicHttpService {

override def routes(implicit transid: TransactionId) = ping
override def routes = ping

override def loglevelForRoute(route: String): Logging.LogLevel = {
if (route == "/ping") {
Expand Down Expand Up @@ -59,7 +63,7 @@ object BasicRasService {
* which extends the BasicRasService trait.
*/
private class RasService(implicit val logging: Logging) extends BasicRasService with Actor {
override def actorRefFactory = context
//override def actorRefFactory = context
}

/**
Expand Down
28 changes: 24 additions & 4 deletions common/scala/src/main/scala/whisk/http/ErrorResponse.scala
Expand Up @@ -16,10 +16,7 @@

package whisk.http

import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

/*
import spray.http.MediaType
import spray.http.StatusCode
import spray.http.StatusCodes.Forbidden
Expand All @@ -31,6 +28,27 @@ import spray.json.DefaultJsonProtocol._
import spray.routing.Directives
import spray.routing.Rejection
import spray.routing.StandardRoute
*/

import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

import akka.http.scaladsl.model.StatusCode
import akka.http.scaladsl.model.StatusCodes.Forbidden
import akka.http.scaladsl.model.StatusCodes.NotFound
import akka.http.scaladsl.model.MediaType
import akka.http.scaladsl.server.Rejection
import akka.http.scaladsl.server.Directives
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport.sprayJsonMarshaller
import akka.http.scaladsl.server.StandardRoute
//import akka.http.marshalling.ToResponseMarshallable.isMarshallable

import spray.json._
import spray.json.DefaultJsonProtocol._


import whisk.common.TransactionId
import whisk.core.entity.SizeError
import whisk.core.entity.ByteSize
Expand Down Expand Up @@ -111,6 +129,8 @@ object Messages {
s"${error.field} larger than allowed: ${error.is.toBytes} > ${error.allowed.toBytes} bytes."
}

val payloadMustBeJSON = "Payload must be JSON formatted."

def truncateLogs(limit: ByteSize) = {
s"Logs were truncated because the total bytes size exceeds the limit of ${limit.toBytes} bytes."
}
Expand Down
65 changes: 34 additions & 31 deletions core/controller/src/main/resources/application.conf
@@ -1,40 +1,43 @@
# common logging configuration see common scala
include "logging"
include "akka-http-version"

# see http://spray.io/documentation/spray-can/configuration/
# descriptions inlined below for convenience
spray.can.server {
# Description:
# If a request hasn't been responded to after the time period set here
# a `spray.http.Timedout` message will be sent to the timeout handler.
# Set to `infinite` to completely disable request timeouts.
#
# Explaining the set value:
# The controller holds connections up to 60s for blocking invokes, and
# all other operations are expected to complete quickly. We allow a grace
# period in addition to the blocking invoke timeout.
request-timeout = 90s
// # http://doc.akka.io/docs/akka-http/current/scala/http/configuration.html
// # descriptions inlined below for convenience
akka.http {
server {
# Description:
# If a request hasn't been responded to after the time period set here
# a `spray.http.Timedout` message will be sent to the timeout handler.
# Set to `infinite` to completely disable request timeouts.
#
# Explaining the set value:
# The controller holds connections up to 60s for blocking invokes, and
# all other operations are expected to complete quickly. We allow a grace
# period in addition to the blocking invoke timeout.
request-timeout = 90s

# Description:
# Enables/disables support for statistics collection and querying.
# Even though stats keeping overhead is small,
# for maximum performance switch off when not needed.
stats-support = off
# Description:
# Enables/disables support for statistics collection and querying.
# Even though stats keeping overhead is small,
# for maximum performance switch off when not needed.
stats-support = off

# Description:
# The time after which an idle connection will be automatically closed.
# Set to `infinite` to completely disable idle connection timeouts.
#
# Explaining the set value:
# This must be greater than the request timeout.
idle-timeout = 120s
# Description:
# The time after which an idle connection will be automatically closed.
# Set to `infinite` to completely disable idle connection timeouts.
#
# Explaining the set value:
# This must be greater than the request timeout.
idle-timeout = 120s

parsing {
# This indirectly puts a bound on the name of entities
# 8k matches nginx default
max-uri-length = 8k
parsing {
# This indirectly puts a bound on the name of entities
# 8k matches nginx default
max-uri-length = 8k

# This is 50MB to allow action attachments
max-content-length = 50m
# This is 50MB to allow action attachments
max-content-length = 50m
}
}
}

0 comments on commit f213d4e

Please sign in to comment.