forked from colinsurprenant/redstorm
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
all examples in RedStorm::Examples namespace and descriptive ids
- Loading branch information
1 parent
f482f31
commit 0128e29
Showing
17 changed files
with
388 additions
and
322 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,20 +1,25 @@ | ||
require 'red_storm' | ||
require 'examples/native/random_sentence_spout' | ||
require 'examples/native/split_sentence_bolt' | ||
require 'examples/native/word_count_bolt' | ||
|
||
class ClusterWordCountTopology | ||
RedStorm::Configuration.topology_class = self | ||
module RedStorm | ||
module Examples | ||
class ClusterWordCountTopology | ||
RedStorm::Configuration.topology_class = self | ||
|
||
def start(base_class_path, env) | ||
builder = TopologyBuilder.new | ||
builder.setSpout('1', JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5) | ||
builder.setBolt('2', JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 4).shuffleGrouping('1') | ||
builder.setBolt('3', JRubyBolt.new(base_class_path, "WordCountBolt"), 4).fieldsGrouping('2', Fields.new("word")) | ||
def start(base_class_path, env) | ||
builder = TopologyBuilder.new | ||
builder.setSpout('RandomSentenceSpout', JRubySpout.new(base_class_path, "RedStorm::Examples::RandomSentenceSpout"), 5) | ||
builder.setBolt('SplitSentenceBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::SplitSentenceBolt"), 4).shuffleGrouping('RandomSentenceSpout') | ||
builder.setBolt('WordCountBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::WordCountBolt"), 4).fieldsGrouping('SplitSentenceBolt', Fields.new("word")) | ||
|
||
conf = Config.new | ||
conf.setDebug(true) | ||
conf.setNumWorkers(20); | ||
conf.setMaxSpoutPending(1000); | ||
StormSubmitter.submitTopology("word-count", conf, builder.createTopology); | ||
conf = Config.new | ||
conf.setDebug(true) | ||
conf.setNumWorkers(20); | ||
conf.setMaxSpoutPending(1000); | ||
StormSubmitter.submitTopology("word_count", conf, builder.createTopology); | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,18 @@ | ||
class ExclamationBolt | ||
def prepare(conf, context, collector) | ||
@collector = collector | ||
end | ||
module RedStorm | ||
module Examples | ||
class ExclamationBolt | ||
def prepare(conf, context, collector) | ||
@collector = collector | ||
end | ||
|
||
def execute(tuple) | ||
@collector.emit(tuple, Values.new(tuple.getString(0) + "!!!")) | ||
@collector.ack(tuple) | ||
end | ||
def execute(tuple) | ||
@collector.emit(tuple, Values.new(tuple.getString(0) + "!!!")) | ||
@collector.ack(tuple) | ||
end | ||
|
||
def declare_output_fields(declarer) | ||
declarer.declare(Fields.new("word")) | ||
def declare_output_fields(declarer) | ||
declarer.declare(Fields.new("word")) | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,25 +1,31 @@ | ||
java_import 'backtype.storm.testing.TestWordSpout' | ||
|
||
require 'lib/red_storm' | ||
require 'examples/native/exclamation_bolt' | ||
|
||
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt | ||
|
||
class LocalExclamationTopology | ||
RedStorm::Configuration.topology_class = self | ||
module RedStorm | ||
module Examples | ||
class LocalExclamationTopology | ||
RedStorm::Configuration.topology_class = self | ||
|
||
def start(base_class_path, env) | ||
builder = TopologyBuilder.new | ||
|
||
builder.setSpout('1', TestWordSpout.new, 10) | ||
builder.setBolt('2', JRubyBolt.new(base_class_path, "ExclamationBolt"), 3).shuffleGrouping('1') | ||
builder.setBolt('3', JRubyBolt.new(base_class_path, "ExclamationBolt"), 2).shuffleGrouping('2') | ||
|
||
conf = Config.new | ||
conf.setDebug(true) | ||
|
||
cluster = LocalCluster.new | ||
cluster.submitTopology("test", conf, builder.createTopology) | ||
sleep(5) | ||
cluster.killTopology("test") | ||
cluster.shutdown | ||
def start(base_class_path, env) | ||
builder = TopologyBuilder.new | ||
|
||
builder.setSpout('TestWordSpout', TestWordSpout.new, 10) | ||
builder.setBolt('ExclamationBolt1', JRubyBolt.new(base_class_path, 'RedStorm::Examples::ExclamationBolt'), 3).shuffleGrouping('TestWordSpout') | ||
builder.setBolt('ExclamationBolt2', JRubyBolt.new(base_class_path, 'RedStorm::Examples::ExclamationBolt'), 3).shuffleGrouping('ExclamationBolt1') | ||
|
||
conf = Config.new | ||
conf.setDebug(true) | ||
|
||
cluster = LocalCluster.new | ||
cluster.submitTopology("exclamation", conf, builder.createTopology) | ||
sleep(5) | ||
cluster.killTopology("exclamation") | ||
cluster.shutdown | ||
end | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,39 +1,45 @@ | ||
java_import 'backtype.storm.testing.TestWordSpout' | ||
|
||
class ExclamationBolt2 | ||
def prepare(conf, context, collector) | ||
@collector = collector | ||
end | ||
require 'lib/red_storm' | ||
|
||
def execute(tuple) | ||
@collector.emit(tuple, Values.new(tuple.getString(0) + "!!!")) | ||
@collector.ack(tuple) | ||
end | ||
module RedStorm | ||
module Examples | ||
class ExclamationBolt2 | ||
def prepare(conf, context, collector) | ||
@collector = collector | ||
end | ||
|
||
def declare_output_fields(declarer) | ||
declarer.declare(Fields.new("word")) | ||
end | ||
end | ||
def execute(tuple) | ||
@collector.emit(tuple, Values.new("!#{tuple.getString(0)}!")) | ||
@collector.ack(tuple) | ||
end | ||
|
||
def declare_output_fields(declarer) | ||
declarer.declare(Fields.new("word")) | ||
end | ||
end | ||
|
||
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt | ||
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt | ||
|
||
class LocalExclamationTopology2 | ||
RedStorm::Configuration.topology_class = self | ||
class LocalExclamationTopology2 | ||
RedStorm::Configuration.topology_class = self | ||
|
||
def start(base_class_path, env) | ||
builder = TopologyBuilder.new | ||
|
||
builder.setSpout('1', TestWordSpout.new, 10) | ||
builder.setBolt('2', JRubyBolt.new(base_class_path, "ExclamationBolt2"), 3).shuffleGrouping('1') | ||
builder.setBolt('3', JRubyBolt.new(base_class_path, "ExclamationBolt2"), 2).shuffleGrouping('2') | ||
|
||
conf = Config.new | ||
conf.setDebug(true) | ||
|
||
cluster = LocalCluster.new | ||
cluster.submitTopology("test", conf, builder.createTopology) | ||
sleep(5) | ||
cluster.killTopology("test") | ||
cluster.shutdown | ||
def start(base_class_path, env) | ||
builder = TopologyBuilder.new | ||
|
||
builder.setSpout('TestWordSpout', TestWordSpout.new, 10) | ||
builder.setBolt('ExclamationBolt21', JRubyBolt.new(base_class_path, "RedStorm::Examples::ExclamationBolt2"), 3).shuffleGrouping('TestWordSpout') | ||
builder.setBolt('ExclamationBolt22', JRubyBolt.new(base_class_path, "RedStorm::Examples::ExclamationBolt2"), 2).shuffleGrouping('ExclamationBolt21') | ||
|
||
conf = Config.new | ||
conf.setDebug(true) | ||
|
||
cluster = LocalCluster.new | ||
cluster.submitTopology("exclamation", conf, builder.createTopology) | ||
sleep(5) | ||
cluster.killTopology("exclamation") | ||
cluster.shutdown | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,60 +1,65 @@ | ||
require 'redis' | ||
require 'thread' | ||
require 'lib/red_storm' | ||
require 'examples/native/word_count_bolt' | ||
|
||
# RedisWordSpout reads the Redis queue "test" on localhost:6379 | ||
# and emits each word items pop'ed from the queue. | ||
class RedisWordSpout | ||
def open(conf, context, collector) | ||
@collector = collector | ||
@q = Queue.new | ||
@redis_reader = detach_redis_reader | ||
end | ||
|
||
def next_tuple | ||
# per doc nextTuple should not block, and sleep a bit when there's no data to process. | ||
if @q.size > 0 | ||
@collector.emit(Values.new(@q.pop)) | ||
else | ||
sleep(0.1) | ||
end | ||
end | ||
module RedStorm | ||
module Examples | ||
# RedisWordSpout reads the Redis queue "test" on localhost:6379 | ||
# and emits each word items pop'ed from the queue. | ||
class RedisWordSpout | ||
def open(conf, context, collector) | ||
@collector = collector | ||
@q = Queue.new | ||
@redis_reader = detach_redis_reader | ||
end | ||
|
||
def next_tuple | ||
# per doc nextTuple should not block, and sleep a bit when there's no data to process. | ||
if @q.size > 0 | ||
@collector.emit(Values.new(@q.pop)) | ||
else | ||
sleep(0.1) | ||
end | ||
end | ||
|
||
def declare_output_fields(declarer) | ||
declarer.declare(Fields.new("word")) | ||
end | ||
def declare_output_fields(declarer) | ||
declarer.declare(Fields.new("word")) | ||
end | ||
|
||
private | ||
private | ||
|
||
def detach_redis_reader | ||
Thread.new do | ||
Thread.current.abort_on_exception = true | ||
def detach_redis_reader | ||
Thread.new do | ||
Thread.current.abort_on_exception = true | ||
|
||
redis = Redis.new(:host => "localhost", :port => 6379) | ||
loop do | ||
if data = redis.blpop("test", 0) | ||
@q << data[1] | ||
redis = Redis.new(:host => "localhost", :port => 6379) | ||
loop do | ||
if data = redis.blpop("test", 0) | ||
@q << data[1] | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end | ||
end | ||
|
||
class LocalRedisWordCountTopology | ||
RedStorm::Configuration.topology_class = self | ||
class LocalRedisWordCountTopology | ||
RedStorm::Configuration.topology_class = self | ||
|
||
def start(base_class_path, env) | ||
builder = TopologyBuilder.new | ||
builder.setSpout('1', JRubySpout.new(base_class_path, "RedisWordSpout"), 1) | ||
builder.setBolt('2', JRubyBolt.new(base_class_path, "WordCountBolt"), 3).fieldsGrouping('1', Fields.new("word")) | ||
def start(base_class_path, env) | ||
builder = TopologyBuilder.new | ||
builder.setSpout('RedisWordSpout', JRubySpout.new(base_class_path, "RedStorm::Examples::RedisWordSpout"), 1) | ||
builder.setBolt('WordCountBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::WordCountBolt"), 3).fieldsGrouping('RedisWordSpout', Fields.new("word")) | ||
|
||
conf = Config.new | ||
conf.setDebug(true) | ||
conf.setMaxTaskParallelism(3) | ||
conf = Config.new | ||
conf.setDebug(true) | ||
conf.setMaxTaskParallelism(3) | ||
|
||
cluster = LocalCluster.new | ||
cluster.submitTopology("redis-word-count", conf, builder.createTopology) | ||
sleep(600) | ||
cluster.shutdown | ||
cluster = LocalCluster.new | ||
cluster.submitTopology("redis_word_count", conf, builder.createTopology) | ||
sleep(600) | ||
cluster.shutdown | ||
end | ||
end | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,30 @@ | ||
class RandomSentenceSpout | ||
attr_reader :is_distributed | ||
module RedStorm | ||
module Examples | ||
class RandomSentenceSpout | ||
attr_reader :is_distributed | ||
|
||
def initialize | ||
@is_distributed = true | ||
@sentences = [ | ||
"the cow jumped over the moon", | ||
"an apple a day keeps the doctor away", | ||
"four score and seven years ago", | ||
"snow white and the seven dwarfs", | ||
"i am at two with nature" | ||
] | ||
end | ||
def initialize | ||
@is_distributed = true | ||
@sentences = [ | ||
"the cow jumped over the moon", | ||
"an apple a day keeps the doctor away", | ||
"four score and seven years ago", | ||
"snow white and the seven dwarfs", | ||
"i am at two with nature" | ||
] | ||
end | ||
|
||
def open(conf, context, collector) | ||
@collector = collector | ||
end | ||
|
||
def next_tuple | ||
@collector.emit(Values.new(@sentences[rand(@sentences.length)])) | ||
end | ||
def open(conf, context, collector) | ||
@collector = collector | ||
end | ||
def next_tuple | ||
@collector.emit(Values.new(@sentences[rand(@sentences.length)])) | ||
end | ||
|
||
def declare_output_fields(declarer) | ||
declarer.declare(Fields.new("word")) | ||
def declare_output_fields(declarer) | ||
declarer.declare(Fields.new("word")) | ||
end | ||
end | ||
end | ||
end | ||
end |
Oops, something went wrong.