Ruby classes to process joins in Hadoop Streaming
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
examples
lib
CHANGELOG
README
streaming_join.gemspec

README

streaming_join

Ruby classes intended to be used in Hadoop Streaming reducers.  It
has been tested with jruby 1.6+ and ruby 1.9.2+.

Examples (found in the examples directory) use the test data and
scenarios from here:

http://en.wikipedia.org/wiki/Join_(SQL)

The equivalent sql for each example is listed in its directory.  The
supported join types are:

inner_join
cross_join
left_outer_join
right_outer_join
full_outer_join
merge_rows

API

As with most map/reduce jobs, there are two basic components:

Mapper

The mapper in a streaming_join job outputs records of the form:

key TAB side_index TAB value

The "left" side uses side_index 0 while the right side would use 1.

In a simple join, here is one example mapper output:

1	0	Matsumoto
2	0	Wall
3	0	van Rossum

Here is the other side's output:

1	1	Ruby	1995
2	1	Perl	1987
4	1	Clojure	2007

Reducer

Depending on the join style, the reducer will emit combined records.  Using
the above mapper output, an inner join would end up like:

1	Matsumoto	Ruby
2	Wall	Perl

The code would look something like this:

Mapper

require 'streaming_join'

j = JoinMapper.new
j.add_side(/_left/, 0, 1)
j.add_side(/_right/, 0, 1)
j.process_stream

Reducer

require 'streaming_join'

Join.new.process_stream

See examples/job for more detail and comments.

Current Limitations

- As each key is processed in a reducer, the left side of that single
  keyspace must fit into memory.  So, when in doubt, put the smaller
  table on the left.
- Only two tables can be joined in a single job.
- Cartesian joins use way too much memory.  Not a priority right now. ;)

Other Notes

- The custom counters used to report statistics could in some
  circumstances add a non-negligible amount of time to the processing
  of massive datasets.  To disable this feature, pass the optional hash
  when initializing a join object.

  Join.new(report: false).process_stream

  instead of:

  Join.new.process_stream
  
Please let me know if you find this software useful!

--frank