Skip to content

Commit

Permalink
storm ids are now strings
Browse files Browse the repository at this point in the history
  • Loading branch information
colinsurprenant committed Dec 8, 2011
1 parent 52f8628 commit 851c412
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 78 deletions.
65 changes: 38 additions & 27 deletions lib/red_storm/simple_topology.rb
Expand Up @@ -8,11 +8,11 @@ class SimpleTopology

class ComponentDefinition
attr_reader :clazz, :parallelism
attr_accessor :id
attr_accessor :id # ids are forced to string

def initialize(component_class, id, parallelism)
@clazz = component_class
@id = id
@id = id.to_s
@parallelism = parallelism
end
end
Expand All @@ -28,7 +28,7 @@ def initialize(*args)
end

def source(source_id, grouping)
@sources << [source_id.is_a?(Class) ? SimpleTopology.underscore(source_id) : source_id, grouping.is_a?(Hash) ? grouping : {grouping => nil}]
@sources << [source_id.is_a?(Class) ? SimpleTopology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}]
end

def define_grouping(declarer)
Expand Down Expand Up @@ -131,33 +131,44 @@ def start(base_class_path, env)
private

def self.resolve_ids!(components)
next_numeric_id = 1
resolved_names = {}

numeric_components, symbolic_components = components.partition{|c| c.id.is_a?(Fixnum)}
numeric_ids = numeric_components.map(&:id)

# assign numeric ids to symbolic ids
symbolic_components.each do |component|
id = component.id.to_s
raise("duplicate symbolic id in #{component.clazz.name} on id=#{id}") if resolved_names.has_key?(id)
next_numeric_id += 1 while numeric_ids.include?(next_numeric_id)
numeric_ids << next_numeric_id
resolved_names[id] = next_numeric_id
end

# reassign numeric ids to all components
components.each do |component|
unless component.id.is_a?(Fixnum)
component.id = resolved_names[component.id.to_s] || raise("cannot resolve #{component.clazz.name} id=#{component.id.to_s}")
end
# verify duplicate implicit ids
ids = components.map(&:id)
components.reverse.each do |component|
raise("duplicate id in #{component.clazz.name} on id=#{component.id}") if ids.select{|id| id == component.id}.size > 1
if component.respond_to?(:sources)
component.sources.map! do |source_id, grouping|
id = source_id.is_a?(Fixnum) ? source_id : resolved_names[source_id.to_s] || raise("cannot resolve #{component.clazz.name} source id=#{source_id.to_s}")
[id, grouping]
end
component.sources.each{|source_id, grouping| raise("cannot resolve #{component.clazz.name} source id=#{source_id}") unless ids.include?(source_id)}
end
end


# next_numeric_id = 1
# resolved_names = {}


# numeric_components, symbolic_components = components.partition{|c| c.id.is_a?(Fixnum) || c.id.to_s =~ /^\d+$/ }
# numeric_ids = numeric_components.map(&:id).map(&:to_i)

# symbolic_components.each do |component|
# id = component.id.to_s
# raise("duplicate symbolic id in #{component.clazz.name} on id=#{id}") if resolved_names.has_key?(id)
# # next_numeric_id += 1 while numeric_ids.include?(next_numeric_id)
# # numeric_ids << next_numeric_id
# # resolved_names[id] = next_numeric_id
# resolved_names[id] = true
# end

# # reassign numeric ids to all components
# components.each do |component|
# unless component.id.is_a?(Fixnum)
# component.id = resolved_names[component.id.to_s] || raise("cannot resolve #{component.clazz.name} id=#{component.id.to_s}")
# end
# if component.respond_to?(:sources)
# component.sources.map! do |source_id, grouping|
# id = source_id.is_a?(Fixnum) ? source_id : resolved_names[source_id.to_s] || raise("cannot resolve #{component.clazz.name} source id=#{source_id.to_s}")
# [id, grouping]
# end
# end
# end
end

def self.spouts
Expand Down
81 changes: 30 additions & 51 deletions spec/red_storm/simple_topology_spec.rb
Expand Up @@ -219,8 +219,8 @@ class Topology1 < RedStorm::SimpleTopology
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout1)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2").and_return(jruby_spout2)

builder.should_receive("setSpout").with(1, jruby_spout1, 1)
builder.should_receive("setSpout").with(2, jruby_spout2, 1)
builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1)
builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1)
configurator.should_receive(:config).and_return("config")
builder.should_receive(:createTopology).and_return("topology")
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology")
Expand Down Expand Up @@ -265,8 +265,8 @@ class Topology1 < RedStorm::SimpleTopology
bolt_definition2.should_receive(:parallelism).and_return(3)
bolt_definition1.should_receive(:id).any_number_of_times.and_return("id1")
bolt_definition2.should_receive(:id).any_number_of_times.and_return("id2")
bolt_definition1.should_receive(:id=).with(1)
bolt_definition2.should_receive(:id=).with(2)
# bolt_definition1.should_receive(:id=).with('1')
# bolt_definition2.should_receive(:id=).with('2')

configurator.should_receive(:config).and_return("config")
builder.should_receive(:createTopology).and_return("topology")
Expand All @@ -282,80 +282,89 @@ class Topology1 < RedStorm::SimpleTopology
builder = mock(RedStorm::TopologyBuilder)
configurator = mock(RedStorm::SimpleTopology::Configurator)
jruby_bolt = mock(RedStorm::JRubyBolt)
jruby_spout = mock(RedStorm::JRubySpout)
@declarer = mock("InputDeclarer")
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1").and_return(jruby_bolt)
builder.should_receive("setBolt").with(1, jruby_bolt, 1).and_return(@declarer)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout)
builder.should_receive("setBolt").with('bolt_class1', jruby_bolt, 1).and_return(@declarer)
builder.should_receive("setSpout").with('1', jruby_spout, 1).and_return(@declarer)
configurator.should_receive(:config).and_return("config")
builder.should_receive(:createTopology).and_return("topology")
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology")
end

it "should support fields" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :fields => "f1"
end
end

RedStorm::Fields.should_receive(:new).with("f1").and_return("fields")
@declarer.should_receive("fieldsGrouping").with(1, "fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
Topology1.new.start("base_path", :cluster)
end

it "should support shuffle" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :shuffle
end
end

@declarer.should_receive("shuffleGrouping").with(1)
@declarer.should_receive("shuffleGrouping").with('1')
Topology1.new.start("base_path", :cluster)
end

it "should support none" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :none
end
end

@declarer.should_receive("noneGrouping").with(1)
@declarer.should_receive("noneGrouping").with('1')
Topology1.new.start("base_path", :cluster)
end

it "should support global" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :global
end
end

@declarer.should_receive("globalGrouping").with(1)
@declarer.should_receive("globalGrouping").with('1')
Topology1.new.start("base_path", :cluster)
end

it "should support all" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :all
end
end

@declarer.should_receive("allGrouping").with(1)
@declarer.should_receive("allGrouping").with('1')
Topology1.new.start("base_path", :cluster)
end

it "should support direct" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :direct
end
end

@declarer.should_receive("directGrouping").with(1)
@declarer.should_receive("directGrouping").with('1')
Topology1.new.start("base_path", :cluster)
end
end
Expand Down Expand Up @@ -401,7 +410,7 @@ class Topology1 < RedStorm::SimpleTopology; end
topology.cluster.should == cluster
end

it "should keep numeric ids" do
it "should support explicit numeric ids" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1

Expand All @@ -410,18 +419,12 @@ class Topology1 < RedStorm::SimpleTopology
end
end

Topology1.spouts.first.id.should == 1
Topology1.bolts.first.id.should == 2
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]

Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)

Topology1.spouts.first.id.should == 1
Topology1.bolts.first.id.should == 2
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
Topology1.spouts.first.id.should == '1'
Topology1.bolts.first.id.should == '2'
Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}]
end

it "should resolve explicit symbolic ids" do
it "should support explicit string ids" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => "id1"

Expand All @@ -433,15 +436,9 @@ class Topology1 < RedStorm::SimpleTopology
Topology1.spouts.first.id.should == "id1"
Topology1.bolts.first.id.should == "id2"
Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}]

Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)

Topology1.spouts.first.id.should == 1
Topology1.bolts.first.id.should == 2
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
end

it "should resolve implicit string ids" do
it "should support implicit string ids" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1

Expand All @@ -453,15 +450,9 @@ class Topology1 < RedStorm::SimpleTopology
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}]

Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)

Topology1.spouts.first.id.should == 1
Topology1.bolts.first.id.should == 2
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
end

it "should resolve implicit symbol ids" do
it "should support implicit symbol ids" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1

Expand All @@ -472,16 +463,10 @@ class Topology1 < RedStorm::SimpleTopology

Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == [:spout_class1, {:shuffle => nil}]

Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)

Topology1.spouts.first.id.should == 1
Topology1.bolts.first.id.should == 2
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}]
end

it "should resolve implicit class ids" do
it "should support implicit class ids" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1

Expand All @@ -493,12 +478,6 @@ class Topology1 < RedStorm::SimpleTopology
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}]

Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)

Topology1.spouts.first.id.should == 1
Topology1.bolts.first.id.should == 2
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
end

it "should raise on unresolvable" do
Expand Down Expand Up @@ -526,7 +505,7 @@ class Topology1 < RedStorm::SimpleTopology
Topology1.spouts.first.id.should == "spout_class1"
Topology1.spouts.last.id.should == "spout_class1"

lambda {Topology1.resolve_ids!(Topology1.spouts)}.should raise_error RuntimeError, "duplicate symbolic id in SpoutClass1 on id=spout_class1"
lambda {Topology1.resolve_ids!(Topology1.spouts)}.should raise_error RuntimeError, "duplicate id in SpoutClass1 on id=spout_class1"
end

end
Expand Down

0 comments on commit 851c412

Please sign in to comment.