Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

x-ha-policy and heartbeat support #12

Closed
wants to merge 3 commits into from

1 participant

@jplock

cleaned up my pull request to support x-ha-policy and heartbeat support. This also changes from maven-assembly to maven-shade and upgrades the dependencies where necessary. Thanks for taking a look at this.

jplock added some commits
@jplock jplock Upgraded amqp-client from 2.7.1 to 2.8.7
Upgraded log4j from 1.2.16 to 1.2.17
Upgraded jedis from 2.0.0 to 2.1.0
Upgraded from json-simple from 1.1 to 1.1.1
Upgraded from javax.mail from 1.4.5-rc1 to 1.4.5
Upgraded from metrics-core from 2.0.3 to 2.1.3
Upgraded jetty from 8.1.1.v20120215 to 8.1.7.v20120910
Switched from maven-assembly-plugin to maven-shade-plugin
ef7ca5b
@jplock jplock Added support for specifying the RabbitMQ heartbeat 8192caa
@jplock jplock Added x-ha-policy support b4fec19
@jplock jplock closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Nov 6, 2012
  1. @jplock

    Upgraded amqp-client from 2.7.1 to 2.8.7

    jplock authored
    Upgraded log4j from 1.2.16 to 1.2.17
    Upgraded jedis from 2.0.0 to 2.1.0
    Upgraded from json-simple from 1.1 to 1.1.1
    Upgraded from javax.mail from 1.4.5-rc1 to 1.4.5
    Upgraded from metrics-core from 2.0.3 to 2.1.3
    Upgraded jetty from 8.1.1.v20120215 to 8.1.7.v20120910
    Switched from maven-assembly-plugin to maven-shade-plugin
  2. @jplock
  3. @jplock

    Added x-ha-policy support

    jplock authored
This page is out of date. Refresh to see the latest.
View
6 example-config.yml
@@ -8,7 +8,11 @@ Octobot:
priority: 5,
workers: 1,
username: cilantro,
- password: burrito
+ password: burrito,
+ heartbeat: 10,
+ arguments: {
+ x-ha-policy: all
+ }
}
metrics_port: 1228
View
73 pom.xml
@@ -42,29 +42,29 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>2.7.1</version>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>2.8.7</version>
</dependency>
<dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.16</version>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
</dependency>
<dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <version>2.0.0</version>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>2.1.0</version>
</dependency>
<dependency>
- <groupId>com.surftools</groupId>
- <artifactId>BeanstalkClient</artifactId>
- <version>1.4.6</version>
+ <groupId>com.surftools</groupId>
+ <artifactId>BeanstalkClient</artifactId>
+ <version>1.4.6</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
- <version>1.1</version>
+ <version>1.1.1</version>
</dependency>
<dependency>
<groupId>net.java.dev</groupId>
@@ -74,27 +74,27 @@
<dependency>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
- <version>1.4.5-rc1</version>
+ <version>1.4.5</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-core</artifactId>
- <version>2.0.3</version>
+ <version>2.1.3</version>
</dependency>
<dependency>
<groupId>com.yammer.metrics</groupId>
<artifactId>metrics-servlet</artifactId>
- <version>2.0.3</version>
+ <version>2.1.3</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
- <version>8.1.1.v20120215</version>
+ <version>8.1.7.v20120910</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
- <version>8.1.1.v20120215</version>
+ <version>8.1.7.v20120910</version>
</dependency>
</dependencies>
@@ -192,26 +192,36 @@
</configuration>
</plugin>
<plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>2.3</version>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.6</version>
<configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- <archive>
- <manifest>
- <addClasspath>true</addClasspath>
- <mainClass>com.urbanairship.octobot.Octobot</mainClass>
- </manifest>
- </archive>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
</configuration>
<executions>
<execution>
- <id>make-assembly</id>
<phase>package</phase>
<goals>
- <goal>single</goal>
+ <goal>shade</goal>
</goals>
+ <configuration>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ <mainClass>com.urbanairship.octobot.Octobot</mainClass>
+ </transformer>
+ </transformers>
+ </configuration>
</execution>
</executions>
</plugin>
@@ -225,4 +235,3 @@
</extensions>
</build>
</project>
-
View
6 src/main/java/com/urbanairship/octobot/Queue.java
@@ -12,6 +12,8 @@
public String username;
public String password;
public String vhost;
+ public Integer heartbeat;
+ public HashMap<String, Object> arguments;
public Queue(String queueType, String queueName, String host, Integer port,
String username, String password) {
@@ -41,6 +43,10 @@ public Queue(HashMap<String, Object> config) {
if (config.get("port") != null)
this.port = Integer.parseInt(((Long) config.get("port")).toString());
+ if (config.get("heartbeat") != null)
+ this.heartbeat = Integer.parseInt(((Long) config.get("heartbeat")).toString());
+ if (config.get("arguments") != null)
+ this.arguments = (HashMap<String, Object>)config.get("arguments");
}
View
142 src/main/java/com/urbanairship/octobot/QueueConsumer.java
@@ -92,7 +92,7 @@ private void consumeFromBeanstalk() {
try { job = beanstalkClient.reserve(1); }
catch (BeanstalkException e) {
logger.error("Beanstalk connection error.", e);
- beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host,
+ beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host,
queue.port, queue.queueName);
continue;
}
@@ -106,7 +106,7 @@ private void consumeFromBeanstalk() {
try { beanstalkClient.delete(job.getJobId()); }
catch (BeanstalkException e) {
logger.error("Error sending message receipt.", e);
- beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host,
+ beanstalkClient = Beanstalk.getBeanstalkChannel(queue.host,
queue.port, queue.queueName);
}
}
@@ -122,14 +122,14 @@ private void consumeFromRedis() {
} catch (JedisConnectionException e) {
logger.error("Unable to connect to Redis.", e);
}
-
+
logger.info("Connected to Redis.");
jedis.subscribe(new JedisPubSub() {
@Override
- public void onMessage(String channel, String message) {
- invokeTask(message);
- }
+ public void onMessage(String channel, String message) {
+ invokeTask(message);
+ }
@Override
public void onPMessage(String string, String string1, String string2) {
@@ -155,85 +155,85 @@ public void onPUnsubscribe(String string, int i) {
public void onPSubscribe(String string, int i) {
logger.info("onPSubscribe Triggered - Not implemented.");
}
- }, queue.queueName);
+ }, queue.queueName);
}
-// Invokes a task based on the name of the task passed in the message via
-// reflection, accounting for non-existent tasks and errors while running.
-public boolean invokeTask(String rawMessage) {
- String taskName = "";
- JSONObject message;
- int retryCount = 0;
- long retryTimes = 0;
+ // Invokes a task based on the name of the task passed in the message via
+ // reflection, accounting for non-existent tasks and errors while running.
+ public boolean invokeTask(String rawMessage) {
+ String taskName = "";
+ JSONObject message;
+ int retryCount = 0;
+ long retryTimes = 0;
- long startedAt = System.nanoTime();
- String errorMessage = null;
- Throwable lastException = null;
- boolean executedSuccessfully = false;
+ long startedAt = System.nanoTime();
+ String errorMessage = null;
+ Throwable lastException = null;
+ boolean executedSuccessfully = false;
- while (retryCount < retryTimes + 1) {
- if (retryCount > 0)
- logger.info("Retrying task. Attempt " + retryCount + " of " + retryTimes);
+ while (retryCount < retryTimes + 1) {
+ if (retryCount > 0)
+ logger.info("Retrying task. Attempt " + retryCount + " of " + retryTimes);
- try {
- message = (JSONObject) JSONValue.parse(rawMessage);
- taskName = (String) message.get("task");
- if (message.containsKey("retries"))
- retryTimes = (Long) message.get("retries");
- } catch (Exception e) {
- logger.error("Error: Invalid message received: " + rawMessage);
- return executedSuccessfully;
+ try {
+ message = (JSONObject) JSONValue.parse(rawMessage);
+ taskName = (String) message.get("task");
+ if (message.containsKey("retries"))
+ retryTimes = (Long) message.get("retries");
+ } catch (Exception e) {
+ logger.error("Error: Invalid message received: " + rawMessage);
+ return executedSuccessfully;
+ }
+
+ // Locate the task, then invoke it, supplying our message.
+ // Cache methods after lookup to avoid unnecessary reflection lookups.
+ try {
+
+ TaskExecutor.execute(taskName, message);
+ executedSuccessfully = true;
+
+ } catch (ClassNotFoundException e) {
+ lastException = e;
+ errorMessage = "Error: Task requested not found: " + taskName;
+ logger.error(errorMessage);
+ } catch (NoClassDefFoundError e) {
+ lastException = e;
+ errorMessage = "Error: Task requested not found: " + taskName;
+ logger.error(errorMessage, e);
+ } catch (NoSuchMethodException e) {
+ lastException = e;
+ errorMessage = "Error: Task requested does not have a static run method.";
+ logger.error(errorMessage);
+ } catch (Throwable e) {
+ lastException = e;
+ errorMessage = "An error occurred while running the task.";
+ logger.error(errorMessage, e);
+ }
+
+ if (executedSuccessfully) break;
+ else retryCount++;
}
- // Locate the task, then invoke it, supplying our message.
- // Cache methods after lookup to avoid unnecessary reflection lookups.
- try {
+ // Deliver an e-mail error notification if enabled.
+ if (enableEmailErrors && !executedSuccessfully) {
+ String email = "Error running task: " + taskName + ".\n\n"
+ + "Attempted executing " + retryCount + " times as specified.\n\n"
+ + "The original input was: \n\n" + rawMessage + "\n\n"
+ + "Here's the error that resulted while running the task:\n\n"
+ + stackToString(lastException);
- TaskExecutor.execute(taskName, message);
- executedSuccessfully = true;
-
- } catch (ClassNotFoundException e) {
- lastException = e;
- errorMessage = "Error: Task requested not found: " + taskName;
- logger.error(errorMessage);
- } catch (NoClassDefFoundError e) {
- lastException = e;
- errorMessage = "Error: Task requested not found: " + taskName;
- logger.error(errorMessage, e);
- } catch (NoSuchMethodException e) {
- lastException = e;
- errorMessage = "Error: Task requested does not have a static run method.";
- logger.error(errorMessage);
- } catch (Throwable e) {
- lastException = e;
- errorMessage = "An error occurred while running the task.";
- logger.error(errorMessage, e);
+ try { MailQueue.put(email); }
+ catch (InterruptedException e) { }
}
-
- if (executedSuccessfully) break;
- else retryCount++;
- }
- // Deliver an e-mail error notification if enabled.
- if (enableEmailErrors && !executedSuccessfully) {
- String email = "Error running task: " + taskName + ".\n\n"
- + "Attempted executing " + retryCount + " times as specified.\n\n"
- + "The original input was: \n\n" + rawMessage + "\n\n"
- + "Here's the error that resulted while running the task:\n\n"
- + stackToString(lastException);
+ long finishedAt = System.nanoTime();
+ Metrics.update(taskName, finishedAt - startedAt, executedSuccessfully, retryCount);
- try { MailQueue.put(email); }
- catch (InterruptedException e) { }
+ return executedSuccessfully;
}
- long finishedAt = System.nanoTime();
- Metrics.update(taskName, finishedAt - startedAt, executedSuccessfully, retryCount);
-
- return executedSuccessfully;
-}
-
// Opens up a connection to RabbitMQ, retrying every five seconds
// if the queue server is unavailable.
private Channel getAMQPChannel(Queue queue) {
@@ -248,7 +248,7 @@ private Channel getAMQPChannel(Queue queue) {
channel = connection.createChannel();
consumer = new QueueingConsumer(channel);
channel.exchangeDeclare(queue.queueName, "direct", true);
- channel.queueDeclare(queue.queueName, true, false, false, null);
+ channel.queueDeclare(queue.queueName, true, false, false, queue.arguments);
channel.queueBind(queue.queueName, queue.queueName, queue.queueName);
channel.basicConsume(queue.queueName, false, consumer);
logger.info("Connected to RabbitMQ");
View
3  src/main/java/com/urbanairship/octobot/RabbitMQ.java
@@ -22,6 +22,9 @@ public RabbitMQ(Queue queue) {
factory.setUsername(queue.username);
factory.setPassword(queue.password);
factory.setVirtualHost(queue.vhost);
+ if (queue.heartbeat != null) {
+ factory.setRequestedHeartbeat(queue.heartbeat);
+ }
}
// Returns a new connection to an AMQP queue.
Something went wrong with that request. Please try again.