Skip to content
This repository has been archived by the owner on Feb 19, 2021. It is now read-only.

Commit

Permalink
moved to the enricher model
Browse files Browse the repository at this point in the history
  • Loading branch information
kailuowang committed Oct 6, 2016
1 parent b78c599 commit 72a8624
Show file tree
Hide file tree
Showing 58 changed files with 1,137 additions and 933 deletions.
108 changes: 62 additions & 46 deletions distributed/src/main/scala/asobu/distributed/EndPointDefinition.scala
@@ -1,41 +1,37 @@
package asobu.distributed

import akka.actor.{ActorPath, ActorRef}
import asobu.distributed.CustomRequestExtractorDefinition.Interpreter
import asobu.distributed.gateway.Endpoint.Prefix
import asobu.distributed.gateway.RoutesCompilerExtra._
import asobu.distributed.service.ActionExtractor.RemoteExtractor
import asobu.distributed.service.RemoteExtractorDef
import asobu.dsl.{Extractor, ExtractResult}
import asobu.distributed.gateway.enricher.Interpreter
import asobu.dsl.{RequestExtractor, Extractor, ExtractResult}
import play.api.mvc.{AnyContent, Request}
import play.core.routing.RouteParams
import play.routes.compiler.Route
import play.routes.compiler.{PathPattern, HttpVerb, Route}
import shapeless.{HNil, HList}

import scala.concurrent.ExecutionContext

/**
* Endpoint definition by the remote handler
*/
trait EndpointDefinition {
/**
* type of the Message
*/
type T //todo: add serializable bound
val routeInfo: Route
val prefix: Prefix
sealed trait EndpointDefinition {
def verb: HttpVerb
def path: PathPattern
def call: String
def prefix: Prefix //todo: move prefix to Endpoint set
def handlerActor: ActorPath
def clusterRole: String
def version: Option[Int]
def version: Option[Int] //todo: move version to Endpoint set

val defaultPrefix: String = {
if (prefix.value.endsWith("/")) "" else "/"
}

val documentation: (String, String, String) = {
val localPath = if (routeInfo.path.parts.isEmpty) "" else defaultPrefix + routeInfo.path.toString
val localPath = if (path.parts.isEmpty) "" else defaultPrefix + path.toString
val pathInfo = prefix.value + localPath
(routeInfo.verb.toString, pathInfo, routeInfo.call.toString)
(verb.toString, pathInfo, call)
}

def handlerPath = handlerActor.toStringWithoutAddress
Expand All @@ -44,46 +40,66 @@ trait EndpointDefinition {
val (verb, path, _) = documentation
s"$verb $path"
}

def remoteExtractor(interpreter: Interpreter)(implicit ex: ExecutionContext): RemoteExtractor[T]

}

object EndpointDefinition {
type Aux[T0] = EndpointDefinition { type T = T0 }
}
@SerialVersionUID(2L)
case class EndpointDefSimple(
prefix: Prefix,
verb: HttpVerb,
path: PathPattern,
call: String,
handlerActor: ActorPath,
clusterRole: String,
version: Option[Int] = None
) extends EndpointDefinition

@SerialVersionUID(2L)
case class EndpointDefWithEnrichment(
prefix: Prefix,
verb: HttpVerb,
path: PathPattern,
call: String,
enricherDef: RequestEnricherDefinition,
handlerActor: ActorPath,
clusterRole: String,
version: Option[Int] = None
) extends EndpointDefinition

@SerialVersionUID(1L)
case class EndpointDefImpl[LExtracted <: HList, LParam <: HList, LExtra <: HList](
object EndpointDefinition {
def apply(
prefix: Prefix,
routeInfo: Route,
remoteExtractorDef: RemoteExtractorDef[LExtracted, LParam, LExtra],
route: Route,
handlerActor: ActorPath,
clusterRole: String,
version: Option[Int] = None
) extends EndpointDefinition {

type T = LExtracted

def remoteExtractor(interpreter: Interpreter)(implicit ex: ExecutionContext): RemoteExtractor[T] = remoteExtractorDef.extractor(interpreter: Interpreter)
}

/**
* Endpoint that takes no input at all, just match a route path
*/
case class NullaryEndpointDefinition(
): EndpointDefinition =
EndpointDefSimple(
prefix,
route.verb,
route.path,
route.call.toString(),
handlerActor,
clusterRole,
version
)

def apply(
prefix: Prefix,
routeInfo: Route,
route: Route,
enricherDef: RequestEnricherDefinition,
handlerActor: ActorPath,
clusterRole: String,
version: Option[Int] = None
) extends EndpointDefinition {

type T = HNil
def extract(routeParams: RouteParams, request: Request[AnyContent]): ExtractResult[T] = {
import scala.concurrent.ExecutionContext.Implicits.global
ExtractResult.pure(HNil)
}
version: Option[Int]
): EndpointDefinition =
EndpointDefWithEnrichment(
prefix,
route.verb,
route.path,
route.call.toString(),
enricherDef,
handlerActor,
clusterRole,
version
)

def remoteExtractor(interpreter: Interpreter)(implicit ec: ExecutionContext) = Extractor.empty[(RouteParams, Request[AnyContent])]
}
Expand Up @@ -5,17 +5,20 @@ import akka.cluster.{Member, MemberStatus, Cluster}
import akka.cluster.ddata.{LWWMap, LWWMapKey, DistributedData}
import akka.cluster.ddata.Replicator._
import asobu.distributed.EndpointsRegistry.DocDataType
import asobu.distributed.gateway.enricher.Interpreter
import play.api.libs.json.{JsNumber, Json, JsObject}
import concurrent.duration._

trait EndpointsRegistry {
val EndpointsDataKey = EndpointsRegistry.EndpointsDataKey
val EndpointsDocsKey = EndpointsRegistry.EndpointsDocsKey

def writeConsistency: WriteConsistency
def readConsistency: ReadConsistency
def emptyData: LWWMap[EndpointDefinition]
val EndpointsDataKey = LWWMapKey[EndpointDefinition]("endpoints-registry-endpoints")

val EndpointsDocsKey = LWWMapKey[DocDataType]("endpoints-registry-apidocs")

def emptyDocs: LWWMap[DocDataType]
val emptyData = LWWMap.empty[EndpointDefinition]

implicit def node: Cluster
def replicator: ActorRef
Expand All @@ -26,23 +29,21 @@ object EndpointsRegistry {
//Doc json is stored as String (JsObject isn't that serializable after manipulation)
type DocDataType = String

val EndpointsDataKey = LWWMapKey[EndpointDefinition]("endpoints-registry-endpoints")
val EndpointsDocsKey = LWWMapKey[DocDataType]("endpoints-registry-apidocs")
}

case class DefaultEndpointsRegistry(system: ActorSystem) extends EndpointsRegistry {
val timeout = 30.seconds

val writeConsistency = WriteAll(timeout)
val readConsistency = ReadMajority(timeout)
val emptyData = LWWMap.empty[EndpointDefinition]
val emptyDocs = LWWMap.empty[DocDataType]

implicit val node = Cluster(system)
val replicator: ActorRef = DistributedData(system).replicator
}

class EndpointsRegistryUpdater(registry: EndpointsRegistry) extends Actor with ActorLogging {

import EndpointsRegistryUpdater._
import registry._

Expand Down
@@ -0,0 +1,16 @@
package asobu.distributed

trait RequestEnricherDefinition extends Serializable

trait RequestEnricherDefinitionCombinator extends RequestEnricherDefinition

object RequestEnricherDefinition {

case class AndThen[A <: RequestEnricherDefinition, B <: RequestEnricherDefinition](a: A, b: B) extends RequestEnricherDefinitionCombinator
case class OrElse[A <: RequestEnricherDefinition, B <: RequestEnricherDefinition](a: A, b: B) extends RequestEnricherDefinitionCombinator

implicit class RequestEnricherDefinitionSyntax[A <: RequestEnricherDefinition](a: A) {
def andThen[B <: RequestEnricherDefinition](b: B): AndThen[A, B] = AndThen(a, b)
def orElse[B <: RequestEnricherDefinition](b: B): OrElse[A, B] = OrElse(a, b)
}
}

This file was deleted.

Expand Up @@ -3,6 +3,9 @@ package asobu.distributed
import akka.actor.ActorSystem
import scala.collection.JavaConverters._

/**
* Validate if the distributed data is configured correctly.
*/
object SystemValidator {
def validate(system: ActorSystem): Either[String, Unit] = {
val cfg = system.settings.config
Expand Down
Expand Up @@ -9,7 +9,7 @@ import akka.routing.{Group, RoundRobinGroup, FromConfig}
import scala.concurrent.{Future, Promise}

/**
* AkkaCluster enabled Akka routers
* AkkaCluster enabled Akka routers todo: this would not be needed with the new kanaloa backend.
*/
object ClusterRouters {

Expand Down

This file was deleted.

0 comments on commit 72a8624

Please sign in to comment.