Permalink
Browse files

First implementation of Octobot in Scala.

  • Loading branch information...
0 parents commit 8eaf33a11a19f8a6e45e74d8aa0f469fe861340b @cscotta committed Sep 29, 2010
7 .gitignore
@@ -0,0 +1,7 @@
+*.class
+jar
+jar/*
+build
+build/*
+nbproject
+MANIFEST.MF
24 LICENSE
@@ -0,0 +1,24 @@
+Copyright (c) 2010, C. Scott Andreas
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+ * Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+ * Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+ * Neither the name of the organization nor the names of its contributors
+ may be used to endorse or promote products derived from this software
+ without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
+DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32 README
@@ -0,0 +1,32 @@
+Introduction –
+
+Octobot is a task queue worker designed for reliability, ease of use, and
+throughput.
+
+Octobot can listen on any number of queues, with any number of workers
+processing messages from each. Each queue can be set at a custom priority to
+ensure that more system resources are available for more important tasks. AMQP
+/ RabbitMQ, Redis, and Beanstalk are supported as backends, with an extensible
+architecture to allow for additional backends to be added as needed.
+
+
+Architecture –
+
+Octobotʼs internal architecture is a shared-nothing, threadsafe design that
+allows for any number of concurrent queues and tasks to be processed at once,
+limited only by system resources. Taking full advantage of parallel execution,
+Octobotʼs ability to spawn multiple workers on multiple queues can be used to
+more efficiently process tasks at a higher level of parallelism to offset the
+cost of IO-bound tasks on remote systems such as database reads, writes, and
+queue IO. Octobot also provides for connection pooling to datastores such as
+Cassandra and MongoDB, as well as outbound connection management for AMQP.
+
+As an isolated worker with support for multiple queues, Octobot instances can
+be geographically distributed across multiple datacenters and availability
+regions, assuring that so long as a queue is available, work will get done.
+
+Octobotʼs modular design also makes it easy to add system introspection and
+reporting tools, providing metrics such as job throughput per queue, total
+messages processed per second, and time per message, which can be passed onto
+an external system if additional time calculations are required to determine
+end-to-end messaging performance.
33 build.xml
@@ -0,0 +1,33 @@
+<project>
+ <property environment="env" />
+ <path id="classpath">
+ <fileset dir="./lib" includes="**/*.jar" />
+ </path>
+
+ <target name="clean">
+ <delete dir="build" />
+ <delete dir="test/build" />
+ <delete dir="jar" />
+ </target>
+
+ <target name="compile" depends="clean">
+ <mkdir dir="build" />
+ <exec executable="/bin/sh">
+ <arg value="-c" />
+ <arg value="scalac -cp lib/jvyaml.jar:lib/log4j-1.2.16.jar:lib/json.jar:lib/BeanstalkClient-1.0.4-SNAPSHOT.jar:lib/rabbitmq-client.jar:lib/mail-1.4.3.jar:lib/activation-1.1.1.jar:lib/jedis-1.0.0-RC5.jar:lib/scala-library.jar src/com/urbanairship/octobot/*.scala -d build" />
+ </exec>
+ </target>
+
+ <target name="jar" depends="compile">
+ <mkdir dir="jar" />
+
+ <manifest file="MANIFEST.MF">
+ <attribute name="Built-By" value="${user.name}"/>
+ <attribute name="Main-Class" value="com.urbanairship.octobot.Octobot"/>
+ </manifest>
+
+ <jar destfile="jar/octobot.jar" basedir="build" includes="**/*.class" manifest="MANIFEST.MF">
+ <zipgroupfileset dir="lib" includes="*.jar"/>
+ </jar>
+ </target>
+</project>
25 example-config.yml
@@ -0,0 +1,25 @@
+Octobot:
+ queues:
+ - { name: tacotruck,
+ protocol: beanstalk,
+ host: localhost,
+ port: 11300,
+ workers: 1,
+ priority: 10
+ }
+
+ metrics_port: 1228
+
+ email_enabled: false
+ email_from: ohai@example.com
+ email_to: ohno@itsbroke.com
+ email_hostname: localhost
+ email_server: smtp.gmail.com
+ email_port: 465
+ email_ssl: true
+ email_auth: true
+ email_username: username
+ email_password: password
+
+ # startup_hook: org.example.taquito.StartupHook
+ # shutdown_hook: org.example.taquito.ShutdownHook
BIN lib/BeanstalkClient-1.0.4-SNAPSHOT.jar
Binary file not shown.
BIN lib/activation-1.1.1.jar
Binary file not shown.
BIN lib/commons-io-1.2.jar
Binary file not shown.
BIN lib/jedis-1.0.0-RC5.jar
Binary file not shown.
BIN lib/json.jar
Binary file not shown.
BIN lib/json_simple-1.1.jar
Binary file not shown.
BIN lib/jvyaml.jar
Binary file not shown.
BIN lib/log4j-1.2.16.jar
Binary file not shown.
BIN lib/mail-1.4.3.jar
Binary file not shown.
BIN lib/rabbitmq-client.jar
Binary file not shown.
BIN lib/scala-library.jar
Binary file not shown.
9 log4j.properties
@@ -0,0 +1,9 @@
+# Set root logger level to DEBUG and its only appender to A1.
+log4j.rootLogger=DEBUG, A1
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses a PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
17 octobot
@@ -0,0 +1,17 @@
+#!/bin/bash
+
+options[0]="configFile=`pwd`/example-config.yml"
+options[1]="log4j.configuration=`pwd`/log4j.properties"
+
+JVM_OPTS="-server"
+CLASSPATH="-cp ../lib/*:.:../tasks.jar"
+EXECUTABLE="com.urbanairship.octobot.Octobot"
+
+for i in "${options[@]}"
+do
+ JVM_OPTS="$JVM_OPTS -D$i "
+done
+
+echo -e "Launching Octobot...\n"
+echo -e "Options specified: $JVM_OPTS\n"
+cd build && java $JVM_OPTS $CLASSPATH $EXECUTABLE
17 octobot-jar
@@ -0,0 +1,17 @@
+#!/bin/bash
+
+options[0]="configFile=`pwd`/example-config.yml"
+options[1]="log4j.configuration=`pwd`/log4j.properties"
+
+JVM_OPTS="-server"
+EXECUTABLE="-jar octobot.jar"
+CLASSPATH="-cp ../tasks.jar:../lib/scala-library.jar"
+
+for i in "${options[@]}"
+do
+ JVM_OPTS="$JVM_OPTS -D$i "
+done
+
+echo -e "Launching Octobot...\n"
+echo -e "Options specified: $JVM_OPTS\n"
+cd jar && java $JVM_OPTS $CLASSPATH $EXECUTABLE
40 src/com/urbanairship/octobot/Beanstalk.scala
@@ -0,0 +1,40 @@
+package com.urbanairship.octobot
+
+import com.surftools.BeanstalkClientImpl.ClientImpl
+import org.apache.log4j.Logger
+
+// This class handles all interfacing with a Beanstalk in Octobot.
+// It is responsible for connection initialization and management.
+
+object Beanstalk {
+
+ val logger = Logger.getLogger("Beanstalk")
+
+ def getBeanstalkChannel(host: String, port: Int, tube: String) : ClientImpl = {
+ var attempts = 0
+ var client: ClientImpl = null
+ logger.info("Opening connection to Beanstalk tube: '" + tube + "'...")
+
+ while (true) {
+ attempts += 1
+ logger.debug("Attempt #" + attempts)
+ try {
+ client = new ClientImpl(host, port)
+ client.useTube(tube)
+ client.watch(tube)
+ logger.info("Connected to Beanstalk")
+ return client
+ } catch {
+ case ex: Exception => {
+ logger.error("Unable to connect to Beanstalk. Retrying in 5 seconds", ex)
+ Thread.sleep(1000 * 5)
+ }
+ }
+ }
+
+ // This code can never be reached, but the compiler wants it.
+ return client
+ }
+
+}
+
117 src/com/urbanairship/octobot/Introspector.scala
@@ -0,0 +1,117 @@
+package com.urbanairship.octobot
+
+import java.util.HashMap
+import java.util.LinkedList
+import java.net.Socket
+import java.net.ServerSocket
+
+import java.io.OutputStream
+import java.io.IOException
+
+import java.lang.management.RuntimeMXBean
+import java.lang.management.ManagementFactory
+import java.util.ArrayList
+
+import org.json.JSONObject
+import org.json.JSONTokener
+
+import org.apache.log4j.Logger
+import scala.collection.JavaConversions._
+
+
+// This class provides some basic instrumentation for Octobot.
+// It provides a simple socket server listening on an admin port (1228
+// by default). Upon receiving a connection, it prints out a JSON string
+// of information such as such as tasks processed per second, total successes
+// and failures, successes and failures per task / per queue.
+
+class Introspector() extends Runnable {
+
+ var server: ServerSocket = null
+ val mx = ManagementFactory.getRuntimeMXBean()
+ var port = Settings.getAsInt("Octobot", "metrics_port")
+ val logger = Logger.getLogger("Introspector")
+
+ override def run() {
+ if (port < 1) port = 1228
+ try { server = new ServerSocket(port) }
+ catch {
+ case ex: IOException => {
+ logger.error("Introspector: Unable to listen on port: " + port +
+ ". Introspector will be unavailable on this instance.")
+ return
+ }
+ }
+
+ logger.info("Introspector launched on port: " + port)
+
+ while (true) {
+ try {
+ val socket = server.accept()
+ val oos = socket.getOutputStream()
+ oos.write(introspect().getBytes())
+ oos.close()
+ socket.close()
+ } catch {
+ case ex: IOException => {
+ logger.error("Error in accepting Introspector connection. "
+ + "Introspector thread shutting down.", ex)
+ return
+ }
+ }
+ }
+ }
+
+ // Assembles metrics for each task and returns a JSON string.
+ // Warnings suppressed are from building the JSON itself.
+ def introspect() : String = {
+ val metrics = new HashMap[String, Object]
+
+ // Make a quick copy of our runtime metrics data.
+ var instrumentedTasks: ArrayList[String] = null
+ var executionTimes: HashMap[String, LinkedList[Long]] = null
+ var taskSuccesses: HashMap[String, Int] = null
+ var taskFailures: HashMap[String, Int] = null
+ var taskRetries: HashMap[String, Int] = null
+
+ Metrics.metricsLock.synchronized {
+ executionTimes = new HashMap[String, LinkedList[Long]](Metrics.executionTimes)
+ taskSuccesses = new HashMap[String, Int](Metrics.taskSuccesses)
+ taskFailures = new HashMap[String, Int](Metrics.taskFailures)
+ taskRetries = new HashMap[String, Int](Metrics.taskRetries)
+ instrumentedTasks = new ArrayList[String](Metrics.instrumentedTasks)
+ }
+
+ // Build a JSON object for each task we've instrumented.
+ instrumentedTasks.foreach { taskName =>
+ var task = new JSONObject()
+ task.put("successes", taskSuccesses.get(taskName))
+ task.put("failures", taskFailures.get(taskName))
+ task.put("retries", taskRetries.get(taskName))
+ task.put("average_time", average(executionTimes.get(taskName)))
+ metrics.put("task_" + taskName, task)
+ }
+
+ metrics.put("tasks_instrumented", instrumentedTasks.size().asInstanceOf[AnyRef])
+ metrics.put("alive_since", (mx.getUptime() / 1000).asInstanceOf[AnyRef])
+
+ return new JSONObject(metrics).toString
+ }
+
+
+ // Calculate and return the mean execution time of our sample.
+ def average(times: LinkedList[Long]) : Float = {
+ if (times == null) return 0.toFloat
+
+ var timeSum: Long = 0
+ times.foreach(t => timeSum += t)
+
+ // Execution time is reported in nanoseconds, so we divide by 1,000,000
+ // to get to ms. Guard against a divide by zero if no stats are available.
+ val result = if (times.size() > 0)
+ timeSum / times.size() / 1000000f else 0.toFloat
+
+ return result
+ }
+}
+
154 src/com/urbanairship/octobot/MailQueue.scala
@@ -0,0 +1,154 @@
+package com.urbanairship.octobot
+
+import org.apache.log4j.Logger
+import java.util.concurrent.ArrayBlockingQueue
+
+// E-mail Imports
+import javax.mail.Message
+import javax.mail.Session
+import javax.mail.Transport
+import javax.mail.MessagingException
+import javax.mail.PasswordAuthentication
+import javax.mail.internet.InternetAddress
+import javax.mail.internet.MimeMessage
+import java.util.Properties
+
+
+// This singleton class provides an internal queue allowing us to asynchronously
+// send email notifications rather than processing them in main app loop.
+
+object MailQueue extends Runnable {
+
+ val logger = Logger.getLogger("Email Queue")
+ val from = Settings.get("Octobot", "email_from")
+ val recipient = Settings.get("Octobot", "email_to")
+ val server = Settings.get("Octobot", "email_server")
+ val username = Settings.get("Octobot", "email_username")
+ val password = Settings.get("Octobot", "email_password")
+ val port = Settings.getAsInt("Octobot", "email_port")
+ val useSSL = Settings.getAsBoolean("Octobot", "email_ssl")
+ val useAuth = Settings.getAsBoolean("Octobot", "email_auth")
+
+ // This internal queue is backed by an ArrayBlockingQueue. By specifying the
+ // number of messages to be held here before the queue blocks (below), we
+ // provide ourselves a safety threshold in terms of how many messages could
+ // be backed up before we force the delivery of all current waiting messages.
+
+ var messages = new ArrayBlockingQueue[String](100)
+
+ def put(message: String) {
+ messages.put(message)
+ }
+
+ def size() : Int = {
+ messages.size()
+ }
+
+ def remainingCapacity() : Int = {
+ messages.remainingCapacity()
+ }
+
+ // As this thread runs, it consumes messages from the internal queue and
+ // delivers each to the recipients configured in the YML file.
+ override def run() {
+
+ if (!validSettings()) {
+ logger.error("Email settings invalid check your configuration.")
+ return
+ }
+
+ while (true) {
+ deliverMessage(messages.take())
+ }
+ }
+
+ // Delivers email error notificiations.
+ def deliverMessage(message: String) {
+
+ logger.info("Sending error notification to: " + recipient)
+
+ try {
+ val email = prepareEmail()
+ email.setFrom(new InternetAddress(from))
+ email.addRecipient(Message.RecipientType.TO, new InternetAddress(recipient))
+
+ email.setSubject("Task Error Notification")
+ email.setText(message)
+
+ // Send message
+ Transport.send(email)
+ logger.info("Sent error e-mail to " + recipient + ". "
+ + "Message: \n\n" + message)
+
+ } catch {
+ case ex: MessagingException => {
+ logger.error("Error delivering error notification.", ex)
+ }
+ }
+ }
+
+
+ // Prepares a sendable email object based on Octobot's SMTP, SSL, and
+ // Authentication configuration.
+ def prepareEmail() : MimeMessage = {
+ // Prepare our configuration.
+ val properties = System.getProperties()
+ properties.setProperty("mail.smtp.host", server)
+ properties.put("mail.smtp.auth", "true")
+ var session: Session = null
+
+ // Configure SSL.
+ if (useSSL) {
+ properties.put("mail.smtp.socketFactory.port", port.asInstanceOf[AnyRef])
+ properties.put("mail.smtp.starttls.enable","true")
+ properties.put("mail.smtp.socketFactory.fallback", "false")
+ properties.put("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory")
+ }
+
+ // Configure authentication.
+ if (useAuth) {
+ properties.setProperty("mail.smtp.submitter", username)
+ val authenticator = new Authenticator(username, password)
+ session = Session.getInstance(properties, authenticator)
+ } else {
+ session = Session.getDefaultInstance(properties)
+ }
+
+ return new MimeMessage(session)
+ }
+
+
+ // Provides an SMTP authenticator for messages sent with auth.
+ class Authenticator(val user: String, val pass: String, var authenticator: Authenticator)
+ extends javax.mail.Authenticator {
+
+ var authentication: PasswordAuthentication = null
+
+ def this(user: String, pass: String) {
+ this(user, pass, new PasswordAuthentication(username, password).asInstanceOf[Authenticator])
+ }
+
+ override def getPasswordAuthentication() : PasswordAuthentication = {
+ return authentication
+ }
+ }
+
+ def validSettings() : Boolean = {
+ var result = true
+
+ val settings = List(from, recipient, server, port)
+
+ // Validate base settings.
+ settings.foreach { setting =>
+ if (setting == null) result = false
+ }
+
+ // Validate authentication.
+ if (useAuth && (username == null || password == null))
+ result = false
+
+ return result
+ }
+
+}
+
88 src/com/urbanairship/octobot/Metrics.scala
@@ -0,0 +1,88 @@
+package com.urbanairship.octobot
+
+import java.util.ArrayList
+import java.util.HashMap
+import java.util.LinkedList
+
+object Metrics {
+
+ // Keep track of all tasks we've seen executed.
+ val instrumentedTasks = new ArrayList[String]()
+
+ // Keep track of average task throughput (last 10k runs per task).
+ val executionTimes = new HashMap[String, LinkedList[Long]]()
+
+ // Keep track of total successes by task.
+ val taskSuccesses = new HashMap[String, Int]()
+
+ // Keep track of total failures by task.
+ val taskFailures = new HashMap[String, Int]()
+
+ // Keep track of total retries by task.
+ val taskRetries = new HashMap[String, Int]()
+
+ val metricsLock = new Object()
+
+
+ // Updates internal metrics following task execution.
+ def update(task: String, time: Long, status: Boolean, retries: Int) {
+ metricsLock.synchronized {
+ if (!instrumentedTasks.contains(task)) instrumentedTasks.add(task)
+
+ updateExecutionTimes(task, time)
+ updateTaskRetries(task, retries)
+ updateTaskResults(task, status)
+ }
+ }
+
+
+ // Update the list of execution times, keeping the last 10,000 per task.
+ def updateExecutionTimes(task: String, time: Long) {
+ if (!executionTimes.containsKey(task)) {
+ val timeList = new LinkedList[Long]()
+ timeList.addFirst(time)
+ executionTimes.put(task, timeList)
+ } else {
+ val timeList = executionTimes.get(task)
+ if (timeList.size() == 10000) timeList.removeLast()
+ timeList.addFirst(time)
+ executionTimes.put(task, timeList)
+ }
+ }
+
+
+ // Update the number of times this task has been retried.
+ def updateTaskRetries(task: String, retries: Int) {
+ if (retries > 0) {
+ if (!taskRetries.containsKey(task)) {
+ taskRetries.put(task, retries)
+ } else {
+ var retriesForTask = taskRetries.get(task)
+ retriesForTask += retries
+ taskRetries.put(task, retriesForTask)
+ }
+ }
+ }
+
+
+ // Update the number of times this task has succeeded or failed.
+ def updateTaskResults(task: String, status: Boolean) {
+ if (status == true) {
+ if (!taskSuccesses.containsKey(task)) {
+ taskSuccesses.put(task, 1)
+ } else {
+ var success = taskSuccesses.get(task)
+ taskSuccesses.put(task, success + 1)
+ }
+ } else {
+ if (!taskFailures.containsKey(task)) {
+ taskFailures.put(task, 1)
+ } else {
+ var failure = taskFailures.get(task)
+ taskFailures.put(task, failure + 1)
+ }
+ }
+ }
+
+}
+
150 src/com/urbanairship/octobot/Octobot.scala
@@ -0,0 +1,150 @@
+package com.urbanairship.octobot
+
+import java.util.List
+import java.util.HashMap
+import java.lang.reflect.Method
+import java.lang.reflect.InvocationTargetException
+
+import org.apache.log4j.Logger
+import org.apache.log4j.PropertyConfigurator
+import org.apache.log4j.BasicConfigurator
+import scala.collection.JavaConversions._
+
+// The fun starts here!
+
+// This class is the main entry point to the application.
+// It initializes (a) queue consumer thread(s) reponsible for
+// receiving and passing messages on to tasks for execution.
+
+object Octobot extends Application {
+
+ val logger = Logger.getLogger("Octobot")
+
+ override def main(args : Array[String]) {
+
+ // Initialize logging from a log4j configuration file.
+ val configFile = System.getProperty("log4j.configuration")
+ if (configFile != null && !configFile.equals("")) {
+ PropertyConfigurator.configure(configFile)
+ } else {
+ BasicConfigurator.configure()
+ logger.warn("log4j.configuration not set - logging to stdout.")
+ }
+
+ // If a startup hook is configured, call it before launching workers.
+ val startupHook = Settings.get("Octobot", "startup_hook")
+ if (startupHook != null && !startupHook.equals(""))
+ launchStartupHook(startupHook)
+
+ // If a shutdown hook is configured, register it.
+ val shutdownHook = Settings.get("Octobot", "shutdown_hook")
+ if (shutdownHook != null && !shutdownHook.equals(""))
+ registerShutdownHook(shutdownHook)
+
+ val enableEmailErrors = Settings.getAsBoolean("Octobot", "email_enabled")
+ if (enableEmailErrors) {
+ logger.info("Launching email notification queue...")
+ new Thread(MailQueue, "Email Queue").start()
+ }
+
+ logger.info("Launching Introspector...")
+ new Thread(new Introspector(), "Introspector").start()
+
+ logger.info("Launching Workers...")
+ var queues: List[HashMap[String, Any]] = null
+ try {
+ queues = getQueues()
+ } catch {
+ case ex : NullPointerException => {
+ logger.fatal("Error: No valid queues found in Settings. Exiting.")
+ throw new Error("Error: No valid queues found in Settings. Exiting.")
+ }
+ }
+
+ // Start a thread for each queue Octobot is configured to listen on.
+ queues.foreach { queueConf =>
+ // Fetch the number of workers to spawn and their priority.
+ val numWorkers = Settings.getIntFromYML(queueConf.get("workers"), 1)
+ val priority = Settings.getIntFromYML(queueConf.get("priority"), 5)
+
+ val queue = new Queue(queueConf)
+
+ // Spawn worker threads for each queue in our configuration.
+ for (i <- 0 until numWorkers) {
+ var consumer = new QueueConsumer(queue)
+ var worker = new Thread(consumer, "Worker")
+
+ logger.info("Attempting to connect to " + queueConf.get("protocol") +
+ " queue: " + queueConf.get("name") + " with priority " +
+ priority + "/10 " + "(Worker " + (i+1) + "/" + numWorkers + ").")
+
+ worker.setPriority(priority)
+ worker.start()
+ }
+ }
+
+ logger.info("Octobot ready to rock!")
+ }
+
+
+ // Invokes a startup hook registered from the YML config on launch.
+ def launchStartupHook(className: String) {
+ logger.info("Calling Startup Hook: " + className)
+
+ try {
+ val startupHook = Class.forName(className)
+ val method = startupHook.getMethod("run")
+ method.invoke(startupHook.newInstance(), null)
+ } catch {
+ case ex: ClassNotFoundException => {
+ logger.error("Could not find class: " + className + " for the " +
+ "startup hook specified. Please ensure that it exists in your" +
+ " classpath and launch Octobot again. Continuing without" +
+ " executing this hook...")
+ } case ex: NoSuchMethodException => {
+ logger.error("Your startup hook: " + className + " does not " +
+ " properly implement the Runnable interface. Your startup hook must " +
+ " contain a method with the signature: public void run()." +
+ " Continuing without executing this hook...")
+ } case ex: InvocationTargetException => {
+ logger.error("Your startup hook: " + className + " caused an error" +
+ " in execution. Please correct this error and re-launch Octobot." +
+ " Continuing without executing this hook...", ex.getCause())
+ } case ex: Exception => {
+ logger.error("Your startup hook: " + className + " caused an unknown" +
+ " error. Please see the following stacktrace for information.", ex)
+ }
+ }
+ }
+
+
+ // Registers a Runnable to be ran as a shutdown hook when Octobot stops.
+ def registerShutdownHook(className: String) {
+ logger.info("Registering Shutdown Hook: " + className)
+
+ try {
+ val startupHook = Class.forName(className)
+ Runtime.getRuntime().addShutdownHook(new Thread(startupHook.newInstance().asInstanceOf[Runnable]))
+ } catch {
+ case ex: ClassNotFoundException => {
+ logger.error("Could not find class: " + className + " for the " +
+ "shutdown hook specified. Please ensure that it exists in your" +
+ " classpath and launch Octobot again. Continuing without" +
+ " registering this hook...")
+ } case ex: ClassCastException => {
+ logger.error("Your shutdown hook: " + className + " could not be "
+ + "registered due because it does not implement the Runnable "
+ + "interface. Continuing without registering this hook...")
+ } case ex: Exception => {
+ logger.error("Your shutdown hook: " + className + " could not be "
+ + "registered due to an unknown error. Please see the " +
+ "following stacktrace for debugging information.", ex)
+ }
+ }
+ }
+
+ def getQueues() : List[HashMap[String, Any]] = {
+ Settings.configuration.get("Octobot").get("queues").asInstanceOf[List[HashMap[String, Any]]]
+ }
+}
+
32 src/com/urbanairship/octobot/Queue.scala
@@ -0,0 +1,32 @@
+package com.urbanairship.octobot
+
+import java.util.HashMap
+
+class Queue(val queueType: String, val queueName: String, val host: String, val port: Int,
+ val username: String, val password: String, val vhost: String) {
+
+ def this(queueType: String, queueName: String, host: String,
+ port: Int, username: String, password: String) {
+ this(queueType, queueName, host, port, username, password, "/")
+ }
+
+ def this(queueType: String, queueName: String, host: String, port: Int) {
+ this(queueType.toLowerCase, queueName, host, port, null, null, null)
+ }
+
+ def this(config: HashMap[String, Any]) {
+ this(config.get("protocol").asInstanceOf[String].toLowerCase,
+ config.get("name").asInstanceOf[String],
+ config.get("host").asInstanceOf[String],
+ config.get("port").toString.toInt,
+ config.get("username").asInstanceOf[String],
+ config.get("password").asInstanceOf[String],
+ config.get("vhost").asInstanceOf[String])
+ }
+
+ override def toString() : String = {
+ queueType + "/" + queueName + "/" + host + "/" + port + "/" +
+ username + "/" + password + "/" + vhost
+ }
+
+}
277 src/com/urbanairship/octobot/QueueConsumer.scala
@@ -0,0 +1,277 @@
+package com.urbanairship.octobot
+
+// AMQP Support
+import java.io.IOException
+import com.rabbitmq.client.Channel
+import com.rabbitmq.client.Connection
+import com.rabbitmq.client.QueueingConsumer
+
+// Beanstalk Support
+import com.surftools.BeanstalkClient.BeanstalkException
+import com.surftools.BeanstalkClient.Job
+import com.surftools.BeanstalkClientImpl.ClientImpl
+import java.io.PrintWriter
+import java.io.StringWriter
+
+import org.json.JSONObject
+import org.json.JSONTokener
+import org.apache.log4j.Logger
+import redis.clients.jedis.Jedis
+import redis.clients.jedis.JedisPubSub
+
+
+// This thread opens a streaming connection to a queue, which continually
+// pushes messages to Octobot queue workers. The tasks contained within these
+// messages are invoked, then acknowledged and removed from the queue.
+
+class QueueConsumer(val queue: Queue) extends Runnable {
+
+ var channel: Channel = null
+ var connection: Connection = null
+ var consumer: QueueingConsumer = null
+
+ val logger = Logger.getLogger("Queue Consumer")
+ val enableEmailErrors = Settings.getAsBoolean("Octobot", "email_enabled")
+
+ // Fire up the appropriate queue listener and begin invoking tasks!.
+ override def run() {
+ if (queue.queueType.equals("amqp")) {
+ channel = getAMQPChannel(queue)
+ consumeFromAMQP()
+ } else if (queue.queueType.equals("beanstalk")) {
+ consumeFromBeanstalk()
+ } else if (queue.queueType.equals("redis")) {
+ consumeFromRedis()
+ } else {
+ logger.error("Invalid queue type specified: " + queue.queueType)
+ }
+ }
+
+
+ // Attempts to register to receive streaming messages from RabbitMQ.
+ // In the event that RabbitMQ is unavailable the call to getChannel()
+ // will attempt to reconnect. If it fails, the loop simply repeats.
+ def consumeFromAMQP() {
+
+ while (true) {
+ var task: QueueingConsumer.Delivery = null
+ try { task = consumer.nextDelivery() }
+ catch {
+ case ex: Exception => {
+ logger.error("Error in AMQP connection reconnecting.", ex)
+ channel = getAMQPChannel(queue)
+ }
+ }
+
+ // If we've got a message, fetch the body and invoke the task.
+ // Then, send an acknowledgement back to RabbitMQ that we got it.
+ if (task != null && task.getBody() != null) {
+ invokeTask(new String(task.getBody()))
+ try { channel.basicAck(task.getEnvelope().getDeliveryTag(), false) }
+ catch {
+ case ex: IOException => { logger.error("Error ack'ing message.", ex) }
+ }
+ }
+ }
+ }
+
+
+ // Attempt to register to receive messages from Beanstalk and invoke tasks.
+ def consumeFromBeanstalk() {
+ var beanstalkClient = new ClientImpl(queue.host, queue.port)
+ beanstalkClient.watch(queue.queueName)
+ beanstalkClient.useTube(queue.queueName)
+ logger.info("Connected to Beanstalk waiting for jobs.")
+
+ while (true) {
+ var job: Job = null
+ try { job = beanstalkClient.reserve(1) }
+ catch {
+ case ex: BeanstalkException => {
+ logger.error("Beanstalk connection error.", ex)
+ beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host,
+ queue.port, queue.queueName)
+ }
+ }
+
+ if (job != null) {
+ val message = new String(job.getData())
+
+ try { invokeTask(message) }
+ catch {
+ case ex: Exception => {
+ logger.error("Error handling message.", ex)
+ }
+ }
+
+ try { beanstalkClient.delete(job.getJobId()) }
+ catch {
+ case ex: BeanstalkException => {
+ logger.error("Error sending message receipt.", ex)
+ beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host,
+ queue.port, queue.queueName)
+ }
+ }
+ }
+ }
+ }
+
+
+ def consumeFromRedis() {
+ logger.info("Connecting to Redis...")
+ var jedis = new Jedis(queue.host, queue.port)
+ try {
+ jedis.connect()
+ } catch {
+ case ex: IOException => {
+ logger.error("Unable to connect to Redis.", ex)
+ }
+ }
+
+ logger.info("Connected to Redis.")
+
+ jedis.subscribe(new JedisPubSub() {
+ override def onMessage(channel: String, message: String) {
+ invokeTask(message)
+ }
+
+ override def onPMessage(string: String, string1: String, string2: String) {
+ logger.info("onPMessage Triggered - Not implemented.")
+ }
+
+ override def onSubscribe(string: String, i: Int) {
+ logger.info("onSubscribe called - Not implemented.")
+ }
+
+ override def onUnsubscribe(string: String, i: Int) {
+ logger.info("onUnsubscribe Called - Not implemented.")
+ }
+
+ override def onPUnsubscribe(string: String, i: Int) {
+ logger.info("onPUnsubscribe called - Not implemented.")
+ }
+
+ override def onPSubscribe(string: String, i: Int) {
+ logger.info("onPSubscribe Triggered - Not implemented.")
+ }
+ }, queue.queueName)
+ }
+
+
+ // Invokes a task based on the name of the task passed in the message via
+ // reflection, accounting for non-existent tasks and errors while running.
+ def invokeTask(rawMessage: String) : Boolean = {
+ var taskName = ""
+ var message : JSONObject = null
+ var retryCount = 0
+ var retryTimes = 0
+
+ val startedAt = System.nanoTime()
+ var errorMessage: String = null
+ var lastException: Throwable = null
+ var executedSuccessfully = false
+
+ while (!executedSuccessfully && retryCount < retryTimes + 1) {
+ if (retryCount > 0)
+ logger.info("Retrying task. Attempt " + retryCount + " of " + retryTimes)
+
+ try {
+ message = new JSONObject(new JSONTokener(rawMessage))
+ taskName = message.get("task").asInstanceOf[String]
+ if (message.has("retries"))
+ retryTimes = message.get("retries").asInstanceOf[Int]
+ } catch {
+ case ex: Exception => {
+ logger.error("Error: Invalid message received: " + rawMessage, ex)
+ return executedSuccessfully
+ }
+ }
+
+ // Locate the task, then invoke it, supplying our message.
+ // Cache methods after lookup to avoid unnecessary reflection lookups.
+ try {
+ TaskExecutor.execute(taskName, message)
+ executedSuccessfully = true
+ } catch {
+ case ex: ClassNotFoundException => {
+ lastException = ex
+ errorMessage = "Error: Task requested not found: " + taskName
+ logger.error(errorMessage)
+ } case ex: NoClassDefFoundError => {
+ lastException = ex
+ errorMessage = "Error: Task requested not found: " + taskName
+ logger.error(errorMessage, ex)
+ } case ex: NoSuchMethodException => {
+ lastException = ex
+ errorMessage = "Error: Task requested does not have a static run method."
+ logger.error(errorMessage, ex)
+ } case ex: Throwable => {
+ lastException = ex
+ errorMessage = "An error occurred while running the task."
+ logger.error(errorMessage, ex)
+ }
+ }
+
+ if (!executedSuccessfully) retryCount += 1
+ }
+
+ // Deliver an e-mail error notification if enabled.
+ if (enableEmailErrors && !executedSuccessfully) {
+ val email = "Error running task: " + taskName + ".\n\n" +
+ "Attempted executing " + retryCount.toString + " times as specified.\n\n" +
+ "The original input was: \n\n" + rawMessage + "\n\n" +
+ "Here's the error that resulted while running the task:\n\n" +
+ stackToString(lastException)
+
+ MailQueue.put(email)
+ }
+
+ val finishedAt = System.nanoTime()
+ Metrics.update(taskName, finishedAt - startedAt, executedSuccessfully, retryCount)
+
+ return executedSuccessfully
+ }
+
+ // Opens up a connection to RabbitMQ, retrying every five seconds
+ // if the queue server is unavailable.
+ def getAMQPChannel(queue: Queue) : Channel = {
+ var attempts = 0
+ var channel: Channel = null
+ logger.info("Opening connection to AMQP " + queue.vhost + " " + queue.queueName + "...")
+
+ while (true) {
+ attempts += 1
+ logger.debug("Attempt #" + attempts)
+ try {
+ connection = new RabbitMQ(queue).getConnection()
+ channel = connection.createChannel()
+ consumer = new QueueingConsumer(channel)
+ channel.exchangeDeclare(queue.queueName, "direct", true)
+ channel.queueDeclare(queue.queueName, true, false, false, null)
+ channel.queueBind(queue.queueName, queue.queueName, queue.queueName)
+ channel.basicConsume(queue.queueName, false, consumer)
+ logger.info("Connected to RabbitMQ")
+ return channel
+ } catch {
+ case ex: Exception => {
+ logger.error("Cannot connect to AMQP. Retrying in 5 sec.", ex)
+ Thread.sleep(1000 * 5)
+ }
+ }
+ }
+
+ return channel
+ }
+
+ // Converts a stacktrace from task invocation to a string for error logging.
+ def stackToString(e: Throwable) : String = {
+ if (e == null) return "(Null)"
+
+ val stringWriter = new StringWriter()
+ val printWriter = new PrintWriter(stringWriter)
+
+ e.printStackTrace(printWriter)
+ return stringWriter.toString()
+ }
+}
+
57 src/com/urbanairship/octobot/RabbitMQ.scala
@@ -0,0 +1,57 @@
+package com.urbanairship.octobot
+
+import org.apache.log4j.Logger
+import com.rabbitmq.client.Channel
+import java.io.IOException
+import com.rabbitmq.client.Connection
+import com.rabbitmq.client.ConnectionFactory
+
+
+// This class handles all interfacing with AMQP / RabbitMQ in Octobot.
+// It provides basic connection management and returns task channels
+// for placing messages into a remote queue.
+
+class RabbitMQ(val host: String, val port: Int, val username: String, val password: String, val vhost: String) {
+
+ val logger = Logger.getLogger("RabbitMQ")
+ val factory = new ConnectionFactory()
+ factory.setHost(host)
+ factory.setPort(port)
+ factory.setUsername(username)
+ factory.setPassword(password)
+ factory.setVirtualHost(vhost)
+
+ def this(queue: Queue) {
+ this(queue.host, queue.port, queue.username, queue.password, queue.vhost)
+ }
+
+ // Returns a new connection to an AMQP queue.
+ def getConnection() : Connection = {
+ return factory.newConnection()
+ }
+
+ // Returns a live channel for publishing messages.
+ def getTaskChannel() : Channel ={
+ var taskChannel: Channel = null
+
+ var attempts = 0
+ while (true) {
+ attempts += 1
+ logger.info("Attempting to connect to queue: attempt " + attempts)
+ try {
+ val connection = getConnection()
+ taskChannel = connection.createChannel()
+ return taskChannel
+ } catch {
+ case ex: IOException => {
+ logger.error("Error creating AMQP channel, retrying in 5 sec", ex)
+ Thread.sleep(1000 * 5)
+ }
+ }
+ }
+
+ // This statement will never be reached, but makes the compiler happy.
+ return taskChannel
+ }
+}
+
94 src/com/urbanairship/octobot/Settings.scala
@@ -0,0 +1,94 @@
+package com.urbanairship.octobot
+
+import org.jvyaml.YAML
+import java.util.HashMap
+import java.io.FileReader
+import org.apache.log4j.Logger
+
+
+// This class is responsible for loading in configuration data for the
+// application. By default, it searches for a YAML file located at
+// /usr/local/octobot/octobot.yml, unless the JVM environment variable
+// "-DconfigFile=/absolute/path" is specified. These values are accessed
+// as a standard map by calling Settings.get("Octobot", "queues").
+
+// Implemented as a singleton to avoid reading the file in multiple times.
+// Changes to application configuration require a restart to take effect.
+
+object Settings {
+
+ val logger = Logger.getLogger("Settings")
+ var configuration:HashMap[String, HashMap[String, Any]] = null
+
+ // Load the settings once on initialization, and hang onto them.
+ var settingsFile = System.getProperty("configFile")
+ if (settingsFile == null) settingsFile = "/usr/local/octobot/octobot.yml"
+
+ try {
+ configuration = YAML.load(new FileReader(settingsFile)).asInstanceOf[HashMap[String, HashMap[String, Any]]];
+ } catch {
+ case ex: Exception => {
+ logger.warn("Warning: No valid config at " + settingsFile)
+ logger.warn("Please create this file, or set the " +
+ "-DconfigFile=/foo/bar/octobot.yml JVM variable to its location.")
+ logger.warn("Continuing launch with internal defaults.")
+ }
+ }
+
+ def get(category: String, key: String) : String = {
+ var result = ""
+ try {
+ val configCategory = configuration.get(category)
+ result = configCategory.get(key).toString
+ } catch {
+ case ex: NullPointerException => {
+ logger.warn("Warning - unable to load " + category + " / " +
+ key + " from configuration file.")
+ }
+ }
+
+ return result
+ }
+
+ // Fetches a setting from YAML config and converts it to an integer.
+ // No integer settings are autodetected, so that logic is not needed here.
+ def getAsInt(category: String, key: String) : Int = {
+ var result = 0
+ var value : Any = null
+ var configCategory : HashMap[String, Any] = null
+
+ try {
+ configCategory = configuration.get(category)
+ value = configCategory.get(key)
+ if (value != null) result = value.asInstanceOf[Long].intValue
+ } catch {
+ case ex: NullPointerException => {
+ logger.warn("Warning - unable to load " + category + " / " +
+ key + " from configuration file.")
+ }
+ }
+
+ return result
+ }
+
+ def getIntFromYML(obj: Any, defaultValue : Int) : Int = {
+ var result = defaultValue
+
+ try {
+ return obj.toString.toInt
+ } catch {
+ case ex: Exception => {
+ logger.info("Error reading settings.")
+ return defaultValue
+ }
+ }
+ }
+
+
+ // Fetches a setting from YAML config and converts it to a boolean.
+ // No boolean settings are autodetected, so that logic is not needed here.
+ def getAsBoolean(category: String, key: String) : Boolean = {
+ return get(category, key).toBoolean
+ }
+}
+
26 src/com/urbanairship/octobot/TaskExecutor.scala
@@ -0,0 +1,26 @@
+package com.urbanairship.octobot
+
+import java.util.HashMap
+import java.lang.reflect.Method
+import org.json.JSONObject
+
+object TaskExecutor {
+
+ val taskCache = new HashMap[String, Method]
+
+ def execute(taskName: String, message: JSONObject) {
+ var method: Method = null
+
+ if (taskCache.containsKey(taskName)) {
+ method = taskCache.get(taskName)
+ } else {
+ val task = Class.forName(taskName)
+ val klass = new JSONObject().getClass
+ method = task.getMethod("run", klass)
+ taskCache.put(taskName, method)
+ }
+
+ method.invoke(null, message)
+ }
+
+}

0 comments on commit 8eaf33a

Please sign in to comment.