Skip to content

Commit

Permalink
Simple line-counting program to test the functionality of Storm.
Browse files Browse the repository at this point in the history
  • Loading branch information
alexktang committed Nov 27, 2011
1 parent ce22c63 commit d157a1a
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 0 deletions.
6 changes: 6 additions & 0 deletions commands.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Some useful commands

mvn exec:java -Dexec.mainClass=collabstream.lc.LineCountTopology -Dexec.args='local src/main/resources/lotr.txt' -Dexec.classpathScope=compile
mvn exec:java -Dexec.mainClass=collabstream.lc.LineCountTopology -Dexec.args='local src/main/resources/lotr.txt' -Dexec.classpathScope=compile | grep '########'
mvn compile exec:java -Dexec.mainClass=collabstream.lc.LineCountTopology -Dexec.args='local src/main/resources/lotr.txt' -Dexec.classpathScope=compile
mvn compile exec:java -Dexec.mainClass=collabstream.lc.LineCountTopology -Dexec.args='local src/main/resources/lotr.txt' -Dexec.classpathScope=compile | grep '########'
39 changes: 39 additions & 0 deletions src/main/java/collabstream/lc/CounterBolt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package collabstream.lc;

import java.util.HashSet;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class CounterBolt implements IRichBolt {
private OutputCollector collector;
private HashSet<Integer> lineNumSet = new HashSet<Integer>();


public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

public void cleanup() {
System.out.println("######## CounterBolt.cleanup: counted " + lineNumSet.size() + " lines");
}

public void execute(Tuple tuple) {
String line = tuple.getStringByField("line");
Integer lineNum = tuple.getIntegerByField("lineNum");
System.out.println("######## CounterBolt.execute: " + lineNum + ". " + line);
lineNumSet.add(lineNum);
collector.emit(tuple, new Values(lineNum));
collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("lineNum"));
}
}
75 changes: 75 additions & 0 deletions src/main/java/collabstream/lc/FileReaderSpout.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package collabstream.lc;

import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.LineNumberReader;
import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class FileReaderSpout implements IRichSpout {
private SpoutOutputCollector collector;
private String fileName;
private LineNumberReader in;

public FileReaderSpout(String fileName) {
this.fileName = fileName;
}

public boolean isDistributed() {
return false;
}

public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
try {
in = new LineNumberReader(new FileReader(fileName));
} catch (FileNotFoundException e){
System.err.print("######## FileReaderSpout.nextTuple: " + e);
}
}

public void close() {
if (in == null) return;
try {
in.close();
} catch (IOException e) {
System.err.print("######## FileReaderSpout.nextTuple: " + e);
}
}

public void ack(Object msgId) {
System.out.println("######## FileReaderSpout.ack: msgId=" + msgId);
}

public void fail(Object msgId) {
System.out.println("######## FileReaderSpout.fail: msgId=" + msgId);
}

public void nextTuple() {
if (in == null) return;
String line = null;

try {
line = in.readLine();
} catch (IOException e) {
System.err.print("######## FileReaderSpout.nextTuple: " + e);
}

if (line != null) {
int lineNum = in.getLineNumber();
System.out.println("######## FileReaderSpout.nextTuple: " + lineNum + ". " + line);
collector.emit(new Values(lineNum, line), lineNum);
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("lineNum", "line"));
}
}
43 changes: 43 additions & 0 deletions src/main/java/collabstream/lc/LineCountTopology.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package collabstream.lc;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

/**
* Simple line-counting program to test the functionality of Storm. Instantiates four classes: FileReaderSpout,
* CounterBolt, MsgForwarder, and MsgTracker. FileReaderSpout reads lines from a file and emits them. CounterBolt
* counts the lines received from FileReaderSpout and emits their corresponding line numbers. MsgForwarder just forwards
* the lines received from FileReaderSpout and the line numbers received from CounterBolt. MsgTracker receives the
* messages forwarded by MsgForwarder and keeps track of which lines were read and which lines were counted.
*/
public class LineCountTopology {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("######## Wrong number of arguments");
System.err.println("######## required args: local|production fileName");
return;
}

Config config = new Config();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(1, new FileReaderSpout(args[1]));
builder.setBolt(2, new CounterBolt()).shuffleGrouping(1);
builder.setBolt(3, new MsgForwarder(1,2)).shuffleGrouping(1).shuffleGrouping(2);
builder.setBolt(4, new MsgTracker(1,2)).shuffleGrouping(3,1).shuffleGrouping(3,2);

System.out.println("######## LineCountTopology.main: submitting topology");

if ("local".equals(args[0])) {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("line-count", config, builder.createTopology());
System.out.println("######## LineCountTopology.main: sleeping for 10 secs");
Utils.sleep(10000);
cluster.shutdown();
} else {
StormSubmitter.submitTopology("line-count", config, builder.createTopology());
}
}
}
38 changes: 38 additions & 0 deletions src/main/java/collabstream/lc/MsgForwarder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package collabstream.lc;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

public class MsgForwarder implements IRichBolt {
private OutputCollector collector;
public int fileReaderId, counterId;

public MsgForwarder(int fileReaderId, int counterId) {
this.fileReaderId = fileReaderId;
this.counterId = counterId;
}

public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

public void cleanup() {
}

public void execute(Tuple tuple) {
// The ID of the component that emitted the tuple is mapped directly to the ID of the forwarding stream.
collector.emit(tuple.getSourceComponent(), tuple.getValues());
collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream(fileReaderId, new Fields("lineNum", "line"));
declarer.declareStream(counterId, new Fields("lineNum"));
}
}
66 changes: 66 additions & 0 deletions src/main/java/collabstream/lc/MsgTracker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package collabstream.lc;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

public class MsgTracker implements IRichBolt {
private OutputCollector collector;
private int fileReaderId, counterId;
private HashMap<Integer, Record> lnToRec = new HashMap<Integer, Record>();

private static class Record {
boolean read = false;
boolean counted = false;

Record() {}

public String toString() {
return "read=" + (read ? 1 : 0) + ", counted=" + (counted ? 1 : 0);
}
}

public MsgTracker(int fileReaderId, int counterId) {
this.fileReaderId = fileReaderId;
this.counterId = counterId;
}

public void prepare(Map config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

public void cleanup() {
ArrayList<Integer> keys = new ArrayList<Integer>(lnToRec.keySet());
Collections.sort(keys);
for (Integer lineNum : keys) {
System.out.println("######## MsgTracker.cleanup: " + lineNum + ". " + lnToRec.get(lineNum));
}
}

public void execute(Tuple tuple) {
Integer lineNum = tuple.getIntegerByField("lineNum");
Record rec = lnToRec.get(lineNum);
if (rec == null) {
rec = new Record();
lnToRec.put(lineNum, rec);
}

int streamId = tuple.getSourceStreamId();
if (streamId == fileReaderId) {
rec.read = true;
} else if (streamId == counterId) {
rec.counted = true;
}
collector.ack(tuple);
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
8 changes: 8 additions & 0 deletions src/main/resources/lotr.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Three Rings for the Elven-kings under the sky,
Seven for the Dwarf-lords in their halls of stone,
Nine for Mortal Men doomed to die,
One for the Dark Lord on his dark throne
In the Land of Mordor where the Shadows lie.
One Ring to rule them all, One Ring to find them,
One Ring to bring them all and in the darkness bind them
In the Land of Mordor where the Shadows lie.

0 comments on commit d157a1a

Please sign in to comment.