Skip to content

Commit

Permalink
Merge pull request #11 from Roxxi/arbitrary-command-line-args
Browse files Browse the repository at this point in the history
Specifying key-value pairs for the JobConf via the commandline
  • Loading branch information
alexott committed Mar 16, 2013
2 parents cce1174 + 1e601ba commit be9a7b8
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 1 deletion.
11 changes: 11 additions & 0 deletions README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ Layer 5: clojure-hadoop.defjob
See the example files "wordcount4.clj" and "wordcount5.clj" for
demonstrations of this macro.

Layer 6: clojure-hadoop.defjob - Specifying JobConf parameters

Often its necessary to specify parameters in the job's
configuration to in order to enable dynamic map/reduce jobs.
Hadoop natively enables this through the -D<key>=<value>
commandline specification.

Using the convenient defjob macro, "wordcount6.clj" demonstrates
how to set job configuration (JobConf) parameters either via
the commandline, or as part of the defjob defintion within the file.


NOTES

Expand Down
1 change: 1 addition & 0 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
com.sun.jdmk/jmxtools
com.sun.jmx/jmxri]]
]
:source-paths ["src" "test"]
:dev-dependencies [[swank-clojure "1.4.2"]]
:aot [clojure-hadoop.config
clojure-hadoop.defjob
Expand Down
22 changes: 21 additions & 1 deletion src/clojure_hadoop/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,21 @@
"Returns the configuration for the job."
[^Job job] (.getConfiguration job))

(defmulti conf (fn [job key value] key))
(defn- commandline-job-conf-param? [key]
(= (first (as-str key)) \X))

(defmulti conf
(fn [job key value]
(or (and (commandline-job-conf-param? key) :X)
key)))

;; allow users to specify parameters via the commandline
;; to set in the job's configuration
;; e.g. -Xmy.foo.value myfoovalue
;; would yield
;; (.set (configuration job) "my.foo.value" "myfoovalue")
(defmethod conf :X [^Job job key value]
(.set (configuration job) (subs (as-str key) 1) value))

(defmethod conf :job [^Job job key value]
(cond
Expand All @@ -77,6 +91,9 @@
(doseq [[param value] params]
(.set (configuration job) param value)))

;; If you specify your parameters as a map bound to a var
;; you can specify the name of the var with the :params
;; to load the key-value pairs in the configuration.
(defmethod conf :params [^Job job key params]
(set-parameters job (var-get (resolve (read-string params)))))

Expand Down Expand Up @@ -362,4 +379,7 @@ Other available options are:
-output-compressor Compression class or \"gzip\",\"bzip2\",\"default\"
-compression-type For seqfiles, compress \"block\",\"record\",\"none\"
-batch If \"false\" (default), run interactively, else 'submit'
-X<key> <val> Can specify an arbitrary number of key-value pairs to
add to the Job's Configuration by repeating
-X<key1> <val1> -X<key2> <val2> ... -X<keyN> <valN>
"))
117 changes: 117 additions & 0 deletions test/clojure_hadoop/examples/wordcount6.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
;; wordcount6 -- example customized defjob
;;
;; This example wordcount program uses defjob like wordcount5, but it
;; includes an example of specifying additional parameters at the
;; commandline to set values in the JobConf Configuration
;; via -X<configuration-key> <configuration value>.
;;
;; It also includes an example of how to read the values during
;; the map and reduce setup phases and shows how they can be referenced
;; during the map and reduce execution phases.
;;
;;
;;
;; After compiling (see README.txt), run the example like this
;; (all on one line):
;;
;; java -cp examples.jar clojure_hadoop.job \
;; -job clojure-hadoop.examples.wordcount6/job \
;; -input README.txt -output out6 \
;; -Xmy.custom.repeat.string hello \
;; -Xmy.custom.multiplier 11
;;
;;
;; The output is plain text, written to out5/part-00000
;;
;; During the map phase we'll not only output a 1 for each
;; string we read in our input file, but also output
;; an additional 1 for whatever string we supply as
;; our my.custom.repeat.string parameter.
;;
;; During the reduce phase, we'll mulitply the count
;; of each word by the multiplier specified by
;; my.custom.multiplier value
;;
;;
;; Note: Parameters expressed at the commandline
;; will overwrite values expressed in the defjob
;; mappings expressed below.

(ns clojure-hadoop.examples.wordcount6
(:require [clojure-hadoop.wrap :as wrap]
[clojure-hadoop.defjob :as defjob]
[clojure-hadoop.imports :as imp])
(:import (java.util StringTokenizer))
(:use clojure.test clojure-hadoop.job))

(imp/import-io)
(imp/import-mapreduce)


(def ^:dynamic *bonus-word*)

(defn my-map-setup [^JobContext context]
(alter-var-root
(var *bonus-word*)
(fn [_]
(.get (.getConfiguration context) "my.custom.repeat.string"))))

(defn my-map [key value]
(mapcat
(fn [token] [[token 1] [*bonus-word* 1]])
(enumeration-seq (StringTokenizer. value))))

(def ^:dynamic *multiplyer*)

(defn my-reduce-setup [^JobContext context]
(alter-var-root
(var *multiplyer*)
(fn [_]
(read-string (.get (.getConfiguration context) "my.custom.multiplier")))))


(defn my-reduce [key values-fn]
[[key (* *multiplyer* (reduce + (values-fn)))]])

(defn string-long-writer [^TaskInputOutputContext context ^String key value]
(.write context (Text. key) (LongWritable. value)))

(defn string-long-reduce-reader [^Text key wvalues]
[(.toString key)
(fn [] (map (fn [^LongWritable v] (.get v)) wvalues))])

;; Specifying the same parameters to use in a unit-test context
;; when we're not passing in any commandline args.
(def my-config-params
{"my.custom.repeat.string" "unit-test"
"my.custom.multiplier" "7"})

(defjob/defjob job
:map-setup my-map-setup
:map my-map
:map-reader wrap/int-string-map-reader
:map-writer string-long-writer
:reduce-setup my-reduce-setup
:reduce my-reduce
:reduce-reader string-long-reduce-reader
:reduce-writer string-long-writer
:output-key Text
:output-value LongWritable
:input-format :text
:output-format :text
:compress-output false
:input "README.txt"
:output "tmp/out6"
:replace true
;; If we want to set parameters in the job configuration
;; as part of the program, we can do so with the
;; params field, then reference them above;
;; the same way as happens in the commandline case.
;;
;; This is a great way to set defaults if commandline params
;; are omitted, because these params will be processed first.
;; then the commandline args will be layered over.
:params my-config-params)

(deftest test-wordcount-6
(is (run job)))

0 comments on commit be9a7b8

Please sign in to comment.