Skip to content

Commit

Permalink
On service invocation the service parser parse binary data to NFNBina…
Browse files Browse the repository at this point in the history
…ryDataValue now
  • Loading branch information
basilkohler committed Mar 31, 2014
1 parent 135eb94 commit d5c795f
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 108 deletions.
2 changes: 1 addition & 1 deletion nfn/src/main/scala/LambdaToMacros.scala
Expand Up @@ -8,7 +8,7 @@ import language.experimental.macros

import LambdaMacros._
import nfn.{LocalNFNCallExecutor, ComputeWorker}
import nfn.service.impl.{WordCountService, SumService, AddService}
import nfn.service.impl.{WordCountService, AddService}
import scala.concurrent.{ExecutionContext, Await}
import scala.concurrent.duration._
import scala.Some
Expand Down
43 changes: 23 additions & 20 deletions nfn/src/main/scala/NFNMain.scala
Expand Up @@ -42,34 +42,37 @@ object NFNMain extends App {

val ccnWorker = system.actorOf(Props[CCNWorker], name = "ccnworker")


Thread.sleep(2000)
val docname = "/doc/test"
val docdata = "This is a test document with 8 words! but 12 is better".getBytes
val docContent = Content(Seq(docname), docdata)
ccnWorker ! CCNWorker.CCNAddToCache(docContent)

NFNServiceLibrary.nfnPublish(ccnWorker)
Thread.sleep(2000)

val interest = Interest(Seq("call 3 /AddService/Int/Int/rInt 10 2", "NFN"))

val interest = Interest(Seq(s"call 2 /WordCountService/NFNName/rInt $docname", "NFN"))
implicit val timeout = Timeout(20 seconds)
val content = Await.result((ccnWorker ? CCNSendReceive(interest)).mapTo[Content], timeout.duration)
println(s"RESULT: $content")



def repl(nfnSocket: ActorRef) = {
val parser = new LambdaParser()
step
def step: Unit = {
readLine("> ") match {
case "exit" | "quit" | "q" =>
case input @ _ => {
parser.parse(input)
val interest = Interest(Seq(input, "NFN"))
(ccnWorker ? CCNSendReceive(interest)).mapTo[Content] onSuccess {
case content => println(s"RESULT: ${interest.name.mkString("/")} -> '${new String(content.data)}")
}
step
}
}
}
}
// def repl(nfnSocket: ActorRef) = {
// val parser = new LambdaParser()
// step
// def step: Unit = {
// readLine("> ") match {
// case "exit" | "quit" | "q" =>
// case input @ _ => {
// parser.parse(input)
// val interest = Interest(Seq(input, "NFN"))
// (ccnWorker ? CCNSendReceive(interest)).mapTo[Content] onSuccess {
// case content => println(s"RESULT: ${interest.name.mkString("/")} -> '${new String(content.data)}")
// }
// step
// }
// }
// }
// }
}
2 changes: 1 addition & 1 deletion nfn/src/main/scala/nfn/CCNWorker.scala
Expand Up @@ -133,10 +133,10 @@ case class ComputeWorker(ccnWorker: ActorRef) extends Actor {
def handleComputeRequest(computeCmps: Seq[String], interest: Interest, requestor: ActorRef) = {
val futResultContent =
for{callableServ <- NFNService.parseAndFindFromName(computeCmps.mkString(" "), ccnWorker)
servResult <- callableServ.exec
} yield {
// TODO fix the issue of name components vs single names...
logger.warning(s"TODO: fix the issue of name components vs single names...")
val servResult = callableServ.exec
Content(interest.name, servResult.toValueName.name.mkString(" ").getBytes)
}
futResultContent onSuccess {
Expand Down
3 changes: 1 addition & 2 deletions nfn/src/main/scala/nfn/LocalNFNCallExecutor.scala
Expand Up @@ -16,9 +16,8 @@ case class LocalNFNCallExecutor(ccnWorker: ActorRef) extends CallExecutor {

val futValue =
for {callableServ <- NFNService.parseAndFindFromName(call, ccnWorker)
servResult <- callableServ.exec
} yield {
servResult match {
callableServ.exec match {
case NFNIntValue(n) => ConstValue(n)
case res @ _ => throw new Exception(s"LocalNFNCallExecutor: Result of executeCall is not implemented for: $res")
}
Expand Down
120 changes: 83 additions & 37 deletions nfn/src/main/scala/nfn/service/Main.scala
Expand Up @@ -20,13 +20,23 @@ import scala.concurrent.ExecutionContext.Implicits.global
import akka.util.Timeout
import scala.concurrent.duration._

object ContentStore extends Logging {
private var cs: Map[Seq[String], Content] = Map()

def add(content: Content) = cs += content.name -> content

def find(name: Seq[String]):Option[Content] = cs.get(name)

def remote(name: Seq[String]): Unit = cs -= name
}

object NFNServiceLibrary extends Logging {

object NFNServiceLibrary extends Logging {
private var services:Map[String, NFNService] = Map()
private val ccnIf = CCNLite

add(AddService())
add(WordCountService())

def add(serv: NFNService) = {

Expand Down Expand Up @@ -59,22 +69,28 @@ object NFNServiceLibrary extends Logging {
*/
def nfnPublish(ccnWorker: ActorRef) = {
def pinnedData = "pinnedfunction".getBytes
def maybeByteCodeData(serv: NFNService):Option[Array[Byte]] = {
def byteCodeData(serv: NFNService):Array[Byte] = {
val bc = BytecodeLoader.fromClass(serv)
bc match {
case Some(bc) => logger.info(s"nfnPublish: Added bytecode for service $serv")
case None => logger.warn(s"nfnPublush: No bytecode found for unpinned service $serv")
case Some(bc) => bc
case None =>
logger.error(s"nfnPublush: No bytecode found for unpinned service $serv")
pinnedData
}
bc
}

for((name, serv) <- services) {

val serviceContent =
if(serv.pinned) pinnedData
else byteCodeData(serv)

val content = Content(
Seq(name),
if(serv.pinned) pinnedData
else maybeByteCodeData(serv).getOrElse(pinnedData)
serviceContent
)

logger.debug(s"nfnPublish: Adding ${content.name.mkString(" ")} to cache")
ccnWorker ! CCNAddToCache(content)
}
}
Expand All @@ -88,6 +104,12 @@ object NFNServiceLibrary extends Logging {
def derp(foo: Int) = ???
}

case class NFNBinaryDataValue(name: Seq[String], data: Array[Byte]) extends NFNServiceValue {
override def toValueName: NFNName = NFNName(Seq(new String(data)))

override def toNFNName: NFNName = NFNName(name)
}

case class NFNIntValue(amount: Int) extends NFNServiceValue {
def apply = amount

Expand All @@ -104,33 +126,33 @@ case class NFNNameValue(name: NFNName) extends NFNServiceValue{

case class NFNServiceException(msg: String) extends Exception(msg)

case class DollarToChf() extends NFNService {

override def toNFNName:NFNName = NFNName(Seq("DollarToChf/Int/rInt"))

override def parse(unparsedName: String, unparsedValues: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] = {
val values = unparsedValues match {
case Seq(dollarValueString) => Seq(NFNIntValue(dollarValueString.toInt))
case _ => throw new Exception(s"Service $toNFNName could not parse single Int value from: '$unparsedValues'")
}
val name = NFNName(Seq(unparsedName))
assert(name == this.toNFNName)

val function = { (values: Seq[NFNServiceValue]) =>
values match {
case Seq(dollar: NFNIntValue) => {
Future(NFNIntValue(dollar.amount/2))
}
case _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNIntValue and not $values")
}

}
Future(CallableNFNService(name, values, function))
}
}
//case class DollarToChf() extends NFNService {
//
// override def toNFNName:NFNName = NFNName(Seq("DollarToChf/Int/rInt"))
//
// override def parse(unparsedName: String, unparsedValues: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] = {
// val values = unparsedValues match {
// case Seq(dollarValueString) => Seq(NFNIntValue(dollarValueString.toInt))
// case _ => throw new Exception(s"Service $toNFNName could not parse single Int value from: '$unparsedValues'")
// }
// val name = NFNName(Seq(unparsedName))
// assert(name == this.toNFNName)
//
// val function = { (values: Seq[NFNServiceValue]) =>
// values match {
// case Seq(dollar: NFNIntValue) => {
// Future(NFNIntValue(dollar.amount/2))
// }
// case _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNIntValue and not $values")
// }
//
// }
// Future(CallableNFNService(name, values, function))
// }
//}

case class CallableNFNService(name: NFNName, values: Seq[NFNServiceValue], function: (Seq[NFNServiceValue]) => Future[NFNServiceValue]) {
def exec:Future[NFNServiceValue] = function(values)
case class CallableNFNService(name: NFNName, values: Seq[NFNServiceValue], function: (Seq[NFNServiceValue]) => NFNServiceValue) {
def exec:NFNServiceValue = function(values)
}

case class NFNName(name: Seq[String]) {
Expand All @@ -148,14 +170,16 @@ trait NFNServiceValue {

trait NFNService {

def parse(name: String, values: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService]
def instantiateCallable(name: NFNName, futValues: Seq[Future[NFNServiceValue]], ccnWorker: ActorRef): Future[CallableNFNService]

def toNFNName: NFNName

def pinned: Boolean = true
}

object NFNService extends Logging {
implicit val timeout = Timeout(20 seconds)

def parseAndFindFromName(name: String, ccnWorker: ActorRef): Future[CallableNFNService] = {

logger.debug(s"Trying to find service for: $name")
Expand All @@ -170,18 +194,40 @@ object NFNService extends Logging {
assert(count == args.size, s"matched name $name is not a valid service call, because arg count (${count + 1}) is not equal to number of args (${args.size}) (currently nfn counts the function name itself as an arg)")
assert(count > 0, s"matched name $name is not a valid service call, because count cannot be 0 or smaller (currently nfn counts the function name itself as an arg)")

// find service
val futServ: Future[NFNService] = NFNServiceLibrary.find(fun) match {
case Some(serv) => Future(serv)
case None => {
val interest = Interest(Seq(fun))
implicit val timeout = Timeout(20 seconds)
(ccnWorker ? CCNSendReceive(interest)).mapTo[Content] map {
val futServiceContent = (ccnWorker ? CCNSendReceive(interest)).mapTo[Content]
futServiceContent map { content =>
// TODO parse service bytecode and create service
logger.error("TODO parse service bytecode and create service")
???
}
}
}

val futCallableServ:Future[CallableNFNService] = futServ flatMap { serv => serv.parse(fun, args, ccnWorker) }
// create or find values
val futArgs: List[Future[NFNServiceValue]] = args map { (arg: String) =>
arg.forall(_.isDigit) match {
case true => Future(NFNIntValue(arg.toInt))
case false => {
val interest = Interest(Seq(arg))


val futContent:Future[Content] = ContentStore.find(interest.name) match {
case Some(content) => Future(content)
case None => (ccnWorker ? CCNSendReceive(interest)).mapTo[Content]
}
futContent map { content =>
NFNBinaryDataValue(content.name, content.data)
}
}
}
}

val futCallableServ:Future[CallableNFNService] = futServ flatMap { serv => serv.instantiateCallable(NFNName(Seq(fun)), futArgs, ccnWorker) }
futCallableServ onSuccess {
case callableServ => logger.debug(s"Found service for request: $callableServ")
}
Expand Down
26 changes: 14 additions & 12 deletions nfn/src/main/scala/nfn/service/impl/AddService.scala
Expand Up @@ -12,29 +12,31 @@ import akka.actor.ActorRef


case class AddService() extends NFNService with Logging {
override def parse(unparsedName: String, unparsedValues: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] = {
val values = unparsedValues match {
case Seq(lStr, rStr) => Seq(
NFNIntValue(lStr.toInt),
NFNIntValue(rStr.toInt)
)
case _ => throw new Exception(s"Service $toNFNName could not parse two int values from: '$unparsedValues'")
override def instantiateCallable(name: NFNName, futValues: Seq[Future[NFNServiceValue]], ccnWorker: ActorRef): Future[CallableNFNService] = {
val (futLValue, futRValue) = futValues match {
case Seq(lval, rval) => (lval, rval)
case _ => throw new Exception(s"Service $toNFNName can only be applied to two NFNIntValues and not '$futValues'")
}
val name = NFNName(Seq(unparsedName))
assert(name == this.toNFNName)

assert(name == toNFNName, s"Service $toNFNName is created with wrong name $name")

val function = { (values: Seq[NFNServiceValue]) =>
values match {
case Seq(l: NFNIntValue, r: NFNIntValue) => { Future(
case Seq(l: NFNIntValue, r: NFNIntValue) => {
NFNIntValue(l.amount + r.amount)
)}
}
case _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNIntValue and not $values")
}
}
Future(CallableNFNService(name, values, function))

Future.sequence(futValues) map {
case values @ Seq(lval: NFNIntValue, rval: NFNIntValue) => CallableNFNService(name, values, function)
case _ => throw new Exception(s"Service $toNFNName can only be applied to two NFNIntValues and not '$futValues'")
}
}

override def toNFNName: NFNName = NFNName(Seq("/AddService/Int/Int/rInt"))

override def pinned: Boolean = false

}
38 changes: 19 additions & 19 deletions nfn/src/main/scala/nfn/service/impl/SumService.scala
Expand Up @@ -12,22 +12,22 @@ import akka.actor.ActorRef
/**
* Created by basil on 24/03/14.
*/
case class SumService() extends NFNService {
override def parse(unparsedName: String, unparsedValues: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] = {
val values = unparsedValues.map( (unparsedValue: String) => NFNIntValue(unparsedValue.toInt))
val name = NFNName(Seq(unparsedName))
assert(name == this.toNFNName)

val function = { (values: Seq[NFNServiceValue]) =>
Future(NFNIntValue(
values.map({
case NFNIntValue(i) => i
case _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNIntValue and not $values")
}).sum
))
}
Future(CallableNFNService(name, values, function))
}

override def toNFNName: NFNName = NFNName(Seq("SumService/Int/Int/rInt"))
}
//case class SumService() extends NFNService {
// override def parse(unparsedName: String, unparsedValues: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] = {
// val values = unparsedValues.map( (unparsedValue: String) => NFNIntValue(unparsedValue.toInt))
// val name = NFNName(Seq(unparsedName))
// assert(name == this.toNFNName)
//
// val function = { (values: Seq[NFNServiceValue]) =>
// Future(NFNIntValue(
// values.map({
// case NFNIntValue(i) => i
// case _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNIntValue and not $values")
// }).sum
// ))
// }
// Future(CallableNFNService(name, values, function))
// }
//
// override def toNFNName: NFNName = NFNName(Seq("SumService/Int/Int/rInt"))
//}

0 comments on commit d5c795f

Please sign in to comment.