Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Adding the Munge object that glues together Reader and Writer objects.

  • Loading branch information...
commit 512672997aa2a9897025450ea24f4b3e3c7874b9 1 parent 216960a
@JEG2 authored
View
183 lib/mungr/munge.rb
@@ -0,0 +1,183 @@
+# encoding: UTF-8
+
+require "mungr/staged"
+
+module Mungr
+ #
+ # Objects of this class are the basic unit of transformation for Mungr
+ # scripts. Altering some data using one of these objects is a five stage
+ # process:
+ #
+ # 1. A Munge is built and configured
+ # 2. Readers and Writers are added to the Munge or it is chained to another
+ # Munge
+ # 3. The Munge is prepared just before the first munge
+ # 4. Chunks of data are read from the attached Readers, transformed by the
+ # munge code, and passed on in modified form to the attached Writers
+ # 5. The Munge and all attached Writers are finished when all Readers are
+ # exhausted
+ #
+ class Munge < Staged
+ #
+ # Use the +init+ block to build a Munge by assigning code to each of the
+ # three code stages, something like:
+ #
+ # doubler = Munge.new do |m|
+ # m.prepare {
+ # # if needed...
+ # }
+ # m.munge do |context_from_prepare, value|
+ # value * 2
+ # end
+ # m.finish do |context_from_prepare|
+ # # if needed...
+ # end
+ # end
+ #
+ # The prepare() and finish() stages are optional.
+ #
+ # Once built, you attach Readers and Writers then call munge() repeatedly to
+ # process the data in chunks or just call run() once to exhaust all data,
+ # like this:
+ #
+ # numbers = Reader.new do |r|
+ # r.prepare {
+ # (1..100).to_a
+ # }
+ # r.read { |ns|
+ # ns.shift
+ # }
+ # end
+ # doubler.add_reader(numbers)
+ #
+ # file_writer = Writer.new do |w|
+ # w.prepare {
+ # File.open("doubled_numbers.txt", "w")
+ # }
+ # w.write do |f, double|
+ # f.puts double
+ # end
+ # w.finish do |f|
+ # f.close
+ # end
+ # end
+ # doubler.add_writer(file_writer)
+ #
+ # doubler.run
+ #
+ def initialize(*args, &init)
+ @readers = Array.new
+ @munge_code = nil
+ @writers = Array.new
+
+ super
+ end
+
+ #
+ # :call-seq:
+ # add_reader(reader)
+ # add_reader(munge)
+ #
+ # This method can be used to attach one or more Readers to this object.
+ # Input will be fetched from all Readers with each call to munge() and
+ # passed as arguments to the munge code. When all Readers are exhausted,
+ # this object will be marked finished?() as will all attached Writers.
+ #
+ # Alternately, you may set a single Munge (not combined with anything else)
+ # as the Readers for this object. All values returned by a call to the
+ # chained munge() will be treated as inputs for this object's munge code.
+ #
+ # Returns +self+ for method chaining.
+ #
+ def add_reader(reader_or_munge)
+ fail "Already reading from a Munge." if @readers.is_a? self.class
+ case reader_or_munge
+ when self.class
+ if @readers.empty?
+ fail "A Munge used as a Reader cannot have Writers." \
+ if reader_or_munge.has_writers?
+ @readers = reader_or_munge
+ else
+ fail "Cannot mix Readers with a Munge."
+ end
+ else
+ @readers << reader_or_munge
+ end
+ self
+ end
+
+ #
+ # This method can be used to attach one or more Writers to this object.
+ # All values returned by the munge code are passed on as arguments to a
+ # write call for each attached Writer. Furthermore, finish() is called on
+ # all attached Writers when input is exhausted.
+ #
+ def add_writer(writer)
+ @writers << writer
+ self
+ end
+
+ #
+ # Returns +true+ if this object has an attached Writers, +false+ otherwise.
+ # This object can not be chained as the Readers of another Munge if it was
+ # any Writers attached.
+ #
+ def has_writers?
+ not @writers.empty?
+ end
+
+ #
+ # :call-seq:
+ # munge() { |context, data| code_to_transform_one_chunk_of_data(data) }
+ # munge(*input)
+ #
+ # If passed a block, this method sets the code that will be used to
+ # transform a single chunk of input. The altered values returned from this
+ # block will be passed on to all attached Writers as output. The block will
+ # also be passed the context returned from prepare().
+ #
+ # This method, called without block, is also the primary interface for a
+ # munging process. You can just call it repeatedly to read from attached
+ # Readers, transform data, and output values.
+ #
+ def munge(&code)
+ load_or_run(:munge, &code)
+ end
+
+ # Calls munge() repeatedly until finished?() returns +true+.
+ def run
+ munge until finished?
+ end
+
+ #######
+ private
+ #######
+
+ #
+ # This method is the primary interface for transforming data. It will:
+ #
+ # * Return +nil+ if finished?() is now +true+
+ # * Run prepare() unless prepared?() is now +true+
+ # * Run the munge code to transform one chunk of data, pass the transformed
+ # data to all attached Writers, as well as return it
+ # * Run finish() on this object and all attached Writers after all attached
+ # Readers are exhausted
+ #
+ def run_munge_code
+ return nil if finished?
+ prepare unless prepared?
+ inputs = @readers.is_a?(self.class) ? @readers.munge :
+ @readers.map(&:read)
+ if Array(@readers).all?(&:finished?)
+ finish
+ @writers.each(&:finish)
+ else
+ outputs = @munge_code[@context, *inputs]
+ @writers.each do |writer|
+ writer.write(*outputs)
+ end
+ outputs
+ end
+ end
+ end
+end
View
12 lib/mungr/reader.rb
@@ -19,9 +19,15 @@ class Reader < Staged
# three code stages, something like:
#
# file_reader = Reader.new do |r|
- # r.prepare { File.open("my_file.txt") }
- # r.read { |f| f.gets }
- # r.finish { |f| f.close }
+ # r.prepare {
+ # File.open("my_file.txt")
+ # }
+ # r.read { |f|
+ # f.gets
+ # }
+ # r.finish do |f|
+ # f.close
+ # end
# end
#
# The prepare() and finish() stages are optional.
View
3  lib/mungr/staged.rb
@@ -107,9 +107,8 @@ def run_prepare_code
# Executes the finish code. Also flips the finished?() status to +true+.
#
def run_finish_code
- result = @finish_code ? @finish_code[@context] : nil
+ @finish_code ? @finish_code[@context] : nil
@finished = true
- result
end
end
end
View
14 lib/mungr/writer.rb
@@ -18,9 +18,15 @@ class Writer < Staged
# three code stages, something like:
#
# file_writer = Writer.new do |w|
- # w.prepare { File.open("my_file.txt", "w") }
- # w.write { |f, line| f.puts line }
- # w.finish { |f| f.close }
+ # w.prepare {
+ # File.open("my_file.txt", "w")
+ # }
+ # w.write do |f, line|
+ # f.puts line
+ # end
+ # w.finish do |f|
+ # f.close
+ # end
# end
#
# The prepare() and finish() stages are optional.
@@ -41,7 +47,7 @@ def initialize(*args, &init)
#
# :call-seq:
- # write() { |context| code_to_read_one_chunk_of_data() }
+ # write() { |context, data| code_to_write_one_chunk_of_data(data) }
# write(*output)
#
# If passed a block, this method sets the code that will be used to write a
View
17 test/helper.rb
@@ -0,0 +1,17 @@
+# encoding: UTF-8
+
+require "minitest/autorun"
+
+class MiniTest::Unit::TestCase
+ #######
+ private
+ #######
+
+ def reader(&init)
+ @reader = Mungr::Reader.new(&init)
+ end
+
+ def writer(&init)
+ @writer = Mungr::Writer.new(&init)
+ end
+end
View
364 test/test_munge.rb
@@ -0,0 +1,364 @@
+# encoding: UTF-8
+
+require "helper"
+
+require "mungr/munge"
+
+class TestMunge < MiniTest::Unit::TestCase
+ ##############
+ ### Status ###
+ ##############
+
+ def test_a_munge_is_prepared_before_the_first_munge
+ order = Array.new
+ munge do |m|
+ m.prepare { order << :prepared }
+ m.munge do
+ order << :munge
+ end
+ end
+ add_reader(:data)
+ assert(!@munge.prepared?, "The Munge was prepared?() before the munge().")
+ @munge.munge
+ assert( @munge.prepared?,
+ "The Munge was not prepared?() after the munge()." )
+ assert_equal([:prepared, :munge], order)
+ end
+
+ def test_exhausting_all_readers_sets_finished
+ [ [ ], # no readers
+ [[1, 2, 3]],
+ [[1], [1, 2, 3], [1, 2]] ].each do |test_readers|
+ order = Array.new
+ munge do |m|
+ m.munge do
+ order << :munge
+ end
+ m.finish do
+ order << :finish
+ end
+ end
+ test_readers.each do |reader|
+ add_reader(*reader)
+ end
+ assert( !@munge.finished?,
+ "The Munge was finished?() before being exhausted." )
+ finish
+ assert( @munge.finished?,
+ "The Mung was not finished?() after being exhausted." )
+ assert_equal( [:munge] * (test_readers.map(&:size).max || 0) + [:finish],
+ order )
+ end
+ end
+
+ ###############
+ ### Context ###
+ ###############
+
+ def test_any_value_returned_from_prepare_is_forwarded_to_munge_and_finish
+ object = Object.new
+ calls = Array.new
+ munge do |m|
+ m.prepare { object }
+ m.munge do |context, _|
+ calls << context
+ end
+ m.finish do |context|
+ calls << context
+ end
+ end
+ add_reader(:data)
+ finish # trigger munge and finish code
+ assert_equal([object] * 2, calls)
+ end
+
+ ########################
+ ### Managing Readers ###
+ ########################
+
+ def test_add_reader_associates_input_sources_for_the_munge
+ args = Array.new
+ munge do |m|
+ m.munge do |_, i, l|
+ args << [i, l]
+ end
+ end
+ add_reader(1, 2)
+ add_reader(:a, :b, :c)
+ finish
+ assert_equal([[1, :a], [2, :b], [nil, :c]], args)
+ end
+
+ def test_adding_a_munge_after_a_reader_is_an_error
+ munge
+ add_reader # add a normal Reader
+ assert_raises(RuntimeError) do
+ @munge.add_reader(Mungr::Munge.new) # add a Munge
+ end
+ end
+
+ def test_adding_any_kind_of_reader_after_a_munge_is_an_error
+ munge
+ @munge.add_reader(Mungr::Munge.new) # add a Munge
+ assert_raises(RuntimeError) do
+ @munge.add_reader(Mungr::Munge.new) # can't add another
+ end
+ assert_raises(RuntimeError) do
+ add_reader # not even a normal Reader
+ end
+ end
+
+ ########################
+ ### Managing Writers ###
+ ########################
+
+ def test_add_writer_associates_output_sources_for_the_munge
+ args = Array.new
+ munge do |m|
+ m.munge do |_, i|
+ i
+ end
+ end
+ add_reader(1, 2, 3)
+ add_writer do |w|
+ w.write do |_, i|
+ args << [:writer1, i]
+ end
+ end
+ add_writer do |w|
+ w.write do |_, i|
+ args << [:writer2, i]
+ end
+ end
+ finish
+ assert_equal( [ [:writer1, 1], [:writer2, 1],
+ [:writer1, 2], [:writer2, 2],
+ [:writer1, 3], [:writer2, 3] ], args )
+ end
+
+ def test_a_munge_with_a_writer_cannot_be_used_as_a_reader
+ chain = munge
+ add_writer # can no longer be used as a Reader
+ munge
+ assert_raises(RuntimeError) do
+ @munge.add_reader(chain)
+ end
+ end
+
+ ###############
+ ### Munging ###
+ ###############
+
+ def test_calling_munge_a_with_block_sets_the_code_and_further_calls_run_it
+ data = (1..3).to_a
+ written = Array.new
+ munge do |m|
+ m.munge do |_, i|
+ i * 2
+ end
+ end
+ add_reader(*data)
+ add_writer do |w|
+ w.write do |_, i2|
+ written << i2
+ end
+ end
+ finish
+ assert_equal(data.map { |i| i * 2 }, written)
+ end
+
+ def test_each_input_is_passed_as_an_argument_to_munge
+ args = Array.new
+ munge do |m|
+ m.munge do |_, i, l|
+ args << [i, l]
+ end
+ end
+ add_reader(1, 2, 3)
+ add_reader(:a, :b, :c)
+ finish
+ assert_equal([[1, :a], [2, :b], [3, :c]], args)
+ end
+
+ def test_each_value_returned_from_munge_is_passed_as_an_argument_to_write
+ args = Array.new
+ munge do |m|
+ m.munge do |_, i|
+ [i, i * 2, i * 3]
+ end
+ end
+ add_reader(1, 2, 3)
+ add_writer do |w|
+ w.write do |_, i, i2, i3|
+ args << [i, i2, i3]
+ end
+ end
+ finish
+ assert_equal([[1, 2, 3], [2, 4, 6], [3, 6, 9]], args)
+ end
+
+ def test_when_readers_are_exhausted_no_more_munges_or_writes_are_made
+ calls = Array.new
+ munge do |m|
+ m.munge do
+ calls << :munge
+ end
+ end
+ add_reader(1, 2, 3)
+ add_writer do |w|
+ w.write do
+ calls << :write
+ end
+ end
+ 10.times do
+ @munge.munge
+ end
+ assert_equal([:munge, :write] * 3, calls)
+ end
+
+ def test_prepare_is_called_once_before_the_first_munge
+ calls = Array.new
+ munge do |m|
+ m.prepare { calls << :prepare }
+ m.munge do
+ calls << :munge
+ end
+ end
+ add_reader(1, 2, 3)
+ finish
+ assert_equal([:prepare, :munge, :munge, :munge], calls)
+ end
+
+ def test_finish_is_called_once_when_the_input_is_exhausted
+ calls = Array.new
+ munge do |m|
+ m.munge do
+ calls << :munge
+ end
+ m.finish do
+ calls << :finish
+ end
+ end
+ add_reader(1, 2, 3)
+ finish
+ assert_equal([:munge, :munge, :munge, :finish], calls)
+ end
+
+ def test_finish_is_forwarded_to_all_writers
+ munge do |m|
+ m.munge do
+ # do nothing: just defining some munge code
+ end
+ end
+ add_reader(1, 2, 3)
+ add_writer do |w|
+ w.write do
+ # do nothing: just defining some write code
+ end
+ end
+ add_writer do |w|
+ w.write do
+ # do nothing: just defining some write code
+ end
+ end
+ 3.times do
+ @munge.munge
+ end
+ assert( @writers.none?(&:finished?),
+ "A Writer was finished before input was exhausted." )
+ @munge.munge # input is exhausted here
+ assert( @writers.all?(&:finished?),
+ "Writers were not finished after input was exhausted." )
+ end
+
+ ################
+ ### Chaining ###
+ ################
+
+ def test_a_munge_can_read_form_another_munge
+ args = Array.new
+ chain = munge do |m|
+ m.munge do |_, i|
+ i * 2
+ end
+ end
+ add_reader(1, 2, 3)
+ munge do |m|
+ m.munge do |_, i2|
+ args << i2
+ end
+ end
+ @munge.add_reader(chain)
+ finish
+ assert_equal([2, 4, 6], args)
+ end
+
+ def test_a_munge_can_be_a_multireader_source_form_another_munge
+ args = Array.new
+ chain = munge do |m|
+ m.munge do |_, i|
+ [i, i * 2, i * 3]
+ end
+ end
+ add_reader(1, 2, 3)
+ munge do |m|
+ m.munge do |_, i, i2, i3|
+ args << [i, i2, i3]
+ end
+ end
+ @munge.add_reader(chain)
+ finish
+ assert_equal([[1, 2, 3], [2, 4, 6], [3, 6, 9]], args)
+ end
+
+ def test_finish_is_forwarded_through_chained_munges
+ written = Array.new
+ chain = Array.new
+ chain << munge do |m|
+ m.munge do |_, i, l|
+ [i, i * 2, l]
+ end
+ end
+ chain << add_reader(1, 2, 3).last
+ chain << add_reader(:a, :b).last
+ chain << munge do |m|
+ m.munge do |_, i, i2, l|
+ "#{l}: #{i} * 2 = #{i2}"
+ end
+ end
+ @munge.add_reader(chain.first)
+ chain << add_writer do |w|
+ w.write do |_, str|
+ written << str
+ end
+ end.last
+ finish
+ assert( chain.all?(&:finished?),
+ "Not all elements of the chain finished?()." )
+ assert_equal(["a: 1 * 2 = 2", "b: 2 * 2 = 4", ": 3 * 2 = 6"], written)
+ end
+
+ #######
+ private
+ #######
+
+ def munge(&init)
+ @munge = Mungr::Munge.new(&init)
+ end
+
+ def add_reader(*inputs, &init)
+ (@readers ||= Array.new) <<
+ ( inputs.empty? ? reader(&init) :
+ reader { |r| r.read { inputs.shift } } ).tap { |r|
+ @munge.add_reader(r)
+ }
+ end
+
+ def add_writer(&init)
+ (@writers ||= Array.new) << writer(&init).tap { |w| @munge.add_writer(w) }
+ end
+
+ def finish
+ @munge.run
+ end
+end
View
28 test/test_reader.rb
@@ -1,6 +1,6 @@
# encoding: UTF-8
-require "minitest/autorun"
+require "helper"
require "mungr/reader"
@@ -25,18 +25,20 @@ def test_a_reader_is_prepared_before_the_first_read
def test_exhausting_a_reader_sets_finished
order = Array.new
reader do |r|
- r.read {
+ r.read {
order << :read
nil # signal that we are exhausted
}
- r.finish { order << :finished }
+ r.finish do
+ order << :finish
+ end
end
assert( !@reader.finished?,
"The Reader was finished?() before being exhausted." )
@reader.read
assert( @reader.finished?,
- "The Reader was finished?() after being exhausted." )
- assert_equal([:read, :finished], order)
+ "The Reader was not finished?() after being exhausted." )
+ assert_equal([:read, :finish], order)
end
###############
@@ -52,7 +54,9 @@ def test_any_value_returned_from_prepare_is_forwarded_to_read_and_finish
calls << context
nil # signal that we are exhausted
}
- r.finish { |context| calls << context }
+ r.finish do |context|
+ calls << context
+ end
end.read # trigger read and finish code
assert_equal([object] * 2, calls)
end
@@ -106,19 +110,13 @@ def test_finish_is_called_once_when_the_input_is_exhausted
calls << :read
data.shift
}
- r.finish { calls << :finish }
+ r.finish do
+ calls << :finish
+ end
end
5.times do
@reader.read
end
assert_equal([:read, :read, :read, :read, :finish], calls)
end
-
- #######
- private
- #######
-
- def reader(&init)
- @reader = Mungr::Reader.new(&init)
- end
end
View
34 test/test_writer.rb
@@ -1,6 +1,6 @@
# encoding: UTF-8
-require "minitest/autorun"
+require "helper"
require "mungr/writer"
@@ -13,7 +13,9 @@ def test_a_writer_is_prepared_before_the_first_write
order = Array.new
writer do |w|
w.prepare { order << :prepared }
- w.write { order << :write }
+ w.write do
+ order << :write
+ end
end
assert(!@writer.prepared?, "The Writer was prepared?() before the write().")
@writer.write("data")
@@ -30,9 +32,13 @@ def test_any_value_returned_from_prepare_is_forwarded_to_write_and_finish
object = Object.new
calls = Array.new
writer do |w|
- w.prepare { object }
- w.write { |context| calls << context }
- w.finish { |context| calls << context }
+ w.prepare { object }
+ w.write do |context|
+ calls << context
+ end
+ w.finish do |context|
+ calls << context
+ end
end
@writer.write("data") # trigger write code
@writer.finish # trigger finish code
@@ -47,7 +53,9 @@ def test_calling_write_a_with_block_sets_the_code_and_further_calls_run_it
data = (1..3).to_a
written = Array.new
writer do |w|
- w.write { |_, value| written << value }
+ w.write do |_, value|
+ written << value
+ end
end
data.each do |value|
@writer.write(value)
@@ -59,20 +67,14 @@ def test_prepare_is_called_once_before_the_first_read
data = (1..3).to_a
calls = Array.new
writer do |w|
- w.prepare { calls << :prepare }
- w.write { |_, value| calls << value }
+ w.prepare { calls << :prepare }
+ w.write do |_, value|
+ calls << value
+ end
end
data.each do |value|
@writer.write(value)
end
assert_equal([:prepare] + data, calls)
end
-
- #######
- private
- #######
-
- def writer(&init)
- @writer = Mungr::Writer.new(&init)
- end
end
Please sign in to comment.
Something went wrong with that request. Please try again.