Permalink
Browse files

Update to enable passing of the parsed JSON as a Map to a task.

  • Loading branch information...
1 parent 83a926e commit a0cbd18a71ef30f7fc7bff3c3f80d3928366cbba @cscotta committed Oct 4, 2010
@@ -4,6 +4,8 @@ import java.io.{PrintWriter, StringWriter}
import com.twitter.json._
import org.apache.log4j.Logger
import com.urbanairship.octobot.consumers._
+import scala.collection.JavaConversions._
+
// This thread opens a streaming connection to a queue, which continually
// pushes messages to Octobot queue workers. The tasks contained within these
@@ -33,7 +35,7 @@ object QueueConsumer {
// reflection, accounting for non-existent tasks and errors while running.
def invokeTask(rawMessage: String) : Boolean = {
var taskName = ""
- var message: Map[String, AnyVal] = null
+ var message: Map[String, Object] = null
var retryCount = 0
var retryTimes = 0
@@ -48,7 +50,7 @@ object QueueConsumer {
logger.info("Retrying task. Attempt " + retryCount + " of " + retryTimes)
try {
- message = Json.parse(rawMessage).asInstanceOf[Map[String, AnyVal]]
+ message = Json.parse(rawMessage).asInstanceOf[Map[String, Object]]
taskName = message.get("task").get.toString
if (message.contains("retries"))
@@ -1,20 +1,22 @@
package com.urbanairship.octobot
+import java.util.Map
import com.twitter.json._
import java.lang.reflect.Method
-import scala.collection.mutable.HashMap
+import scala.collection.mutable
+import scala.collection.JavaConversions._
// This class is responsible for the actual invocation of a task.
// Given a task name and a JSON object to be passed onto it, the "execute"
// method looks up the task and method to invoke based on the name and calls it,
// then caches the method lookup.
object TaskExecutor {
- val taskCache = new HashMap[String, Method]
- val argClass = new HashMap[String, AnyVal].getClass
+ val taskCache = new mutable.HashMap[String, Method]
+ val argClass = Class.forName("java.util.Map")
// Invokes a task identified by class name with a message.
- def execute(taskName: String, message: Map[String, AnyVal]) {
+ def execute(taskName: String, message: Map[String, Object]) {
val method: Method = {
taskCache.getOrElse(taskName, {

0 comments on commit a0cbd18

Please sign in to comment.