diff --git a/nfn/src/main/scala/LambdaToMacros.scala b/nfn/src/main/scala/LambdaToMacros.scala index fec74cc7946f..65b5cc07f7b4 100644 --- a/nfn/src/main/scala/LambdaToMacros.scala +++ b/nfn/src/main/scala/LambdaToMacros.scala @@ -8,7 +8,7 @@ import language.experimental.macros import LambdaMacros._ import nfn.{LocalNFNCallExecutor, ComputeWorker} -import nfn.service.impl.{WordCountService, SumService, AddService} +import nfn.service.impl.{WordCountService, AddService} import scala.concurrent.{ExecutionContext, Await} import scala.concurrent.duration._ import scala.Some diff --git a/nfn/src/main/scala/NFNMain.scala b/nfn/src/main/scala/NFNMain.scala index 2851bdb2e08a..04c6432211ad 100644 --- a/nfn/src/main/scala/NFNMain.scala +++ b/nfn/src/main/scala/NFNMain.scala @@ -42,34 +42,37 @@ object NFNMain extends App { val ccnWorker = system.actorOf(Props[CCNWorker], name = "ccnworker") - Thread.sleep(2000) + val docname = "/doc/test" + val docdata = "This is a test document with 8 words! but 12 is better".getBytes + val docContent = Content(Seq(docname), docdata) + ccnWorker ! CCNWorker.CCNAddToCache(docContent) + NFNServiceLibrary.nfnPublish(ccnWorker) Thread.sleep(2000) - val interest = Interest(Seq("call 3 /AddService/Int/Int/rInt 10 2", "NFN")) - + val interest = Interest(Seq(s"call 2 /WordCountService/NFNName/rInt $docname", "NFN")) implicit val timeout = Timeout(20 seconds) val content = Await.result((ccnWorker ? CCNSendReceive(interest)).mapTo[Content], timeout.duration) println(s"RESULT: $content") - def repl(nfnSocket: ActorRef) = { - val parser = new LambdaParser() - step - def step: Unit = { - readLine("> ") match { - case "exit" | "quit" | "q" => - case input @ _ => { - parser.parse(input) - val interest = Interest(Seq(input, "NFN")) - (ccnWorker ? CCNSendReceive(interest)).mapTo[Content] onSuccess { - case content => println(s"RESULT: ${interest.name.mkString("/")} -> '${new String(content.data)}") - } - step - } - } - } - } +// def repl(nfnSocket: ActorRef) = { +// val parser = new LambdaParser() +// step +// def step: Unit = { +// readLine("> ") match { +// case "exit" | "quit" | "q" => +// case input @ _ => { +// parser.parse(input) +// val interest = Interest(Seq(input, "NFN")) +// (ccnWorker ? CCNSendReceive(interest)).mapTo[Content] onSuccess { +// case content => println(s"RESULT: ${interest.name.mkString("/")} -> '${new String(content.data)}") +// } +// step +// } +// } +// } +// } } diff --git a/nfn/src/main/scala/nfn/CCNWorker.scala b/nfn/src/main/scala/nfn/CCNWorker.scala index 1f1b863a3a7d..26fe9842fc9f 100644 --- a/nfn/src/main/scala/nfn/CCNWorker.scala +++ b/nfn/src/main/scala/nfn/CCNWorker.scala @@ -133,10 +133,10 @@ case class ComputeWorker(ccnWorker: ActorRef) extends Actor { def handleComputeRequest(computeCmps: Seq[String], interest: Interest, requestor: ActorRef) = { val futResultContent = for{callableServ <- NFNService.parseAndFindFromName(computeCmps.mkString(" "), ccnWorker) - servResult <- callableServ.exec } yield { // TODO fix the issue of name components vs single names... logger.warning(s"TODO: fix the issue of name components vs single names...") + val servResult = callableServ.exec Content(interest.name, servResult.toValueName.name.mkString(" ").getBytes) } futResultContent onSuccess { diff --git a/nfn/src/main/scala/nfn/LocalNFNCallExecutor.scala b/nfn/src/main/scala/nfn/LocalNFNCallExecutor.scala index 8e0862155ffb..1b0f4535d6e1 100644 --- a/nfn/src/main/scala/nfn/LocalNFNCallExecutor.scala +++ b/nfn/src/main/scala/nfn/LocalNFNCallExecutor.scala @@ -16,9 +16,8 @@ case class LocalNFNCallExecutor(ccnWorker: ActorRef) extends CallExecutor { val futValue = for {callableServ <- NFNService.parseAndFindFromName(call, ccnWorker) - servResult <- callableServ.exec } yield { - servResult match { + callableServ.exec match { case NFNIntValue(n) => ConstValue(n) case res @ _ => throw new Exception(s"LocalNFNCallExecutor: Result of executeCall is not implemented for: $res") } diff --git a/nfn/src/main/scala/nfn/service/Main.scala b/nfn/src/main/scala/nfn/service/Main.scala index 09717dfd306d..27dab662087a 100644 --- a/nfn/src/main/scala/nfn/service/Main.scala +++ b/nfn/src/main/scala/nfn/service/Main.scala @@ -20,13 +20,23 @@ import scala.concurrent.ExecutionContext.Implicits.global import akka.util.Timeout import scala.concurrent.duration._ +object ContentStore extends Logging { + private var cs: Map[Seq[String], Content] = Map() + + def add(content: Content) = cs += content.name -> content + + def find(name: Seq[String]):Option[Content] = cs.get(name) + + def remote(name: Seq[String]): Unit = cs -= name +} -object NFNServiceLibrary extends Logging { +object NFNServiceLibrary extends Logging { private var services:Map[String, NFNService] = Map() private val ccnIf = CCNLite add(AddService()) + add(WordCountService()) def add(serv: NFNService) = { @@ -59,22 +69,28 @@ object NFNServiceLibrary extends Logging { */ def nfnPublish(ccnWorker: ActorRef) = { def pinnedData = "pinnedfunction".getBytes - def maybeByteCodeData(serv: NFNService):Option[Array[Byte]] = { + def byteCodeData(serv: NFNService):Array[Byte] = { val bc = BytecodeLoader.fromClass(serv) bc match { - case Some(bc) => logger.info(s"nfnPublish: Added bytecode for service $serv") - case None => logger.warn(s"nfnPublush: No bytecode found for unpinned service $serv") + case Some(bc) => bc + case None => + logger.error(s"nfnPublush: No bytecode found for unpinned service $serv") + pinnedData } - bc } for((name, serv) <- services) { + + val serviceContent = + if(serv.pinned) pinnedData + else byteCodeData(serv) + val content = Content( Seq(name), - if(serv.pinned) pinnedData - else maybeByteCodeData(serv).getOrElse(pinnedData) + serviceContent ) + logger.debug(s"nfnPublish: Adding ${content.name.mkString(" ")} to cache") ccnWorker ! CCNAddToCache(content) } } @@ -88,6 +104,12 @@ object NFNServiceLibrary extends Logging { def derp(foo: Int) = ??? } +case class NFNBinaryDataValue(name: Seq[String], data: Array[Byte]) extends NFNServiceValue { + override def toValueName: NFNName = NFNName(Seq(new String(data))) + + override def toNFNName: NFNName = NFNName(name) +} + case class NFNIntValue(amount: Int) extends NFNServiceValue { def apply = amount @@ -104,33 +126,33 @@ case class NFNNameValue(name: NFNName) extends NFNServiceValue{ case class NFNServiceException(msg: String) extends Exception(msg) -case class DollarToChf() extends NFNService { - - override def toNFNName:NFNName = NFNName(Seq("DollarToChf/Int/rInt")) - - override def parse(unparsedName: String, unparsedValues: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] = { - val values = unparsedValues match { - case Seq(dollarValueString) => Seq(NFNIntValue(dollarValueString.toInt)) - case _ => throw new Exception(s"Service $toNFNName could not parse single Int value from: '$unparsedValues'") - } - val name = NFNName(Seq(unparsedName)) - assert(name == this.toNFNName) - - val function = { (values: Seq[NFNServiceValue]) => - values match { - case Seq(dollar: NFNIntValue) => { - Future(NFNIntValue(dollar.amount/2)) - } - case _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNIntValue and not $values") - } - - } - Future(CallableNFNService(name, values, function)) - } -} +//case class DollarToChf() extends NFNService { +// +// override def toNFNName:NFNName = NFNName(Seq("DollarToChf/Int/rInt")) +// +// override def parse(unparsedName: String, unparsedValues: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] = { +// val values = unparsedValues match { +// case Seq(dollarValueString) => Seq(NFNIntValue(dollarValueString.toInt)) +// case _ => throw new Exception(s"Service $toNFNName could not parse single Int value from: '$unparsedValues'") +// } +// val name = NFNName(Seq(unparsedName)) +// assert(name == this.toNFNName) +// +// val function = { (values: Seq[NFNServiceValue]) => +// values match { +// case Seq(dollar: NFNIntValue) => { +// Future(NFNIntValue(dollar.amount/2)) +// } +// case _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNIntValue and not $values") +// } +// +// } +// Future(CallableNFNService(name, values, function)) +// } +//} -case class CallableNFNService(name: NFNName, values: Seq[NFNServiceValue], function: (Seq[NFNServiceValue]) => Future[NFNServiceValue]) { - def exec:Future[NFNServiceValue] = function(values) +case class CallableNFNService(name: NFNName, values: Seq[NFNServiceValue], function: (Seq[NFNServiceValue]) => NFNServiceValue) { + def exec:NFNServiceValue = function(values) } case class NFNName(name: Seq[String]) { @@ -148,7 +170,7 @@ trait NFNServiceValue { trait NFNService { - def parse(name: String, values: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] + def instantiateCallable(name: NFNName, futValues: Seq[Future[NFNServiceValue]], ccnWorker: ActorRef): Future[CallableNFNService] def toNFNName: NFNName @@ -156,6 +178,8 @@ trait NFNService { } object NFNService extends Logging { + implicit val timeout = Timeout(20 seconds) + def parseAndFindFromName(name: String, ccnWorker: ActorRef): Future[CallableNFNService] = { logger.debug(s"Trying to find service for: $name") @@ -170,18 +194,40 @@ object NFNService extends Logging { assert(count == args.size, s"matched name $name is not a valid service call, because arg count (${count + 1}) is not equal to number of args (${args.size}) (currently nfn counts the function name itself as an arg)") assert(count > 0, s"matched name $name is not a valid service call, because count cannot be 0 or smaller (currently nfn counts the function name itself as an arg)") + // find service val futServ: Future[NFNService] = NFNServiceLibrary.find(fun) match { case Some(serv) => Future(serv) case None => { val interest = Interest(Seq(fun)) - implicit val timeout = Timeout(20 seconds) - (ccnWorker ? CCNSendReceive(interest)).mapTo[Content] map { + val futServiceContent = (ccnWorker ? CCNSendReceive(interest)).mapTo[Content] + futServiceContent map { content => + // TODO parse service bytecode and create service + logger.error("TODO parse service bytecode and create service") ??? } } } - val futCallableServ:Future[CallableNFNService] = futServ flatMap { serv => serv.parse(fun, args, ccnWorker) } + // create or find values + val futArgs: List[Future[NFNServiceValue]] = args map { (arg: String) => + arg.forall(_.isDigit) match { + case true => Future(NFNIntValue(arg.toInt)) + case false => { + val interest = Interest(Seq(arg)) + + + val futContent:Future[Content] = ContentStore.find(interest.name) match { + case Some(content) => Future(content) + case None => (ccnWorker ? CCNSendReceive(interest)).mapTo[Content] + } + futContent map { content => + NFNBinaryDataValue(content.name, content.data) + } + } + } + } + + val futCallableServ:Future[CallableNFNService] = futServ flatMap { serv => serv.instantiateCallable(NFNName(Seq(fun)), futArgs, ccnWorker) } futCallableServ onSuccess { case callableServ => logger.debug(s"Found service for request: $callableServ") } diff --git a/nfn/src/main/scala/nfn/service/impl/AddService.scala b/nfn/src/main/scala/nfn/service/impl/AddService.scala index 53d09d12f15b..49e7e6364298 100644 --- a/nfn/src/main/scala/nfn/service/impl/AddService.scala +++ b/nfn/src/main/scala/nfn/service/impl/AddService.scala @@ -12,29 +12,31 @@ import akka.actor.ActorRef case class AddService() extends NFNService with Logging { - override def parse(unparsedName: String, unparsedValues: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] = { - val values = unparsedValues match { - case Seq(lStr, rStr) => Seq( - NFNIntValue(lStr.toInt), - NFNIntValue(rStr.toInt) - ) - case _ => throw new Exception(s"Service $toNFNName could not parse two int values from: '$unparsedValues'") + override def instantiateCallable(name: NFNName, futValues: Seq[Future[NFNServiceValue]], ccnWorker: ActorRef): Future[CallableNFNService] = { + val (futLValue, futRValue) = futValues match { + case Seq(lval, rval) => (lval, rval) + case _ => throw new Exception(s"Service $toNFNName can only be applied to two NFNIntValues and not '$futValues'") } - val name = NFNName(Seq(unparsedName)) - assert(name == this.toNFNName) + + assert(name == toNFNName, s"Service $toNFNName is created with wrong name $name") val function = { (values: Seq[NFNServiceValue]) => values match { - case Seq(l: NFNIntValue, r: NFNIntValue) => { Future( + case Seq(l: NFNIntValue, r: NFNIntValue) => { NFNIntValue(l.amount + r.amount) - )} + } case _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNIntValue and not $values") } } - Future(CallableNFNService(name, values, function)) + + Future.sequence(futValues) map { + case values @ Seq(lval: NFNIntValue, rval: NFNIntValue) => CallableNFNService(name, values, function) + case _ => throw new Exception(s"Service $toNFNName can only be applied to two NFNIntValues and not '$futValues'") + } } override def toNFNName: NFNName = NFNName(Seq("/AddService/Int/Int/rInt")) override def pinned: Boolean = false + } diff --git a/nfn/src/main/scala/nfn/service/impl/SumService.scala b/nfn/src/main/scala/nfn/service/impl/SumService.scala index e0667e0fa2dc..e69d5ac2b3e8 100644 --- a/nfn/src/main/scala/nfn/service/impl/SumService.scala +++ b/nfn/src/main/scala/nfn/service/impl/SumService.scala @@ -12,22 +12,22 @@ import akka.actor.ActorRef /** * Created by basil on 24/03/14. */ -case class SumService() extends NFNService { - override def parse(unparsedName: String, unparsedValues: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] = { - val values = unparsedValues.map( (unparsedValue: String) => NFNIntValue(unparsedValue.toInt)) - val name = NFNName(Seq(unparsedName)) - assert(name == this.toNFNName) - - val function = { (values: Seq[NFNServiceValue]) => - Future(NFNIntValue( - values.map({ - case NFNIntValue(i) => i - case _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNIntValue and not $values") - }).sum - )) - } - Future(CallableNFNService(name, values, function)) - } - - override def toNFNName: NFNName = NFNName(Seq("SumService/Int/Int/rInt")) -} +//case class SumService() extends NFNService { +// override def parse(unparsedName: String, unparsedValues: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] = { +// val values = unparsedValues.map( (unparsedValue: String) => NFNIntValue(unparsedValue.toInt)) +// val name = NFNName(Seq(unparsedName)) +// assert(name == this.toNFNName) +// +// val function = { (values: Seq[NFNServiceValue]) => +// Future(NFNIntValue( +// values.map({ +// case NFNIntValue(i) => i +// case _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNIntValue and not $values") +// }).sum +// )) +// } +// Future(CallableNFNService(name, values, function)) +// } +// +// override def toNFNName: NFNName = NFNName(Seq("SumService/Int/Int/rInt")) +//} diff --git a/nfn/src/main/scala/nfn/service/impl/WordCountService.scala b/nfn/src/main/scala/nfn/service/impl/WordCountService.scala index d479c3fe59e3..3801acb3f392 100644 --- a/nfn/src/main/scala/nfn/service/impl/WordCountService.scala +++ b/nfn/src/main/scala/nfn/service/impl/WordCountService.scala @@ -17,27 +17,23 @@ case class WordCountService() extends NFNService { def countWords(doc: NFNName) = 42 - override def parse(unparsedName: String, unparsedValues: Seq[String], ccnWorker: ActorRef): Future[CallableNFNService] = { - val values = unparsedValues match { - case Seq(docNameString) => Seq( - NFNNameValue(NFNName(Seq(docNameString))) - ) - case _ => throw new Exception(s"Service $toNFNName could not parse single Int value from: '$unparsedValues'") - } - val name = NFNName(Seq(unparsedName)) + override def instantiateCallable(name: NFNName, futValues: Seq[Future[NFNServiceValue]], ccnWorker: ActorRef): Future[CallableNFNService] = { assert(name == this.toNFNName) - val function = { (values: Seq[NFNServiceValue]) => - values match { - case Seq(docName: NFNNameValue) => { Future( - NFNIntValue(countWords(docName.name)) - )} - case _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNIntValue and not $values") + val function: (Seq[NFNServiceValue]) => NFNIntValue = { + case Seq(doc: NFNBinaryDataValue) => { + NFNIntValue( + new String(doc.data).split(" ").size + ) } + case values @ _ => throw new NFNServiceException(s"${this.toNFNName} can only be applied to a single NFNBinaryDataValue and not $values") + } + Future.sequence(futValues) map { values => + CallableNFNService(name, values, function) } - Future(CallableNFNService(name, values, function)) } - override def toNFNName: NFNName = NFNName(Seq("WordCountService/Int/rInt")) + override def toNFNName: NFNName = NFNName(Seq("/WordCountService/NFNName/rInt")) + }