Skip to content
Browse files

Merge branch 'adding-files-to-distributed-cache'

Conflicts:
	README.txt
	project.clj
	src/clojure_hadoop/config.clj
  • Loading branch information...
2 parents 1e601ba + a82f6d6 commit 9ba8fe8f896ceb7c95ae5748cb2ef6aef2e4b01d @Roxxi Roxxi committed Mar 12, 2013
View
7 README.txt
@@ -142,6 +142,13 @@ Layer 6: clojure-hadoop.defjob - Specifying JobConf parameters
how to set job configuration (JobConf) parameters either via
the commandline, or as part of the defjob defintion within the file.
+ Layer 7: clojure-hadoop.config - Adding files and archives
+to the DistributedCache.
+
+ Example file "wordcount7.clj" demonstrates how to specify files
+ and archives for distribution to across nodes via the
+ DistributedCache, as well as how to access the files
+ during the mapper-setup or reducer-setup phases.
NOTES
View
9 project.clj
@@ -7,12 +7,15 @@
:comments "Same license as Clojure"}
:dependencies [[org.clojure/clojure "1.3.0"]
[org.apache.hadoop/hadoop-core "1.0.3"]
+ ;; commons-io was omitted in 1.0.3/1.0.4
+ ;; needs to be explicitly included
+ ;; until this is resolved.
+ ;; see http://goo.gl/Trx7A
+ [commons-io "2.4"]
[log4j/log4j "1.2.16" :exclusions [javax.mail/mail
javax.jms/jms
com.sun.jdmk/jmxtools
- com.sun.jmx/jmxri]]
- ]
- :source-paths ["src" "test"]
+ com.sun.jmx/jmxri]]]
:dev-dependencies [[swank-clojure "1.4.2"]]
:aot [clojure-hadoop.config
clojure-hadoop.defjob
View
23 src/clojure_hadoop/config.clj
@@ -1,7 +1,8 @@
(ns clojure-hadoop.config
(:require [clojure-hadoop.imports :as imp]
[clojure-hadoop.load :as load]
- [clojure.string :as str]))
+ [clojure.string :as str])
+ (:import [java.net URI]))
;; This file defines configuration options for clojure-hadoop.
;;
@@ -23,6 +24,7 @@
(imp/import-fs)
(imp/import-mapreduce)
(imp/import-mapreduce-lib)
+(imp/import-filecache)
(def combine-cleanup
"The name of the property that stores the cleanup function name of
@@ -97,9 +99,12 @@
(defmethod conf :params [^Job job key params]
(set-parameters job (var-get (resolve (read-string params)))))
+(defn- sequentialify [thing-or-things]
+ (if (sequential? thing-or-things) thing-or-things (list thing-or-things)))
+
;; Modify the job or configuration
(defmethod conf :configure [^Job job key fnames]
- (doseq [fname (if (sequential? fnames) fnames (list fnames))]
+ (doseq [fname (sequentialify fnames)]
(if (fn? fname)
(fname job)
((load/load-name fname) job))))
@@ -323,6 +328,16 @@
(SequenceFileOutputFormat/setOutputCompressionType
job SequenceFile$CompressionType/RECORD))))
+(defmethod conf :add-cache-archive [^Job job key value]
+ (doseq [archive-path-string (sequentialify (load/load-or-value value))]
+ (DistributedCache/addCacheArchive (URI. archive-path-string)
+ (configuration job))))
+
+(defmethod conf :add-cache-file [^Job job key value]
+ (doseq [file-path-string (sequentialify (load/load-or-value value))]
+ (DistributedCache/addCacheFile (URI. file-path-string)
+ (configuration job))))
+
(defmethod conf :batch [^Job job key value]
(let [val (as-str value)]
@@ -382,4 +397,8 @@ Other available options are:
-X<key> <val> Can specify an arbitrary number of key-value pairs to
add to the Job's Configuration by repeating
-X<key1> <val1> -X<key2> <val2> ... -X<keyN> <valN>
+ -add-cache-archive A URI referencing an archive to add to the DistributedCache
+ Can be specified multiple times.
+ -add-cache-file A URI referencing a file to add to the DistributedCache
+ Can be specified multiple times.
"))
View
9 src/clojure_hadoop/imports.clj
@@ -72,6 +72,15 @@
Reducer StatusReporter TaskAttemptContext TaskAttemptID TaskID
TaskInputOutputContext)))
+(defn import-filecache
+ "Imports all classes/interfaces/exceptions from the package
+ org.apache.hadoop.filecache into the current namespace."
+ []
+ (import
+ '(org.apache.hadoop.filecache
+ DistributedCache TaskDistributedCacheManager
+ TrackerDistributedCacheManager)))
+
(defn import-mapreduce-lib-input
"Imports all classes/interfaces/exceptions from the package
org.apache.hadoop.mapreduce.lib.input into the current namespace."
View
8 src/clojure_hadoop/load.clj
@@ -10,3 +10,11 @@
(require (symbol ns-name)))
(assert (find-ns (symbol ns-name)))
(deref (resolve (symbol ns-name fn-name)))))
+
+(defn load-or-value
+ "If the value provided is something that can be loaded,
+ it will; otherwise returns the value passed in."
+ [^String s]
+ (if (resolve (symbol s))
+ (load-name s)
+ s))
View
7 test-resources/wc7-exclude-these-words-too.txt
@@ -0,0 +1,7 @@
+only
+me
+mail
+library
+the
+THE
+this
View
6 test-resources/wc7-exclude-these-words.txt
@@ -0,0 +1,6 @@
+and
+or
+not
+but
+any
+update
View
201 test/clojure_hadoop/examples/wordcount7.clj
@@ -0,0 +1,201 @@
+;; wordcount7 -- example using the distributed cache to cache
+;; files and archives
+;;
+;; This example wordcount program uses defjob like wordcount5, but it
+;; includes demonstrating configuring and using the DistributedCache
+;; to add a file of words to ignore in the map phase.
+;;
+;; In the defjob section below we'll configure two files that
+;; will have a few words each to create a predicate during the
+;; map setup phase, then use the predicate in the my-map
+;; to filter counting specific words.
+;;
+;; The commandline example below shows how we can specify files
+;; to load in the distributed cache via the commandline as well.
+;;
+;; After compiling (see README.txt), run the example like this
+;; (all on one line):
+;;
+;; java -cp examples.jar clojure_hadoop.job \
+;; -job clojure-hadoop.examples.wordcount7/job \
+;; -input README.txt -output out7 \
+;; -add-cache-file <ABSOLUTE-PATH>/test-resources/wc7-exclude-these-words.txt \
+;; -add-cache-file <ABSOLUTE-PATH>/test-resources/wc7-exclude-these-words-too.txt
+;;
+;; The output is plain text, written to out7/part-00000
+;;
+;; All file specificaitons for the DistributedCache must be strings
+;; that can function as java.net.URI's as that is what is required
+;; to add a file to the DistributedCache. If the files are in HDFS/S3
+;; the URI can be interpreted as a directory locaiton within HDFS/S3.
+;; for example:
+;;
+;; hadoop jar examples.jar clojure_hadoop.job \
+;; -job clojure-hadoop.examples.wordcount7/job \
+;; -input hdfs://<HDFS-ADDRESS>/hdfs/fs/path/to/README.txt -output out7 \
+;; -add-cache-file hdfs://<HDFS-ADDRESS>/hdfs/fs/path/to/wc7-exclude-these-words.txt \
+;; -add-cache-file hdfs://<HDFS-ADDRESS>/hdfs/fs/path/to/wc7-exclude-these-words-too.txt
+;;
+;; For S3 replace hdfs://<HDFS-ADDRESS> with s3://<S3-ADDRESS>
+;;
+;; If running hadoop local to where HDFS is located, you can omit
+;; hdfs://<HDFS-ADDRESS>
+;;
+;; In the later section below, we have an example
+;; of how to specify files to place in the distributed
+;; cache below.
+
+(ns clojure-hadoop.examples.wordcount7
+ (:require [clojure-hadoop.wrap :as wrap]
+ [clojure-hadoop.defjob :as defjob]
+ [clojure-hadoop.imports :as imp])
+ (:import (java.util StringTokenizer)
+ (java.net URI))
+ (:require [clojure.string :as string])
+ (:use clojure.test clojure-hadoop.job)
+ (:use [clojure.string :only [join]]))
+
+(imp/import-io)
+(imp/import-mapreduce)
+(imp/import-filecache)
+
+
+;; We're going to make a custom ignore-word? predicate
+;; for each mapper. Read the following function descriptions
+;; from `my-map-setup` upward.
+(def ignore-word?)
+
+(defn distributed-cache-files
+ "Given the job configuration retrieve the list of file paths
+ we've stored in the distributed cache. These will be different
+ paths on each node, thus each mapper needs to ask for the
+ location during the set up phase."
+ [job-conf]
+ (DistributedCache/getLocalCacheFiles job-conf))
+
+
+(defn file-path->lines
+ "Given a Path, return the lines of the file as a realized
+ sequence"
+ [path]
+ ;; we have to .toString the path to get the absolute path
+ (let [file-name (str path)]
+ (with-open [rdr (clojure.java.io/reader file-name)]
+ ;; force realization of the sequence so we can close the file
+ (doall (line-seq rdr)))))
+
+(defn make-ignore-word?
+ "Given the job configuration, read out the list of files that
+ we've supplied to the distributed cache, and create a predicate
+ that will return false for any strings that are
+ contained in our files."
+ [job-conf]
+
+ (let [local-file-paths (distributed-cache-files job-conf)
+ string-set (set (mapcat file-path->lines local-file-paths))]
+ ;; remember, sets in clojure can be used as predicates to test membership
+ (fn [string] (string-set string))))
+
+(defn my-map-setup
+ "When we set up the mapper, let's now bind our ignore-word? var
+ to a function that's read the files that contains the words to ignore
+ from our cache."
+ [^JobContext context]
+ (alter-var-root (var ignore-word?)
+ (fn [_]
+ (make-ignore-word? (.getConfiguration context)))))
+
+(defn my-map
+ "We'll emit a 1 for each word that we don't end up ignoring,
+ along with that word."
+ [key value]
+ (map (fn [token] [token 1])
+ (remove ignore-word? (enumeration-seq (StringTokenizer. value)))))
+
+(defn my-reduce
+ "Gather by each word and sum the values"
+ [key values-fn]
+ [[key (reduce + (values-fn))]])
+
+(defn string-long-writer [^TaskInputOutputContext context ^String key value]
+ (.write context (Text. key) (LongWritable. value)))
+
+(defn string-long-reduce-reader [^Text key wvalues]
+ [(.toString key)
+ (fn [] (map (fn [^LongWritable v] (.get v)) wvalues))])
+
+(defjob/defjob job
+ :map-setup my-map-setup
+ :map my-map
+ :map-reader wrap/int-string-map-reader
+ :map-writer string-long-writer
+ :reduce my-reduce
+ :reduce-reader string-long-reduce-reader
+ :reduce-writer string-long-writer
+ :output-key Text
+ :output-value LongWritable
+ :input-format :text
+ :output-format :text
+ :compress-output false
+ :input "README.txt"
+ :output "tmp/out7"
+ :replace true)
+
+(deftest test-wordcount-7
+ (is (run job)))
+
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+;;; Specifying files for the distibuted cache within your
+;;; Map-Reduce job.
+;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
+
+;; NOTE: get-current-directory does not work
+;; when you're running a job like this on hadoop
+;; to reference files. This is purely for the
+;; testing, and as an example.
+(defn- get-current-directory []
+ (. (java.io.File. ".") getCanonicalPath))
+
+;; If you want to specify files within your code
+;; that reside in HDFS or S3, you need to specify
+;; the aboslute path to the files here,
+;; as a vector. Again, get-current-directory
+;; is purely for regression testing.
+;;
+;; In a "typical" case where we want to specify the path within
+;; the code, this would look something like:
+;;
+;; (def file-for-distributed-cache "/absolute/path/to/my/file")
+;;
+;; -or-
+;;
+;; (def files-for-distributed-cache ["/absolute/path/to/file1"
+;; "/absolute/path/to/file2"
+;; ...])
+(def files-for-distributed-cache
+ (let [cwd (get-current-directory)]
+ [(join "/" [cwd "test-resources/wc7-exclude-these-words.txt"])
+ (join "/" [cwd "test-resources/wc7-exclude-these-words-too.txt"])]))
+
+(defjob/defjob configured-job
+ :map-setup my-map-setup
+ :map my-map
+ :map-reader wrap/int-string-map-reader
+ :map-writer string-long-writer
+ :reduce my-reduce
+ :reduce-reader string-long-reduce-reader
+ :reduce-writer string-long-writer
+ :output-key Text
+ :output-value LongWritable
+ :input-format :text
+ :output-format :text
+ :compress-output false
+ :input "README.txt"
+ :output "tmp/out7"
+ :replace true
+ :add-cache-file files-for-distributed-cache)
+
+(deftest test-wordcount-7-via-config
+ "Testing adding files to the DistributedCache via
+in-file configuration"
+ (is (run configured-job)))

0 comments on commit 9ba8fe8

Please sign in to comment.
Something went wrong with that request. Please try again.