Skip to content

Commit

Permalink
Enum type, made emittable cascade its to_flat
Browse files Browse the repository at this point in the history
  • Loading branch information
Philip (flip) Kromer committed Mar 15, 2009
1 parent c9f4efc commit d6a7dda
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 8 deletions.
15 changes: 10 additions & 5 deletions TODO.textile
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
Utility

* columnizing / reconstituting

* Make this a Gem
* Set up with JRuby
* Allow for direct HDFS operations
* Make the dfs commands slightly less stupid

* provide for a defaults file
* add more standard options
* Allow for combiners
* JobStarter / JobSteps
* DRY up the options and method overrides
* might as well take dumbo's command line args

BUGS:
Expand All @@ -23,18 +22,24 @@ Patterns to implement:

Example graph scripts:

* Multigraph
* Pagerank (done)
* Breadth-first search (mostly done)
* Triangle enumeration (mostly done)
* Breadth-first search
* Triangle enumeration
* Clustering

Example example scripts (from http://www.cloudera.com/resources/learning-mapreduce):

1. Find the [number of] hits by 5 minute timeslot for a website given its access logs.

2. Find the pages with over 1 million hits in day for a website given its access logs.

3. Find the pages that link to each page in a collection of webpages.

4. Calculate the proportion of lines that match a given regular expression for a collection of documents.

5. Sort tabular data by a primary and secondary column.

6. Find the most popular pages for a website given its access logs.

/can use
4 changes: 2 additions & 2 deletions config/wukong.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
# Script defaults
:runner_defaults:
# field separator for sorting
:key_field_separator: "'\\t'"
# :key_field_separator: "'\\t'"
# field separator for partitioning (distributing among reducers)
:output_field_separator: "'\\t'"
# :output_field_separator: "'\\t'"

100 changes: 100 additions & 0 deletions examples/package-local.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env ruby
$: << File.dirname(__FILE__)+'/..'

require 'wukong'

#
# This is so very very kludgey
#
# Input is an 'ls' file, listing files to .bz2 package.
#
# Mapper takes each in turn and creates, within a parallel directory tree under
# ~/pkgd on the HDFS, a .bz2 compressed version of the file.
#
# So, the file
# /user/me/fixd/all-20090103
# is packaged onto the DFS as
# /user/me/pkgd/user/me/fixd/all-20090103
#
# listing=tmp/fixd-all-package-listing
# hdp-rm $listing
# hadoop dfs -lsr fixd | egrep '(part-|\.tsv$)' | hdp-put - $listing ;
#
# ./package.rb --run --rm --map_tasks=1 $listing $pkgd_log
#
module ExportPackager
PKGD_DIR = '/workspace/flip/pkgd'

#
#
class Reducer < Wukong::Streamer::Base
def announce *args
$stdout.puts *args
$stderr.puts *args
end

def handle_existing_target output_filename
return true unless File.exist?(output_filename)
# announce "Exists! #{output_filename}"
# return false
announce "Removing target file #{output_filename}"
begin announce `rm #{output_filename}`
rescue Exception => e ; announce e ; end
true
end

def mkdir_target_safely output_filename
output_dir = File.dirname(output_filename)
announce "Ensuring directory #{output_dir} exists"
begin announce `mkdir -p #{output_dir}`
rescue Exception => e ; announce e ; end
end

def bzip_into_pkgd_file input_filename, output_filename
announce "bzip'ing into #{output_filename}"
announce `( hadoop dfs -cat #{input_filename}/[^_]\** ) | bzip2 -c > #{output_filename}`
end

def gen_output_filename input_filename
input_filename += '.tsv' unless input_filename =~ /.*\.\w{2,}/
"%s/%s.bz2" % [PKGD_DIR, input_filename.gsub(/^\//, '')]
end

def rsync host, local_path, remote_path=nil
remote_path ||= local_path
announce `/usr/bin/rsync -Cuvrtlp #{local_path} #{host}:#{remote_path}`
sleep 5
end

def process input_filename
output_filename = gen_output_filename(input_filename)
handle_existing_target(output_filename) or return
mkdir_target_safely output_filename
bzip_into_pkgd_file input_filename, output_filename
rsync :lab3, output_filename
#
end

def recordize line
# handle ls or straight file list, either
line.split(/\s/).last
end

def stream
super
rsync :lab3, PKGD_DIR+'/'
end
end

class Script < Wukong::Script
def default_options
super.merge :map_tasks => 1,
:max_node_reduce_tasks => 1, # only one reducer per local filesystem
:timeout => 40 * 60 * 1000 # timeout in ms
end
end
# Execute the script
Script.new(nil, Reducer).run
end


1 change: 1 addition & 0 deletions wukong/and_pig/generate/variable_inflections.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class << Bignum ; def typify() 'long' end ; end
class << Float ; def typify() 'float' end ; end
class << String ; def typify() 'chararray' end ; end
class << Symbol ; def typify() self end ; end
class << Date ; def typify() 'long' end ; end

# Array.class_eval do
# def typify()
Expand Down
115 changes: 115 additions & 0 deletions wukong/datatypes/enum.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
require 'active_support/core_ext/class/inheritable_attributes'
require 'active_support/core_ext/duplicable'
module Wukong
module Datatypes
#
# Infinity is bigger than any number
#
#
Infinity = 1.0/0

class Enum
attr_accessor :val
class_inheritable_reader :names
def initialize val
self.val = val
end
# MyEnum[val] is sugar for MyEnum.new(val)
def self.[] *args
new *args
end
def to_i
val
end
def to_s
return nil if val.nil?
self.class.names[val]
end
def inspect
"<#{self.class.to_s} #{to_i} (#{to_s})>"
end
# returns the value corresponding to that string representation
def index *args
# delegate
self.class.names.index *args
end
def to_flat
to_s #to_i
end

#
# Use enumerates to set the class' names
#
# class MyEnum < Enum
# enumerates :one, :two, :three
# end
# MyEnum[1].to_s # => 'one'
#
#
def self.enumerates *names
write_inheritable_attribute :names, names.map(&:to_s)
end

def self.to_sql_str
"ENUM('#{names.join("', '")}')"
end

def self.typify
'chararray'
end
end


#
# Note that bin 0 is
#
class Binned < Enum
class_inheritable_reader :bins, :empty_bin_name
@@empty_bin_name = '(none)'

def bins
self.class.bins
end

# FIXME -- doesn't respect a lower bound.
def initialize val
return super(val) if val.nil?
last_top = bins.first
bins.each_with_index do |bin_top, idx|
return super(idx) if val <= bin_top
end
return super(bins.length)
end

def self.enumerates *bins
options = bins.extract_options!
write_inheritable_attribute :bins, bins
last_top = bins.shift
# bins.unshift bins.first if last_top == -Infinity
names = bins.map do |bin_top|
name = bin_name last_top, bin_top, options
last_top = (last_top.is_a?(Integer) ? bin_top + 1 : bin_top)
name
end
super(*names)
end

#
# Bins
#
def self.bin_name lo_val, hi_val, options = { }
# case lo_val
# when Integer then lo_val = [lo_val+1, hi_val].compact.min
# end
case
when lo_val == -Infinity then "< #{hi_val}"
when hi_val == Infinity then "#{lo_val}+"
when (lo_val == hi_val) then lo_val
else "#{lo_val} - #{hi_val}"
end
end

end
end
end

2 changes: 1 addition & 1 deletion wukong/extensions/emittable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def to_flat include_key=true
else
sort_key = self.class.resource_name
end
[sort_key, *to_a]
[sort_key, *to_a.map(&:to_flat)]
end
end

Expand Down

0 comments on commit d6a7dda

Please sign in to comment.