Skip to content
This repository has been archived by the owner on Feb 19, 2021. It is now read-only.

Commit

Permalink
Take full control of EndpointDefinition data presentation
Browse files Browse the repository at this point in the history
  • Loading branch information
kailuowang committed Oct 7, 2016
1 parent c432832 commit 05b9287
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 67 deletions.
Expand Up @@ -20,6 +20,7 @@ import asobu.dsl.CatsInstances._
import scala.concurrent.{ExecutionContext, Future, duration}, duration._
import scala.reflect.ClassTag
import protocol._
import EndpointDefinition._

trait EndpointRoute {
def unapply(requestHeader: Request[AnyContent]): Option[RouteParams]
Expand All @@ -43,16 +44,15 @@ abstract class Endpoint(
ec: ExecutionContext
) extends EndpointRoute with EndpointHandler {
implicit val ak: Timeout = 10.minutes //todo: find the right place to configure this
import definition._

private val handlerRef: ActorRef = {
val props = bridgeProps(handlerPath, definition.clusterRole)
val props = bridgeProps(definition.handlerPath, definition.clusterRole)
//a random name allows some redundancy in this router.
val bridgeActorName = definition.clusterRole + handlerActor.name.replace("$", "") + ThreadLocalRandom.current().nextInt(1000)
val bridgeActorName = definition.clusterRole + definition.handlerActorPath.name.replace("$", "") + ThreadLocalRandom.current().nextInt(1000)
arf.actorOf(props, bridgeActorName)
}

def shutdown(): Unit = if (handlerRef != handlerActor) handlerRef ! PoisonPill
def shutdown(): Unit = handlerRef ! PoisonPill

def unapply(request: Request[AnyContent]): Option[RouteParams] = {
// queryString's parser parses an empty string as Map("" -> Seq()), so we replace query strings made up of all empty values
Expand All @@ -72,7 +72,6 @@ abstract class Endpoint(
// or have distributed request have the reply to Address and then send it to handlerRef as the implicit sender.
def handle(request: GateWayRequest): Future[Result] = {
import akka.pattern.ask
// import ExecutionContext.Implicits.global

val dRequestER: ExtractResult[DRequest] = {
val pathParamsErrors = request.routeParam.path.collect {
Expand Down Expand Up @@ -105,8 +104,8 @@ abstract class Endpoint(
}

private lazy val routeExtractors: ParamsExtractor = {
val localParts = if (path.parts.nonEmpty) StaticPart(defaultPrefix) +: path.parts else Nil
routing.Route(verb.value, routing.PathPattern(toCPart(StaticPart(prefix.value) +: localParts)))
val localParts = if (definition.path.nonEmpty) StaticPart(definition.defaultPrefix) +: definition.pathPattern.parts else Nil
routing.Route(definition.verb.value, routing.PathPattern(toCPart(StaticPart(definition.prefix.value) +: localParts)))
}

private def toCPart(parts: Seq[PathPart]): Seq[routing.PathPart] = parts map {
Expand Down
Expand Up @@ -2,74 +2,95 @@ package asobu.distributed.protocol

import akka.actor.ActorPath
import asobu.distributed.RequestEnricherDefinition
import asobu.distributed.protocol.Prefix
import play.routes.compiler.{HttpVerb, PathPattern, Route}
import play.routes.compiler._
import EndpointDefinition._

@SerialVersionUID(2L)
@SerialVersionUID(1L)
case class EndpointDefinition(
prefix: Prefix,
verb: HttpVerb,
path: PathPattern,
call: String,
handlerActor: ActorPath,
clusterRole: String,
enricherDef: Option[RequestEnricherDefinition] = None,
version: Option[Int] = None
) {
prefix: Prefix,
verb: Verb,
path: Seq[PathPart],
call: String,
handlerAddress: HandlerAddress,
clusterRole: String,
enricherDef: Option[RequestEnricherDefinition] = None,
version: Option[Int] = None
)

val defaultPrefix: String = {
if (prefix.value.endsWith("/")) "" else "/"
}
@SerialVersionUID(1L)
case class Verb(value: String)

val documentation: (String, String, String) = {
val localPath = if (path.parts.isEmpty) "" else defaultPrefix + path.toString
val pathInfo = prefix.value + localPath
(verb.toString, pathInfo, call)
}
sealed trait PathPart

def handlerPath = handlerActor.toStringWithoutAddress
@SerialVersionUID(1L)
case class StaticPathPart(value: String) extends PathPart

val id: String = {
val (verb, path, _) = documentation
s"$verb $path"
}
@SerialVersionUID(1L)
case class DynamicPathPart(name: String, constraint: String, encode: Boolean) extends PathPart

}
@SerialVersionUID(1L)
case class HandlerAddress(value: String) extends AnyVal

@SerialVersionUID(1L)
class Prefix private (val value: String) extends AnyVal

object EndpointDefinition {

case class Verb(value: String)
implicit class EndpointDefinitionOps(val ed: EndpointDefinition) {
import ed._
lazy val defaultPrefix: String = {
if (prefix.value.endsWith("/")) "" else "/"
}

lazy val pathPattern = PathPattern(path.map {
case StaticPathPart(v) StaticPart(v)
case DynamicPathPart(n, c, e) DynamicPart(n, c, e)
})

lazy val documentation: (String, String, String) = {
val localPath = if (pathPattern.parts.isEmpty) "" else defaultPrefix + pathPattern.toString
val pathInfo = prefix.value + localPath
(verb.toString, pathInfo, call)
}

def handlerActorPath = ActorPath.fromString(handlerAddress.value)

def handlerPath = handlerActorPath.toStringWithoutAddress

sealed trait PathPath
case class StaticPathPart(value: String) extends PathPath
case class DynamicPathPart(name: String, constraint: String, encode: Boolean) extends PathPath
case class HandlerAddress(value: String) extends AnyVal
lazy val id: String = {
val (verb, path, _) = documentation
s"$verb $path"
}
}

implicit class actorPathToHandlerAddress(actorPath: ActorPath) {
def handlerAddress: HandlerAddress = HandlerAddress(actorPath.toStringWithAddress(actorPath.address))
}

def apply(
prefix: Prefix,
route: Route,
handlerActor: ActorPath,
handlerAddress: HandlerAddress,
clusterRole: String,
enricherDef: Option[RequestEnricherDefinition],
version: Option[Int]
): EndpointDefinition =
EndpointDefinition(
prefix,
route.verb,
route.path,
Verb(route.verb.value),
route.path.parts.map {
case StaticPart(v) StaticPathPart(v)
case DynamicPart(n, c, e) DynamicPathPart(n, c, e)
},
route.call.toString(),
handlerActor,
handlerAddress,
clusterRole,
enricherDef,
version
)

}

@SerialVersionUID(1L)
class Prefix private (val value: String) extends AnyVal

object Prefix {
val root = apply("/")
def apply(value: String): Prefix = {
Expand Down
25 changes: 13 additions & 12 deletions distributed/src/main/scala/asobu/distributed/service/Action.scala
Expand Up @@ -5,18 +5,17 @@ import akka.cluster.Cluster
import Action.UnrecognizedMessage
import akka.stream.Materializer
import akka.util.ByteString
import asobu.distributed.protocol.EndpointDefinition
import asobu.distributed.protocol.{HandlerAddress, EndpointDefinition, Prefix}
import asobu.distributed.service.extractors.DRequestExtractor
import asobu.distributed.{DResult, DRequest}
import asobu.distributed.protocol.Prefix
import asobu.distributed._
import play.api.http.HttpEntity
import play.api.mvc.{AnyContent, ResponseHeader, Result}
import play.core.routing.RouteParams
import play.routes.compiler.Route
import scala.concurrent.Future
import scala.util.parsing.input.Positional

import EndpointDefinition._
trait Action {
type TMessage

Expand All @@ -26,7 +25,7 @@ trait Action {

def handlerActor()(implicit sys: ActorSystem): ActorRef = {
assert(!Cluster(sys).selfRoles.isEmpty, "Endpoint must be declared in node with a role in an Akka cluster")
sys.actorOf(Props(new RemoteHandler).withDeploy(Deploy.local), name + "_Handler")
sys.actorOf(Props(new RemoteHandler), name + "_Handler")
}

def enricherDefinition: Option[RequestEnricherDefinition]
Expand All @@ -35,14 +34,16 @@ trait Action {
route: Route,
prefix: Prefix,
version: Option[Int]
)(implicit sys: ActorSystem): EndpointDefinition = EndpointDefinition(
prefix,
route,
handlerActor().path,
Cluster(sys).selfRoles.head,
enricherDefinition,
version
)
)(implicit sys: ActorSystem): EndpointDefinition = {
EndpointDefinition(
prefix,
route,
handlerActor().path.handlerAddress,
Cluster(sys).selfRoles.head,
enricherDefinition,
version
)
}

class RemoteHandler extends Actor {
import context.dispatcher
Expand Down
Expand Up @@ -2,7 +2,7 @@ package asobu.distributed.gateway

import akka.actor._
import asobu.distributed.gateway.enricher.DisabledInterpreter
import asobu.distributed.protocol.{Prefix, EndpointDefinition}
import asobu.distributed.protocol.{HandlerAddress, Prefix, EndpointDefinition}
import asobu.distributed.util.{EndpointUtil, TestClusterActorSystem}
import asobu.distributed.util
import asobu.distributed.gateway.Endpoint.EndpointFactory
Expand Down Expand Up @@ -37,7 +37,7 @@ object EndpointSpec extends PlaySpecification with Mockito {
EndpointDefinition(
prefix,
route,
ActorPath.fromString("akka://my-sys/user/service-a/worker1"),
HandlerAddress("akka://my-sys/user/service-a/worker1"),
"role",
None,
None
Expand Down
Expand Up @@ -12,6 +12,7 @@ import shapeless.HNil
import concurrent.duration._
import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import EndpointDefinition._

class EndpointsRouterSpec extends SpecWithActorCluster {

Expand All @@ -30,7 +31,7 @@ class EndpointsRouterSpec extends SpecWithActorCluster {
val worker2 = TestProbe()
val createEndpointDef = (route: Route, prefix: Prefix) {
val path = if (route.path.toString.contains("ep1")) worker1.ref.path else worker2.ref.path
EndpointDefinition(prefix, route, path, role, None, None)
EndpointDefinition(prefix, route, path.handlerAddress, role, None, None)
}

val endpointDefs = EndpointUtil.parseEndpoints(routeString)(createEndpointDef)
Expand Down
Expand Up @@ -4,8 +4,7 @@ import java.io.InvalidClassException

import akka.actor.{ActorRef, ExtendedActorSystem, UnhandledMessage}
import asobu.distributed._
import asobu.distributed.protocol.Prefix
import asobu.distributed.protocol.EndpointDefinition
import asobu.distributed.protocol.{Verb, Prefix, EndpointDefinition}
import asobu.distributed.util.{EndpointUtil, MockRoute, SpecWithActorCluster}
import akka.actor.ActorDSL._
import akka.cluster.Cluster
Expand All @@ -18,6 +17,7 @@ import play.routes.compiler.HttpVerb
class EndpointsRouterUpdaterSpec extends SpecWithActorCluster {
import scala.concurrent.ExecutionContext.Implicits.global
import EndpointUtil._
import EndpointDefinition._

def mockEndpointDef(
version: Option[Int],
Expand All @@ -30,10 +30,10 @@ class EndpointsRouterUpdaterSpec extends SpecWithActorCluster {

EndpointDefinition(
Prefix("/"),
HttpVerb(verb),
Verb(verb),
pathOf(pathParts),
"call",
handler.path,
handler.path.handlerAddress,
"test",
None,
version
Expand Down
Expand Up @@ -4,7 +4,7 @@ import akka.actor.ActorRefFactory
import asobu.distributed.gateway.Endpoint.EndpointFactory
import asobu.distributed.gateway.HandlerBridgeProps
import asobu.distributed.gateway.enricher.DisabledInterpreter
import asobu.distributed.protocol.{Prefix, EndpointDefinition}
import asobu.distributed.protocol.{StaticPathPart, PathPart, Prefix, EndpointDefinition}
import asobu.distributed.service.EndpointRoutesParser

import play.routes.compiler.{StaticPart, PathPattern, Route}
Expand Down Expand Up @@ -39,6 +39,5 @@ object EndpointUtil {
EndpointFactory[Nothing](HandlerBridgeProps.default)
}

def pathOf(pathParts: List[String] = List("abc", "ep1")): PathPattern =
PathPattern(pathParts.map(StaticPart))
def pathOf(pathParts: List[String] = List("abc", "ep1")): Seq[PathPart] = pathParts.map(StaticPathPart)
}

0 comments on commit 05b9287

Please sign in to comment.