Skip to content
Browse files

Add files and archives to the DistributedCache via configuraiton.

This patch enables you to add files and archives to the Distributed Cache via commandline specification and configuration within a job. This is important when mappers and reducers need to refer to configuration or relatively small data sets as part of their execution.

The new file "wordcount7.clj" demonstrates both of these two methods, and provides unit testing. The commandline usage can be tested manually, and the current additional test in the file tests specifying the added test files to the cache for use in the mapper.

The commandline help has been updated, as well as the README.txt with regards to wordcount7.clj.

Conflicts:
	README.txt
	project.clj
	src/clojure_hadoop/config.clj
  • Loading branch information...
1 parent be9a7b8 commit d02f804bdafb7189c52c2719ecaeaf06f8dad87a @Roxxi Roxxi committed with Mar 11, 2013
View
8 README.txt
@@ -142,6 +142,14 @@ Layer 6: clojure-hadoop.defjob - Specifying JobConf parameters
how to set job configuration (JobConf) parameters either via
the commandline, or as part of the defjob defintion within the file.
+ Layer 7: clojure-hadoop.config - Adding files and archives
+to the DistributedCache.
+
+ Example file "wordcount7.clj" demonstrates how to specify files
+ and archives for distribution to across nodes via the
+ DistributedCache, as well as how to access the files
+ during the mapper-setup or reducer-setup phases.
+
NOTES
View
9 project.clj
@@ -7,12 +7,15 @@
:comments "Same license as Clojure"}
:dependencies [[org.clojure/clojure "1.3.0"]
[org.apache.hadoop/hadoop-core "1.0.3"]
+ ;; commons-io was omitted in 1.0.3/1.0.4
+ ;; needs to be explicitly included
+ ;; until this is resolved.
+ ;; see http://goo.gl/Trx7A
+ [commons-io "2.4"]
[log4j/log4j "1.2.16" :exclusions [javax.mail/mail
javax.jms/jms
com.sun.jdmk/jmxtools
- com.sun.jmx/jmxri]]
- ]
- :source-paths ["src" "test"]
+ com.sun.jmx/jmxri]]]
:dev-dependencies [[swank-clojure "1.4.2"]]
:aot [clojure-hadoop.config
clojure-hadoop.defjob
View
23 src/clojure_hadoop/config.clj
@@ -1,7 +1,8 @@
(ns clojure-hadoop.config
(:require [clojure-hadoop.imports :as imp]
[clojure-hadoop.load :as load]
- [clojure.string :as str]))
+ [clojure.string :as str])
+ (:import [java.net URI]))
;; This file defines configuration options for clojure-hadoop.
;;
@@ -23,6 +24,7 @@
(imp/import-fs)
(imp/import-mapreduce)
(imp/import-mapreduce-lib)
+(imp/import-filecache)
(def combine-cleanup
"The name of the property that stores the cleanup function name of
@@ -97,9 +99,12 @@
(defmethod conf :params [^Job job key params]
(set-parameters job (var-get (resolve (read-string params)))))
+(defn- sequentialify [thing-or-things]
+ (if (sequential? thing-or-things) thing-or-things (list thing-or-things)))
+
;; Modify the job or configuration
(defmethod conf :configure [^Job job key fnames]
- (doseq [fname (if (sequential? fnames) fnames (list fnames))]
+ (doseq [fname (sequentialify fnames)]
(if (fn? fname)
(fname job)
((load/load-name fname) job))))
@@ -323,6 +328,16 @@
(SequenceFileOutputFormat/setOutputCompressionType
job SequenceFile$CompressionType/RECORD))))
+(defmethod conf :add-cache-archive [^Job job key value]
+ (doseq [archive-path-string (sequentialify (load/load-or-value value))]
+ (DistributedCache/addCacheArchive (URI. archive-path-string)
+ (configuration job))))
+
+(defmethod conf :add-cache-file [^Job job key value]
+ (doseq [file-path-string (sequentialify (load/load-or-value value))]
+ (DistributedCache/addCacheFile (URI. file-path-string)
+ (configuration job))))
+
(defmethod conf :batch [^Job job key value]
(let [val (as-str value)]
@@ -382,4 +397,8 @@ Other available options are:
-X<key> <val> Can specify an arbitrary number of key-value pairs to
add to the Job's Configuration by repeating
-X<key1> <val1> -X<key2> <val2> ... -X<keyN> <valN>
+ -add-cache-archive A URI referencing an archive to add to the DistributedCache
+ Can be specified multiple times.
+ -add-cache-file A URI referencing a file to add to the DistributedCache
+ Can be specified multiple times.
"))
View
9 src/clojure_hadoop/imports.clj
@@ -72,6 +72,15 @@
Reducer StatusReporter TaskAttemptContext TaskAttemptID TaskID
TaskInputOutputContext)))
+(defn import-filecache
+ "Imports all classes/interfaces/exceptions from the package
+ org.apache.hadoop.filecache into the current namespace."
+ []
+ (import
+ '(org.apache.hadoop.filecache
+ DistributedCache TaskDistributedCacheManager
+ TrackerDistributedCacheManager)))
+
(defn import-mapreduce-lib-input
"Imports all classes/interfaces/exceptions from the package
org.apache.hadoop.mapreduce.lib.input into the current namespace."
View
8 src/clojure_hadoop/load.clj
@@ -10,3 +10,11 @@
(require (symbol ns-name)))
(assert (find-ns (symbol ns-name)))
(deref (resolve (symbol ns-name fn-name)))))
+
+(defn load-or-value
+ "If the value provided is something that can be loaded,
+ it will; otherwise returns the value passed in."
+ [^String s]
+ (if (resolve (symbol s))
+ (load-name s)
+ s))
View
7 test-resources/wc7-exclude-these-words-too.txt
@@ -0,0 +1,7 @@
+only
+me
+mail
+library
+the
+THE
+this
View
6 test-resources/wc7-exclude-these-words.txt
@@ -0,0 +1,6 @@
+and
+or
+not
+but
+any
+update
View
201 test/clojure_hadoop/examples/wordcount7.clj
@@ -0,0 +1,201 @@
+;; wordcount7 -- example using the distributed cache to cache
+;; files and archives
+;;
+;; This example wordcount program uses defjob like wordcount5, but it
+;; includes demonstrating configuring and using the DistributedCache
+;; to add a file of words to ignore in the map phase.
+;;
+;; In the defjob section below we'll configure two files that
+;; will have a few words each to create a predicate during the
+;; map setup phase, then use the predicate in the my-map
+;; to filter counting specific words.
+;;
+;; The commandline example below shows how we can specify files
+;; to load in the distributed cache via the commandline as well.
+;;
+;; After compiling (see README.txt), run the example like this
+;; (all on one line):
+;;
+;; java -cp examples.jar clojure_hadoop.job \
+;; -job clojure-hadoop.examples.wordcount7/job \
+;; -input README.txt -output out7 \
+;; -add-cache-file <ABSOLUTE-PATH>/test-resources/wc7-exclude-these-words.txt \
+;; -add-cache-file <ABSOLUTE-PATH>/test-resources/wc7-exclude-these-words-too.txt
+;;
+;; The output is plain text, written to out7/part-00000
+;;
+;; All file specificaitons for the DistributedCache must be strings
+;; that can function as java.net.URI's as that is what is required
+;; to add a file to the DistributedCache. If the files are in HDFS/S3
+;; the URI can be interpreted as a directory locaiton within HDFS/S3.
+;; for example:
+;;
+;; hadoop jar examples.jar clojure_hadoop.job \
+;; -job clojure-hadoop.examples.wordcount7/job \
+;; -input hdfs://<HDFS-ADDRESS>/hdfs/fs/path/to/README.txt -output out7 \
+;; -add-cache-file hdfs://<HDFS-ADDRESS>/hdfs/fs/path/to/wc7-exclude-these-words.txt \
+;; -add-cache-file hdfs://<HDFS-ADDRESS>/hdfs/fs/path/to/wc7-exclude-these-words-too.txt
+;;
+;; For S3 replace hdfs://<HDFS-ADDRESS> with s3://<S3-ADDRESS>
+;;
+;; If running hadoop local to where HDFS is located, you can omit
+;; hdfs://<HDFS-ADDRESS>
+;;
+;; In the later section below, we have an example
+;; of how to specify files to place in the distributed
+;; cache below.
+
+(ns clojure-hadoop.examples.wordcount7
+ (:require [clojure-hadoop.wrap :as wrap]
+ [clojure-hadoop.defjob :as defjob]
+ [clojure-hadoop.imports :as imp])
+ (:import (java.util StringTokenizer)
+ (java.net URI))
+ (:require [clojure.string :as string])
+ (:use clojure.test clojure-hadoop.job)
+ (:use [clojure.string :only [join]]))
+
+(imp/import-io)
+(imp/import-mapreduce)
+(imp/import-filecache)
+
+
+;; We're going to make a custom ignore-word? predicate
+;; for each mapper. Read the following function descriptions
+;; from `my-map-setup` upward.
+(def ignore-word?)
+
+(defn distributed-cache-files
+ "Given the job configuration retrieve the list of file paths
+ we've stored in the distributed cache. These will be different
+ paths on each node, thus each mapper needs to ask for the
+ location during the set up phase."
+ [job-conf]
+ (DistributedCache/getLocalCacheFiles job-conf))
+
+
+(defn file-path->lines
+ "Given a Path, return the lines of the file as a realized
+ sequence"
+ [path]
+ ;; we have to .toString the path to get the absolute path
+ (let [file-name (str path)]
+ (with-open [rdr (clojure.java.io/reader file-name)]
+ ;; force realization of the sequence so we can close the file
+ (doall (line-seq rdr)))))
+
+(defn make-ignore-word?
+ "Given the job configuration, read out the list of files that
+ we've supplied to the distributed cache, and create a predicate
+ that will return false for any strings that are
+ contained in our files."
+ [job-conf]
+
+ (let [local-file-paths (distributed-cache-files job-conf)
+ string-set (set (mapcat file-path->lines local-file-paths))]
+ ;; remember, sets in clojure can be used as predicates to test membership
+ (fn [string] (string-set string))))
+
+(defn my-map-setup
+ "When we set up the mapper, let's now bind our ignore-word? var
+ to a function that's read the files that contains the words to ignore
+ from our cache."
+ [^JobContext context]
+ (alter-var-root (var ignore-word?)
+ (fn [_]
+ (make-ignore-word? (.getConfiguration context)))))
+
+(defn my-map
+ "We'll emit a 1 for each word that we don't end up ignoring,
+ along with that word."
+ [key value]
+ (map (fn [token] [token 1])
+ (remove ignore-word? (enumeration-seq (StringTokenizer. value)))))
+
+(defn my-reduce
+ "Gather by each word and sum the values"
+ [key values-fn]
+ [[key (reduce + (values-fn))]])
+
+(defn string-long-writer [^TaskInputOutputContext context ^String key value]
+ (.write context (Text. key) (LongWritable. value)))
+
+(defn string-long-reduce-reader [^Text key wvalues]
+ [(.toString key)
+ (fn [] (map (fn [^LongWritable v] (.get v)) wvalues))])
+
+(defjob/defjob job
+ :map-setup my-map-setup
+ :map my-map
+ :map-reader wrap/int-string-map-reader
+ :map-writer string-long-writer
+ :reduce my-reduce
+ :reduce-reader string-long-reduce-reader
+ :reduce-writer string-long-writer
+ :output-key Text
+ :output-value LongWritable
+ :input-format :text
+ :output-format :text
+ :compress-output false
+ :input "README.txt"
+ :output "tmp/out7"
+ :replace true)
+
+(deftest test-wordcount-7
+ (is (run job)))
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;; Specifying files for the distibuted cache within your
+;;; Map-Reduce job.
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+
+;; NOTE: get-current-directory does not work
+;; when you're running a job like this on hadoop
+;; to reference files. This is purely for the
+;; testing, and as an example.
+(defn- get-current-directory []
+ (. (java.io.File. ".") getCanonicalPath))
+
+;; If you want to specify files within your code
+;; that reside in HDFS or S3, you need to specify
+;; the aboslute path to the files here,
+;; as a vector. Again, get-current-directory
+;; is purely for regression testing.
+;;
+;; In a "typical" case where we want to specify the path within
+;; the code, this would look something like:
+;;
+;; (def file-for-distributed-cache "/absolute/path/to/my/file")
+;;
+;; -or-
+;;
+;; (def files-for-distributed-cache ["/absolute/path/to/file1"
+;; "/absolute/path/to/file2"
+;; ...])
+(def files-for-distributed-cache
+ (let [cwd (get-current-directory)]
+ [(join "/" [cwd "test-resources/wc7-exclude-these-words.txt"])
+ (join "/" [cwd "test-resources/wc7-exclude-these-words-too.txt"])]))
+
+(defjob/defjob configured-job
+ :map-setup my-map-setup
+ :map my-map
+ :map-reader wrap/int-string-map-reader
+ :map-writer string-long-writer
+ :reduce my-reduce
+ :reduce-reader string-long-reduce-reader
+ :reduce-writer string-long-writer
+ :output-key Text
+ :output-value LongWritable
+ :input-format :text
+ :output-format :text
+ :compress-output false
+ :input "README.txt"
+ :output "tmp/out7"
+ :replace true
+ :add-cache-file files-for-distributed-cache)
+
+(deftest test-wordcount-7-via-config
+ "Testing adding files to the DistributedCache via
+in-file configuration"
+ (is (run configured-job)))

0 comments on commit d02f804

Please sign in to comment.
Something went wrong with that request. Please try again.