Permalink
Browse files

Added functionalities: SubmitAt, and InvokeAt, for running tasks on s…

…pecific nodes
  • Loading branch information...
1 parent cf3d59f commit 06f9920e6804ede589235a740548cb3a9d951fcc @heathermiller committed Jul 13, 2011
@@ -10,16 +10,18 @@
package scacs
-
-import akka.actor.Actor
+import akka.actor.{Actor, ActorRef}
import Actor._
+import scala.collection.mutable.HashMap
class ClusterService extends Actor{
var allAddresses: List[(String, Int)] = List()
+ var data = new HashMap[Int, Any]
+ var master: ActorRef = null
def receive = {
case Announce(hostname, port) =>
- val master = remote.actorFor(classOf[MasterService].getCanonicalName,hostname,port)
+ master = remote.actorFor(classOf[MasterService].getCanonicalName,hostname,port)
val localhost = remote.address.getHostName()
val localport = remote.address.getPort()
master ! Announce(localhost, localport)
@@ -36,6 +38,15 @@ class ClusterService extends Actor{
newActor !! Nodes(allAddresses)
self.reply()
+ case SubmitAt(_, _, block, input, trackingNumber) =>
+ val result = block(this, input)
+ data += (trackingNumber.get -> result)
+
+ case InvokeAt(_, _, block, input, trackingNumber) =>
+ val result = block(this, input)
+ if (!trackingNumber.isEmpty) data += (trackingNumber.get -> result)
+ self.reply(result)
+
case _ =>
println("[ClusterService] unknown message")
}
@@ -3,6 +3,7 @@ package scacs
import akka.actor.{Actor, ActorRef}
import Actor._
+import scala.collection.mutable.HashMap
import java.util.concurrent.CountDownLatch
class MasterService extends Actor {
@@ -11,11 +12,13 @@ class MasterService extends Actor {
var nodeRefs: List[ActorRef] = List()
def receive = {
+
case ClusterSize(num) =>
numNodes = num
println("[MasterService] waiting for "+
numNodes+" nodes to register")
self.reply()
+
case Announce(newHost, newPort) =>
println("[MasterService] new host "+
newHost+":"+newPort)
@@ -28,17 +31,8 @@ class MasterService extends Actor {
port)
}
nodeRefs foreach { service => service !! Nodes(addresses) }
-
- //test, sends message to first node
- nodeRefs(0) !! Start(classOf[EchoActor])
- val echoActor = remote.actorFor(
- classOf[EchoActor].getCanonicalName,
- addresses(0)._1,
- addresses(0)._2)
- println(echoActor !! "hello")
-
- self.stop()
}
+
case _ =>
println("[MasterService] unknown message")
}
@@ -36,17 +36,36 @@ case class Start(clazz: Class[_ <: Actor])
/**
* Message type used by {{MasterService}} for submitting a task to a node,
- * when it expects to receive a {{trackingNumber}} for a processed data
+ * when it expects to receive a tracking number for a processed data
* item in return, rather than the actual data item.
*
- * @param addresses a list of addresses of the remote nodes
+ * TODO: Variants of this which deal with data partitioning and/or shipping.
+ *
+ * @param host The hostname of the {{ClusterService}} to submit to.
+ * @param port The port number of the {{ClusterService}} to submit to.
+ * @param fun The function to distribute.
+ * @param input The input data to distribute and process.
+ * @trackingNumber The tracking number generated by {{MasterService}} to be used by {{ClusterService}} to locally store result.
*/
-//case class SubmitAt(addresses: List[(String, Int)])
+case class SubmitAt(host: String, port: Int,
+ fun: (ClusterService,Any)=>Any,
+ input: Any,
+ trackingNumber: Option[Int])
/**
* Message type used by {{MasterService}} for submitting a task to a node,
* when it expects to receive directly receive the processed data item in return.
*
- * @param clazz the class of the actor to be started
+ * TODO: Variants of this which deal with data partitioning and/or shipping.
+ *
+ * @param host The hostname of the {{ClusterService}} to submit to.
+ * @param port The port number of the {{ClusterService}} to submit to.
+ * @param fun The function to distribute.
+ * @param input The input data to distribute and process.
+ * @trackingNumber The tracking number generated by {{MasterService}} to be used by {{ClusterService}} to locally store result. This is an optional. If no trackingNumber is provided, then the result is not stored locally by {{ClusterService}}.
*/
-//case class InvokeAt(clazz: Class[_ <: Actor])
+case class InvokeAt(host: String, port: Int,
+ fun: (ClusterService,Any)=>Any,
+ input: Any,
+ trackingNumber: Option[Int] = None)
+

0 comments on commit 06f9920

Please sign in to comment.