Permalink
Browse files

Fix and cleanup samples. Add samples rake target as a sort of integra…

…tion test.
  • Loading branch information...
1 parent a612d29 commit 9e6098495b24c8b733d3211fd11989fe75e72536 @mrwalker mrwalker committed Apr 3, 2011
Showing with 161 additions and 130 deletions.
  1. +18 −15 samples/branch.rb
  2. +11 −7 samples/copy.rb
  3. +21 −19 samples/join.rb
  4. +7 −5 samples/logwordcount.rb
  5. +27 −0 samples/project.rb
  6. +13 −11 samples/rename.rb
  7. +0 −25 samples/restrict.rb
  8. +20 −0 samples/scorenames.rb
  9. +0 −20 samples/sortnames.rb
  10. +11 −9 samples/splitter.rb
  11. +21 −19 samples/union.rb
  12. +12 −0 tasks/samples.rake
View
@@ -1,31 +1,34 @@
#! /usr/bin/env jruby
-$:.unshift File.join(File.dirname(__FILE__), '..', 'lib')
+
+$: << File.join(File.dirname(__FILE__), '..', 'lib')
require 'cascading'
require 'samples/cascading'
input = 'samples/data/data2.txt'
output1, output2 = 'output/branch1', 'output/branch2'
-Cascading::Flow.new('copy_to_mysql') do
- source 'extract', tap(input)
+cascade 'branch' do
+ flow 'branch' do
+ source 'input', tap(input)
- assembly 'extract' do
- split 'line', ['name', 'score1', 'score2', 'id'], :pattern => /[.,]*\s+/
+ assembly 'input' do
+ split 'line', ['name', 'score1', 'score2', 'id'], :pattern => /[.,]*\s+/
- branch 'branch1' do
- group_by 'score1' do
- count
+ branch 'branch1' do
+ group_by 'score1' do
+ count
+ end
end
- end
- branch 'branch2' do
- group_by 'score2' do
- count
+ branch 'branch2' do
+ group_by 'score2' do
+ count
+ end
end
end
- end
- sink 'branch1', tap(output1, :sink_mode => :replace)
- sink 'branch2', tap(output2, :sink_mode => :replace)
+ sink 'branch1', tap(output1, :sink_mode => :replace)
+ sink 'branch2', tap(output2, :sink_mode => :replace)
+ end
end.complete(sample_properties)
View
@@ -1,5 +1,5 @@
#! /usr/bin/env jruby
-$:.unshift File.join(File.dirname(__FILE__), '..', 'lib')
+$: << File.join(File.dirname(__FILE__), '..', 'lib')
require 'cascading'
require 'samples/cascading'
@@ -8,11 +8,15 @@
dataUrl = 'http://www.census.gov/genealogy/names/dist.all.last'
system "curl --create-dirs -o #{input} #{dataUrl}" unless File.exists?(input)
-Cascading::Flow.new('copy') do
- source 'copy', tap(input)
- assembly 'copy' do
- rename 'line' => 'value'
- reject 'value:string.indexOf("R") == -1'
+cascade 'copy' do
+ flow 'copy' do
+ source 'input', tap(input)
+
+ assembly 'input' do
+ rename 'line' => 'value'
+ reject 'value:string.indexOf("R") == -1'
+ end
+
+ sink 'input', tap('output/copied', :sink_mode => :replace)
end
- sink 'copy', tap('output/copied', :sink_mode => :replace)
end.complete(sample_properties)
View
@@ -1,5 +1,5 @@
#! /usr/bin/env jruby
-$:.unshift File.join(File.dirname(__FILE__), '..', 'lib')
+$: << File.join(File.dirname(__FILE__), '..', 'lib')
require 'cascading'
require 'samples/cascading'
@@ -9,27 +9,29 @@
input3 = 'samples/data/data_join3.txt'
output = 'output/joined'
-Cascading::Flow.new('join') do
- source 'extract1', tap(input1)
- source 'extract2', tap(input2)
- source 'extract3', tap(input3)
+cascade 'join' do
+ flow 'join' do
+ source 'input1', tap(input1)
+ source 'input2', tap(input2)
+ source 'input3', tap(input3)
- assembly 'extract1' do
- split 'line', ['id', 'name']
- end
+ assembly 'input1' do
+ split 'line', ['id', 'name']
+ end
- assembly 'extract2' do
- split 'line', ['id', 'age']
- end
+ assembly 'input2' do
+ split 'line', ['id', 'age']
+ end
- assembly 'extract3' do
- split 'line', ['id', 'city']
- end
+ assembly 'input3' do
+ split 'line', ['id', 'city']
+ end
- assembly 'join' do
- join 'extract1', 'extract2', 'extract3', :on => 'id'
- project 'id', 'name', 'age', 'city'
- end
+ assembly 'join' do
+ join 'input1', 'input2', 'input3', :on => 'id'
+ project 'id', 'name', 'age', 'city'
+ end
- sink 'join', tap(output, :sink_mode => :replace)
+ sink 'join', tap(output, :sink_mode => :replace)
+ end
end.complete(sample_properties)
View
@@ -1,5 +1,5 @@
#! /usr/bin/env jruby
-$:.unshift File.join(File.dirname(__FILE__), '..', 'lib')
+$: << File.join(File.dirname(__FILE__), '..', 'lib')
require 'cascading'
require 'samples/cascading'
@@ -8,10 +8,11 @@
dataUrl = 'http://www.gutenberg.org/files/20417/20417-8.txt'
system "curl --create-dirs -o #{input} #{dataUrl}" unless File.exists?(input)
-Cascading::Flow.new('logwordcount') do
- source 'logwordcount', tap(input)
+cascade 'logwordcount' do
+ flow 'logwordcount' do
+ source 'input', tap(input)
- assembly 'logwordcount' do
+ assembly 'input' do
# TODO: create a helper for RegexSplitGenerator
each 'line', :function => regex_split_generator('word', :pattern => /[.,]*\s+/)
group_by 'word' do
@@ -20,5 +21,6 @@
group_by 'count', :reverse => true
end
- sink 'logwordcount', tap('output/imported', :sink_mode => :replace)
+ sink 'input', tap('output/imported', :sink_mode => :replace)
+ end
end.complete(sample_properties)
View
@@ -0,0 +1,27 @@
+#! /usr/bin/env jruby
+$: << File.join(File.dirname(__FILE__), '..', 'lib')
+
+# History: "project" (verb) used to be known as "restrict"
+
+require 'cascading'
+require 'samples/cascading'
+
+input = 'samples/data/data2.txt'
+output = 'output/restrict'
+
+cascade 'project' do
+ flow 'project' do
+ source 'input', tap(input)
+
+ assembly 'input' do
+ split 'line', ['name', 'score1', 'score2', 'id'], :output => ['name', 'score1', 'score2', 'id']
+ assert Java::CascadingOperationAssertion::AssertSizeEquals.new(4)
+ project 'name', 'score1', 'score2'
+ assert Java::CascadingOperationAssertion::AssertSizeEquals.new(3)
+ project 'name', 'score2'
+ assert Java::CascadingOperationAssertion::AssertSizeEquals.new(2)
+ end
+
+ sink 'input', tap(output, :sink_mode => :replace)
+ end
+end.complete(sample_properties)
View
@@ -1,22 +1,24 @@
#! /usr/bin/env jruby
-$:.unshift File.join(File.dirname(__FILE__), '..', 'lib')
+$: << File.join(File.dirname(__FILE__), '..', 'lib')
require 'cascading'
require 'samples/cascading'
input = 'samples/data/data2.txt'
output = 'output/rename'
-Cascading::Flow.new('rename') do
- source 'extract', tap(input)
+cascade 'rename' do
+ flow 'rename' do
+ source 'input', tap(input)
- assembly 'extract' do
- split 'line', ['name', 'score1', 'score2', 'id'], :output => ['name', 'score1', 'score2', 'id']
- assert Java::CascadingOperationAssertion::AssertSizeEquals.new(4)
- rename 'name' => 'new_name', 'score1' => 'new_score1', 'score2' => 'new_score2'
- assert Java::CascadingOperationAssertion::AssertSizeEquals.new(4)
- puts "Final field names: #{scope.values_fields.to_a.inspect}"
- end
+ assembly 'input' do
+ split 'line', ['name', 'score1', 'score2', 'id'], :output => ['name', 'score1', 'score2', 'id']
+ assert Java::CascadingOperationAssertion::AssertSizeEquals.new(4)
+ rename 'name' => 'new_name', 'score1' => 'new_score1', 'score2' => 'new_score2'
+ assert Java::CascadingOperationAssertion::AssertSizeEquals.new(4)
+ puts "Final field names: #{scope.values_fields.to_a.inspect}"
+ end
- sink 'extract', tap(output, :sink_mode => :replace)
+ sink 'input', tap(output, :sink_mode => :replace)
+ end
end.complete(sample_properties)
View
@@ -1,25 +0,0 @@
-#! /usr/bin/env jruby
-$:.unshift File.join(File.dirname(__FILE__), '..', 'lib')
-
-# History: "project" (verb) used to be known as "restrict"
-
-require 'cascading'
-require 'samples/cascading'
-
-input = 'samples/data/data2.txt'
-output = 'output/restrict'
-
-Cascading::Flow.new('project') do
- source 'extract', tap(input)
-
- assembly 'extract' do
- split 'line', ['name', 'score1', 'score2', 'id'], :output => ['name', 'score1', 'score2', 'id']
- assert Java::CascadingOperationAssertion::AssertSizeEquals.new(4)
- project 'name', 'score1', 'score2'
- assert Java::CascadingOperationAssertion::AssertSizeEquals.new(3)
- project 'name', 'score2'
- assert Java::CascadingOperationAssertion::AssertSizeEquals.new(2)
- end
-
- sink 'extract', tap(output, :sink_mode => :replace)
-end.complete(sample_properties)
View
@@ -0,0 +1,20 @@
+#! /usr/bin/env jruby
+$: << File.join(File.dirname(__FILE__), '..', 'lib')
+
+require 'cascading'
+require 'samples/cascading'
+
+cascade 'scorenames' do
+ flow 'scorenames' do
+ # You don't have to curl and cache inputs: tap can fetch via HTTP
+ source 'input', tap('http://www.census.gov/genealogy/names/dist.all.last')
+
+ assembly 'input' do
+ split 'line', ['name', 'val1', 'val2', 'id']
+ insert 'val3' => expr('val2:double < 40.0 ? val1:double : val2:double')
+ project 'name', 'val3', 'id'
+ end
+
+ sink 'input', tap('output/sorted', :sink_mode => :replace)
+ end
+end.complete(sample_properties)
View
@@ -1,20 +0,0 @@
-#! /usr/bin/env jruby
-$:.unshift File.join(File.dirname(__FILE__), '..', 'lib')
-
-# Question: does this script actually sort the names?
-
-require 'cascading'
-require 'samples/cascading'
-
-Cascading::Flow.new('fetch') do
- # You don't have to curl and cache inputs: tap can fetch via HTTP
- source 'fetch', tap('http://www.census.gov/genealogy/names/dist.all.last')
-
- assembly 'fetch' do
- split 'line', ['name', 'val1', 'val2', 'id']
- insert 'val3' => expr('val2:double < 40.0 ? val1:double : val2:double')
- project 'name', 'val3', 'id'
- end
-
- sink 'fetch', tap('output/sorted', :sink_mode => :replace)
-end.complete(sample_properties)
View
@@ -1,20 +1,22 @@
#! /usr/bin/env jruby
-$:.unshift File.join(File.dirname(__FILE__), '..', 'lib')
+$: << File.join(File.dirname(__FILE__), '..', 'lib')
require 'cascading'
require 'samples/cascading'
input = 'samples/data/data2.txt'
-Cascading::Flow.new('copy') do
- source 'copy', tap(input)
+cascade 'splitter' do
+ flow 'splitter' do
+ source 'input', tap(input)
- assembly 'copy' do
- split 'line', ['name', 'score1', 'score2', 'id'], :output => ['name', 'score1', 'score2', 'id']
- group_by 'score1' do
- count
+ assembly 'input' do
+ split 'line', ['name', 'score1', 'score2', 'id'], :output => ['name', 'score1', 'score2', 'id']
+ group_by 'score1' do
+ count
+ end
end
- end
- sink 'copy', tap('output/splitted', :sink_mode => :replace)
+ sink 'input', tap('output/splitted', :sink_mode => :replace)
+ end
end.complete(sample_properties)
View
@@ -1,5 +1,5 @@
#! /usr/bin/env jruby
-$:.unshift File.join(File.dirname(__FILE__), '..', 'lib')
+$: << File.join(File.dirname(__FILE__), '..', 'lib')
require 'cascading'
require 'samples/cascading'
@@ -10,30 +10,32 @@
output = 'output/union'
-Cascading::Flow.new('copy_to_mysql') do
- source 'extract', tap(input)
+cascade 'union' do
+ flow 'union' do
+ source 'input', tap(input)
- assembly 'extract' do
- split 'line', ['name', 'score1', 'score2', 'id']
+ assembly 'input' do
+ split 'line', ['name', 'score1', 'score2', 'id']
- branch 'branch1' do
- group_by 'score1', 'name' do
- count
+ branch 'branch1' do
+ group_by 'score1', 'name' do
+ count
+ end
+ rename 'score1' => 'score'
end
- rename 'score1' => 'score'
- end
- branch 'branch2' do
- group_by 'score2', 'name' do
- count
+ branch 'branch2' do
+ group_by 'score2', 'name' do
+ count
+ end
+ rename 'score2' => 'score'
end
- rename 'score2' => 'score'
end
- end
- assembly 'union' do
- union 'branch1', 'branch2'
- end
+ assembly 'union' do
+ union 'branch1', 'branch2'
+ end
- sink 'union', tap(output, :sink_mode => :replace)
+ sink 'union', tap(output, :sink_mode => :replace)
+ end
end.complete(sample_properties)
View
@@ -0,0 +1,12 @@
+namespace :samples do
+ desc 'Run all sample applications'
+ task :run do
+ Dir.glob('samples/*.rb') do |sample|
+ next unless File.executable?(sample)
+ system(sample)
+ end
+ end
+end
+
+desc 'Alias to samples:run'
+task :samples => 'samples:run'

0 comments on commit 9e60984

Please sign in to comment.