Skip to content

Commit

Permalink
First fully integrated example with service execution from the network
Browse files Browse the repository at this point in the history
  • Loading branch information
basilkohler committed Mar 21, 2014
1 parent b47eab0 commit 536cc33
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 153 deletions.
24 changes: 13 additions & 11 deletions nfn/ccnliteinterface/src/main/scala/NFNCommunication.scala
Expand Up @@ -14,19 +14,22 @@ import com.typesafe.scalalogging.slf4j.Logging

trait Packet {

def name: String
def name: Seq[String]

def nameComponents:Array[String] = Array(name, "NFN")
def nameComponents:Seq[String] = name ++ Seq("NFN")
}

case class Interest(name: String) extends Packet {
case class Interest(name: Seq[String]) extends Packet {


override def toString = s"Interest('${name}')"
}
case class Content(name: String, data: Array[Byte]) extends Packet {
override def toString = s"Content('$name' => ${new String(data)})"

case class Content(name: Seq[String], data: Array[Byte]) extends Packet {
override def toString = s"Content('$name' => ${new String(data)})"
}


object NFNCommunication extends Logging {

def parseXml(xmlString: String):Packet = {
Expand All @@ -39,7 +42,6 @@ object NFNCommunication extends Logging {

encoding match {
case "string" =>
assert(nameData.size == nameSize, s"Parsed name '$nameData' has not its actual size $nameSize")
nameData
case "binary.base64" =>
new String(DatatypeConverter.parseBase64Binary(nameData))
Expand All @@ -65,24 +67,24 @@ object NFNCommunication extends Logging {
xml match {
case interest @ <interest>{_*}</interest> => {
val nameComponents = parseComponents(interest)
Interest(nameComponents.mkString("/"))
Interest(nameComponents)
}
case content @ <contentobj>{_*}</contentobj> => {
val nameComponents = parseComponents(content)
val contentData = parseContent(content)
Content(nameComponents.mkString("/"), contentData)
Content(nameComponents, contentData)
}
case _ => throw new Exception("XML parser cannot parse:\n" + xml)
}
}

def main(args: Array[String]) = {

val socket = UDPClient()
val socket = UDPClient("NFNSocket", 9000)
val ccnIf = new CCNLiteInterface()

val interest = Interest("add 7 1/NFN")
val binaryInterest: Array[Byte] = ccnIf.mkBinaryInterest(interest.nameComponents)
val interest = Interest(Seq("add 7 1"))
val binaryInterest: Array[Byte] = ccnIf.mkBinaryInterest(interest.nameComponents.toArray)

val f = socket.sendReceive(binaryInterest)
val respInterest = Await.result(f, 1 minute)
Expand Down
10 changes: 6 additions & 4 deletions nfn/ccnliteinterface/src/main/scala/UDPClient.scala
Expand Up @@ -4,7 +4,9 @@ import scala.concurrent._
import ExecutionContext.Implicits.global
import java.net.{DatagramPacket, InetAddress, DatagramSocket}

case class UDPClient(port: Int = 9000, maybeAddr: Option[InetAddress] = None) extends Logging {
case class UDPClient(name: String, port: Int, maybeAddr: Option[InetAddress] = None) extends Logging {

logger.debug(s"$name: opened socket at${maybeAddr.getOrElse("localhost")} $port")

val addr = maybeAddr.getOrElse(InetAddress.getByName(null))
val clientSocket = new DatagramSocket()
Expand All @@ -13,17 +15,17 @@ case class UDPClient(port: Int = 9000, maybeAddr: Option[InetAddress] = None) ex
future {
val packet = new DatagramPacket(data, data.length, addr, port)
clientSocket.send(packet)
logger.debug(s"sent data: $data")
logger.debug(s"$name: sent data: $data")
}
}

def receive(): Future[Array[Byte]] = {
future {
val rcvBuf = new Array[Byte](1024)
val rcvPacket = new DatagramPacket(rcvBuf, rcvBuf.length)
logger.debug("waiting for receive...")
logger.debug(s"$name: waiting for receive...")
clientSocket.receive(rcvPacket)
logger.debug(s"received: $rcvBuf")
logger.debug(s"$name: received: ${new String(rcvBuf)}")
rcvBuf
}
}
Expand Down
69 changes: 45 additions & 24 deletions nfn/ccnliteinterface/src/main/scala/UDPServer.scala
@@ -1,29 +1,49 @@
import akka.actor.{ActorRef, ActorSystem, IOManager}
import akka.io.UdpConnected
import akka.io.{Inet, UdpConnected}
import akka.io.UdpConnected.Connect
import akka.util.ByteString
import java.net.{DatagramPacket, DatagramSocket}
import com.typesafe.scalalogging.slf4j.Logging
import java.net.{InetSocketAddress, DatagramPacket, DatagramSocket}
import scala.concurrent._
import ExecutionContext.Implicits.global

object UDPServer {
def main(args: Array[String]) = {
val serverSocket = new DatagramSocket(9000)
case class UDPServer(name: String, port: Int, host: String, handler: Array[Byte] => Option[Array[Byte]]) extends Thread with Logging {

val rcvData = new Array[Byte](1024)
while(true) {
val rcvPacket = new DatagramPacket(rcvData, rcvData.length)
serverSocket.receive(rcvPacket)
val reqStr = new String(rcvPacket.getData)
println(s"server - rcv: $reqStr")
val addr = new InetSocketAddress(host, port)
logger.debug(s"$name: server socket listening on port $port")

val addr = rcvPacket.getAddress
val port = rcvPacket.getPort
val serverSocket = new DatagramSocket(port)

val respData = rcvData
val rspPacket = new DatagramPacket(respData, respData.length, addr, port)
var running = true

serverSocket.send(rspPacket)
override def run() = {
while (running) {
val rcvData = new Array[Byte](1024)
val rcvPacket = new DatagramPacket(rcvData, rcvData.length)
logger.debug(s"$name: waiting for receive...")
serverSocket.receive(rcvPacket)
logger.debug(s"$name: received ${new String(rcvData)}")
val respData = handler(rcvData) match {
case Some(respData) =>
val rspPacket = new DatagramPacket(respData, respData.length, addr)
serverSocket.send(rspPacket)
logger.debug(s"$name: sent ${new String(respData)}")
case None =>
}
}
}

def close = running = false

// def receive(handler: Array[Byte] => Unit): Future[Unit] = future {
// }
// val reqStr = new String(rcvPacket.getData)
// println(s"server - rcv: $reqStr")
//
// val addr = rcvPacket.getAddress
// val port = rcvPacket.getPort
//
// }
}


Expand Down Expand Up @@ -122,19 +142,20 @@ class Connected(hostname: String, port: Int) extends Actor {
}
}

class NFNWorker extends Actor {
class NFNWorker(name: String) extends Actor with Logging {
def ccnIf = new CCNLiteInterface()

override def receive: Actor.Receive = {
case data: ByteString =>
val byteArr = data.toByteBuffer.array.clone
val unparsedXml = ccnIf.ccnbToXml(byteArr)
NFNCommunication.parseXml(unparsedXml) match {
case interest: Interest =>
println(s"CLI RCV $interest")
case content: Content =>
println(s"CLI RCV $content")
}
logger.info(s"$name: received ${new String(byteArr)}")
// val unparsedXml = ccnIf.ccnbToXml(byteArr)
// NFNCommunication.parseXml(unparsedXml) match {
// case interest: Interest =>
// println(s"CLI RCV $interest")
// case content: Content =>
// println(s"CLI RCV $content")
// }
}
}

Expand Down
Expand Up @@ -6,11 +6,11 @@ import org.scalatest.{GivenWhenThen, Matchers, FlatSpec}
*/
class CCNLiteInterfaceTest extends FlatSpec with Matchers with GivenWhenThen {
val ccnIf = new CCNLiteInterface()
val interest = Interest("/name/interest")
val interest = Interest(Seq("/name/interest"))

s"Interest $interest" should "be converted to ccnb back to xml into interest object" in {
Given("cnnb for name")
val ccnbInterest = ccnIf.mkBinaryInterest(interest.nameComponents)
val ccnbInterest = ccnIf.mkBinaryInterest(interest.nameComponents.toArray)
When("parsed to xml string")
val xmlUnparsed = ccnIf.ccnbToXml(ccnbInterest)
Then("xml parsed to interest")
Expand All @@ -19,11 +19,11 @@ class CCNLiteInterfaceTest extends FlatSpec with Matchers with GivenWhenThen {
resultInterest.name should be (Seq("name", "interest"))
}

val content:Content = Content("/name/content", "testcontent".getBytes)
val content:Content = Content(Seq("/name/content"), "testcontent".getBytes)

s"Content $content" should "be converted to ccnb back to xml into content object" in {
Given("cnnb for name and content")
val ccnbContent = ccnIf.mkBinaryContent(content.nameComponents, content.data)
val ccnbContent = ccnIf.mkBinaryContent(content.nameComponents.toArray, content.data)
When("parsed to xml string")
val xmlUnparsed = ccnIf.ccnbToXml(ccnbContent)
Then("xml parsed to content")
Expand Down

0 comments on commit 536cc33

Please sign in to comment.