Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ public class TotalWordCounter implements IBasicBolt {
private static final Random RANDOM = new Random();
private BigInteger total = BigInteger.ZERO;

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context) {
}

/*
* Just output the word value with a count of 1.
* The HBaseBolt will handle incrementing the counter.
*/
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
total = total.add(new BigInteger(input.getValues().get(1).toString()));
collector.emit(tuple(total.toString()));
Expand All @@ -48,10 +50,12 @@ public void execute(Tuple input, BasicOutputCollector collector) {
}
}

@Override
public void cleanup() {
LOG.info("Final total = " + total);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("total"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,25 @@
public class WordCounter implements IBasicBolt {


@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context) {
}

/*
* Just output the word value with a count of 1.
* The HBaseBolt will handle incrementing the counter.
*/
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
collector.emit(tuple(input.getValues().get(0), 1));
}

@Override
public void cleanup() {

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,29 +39,35 @@ public boolean isDistributed() {
return this.isDistributed;
}

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}

@Override
public void close() {

}

@Override
public void nextTuple() {
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
this.collector.emit(new Values(word), UUID.randomUUID());
Thread.yield();
}

@Override
public void ack(Object msgId) {

}

@Override
public void fail(Object msgId) {

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,16 +118,19 @@ public static class SentenceSpout extends BaseRichSpout {
private int count = 0;
private long total = 0L;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence", "timestamp"));
}

@Override
public void open(Map<String, Object> config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.pending = new ConcurrentHashMap<UUID, Values>();
}

@Override
public void nextTuple() {
Values values = new Values(sentences[index], System.currentTimeMillis());
UUID msgId = UUID.randomUUID();
Expand All @@ -146,10 +149,12 @@ public void nextTuple() {
Thread.yield();
}

@Override
public void ack(Object msgId) {
this.pending.remove(msgId);
}

@Override
public void fail(Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
Expand All @@ -161,15 +166,18 @@ public static class MyBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;
private OutputCollector collector;

@Override
public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
this.counts = new HashMap<String, Long>();
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
collector.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,16 +124,19 @@ public static class SentenceSpout extends BaseRichSpout {
private int count = 0;
private long total = 0L;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence", "timestamp"));
}

@Override
public void open(Map<String, Object> config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.pending = new ConcurrentHashMap<UUID, Values>();
}

@Override
public void nextTuple() {
Values values = new Values(sentences[index], System.currentTimeMillis());
UUID msgId = UUID.randomUUID();
Expand All @@ -152,11 +155,13 @@ public void nextTuple() {
Thread.yield();
}

@Override
public void ack(Object msgId) {
// System.out.println("ACK");
this.pending.remove(msgId);
}

@Override
public void fail(Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
Expand All @@ -169,16 +174,19 @@ public static class MyBolt extends BaseRichBolt {
private HashMap<String, Long> counts = null;
private OutputCollector collector;

@Override
public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
this.counts = new HashMap<String, Long>();
this.collector = collector;
}

@Override
public void execute(Tuple tuple) {
collector.ack(tuple);
}


@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// this bolt does not emit anything
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,12 @@ public UserDataSpout withDataFile (String filePath) {
return this;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(this.outputFields));
}

@Override
public void open(Map<String, Object> config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
Expand All @@ -135,6 +137,7 @@ public void open(Map<String, Object> config, TopologyContext context,
}
}

@Override
public void nextTuple() {
String line;
try {
Expand All @@ -161,10 +164,12 @@ public void nextTuple() {
}
}

@Override
public void ack(Object msgId) {
this.pending.remove(msgId);
}

@Override
public void fail(Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,19 @@ public static class UserDataSpout extends BaseRichSpout {
private int count = 0;
private long total = 0L;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","name","phone","street","city","state"));
}

@Override
public void open(Map<String, Object> config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.pending = new ConcurrentHashMap<UUID, Values>();
}

@Override
public void nextTuple() {
String[] user = sentences[index].split(",");
Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
Expand All @@ -129,10 +132,12 @@ public void nextTuple() {
Thread.yield();
}

@Override
public void ack(Object msgId) {
this.pending.remove(msgId);
}

@Override
public void fail(Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,19 @@ public static class UserDataSpout extends BaseRichSpout {
private int count = 0;
private long total = 0L;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","name","phone","street","city","state"));
}

@Override
public void open(Map<String, Object> config, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
this.pending = new ConcurrentHashMap<UUID, Values>();
}

@Override
public void nextTuple() {
String[] user = sentences[index].split(",");
Values values = new Values(Integer.parseInt(user[0]),user[1],user[2],user[3],user[4],user[5]);
Expand All @@ -130,10 +133,12 @@ public void nextTuple() {
}
}

@Override
public void ack(Object msgId) {
this.pending.remove(msgId);
}

@Override
public void fail(Object msgId) {
System.out.println("**** RESENDING FAILED TUPLE");
this.collector.emit(this.pending.get(msgId), msgId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,35 @@ public boolean isDistributed() {
return this.isDistributed;
}

@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}

@Override
public void close() {

}

@Override
public void nextTuple() {
final Random rand = new Random();
final Values row = rows.get(rand.nextInt(rows.size() - 1));
this.collector.emit(row);
Thread.yield();
}

@Override
public void ack(Object msgId) {

}

@Override
public void fail(Object msgId) {

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("user_id","user_name","create_date"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@ public GenericBolt(String name, boolean autoAck, boolean autoAnchor) {
this(name, autoAck, autoAnchor, null);
}

@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;

}

@Override
public void execute(Tuple input) {
LOG.debug("[" + this.name + "] Received message: " + input);

Expand All @@ -94,10 +96,12 @@ public void execute(Tuple input) {

}

@Override
public void cleanup() {

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
if (this.declaredFields != null) {
declarer.declare(this.declaredFields);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
@SuppressWarnings("serial")
public class JsonTupleProducer implements JmsTupleProducer {

@Override
public Values toTuple(Message msg) throws JMSException {
if(msg instanceof TextMessage){
String json = ((TextMessage) msg).getText();
Expand All @@ -51,6 +52,7 @@ public Values toTuple(Message msg) throws JMSException {
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("json"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ public SpringJmsProvider(String appContextClasspathResource, String connectionFa
this.destination = (Destination)context.getBean(destinationBean);
}

@Override
public ConnectionFactory connectionFactory() throws Exception {
return this.connectionFactory;
}

@Override
public Destination destination() throws Exception {
return this.destination;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public static void main(String[] args) throws Exception {
new KafkaSpoutTopologyMainWildcardTopicsLocal().runExample();
}

@Override
protected KafkaSpoutTopologyMainNamedTopics getTopology() {
return new KafkaSpoutTopologyMainWildcardTopics();
}
Expand Down
Loading