Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base: 34a77d1870
...
compare: a3f1bf87f6
  • 5 commits
  • 4 files changed
  • 0 commit comments
  • 1 contributor
View
31 README.md
@@ -1,12 +1,12 @@
IronCount
=============
-IronCount provides a framework to manage consumers of Kafka message queues across multiple nodes.
+IronCount provides a framework to manage consumers of Kafka topics across multiple nodes.
Components
-----
IronCount works with three components. Two components are external Kafka and ZooKeeper. IronCount
-has one component WorkloadManager that runs on multiple machines.
+has one component named WorkloadManager that runs on multiple machines.
Workloads
-----
@@ -21,9 +21,10 @@ A Workload is an object that stores several pieces of information.
* properties: A map of properties that can be used for configuration
* active: a flag to start or pause workloads
-In a serialized form it looks like this:
+In a serialized form a workload looks like this:
- {"name":"testworkload"
+ {
+ "name":"testworkload"
,"topic":"topic1"
,"consumerGroup":"group1"
,"messageHandlerName":"com.jointhegrid.ironcount.eventtofile.MessageToFileHandler"
@@ -34,7 +35,7 @@ In a serialized form it looks like this:
}
To start a Workload create a JSON clob like the one above and save it to a file. Then use
-Deploy tool to write this entry to ZooKeeper. At this point WorkloadManagers should notice
+DeployTool to write this entry to ZooKeeper. At this point WorkloadManagers should notice
the changes to zookeeper and start instances of the Workload.
Extending
@@ -71,7 +72,7 @@ Demos
-----
IronCount has some build in demo's to show it's usefulness. The first is MockingBird, which offers Rainbird
-style URL counting, and data persistance to Cassandra. See `com.jointhegrid.ironcount.mockingbird.*` in the
+style URL counting and data persistance to Cassandra. See `com.jointhegrid.ironcount.mockingbird.*` in the
test packages.
public void handleMessage(Message m) {
@@ -92,11 +93,11 @@ test packages.
}
}
-The framework takes care of the transport and queuing and allows the user to focus on application logic.
+The framework takes care of the transport of messages and allows the user to focus on application logic.
-The second demo is a Join similar to s4's join demo. This one is implemented with two Kafka queues,
-a queue named `map` and a queue named `reduce`. The `MapHandler` handles messages from the map queue, processes
-them and then send them to the reduce queue.
+The second demo is a Join similar to s4's join demo. This one is implemented with two Kafka topics,
+a topic named `map` and a topic named `reduce`. The `MapHandler` handles messages from the map topic, processes
+them and then send them to the `reduce` topic.
@Override
public void setWorkload(Workload w) {
@@ -129,10 +130,12 @@ them and then send them to the reduce queue.
("reduce", columns[0], Arrays.asList(table+"|"+row)));
}
- Kafka has the notion of partitioners and the join key is used internally to route
-messages for the same user_id to the same handler. The ReduceHandler writes
-partial aggreggations as Cassandra counters made possible by Kafka's underlying partitioning. This system
-where one Workload creates data for another can be viewed as a pipe or a feedback loop.
+Kafka has the notion of partitioners and the join key is used internally to route
+messages for the same user_id to the same partition of a topic. The ReduceHandler writes
+partial aggreggations as Cassandra counters made possible by Kafka's underlying partitioning.
+
+In this example the map Workload creates data for the reduce Workload. You can imagine this
+as a pipe or a feedback loop.
@Override
public void handleMessage(Message m) {
View
2  pom.xml
@@ -4,7 +4,7 @@
<groupId>com.jointhegrid</groupId>
<artifactId>ironcount</artifactId>
<name>ironcount</name>
- <version>1.0.0-SNAPSHOT</version>
+ <version>1.0.1-SNAPSHOT</version>
<description></description>
<packaging>jar</packaging>
View
6 src/main/java/com/jointhegrid/ironcount/WorkerThread.java
@@ -131,7 +131,9 @@ public void run(){
public void run() {
for(Message message: stream) {
try {
- handler.handleMessage(message);
+ if (goOn){
+ handler.handleMessage(message);
+ }
} catch (Exception ex){
logger.error("worker thread fired exception "+workload+" "+ex);
goOn=false;
@@ -153,6 +155,8 @@ public void run() {
try {
this.m.getWorkerThreads().remove(this);
this.zk.close();
+ executor.shutdown();
+ logger.debug("thread tear down");
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
View
4 src/main/java/com/jointhegrid/ironcount/WorkloadManager.java
@@ -221,7 +221,7 @@ public void considerStartingWorkload(Workload w){
try {
map.writeValue(baos, w);
} catch (IOException ex) {
- System.out.println(ex);
+ logger.error(ex);
}
return baos.toByteArray();
}
@@ -233,7 +233,7 @@ public Workload deserializeWorkload(byte[] b) {
try {
work = (Workload) m.readValue(new String(b), t);
} catch (IOException ex) {
- System.out.println(ex);
+ logger.error(ex);
}
return work;
}

No commit comments for this range

Something went wrong with that request. Please try again.