Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
208 lines (168 sloc) 7.87 KB


Swineherd is for running scripts and workflows on filesystems.


A workflow is built with script objects and ran on a filesystem.


A script has the following

  • source – The source file used. These can be Apache Pig scripts, Wukong scripts, even R scripts. You can add your own scripts by subclassing the script class.
  • input – An array of input paths.
  • output – An array of output paths.
  • options – A ruby hash of options used as command line args. Eg. {:foo => ’bar’}. How these options are mapped to command line arguments is up to the particular script class.
  • attributes – A ruby hash of parameters used for variable substitution. Every script is assumed to be (but not required to be) an eruby template.


A workflow is built using rake task objects that doing nothing more than run scripts. A workflow

  • can be described with a directed dependency graph
  • has an id which is used to run its tasks idempotently. At the moment it is the responsibility of the running process (or human being) to choose a suitable id.
  • manages intermediate outputs by using the next_output and latest_output methods. See the examples dir for usage.
  • A workflow has a working directory in which all intermediate outputs go
    • These are named according to the rake task that created them


Workflows are intended to run on filesystems. At the moment, implemented filesystems are

  • file – Local file system. Only thoroughly tested on unbuntu linux.
  • hdfs – Hadoop distributed file system. Uses jruby and the Apache Hadoop 0.20 api.
  • s3 – Uses the right_aws gem for interacting with Amazon Simple Storage System (s3).

Using the filesystem:

Paths should be absolute.

# get a new instance of local filesystem and write to it
localfs = FileSystem.get(:file)"mylocalfile", 'w') do |f|
  f.write("Writing a string to a local file")

# get a new instance of hadoop filesystem and write to it
hadoopfs = FileSystem.get(:hdfs)"myhadoopfile", 'w') do |f|
  f.write("Writing a string to an hdfs file")

# get a new instance of s3 filesystem and write to it
access_key_id     = '1234abcd'
secret_access_key = 'foobar1234'
s3fs = FileSystem.get(:s3, accees_key_id, secret_access_key)
s3fs.mkpath 'mys3bucket' # bucket must exist"mys3bucket/mys3file", 'w') do |f|
  f.write("Writing a string to an s3 file")

Working Example

For the most up to date working example see the examples directory. Here’s a simple example for running pagerank:

#!/usr/bin/env ruby

$LOAD_PATH << '../../lib'
require 'swineherd'        ; include Swineherd
require 'swineherd/script' ; include Swineherd::Script
require 'swineherd/filesystem'

Settings.define :flow_id,     :required => true,                     :description => "Flow id required to make run of workflow unique"
Settings.define :iterations,  :type => Integer,  :default => 10,     :description => "Number of pagerank iterations to run"
Settings.define :hadoop_home, :default => '/usr/local/share/hadoop', :description => "Path to hadoop config"

flow = do

  # The filesystems we're going to be working with
  hdfs    = Swineherd::FileSystem.get(:hdfs)
  localfs = Swineherd::FileSystem.get(:file)

  # The scripts we're going to use
  initializer ='scripts/pagerank_initialize.pig')
  iterator    ='scripts/pagerank.pig')
  finisher    ='scripts/cut_off_list.rb')
  plotter     ='scripts/histogram.R')

  # Runs simple pig script to initialize pagerank. We must specify the input
  # here as this is the first step in the workflow. The output attribute is to
  # ensure idempotency and the options attribute is the hash that will be
  # converted into command-line args for the pig interpreter.
  task :pagerank_initialize do
    initializer.options = {:adjlist => "/tmp/pagerank_example/seinfeld_network.tsv", :initgrph => next_output(:pagerank_initialize)} unless hdfs.exists? latest_output(:pagerank_initialize)

  # Runs multiple iterations of pagerank with another pig script and manages all
  # the intermediate outputs.
  task :pagerank_iterate => [:pagerank_initialize] do
    iterator.options[:damp]           = '0.85f'
    iterator.options[:curr_iter_file] = latest_output(:pagerank_initialize)
    Settings.iterations.times do
      iterator.options[:next_iter_file] = next_output(:pagerank_iterate) unless hdfs.exists? latest_output(:pagerank_iterate)
      iterator.options[:curr_iter_file] = latest_output(:pagerank_iterate)

  # Here we use a wukong script to cut off the last field (a big pig bag of
  # links). Notice how every wukong script MUST have an input but pig scripts do
  # not.
  task :cut_off_adjacency_list => [:pagerank_iterate] do
    finisher.input  << latest_output(:pagerank_iterate)
    finisher.output << next_output(:cut_off_adjacency_list) :hadoop unless hdfs.exists? latest_output(:cut_off_adjacency_list)

  # We want to pull down one result file, merge the part-000.. files into one file
  task :merge_results => [:cut_off_adjacency_list] do
    merged_results = next_output(:merge_results)
    hdfs.merge(latest_output(:cut_off_adjacency_list), merged_results) unless hdfs.exists? merged_results

  # Cat results into a local directory with the same structure
  # eg. #{work_dir}/#{flow_id}/pull_down_results-0.
  # FIXME: Bridging filesystems is cludgey.
  task :pull_down_results => [:merge_results] do
    local_results = next_output(:pull_down_results)
    hdfs.copy_to_local(latest_output(:merge_results), local_results) unless localfs.exists? local_results

  # Plot 2nd column of the result as a histogram (requires R and
  # ggplot2). Note that the output here is a png file but doesn't have that
  # extension. Ensmarten me as to the right way to handle that?
  task :plot_results =>  [:pull_down_results] do
    plotter.attributes = {
      :pagerank_data => latest_output(:pull_down_results),
      :plot_file     => next_output(:plot_results), # <-- this will be a png...
      :raw_rank      => "aes(x=d$V2)"
    } unless localfs.exists? latest_output(:plot_results)


flow.workdir = "/tmp/pagerank_example"


There’s a fun little program to emphasize the ease of using the filesystem abstraction called ‘hdp-tree’:

$: bin/hdp-tree /tmp/my_hdfs_directory
  - my_hdfs_directory: 
      - sub_dir_a: leaf_file_1
      - sub_dir_a: leaf_file_2
      - sub_dir_a: leaf_file_3
  - my_hdfs_directory: 
      - sub_dir_b: leaf_file_1
      - sub_dir_b: leaf_file_2
      - sub_dir_b: leaf_file_3
  - my_hdfs_directory: 
      - sub_dir_c: leaf_file_1
      - sub_dir_c: leaf_file_2
      - sub_dir_c: leaf_file_3
      - sub_dir_c: 
          - sub_sub_dir_a: yet_another_leaf_file
      - sub_dir_c: sub_sub_dir_b
      - sub_dir_c: sub_sub_dir_c

I know, it’s not as pretty as unix tree, but this IS github…


  • next task in a workflow should NOT run if the previous step failed
    • this is made difficult by the fact that, sometimes?, when a pig script fails it still returns a 0 exit status
    • same for wukong scripts
  • add a job object that implements a not_if function. this way a workflow will be constructed of job objects
    • a job will do nothing more than execute the ruby code in it’s (run?) block, unless not_if is true
    • this way we can put script objects inside a job and only run under certain conditions that the user specifies when
      they create the job
  • implement ftp filesystem interfaces