Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
...
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 12 files changed
  • 0 commit comments
  • 2 contributors
View
32 examples/simple/ruby_version_topology.rb
@@ -0,0 +1,32 @@
+require 'red_storm'
+
+# this example topology only prints the Ruby version string. No tuple is emitted.
+
+module RedStorm
+ module Examples
+ class VersionSpout < RedStorm::SimpleSpout
+ output_fields :dummy
+ on_init {log.info("****** JRuby version #{RUBY_VERSION}")}
+ on_send {}
+ end
+
+ class RubyVersionTopology < RedStorm::SimpleTopology
+ spout VersionSpout
+
+ configure do |env|
+ debug true
+
+ # set the JRuby version property for this topology. this will only affect remote cluster execution
+ # for local execution use the --1.8|--1.9 switch when launching
+ set "topology.worker.childopts", "-Djruby.compat.version=RUBY1_9"
+ end
+
+ on_submit do |env|
+ if env == :local
+ sleep(1)
+ cluster.shutdown
+ end
+ end
+ end
+ end
+end
View
18 lib/red_storm/application.rb
@@ -13,15 +13,19 @@ def run(args)
if ["install", "examples", "jar"].include?(args[0])
load(TASKS_FILE)
Rake::Task[args.shift].invoke(*args)
- elsif args.size == 2 && ["local"].include?(args[0]) && File.exist?(args[1])
- load(TASKS_FILE)
- Rake::Task['launch'].invoke(*args)
- else
- usage
+ exit
+ elsif args.size >= 2 && args.include?("local")
+ args.delete("local")
+ version = args.delete("--1.8") || args.delete("--1.9") || "--1.8"
+ if args.size == 1
+ file = args[0]
+ load(TASKS_FILE)
+ Rake::Task['launch'].invoke("local", version, file)
+ exit
+ end
end
- else
- usage
end
+ usage
end
end
end
View
1  lib/red_storm/proxy/bolt.rb
@@ -8,6 +8,7 @@
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
+java_import 'org.apache.log4j.Logger'
java_package 'redstorm.proxy'
View
2  lib/red_storm/proxy/spout.rb
@@ -8,6 +8,7 @@
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
+java_import 'org.apache.log4j.Logger'
java_package 'redstorm.proxy'
@@ -25,6 +26,7 @@
# - fail(msg_id)
# - close
#
+
class Spout
java_implements IRichSpout
View
13 lib/red_storm/simple_bolt.rb
@@ -5,6 +5,10 @@ class SimpleBolt
# DSL class methods
+ def self.log
+ @log ||= Logger.getLogger(self.name)
+ end
+
def self.output_fields(*fields)
@fields = fields.map(&:to_s)
end
@@ -27,6 +31,10 @@ def self.on_close(method_name = nil, &close_block)
# DSL instance methods
+ def log
+ self.class.log
+ end
+
def unanchored_emit(*values)
@collector.emit(Values.new(*values))
end
@@ -64,13 +72,12 @@ def declare_output_fields(declarer)
declarer.declare(Fields.new(self.class.fields))
end
- # default optional dsl methods/callbacks
+ private
+ # default noop optional dsl callbacks
def on_init; end
def on_close; end
- private
-
def self.fields
@fields ||= []
end
View
13 lib/red_storm/simple_spout.rb
@@ -9,6 +9,10 @@ def self.set(options = {})
self.spout_options.merge!(options)
end
+ def self.log
+ @log ||= Logger.getLogger(self.name)
+ end
+
def self.output_fields(*fields)
@fields = fields.map(&:to_s)
end
@@ -43,6 +47,10 @@ def emit(*values)
@collector.emit(Values.new(*values))
end
+ def log
+ self.class.log
+ end
+
# Spout proxy interface
def next_tuple
@@ -84,15 +92,14 @@ def fail(msg_id)
instance_exec(msg_id, &self.class.on_fail_block)
end
- # default optional dsl methods/callbacks
+ private
+ # default optional noop dsl methods/callbacks
def on_init; end
def on_close; end
def on_ack(msg_id); end
def on_fail(msg_id); end
- private
-
def self.fields
@fields ||= []
end
View
4 lib/red_storm/simple_topology.rb
@@ -64,6 +64,10 @@ def initialize
@config = Config.new
end
+ def set(attribute, value)
+ @config.put(attribute, value)
+ end
+
def method_missing(sym, *args)
config_method = "set#{self.class.camel_case(sym)}"
@config.send(config_method, *args)
View
4 lib/red_storm/topology_launcher.rb
@@ -21,6 +21,8 @@
java_import 'redstorm.storm.jruby.JRubyBolt'
java_import 'redstorm.storm.jruby.JRubySpout'
+java_import 'org.apache.log4j.Logger'
+
java_package 'redstorm'
# TopologyLauncher is the application entry point when launching a topology. Basically it will
@@ -36,7 +38,7 @@ def self.main(args)
env = args[0].to_sym
class_path = args[1]
- require class_path
+ require "./#{class_path}" # ./ for 1.9 compatibility
topology_name = RedStorm::Configuration.topology_class.respond_to?(:topology_name) ? "/#{RedStorm::Configuration.topology_class.topology_name}" : ''
puts("RedStorm v#{RedStorm::VERSION} starting topology #{RedStorm::Configuration.topology_class.name}#{topology_name} in #{env.to_s} environment")
View
5 lib/tasks/red_storm.rake
@@ -25,9 +25,10 @@ JRUBY_SRC_DIR = "#{RedStorm::REDSTORM_HOME}/lib"
SRC_EXAMPLES = "#{RedStorm::REDSTORM_HOME}/examples"
DST_EXAMPLES = "#{CWD}/examples"
-task :launch, :env, :class_file do |t, args|
+task :launch, :env, :version, :class_file do |t, args|
+ version_token = args[:version] == "--1.9" ? "RUBY1_9" : "RUBY1_8"
gem_home = ENV["GEM_HOME"].to_s.empty? ? " -Djruby.gem.home=`gem env home`" : ""
- command = "java -cp \"#{TARGET_CLASSES_DIR}:#{TARGET_DEPENDENCY_DIR}/*\"#{gem_home} redstorm.TopologyLauncher #{args[:env]} #{args[:class_file]}"
+ command = "java -Djruby.compat.version=#{version_token} -cp \"#{TARGET_CLASSES_DIR}:#{TARGET_DEPENDENCY_DIR}/*\"#{gem_home} redstorm.TopologyLauncher #{args[:env]} #{args[:class_file]}"
puts("launching #{command}")
system(command)
end
View
4 pom.xml
@@ -33,7 +33,7 @@
<dependency>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
- <version>1.6.6</version>
+ <version>1.6.7</version>
</dependency>
</dependencies>
@@ -48,7 +48,7 @@
<artifactItem>
<groupId>org.jruby</groupId>
<artifactId>jruby-complete</artifactId>
- <version>1.6.6</version>
+ <version>1.6.7</version>
<type>jar</type>
<overWrite>false</overWrite>
</artifactItem>
View
94 spec/red_storm/simple_bolt_spec.rb
@@ -11,18 +11,27 @@
describe "interface" do
it "should implement bolt proxy" do
- spout = RedStorm::SimpleBolt.new
- spout.should respond_to :execute
- spout.should respond_to :cleanup
- spout.should respond_to :prepare
- spout.should respond_to :declare_output_fields
+ bolt = RedStorm::SimpleBolt.new
+ bolt.should respond_to :execute
+ bolt.should respond_to :cleanup
+ bolt.should respond_to :prepare
+ bolt.should respond_to :declare_output_fields
end
- it "should implement dsl statement" do
+ it "should implement dsl class statements" do
RedStorm::SimpleBolt.should respond_to :output_fields
RedStorm::SimpleBolt.should respond_to :on_init
RedStorm::SimpleBolt.should respond_to :on_close
RedStorm::SimpleBolt.should respond_to :on_receive
+ RedStorm::SimpleBolt.should respond_to :log
+ end
+
+ it "should implement dsl instance statements" do
+ bolt = RedStorm::SimpleBolt.new
+ bolt.should respond_to :unanchored_emit
+ bolt.should respond_to :anchored_emit
+ bolt.should respond_to :ack
+ bolt.should respond_to :log
end
end
@@ -188,7 +197,6 @@ class Bolt1 < RedStorm::SimpleBolt
Bolt1.send(:ack?).should be_true
Bolt1.send(:anchor?).should be_true
end
-
end
describe "with default method" do
@@ -240,7 +248,6 @@ class Bolt1 < RedStorm::SimpleBolt
Bolt1.send(:ack?).should be_true
Bolt1.send(:anchor?).should be_true
end
-
end
end
@@ -289,6 +296,77 @@ class Bolt1 < RedStorm::SimpleBolt
bolt.cleanup
end
end
+
+ # log specs are mostly the same ats in the spout specs. if these are modified, sync with spout
+ describe "log statement" do
+
+ class Logger; end # mock log4j Logger class which does not exists in the specs context
+
+ describe "in class" do
+ it "should proxy to storm log4j logger" do
+ logger = mock(Logger)
+ Logger.should_receive("getLogger").with("Bolt1").and_return(logger)
+ logger.should_receive(:info).with("test")
+
+ class Bolt1 < RedStorm::SimpleBolt
+ log.info("test")
+ end
+ end
+
+ it "should use own class name as logger id" do
+ logger1 = mock(Logger)
+ logger2 = mock(Logger)
+ Logger.should_receive("getLogger").with("Bolt1").and_return(logger1)
+ Logger.should_receive("getLogger").with("Bolt2").and_return(logger2)
+ logger1.should_receive(:info).with("test1")
+ logger2.should_receive(:info).with("test2")
+
+ class Bolt1 < RedStorm::SimpleBolt
+ log.info("test1")
+ end
+ class Bolt2 < RedStorm::SimpleBolt
+ log.info("test2")
+ end
+ end
+ end
+
+ describe "in instance" do
+ it "should proxy to storm log4j logger" do
+ logger = mock(Logger)
+ Logger.should_receive("getLogger").with("Bolt1").and_return(logger)
+
+ class Bolt1 < RedStorm::SimpleBolt
+ on_init {log.info("test")}
+ end
+
+ logger.should_receive(:info).with("test")
+ bolt = Bolt1.new
+ bolt.prepare(nil, nil, nil)
+ end
+
+ it "should use own class name as logger id" do
+ logger1 = mock(Logger)
+ logger2 = mock(Logger)
+ Logger.should_receive("getLogger").with("Bolt1").and_return(logger1)
+ Logger.should_receive("getLogger").with("Bolt2").and_return(logger2)
+
+ class Bolt1 < RedStorm::SimpleBolt
+ on_init {log.info("test1")}
+ end
+ class Bolt2 < RedStorm::SimpleBolt
+ on_init {log.info("test2")}
+ end
+
+ logger1.should_receive(:info).with("test1")
+ bolt1 = Bolt1.new
+ bolt1.prepare(nil, nil, nil)
+
+ logger2.should_receive(:info).with("test2")
+ bolt2 = Bolt2.new
+ bolt2.prepare(nil, nil, nil)
+ end
+ end
+ end
end
describe "bolt" do
View
80 spec/red_storm/simple_spout_spec.rb
@@ -21,7 +21,7 @@
spout.should respond_to :fail
end
- it "should implement dsl statement" do
+ it "should implement dsl class statement" do
RedStorm::SimpleSpout.should respond_to :set
RedStorm::SimpleSpout.should respond_to :output_fields
RedStorm::SimpleSpout.should respond_to :on_init
@@ -29,7 +29,15 @@
RedStorm::SimpleSpout.should respond_to :on_send
RedStorm::SimpleSpout.should respond_to :on_ack
RedStorm::SimpleSpout.should respond_to :on_fail
+ RedStorm::SimpleSpout.should respond_to :log
end
+
+ it "should implement dsl instance statements" do
+ spout = RedStorm::SimpleSpout.new
+ spout.should respond_to :emit
+ spout.should respond_to :log
+ end
+
end
describe "dsl" do
@@ -316,6 +324,76 @@ def on_fail(msg_id); test_method(msg_id); end
end
end
+ # log specs are mostly the same ats in the bolt specs. if these are modified, sync with bolt
+ describe "log statement" do
+
+ class Logger; end # mock log4j Logger class which does not exists in the specs context
+
+ describe "in class" do
+ it "should proxy to storm log4j logger" do
+ logger = mock(Logger)
+ Logger.should_receive("getLogger").with("Spout1").and_return(logger)
+ logger.should_receive(:info).with("test")
+
+ class Spout1 < RedStorm::SimpleSpout
+ log.info("test")
+ end
+ end
+
+ it "should use own class name as logger id" do
+ logger1 = mock(Logger)
+ logger2 = mock(Logger)
+ Logger.should_receive("getLogger").with("Spout1").and_return(logger1)
+ Logger.should_receive("getLogger").with("Spout2").and_return(logger2)
+ logger1.should_receive(:info).with("test1")
+ logger2.should_receive(:info).with("test2")
+
+ class Spout1 < RedStorm::SimpleSpout
+ log.info("test1")
+ end
+ class Spout2 < RedStorm::SimpleSpout
+ log.info("test2")
+ end
+ end
+ end
+
+ describe "in instance" do
+ it "should proxy to storm log4j logger" do
+ logger = mock(Logger)
+ Logger.should_receive("getLogger").with("Spout1").and_return(logger)
+
+ class Spout1 < RedStorm::SimpleSpout
+ on_init {log.info("test")}
+ end
+
+ logger.should_receive(:info).with("test")
+ spout = Spout1.new
+ spout.open(nil, nil, nil)
+ end
+
+ it "should use own class name as logger id" do
+ logger1 = mock(Logger)
+ logger2 = mock(Logger)
+ Logger.should_receive("getLogger").with("Spout1").and_return(logger1)
+ Logger.should_receive("getLogger").with("Spout2").and_return(logger2)
+
+ class Spout1 < RedStorm::SimpleSpout
+ on_init {log.info("test1")}
+ end
+ class Spout2 < RedStorm::SimpleSpout
+ on_init {log.info("test2")}
+ end
+
+ logger1.should_receive(:info).with("test1")
+ spout1 = Spout1.new
+ spout1.open(nil, nil, nil)
+
+ logger2.should_receive(:info).with("test2")
+ spout2 = Spout2.new
+ spout2.open(nil, nil, nil)
+ end
+ end
+ end
end
describe "spout" do

No commit comments for this range

Something went wrong with that request. Please try again.