Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/mlimotte/lemur
Browse files Browse the repository at this point in the history
  • Loading branch information
mlimotte committed Feb 19, 2013
2 parents 9cf79fb + 47ee6f6 commit 5e2495b
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 38 deletions.
23 changes: 13 additions & 10 deletions project.clj
Expand Up @@ -8,12 +8,13 @@
["-Dlog4j.configuration=file:resources/log4j.properties"] ["-Dlog4j.configuration=file:resources/log4j.properties"]
[]) [])


:source-path "src/main/clj" :source-paths ["src/main/clj"]
:test-path "src/test/clj" :test-paths ["src/test/clj"]


:dependencies [[org.clojure/clojure "1.3.0"] :dependencies [[org.clojure/clojure "1.3.0"]
[org.clojure/tools.logging "0.2.3"] [org.clojure/tools.logging "0.2.3"]
[org.clojure/data.json "0.1.2"] [org.clojure/data.json "0.1.2"]
[bultitude "0.2.0"]


; aws-java-sdk-1.3.3 does not specify the correct httpclient, so we do it explicitly ; aws-java-sdk-1.3.3 does not specify the correct httpclient, so we do it explicitly
[org.apache.httpcomponents/httpclient "4.1.1"] [org.apache.httpcomponents/httpclient "4.1.1"]
Expand All @@ -33,22 +34,24 @@


:dev-dependencies [[robert/hooke "1.1.2"] ;for leiningen test-selectors :dev-dependencies [[robert/hooke "1.1.2"] ;for leiningen test-selectors
[org.clojure/tools.trace "0.7.1"] [org.clojure/tools.trace "0.7.1"]
[midje "1.3.1"]
[lein-midje "1.0.8"]
[com.offbytwo.iclojure/iclojure "1.1.0"] [com.offbytwo.iclojure/iclojure "1.1.0"]
[clojure-source "1.3.0"]] [clojure-source "1.3.0"]

[org.clojure/tools.trace "0.7.3"]]
:test-selectors {:default (fn [v] (not (or (:integration v) (:manual v))))
:integration :integration
:manual :manual
:all (fn [v] (not (:manual v)))}


:profiles {:dev {:plugins [[lein-midje "2.0.4"]]
:dependencies [[midje "1.4.0"]]}}
:repl-init lemur.repl :repl-init lemur.repl
:main ^:skip-aot lemur.repl :main ^:skip-aot lemur.repl
:min-lein-version "2.0.0"


:run-aliases {:lemur lemur.core} :run-aliases {:lemur lemur.core}


; Launch irepl: ; Launch irepl:
;java -cp lib/*:lib/dev/* com.offbytwo.iclojure.Main ;java -cp lib/*:lib/dev/* com.offbytwo.iclojure.Main


:test-selectors {:default (fn [v] (not (or (:integration v) (:manual v))))
:integration :integration
:manual :manual
:all (fn [v] (not (:manual v)))}

:aot [lemur.core]) :aot [lemur.core])
30 changes: 19 additions & 11 deletions src/main/clj/com/climate/services/aws/emr.clj
Expand Up @@ -9,6 +9,7 @@
(:import (:import
java.io.File java.io.File
com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient
com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest
com.amazonaws.services.elasticmapreduce.util.StepFactory com.amazonaws.services.elasticmapreduce.util.StepFactory
com.amazonaws.auth.BasicAWSCredentials com.amazonaws.auth.BasicAWSCredentials
[com.amazonaws.services.elasticmapreduce.model [com.amazonaws.services.elasticmapreduce.model
Expand Down Expand Up @@ -224,29 +225,36 @@
(.setActionOnFailure (str ActionOnFailure/TERMINATE_JOB_FLOW)) (.setActionOnFailure (str ActionOnFailure/TERMINATE_JOB_FLOW))
(.setHadoopJarStep (.newEnableDebuggingStep (StepFactory.))))) (.setHadoopJarStep (.newEnableDebuggingStep (StepFactory.)))))


(defn terminate-flow-id
([jobflow-id]
(terminate-flow-id jobflow-id *emr*))
([jobflow-id emr]
(.terminateJobFlows emr
(TerminateJobFlowsRequest. (java.util.ArrayList. [jobflow-id])))))

(defn step-config [name alive? jar-path main-class cli-args & {:keys [action-on-failure properties]}] (defn step-config [name alive? jar-path main-class cli-args & {:keys [action-on-failure properties]}]
"Create a step to be submitted to EMR. "Create a step to be submitted to EMR.
jar-path is the hadoop job jar, usually an s3:// path. jar-path is the hadoop job jar, usually an s3:// path.
cli-args is a collection of Strings that are passed as args to main-class (can be nil). cli-args is a collection of Strings that are passed as args to main-class (can be nil).
action-on-failure is a String or enum com.amazonaws.services.elasticmapreduce.model.ActionOnFailure. action-on-failure is a String or enum com.amazonaws.services.elasticmapreduce.model.ActionOnFailure.
properties is a map of Java properties that are set when the step runs." properties is a map of Java properties that are set when the step runs."
(let [sc (StepConfig. name (let [sc (StepConfig. name
(doto (doto
(HadoopJarStepConfig.) (HadoopJarStepConfig.)
(.setJar jar-path) (.setJar jar-path)
(.setMainClass main-class) (.setMainClass main-class)
(.setArgs (vec cli-args)) ;collection of strings (.setArgs (vec cli-args)) ;collection of strings
(.setProperties (kv-props properties)) (.setProperties (kv-props properties))))]
(.setActionOnFailure (str (or action-on-failure (.setActionOnFailure sc (str (or action-on-failure
(and alive? ActionOnFailure/CANCEL_AND_WAIT) (and alive? ActionOnFailure/CANCEL_AND_WAIT)
ActionOnFailure/TERMINATE_JOB_FLOW)))))] ActionOnFailure/TERMINATE_JOB_FLOW)))))
sc))


(defn add-steps (defn add-steps
"Add a step to a running jobflow. Steps is a seq of StepConfig objects. "Add a step to a running jobflow. Steps is a seq of StepConfig objects.
Use (step-config) to create StepConfig objects." Use (step-config) to create StepConfig objects."
[jobflow-id steps] [jobflow-id steps]
(.addJobFlowSteps *emr* (AddJobFlowStepsRequest. jobflow-id steps))) (let [steps-array (to-array steps)]
(.addJobFlowSteps *emr* (AddJobFlowStepsRequest. jobflow-id steps))))


(defn start-job-flow [name steps {:keys [log-uri bootstrap-actions ami-version supported-products] (defn start-job-flow [name steps {:keys [log-uri bootstrap-actions ami-version supported-products]
:or {bootstrap-actions [] supported-products []} :or {bootstrap-actions [] supported-products []}
Expand Down
56 changes: 39 additions & 17 deletions src/test/clj/com/climate/services/aws/emr_test.clj
Expand Up @@ -24,9 +24,13 @@
[ec2 :as ec2] [ec2 :as ec2]
[common :as awscommon]]) [common :as awscommon]])
(:import (:import
com.amazonaws.services.elasticmapreduce.util.StepFactory
[com.amazonaws.services.elasticmapreduce.model [com.amazonaws.services.elasticmapreduce.model
JobFlowDetail JobFlowDetail
JobFlowExecutionStatusDetail] JobFlowExecutionStatusDetail
HadoopJarStepConfig
ActionOnFailure
StepConfig]
java.util.Date)) java.util.Date))


;; Some tests are labelled as :manual, rather than :integration, because ;; Some tests are labelled as :manual, rather than :integration, because
Expand All @@ -37,6 +41,23 @@


(def aws-creds (awscommon/aws-credential-discovery)) (def aws-creds (awscommon/aws-credential-discovery))


(def ^:dynamic *flow-args*
{:bootstrap-actions
; Only publicly available script, so we don't have to upload the others.
[(bootstrap "Hadoop Config"
"s3://elasticmapreduce/bootstrap-actions/configure-hadoop"
["-m" "mapred.map.tasks.speculative.execution=false"])]
:log-uri (str "s3://" bucket)
:keypair (:keypair aws-creds) ; the elastic-mapreduce credentials.json file as a keypair entry
:ami-version "latest"
:num-instances 2
:master-type "m1.xlarge"
:slave-type "m1.xlarge"
:spot-task-type "m1.xlarge"
:spot-task-bid "1.00"
:spot-task-num 1
:keep-alive false})

(use-fixtures :once (use-fixtures :once
(fn [f] (fn [f]
(binding [s3/*s3* (s3/s3 aws-creds) (binding [s3/*s3* (s3/s3 aws-creds)
Expand Down Expand Up @@ -70,22 +91,7 @@
["-input" (format "s3://%s/data/simple.txt" bucket) ["-input" (format "s3://%s/data/simple.txt" bucket)
"-output" "/out" "-output" "/out"
"-mapper" (format "s3://%s/scripts/wc.sh" bucket)])] "-mapper" (format "s3://%s/scripts/wc.sh" bucket)])]
{:bootstrap-actions *flow-args*))))
; Only publicly available script, so we don't have to upload the others.
[(bootstrap "Hadoop Config"
"s3://elasticmapreduce/bootstrap-actions/configure-hadoop"
["-m" "mapred.map.tasks.speculative.execution=false"])]
:log-uri (str "s3://" bucket)
:keypair (:keypair aws-creds) ; the elastic-mapreduce credentials.json file as a keypair entry
:ami-version "latest"
:num-instances 2
:master-type "m1.xlarge"
:slave-type "m1.xlarge"
:spot-task-type "m1.xlarge"
:spot-task-bid "1.00"
:spot-task-num 1
;too dangerous to use keep-alive in tests, so tests are limited-- no (emr/add-step) for example
:keep-alive false}))))


; Specified as a fn rather than a test. This is a hack to force it to run before ; Specified as a fn rather than a test. This is a hack to force it to run before
; test-wait-on-step. It will fail if the cluster has already COMPLETED. ; test-wait-on-step. It will fail if the cluster has already COMPLETED.
Expand Down Expand Up @@ -159,3 +165,19 @@
(is= "m1.xlarge" spot-task-type) (is= "m1.xlarge" spot-task-type)
(is= 20 spot-task-num) (is= 20 spot-task-num)
(is= (format "%.3f" expected-bid) spot-task-bid))) (is= (format "%.3f" expected-bid) spot-task-bid)))

(defn make-dummy-step []
(doto (StepConfig.)
(.setName "Dummy")
(.setActionOnFailure (str ActionOnFailure/TERMINATE_JOB_FLOW))
(.setHadoopJarStep (.newEnableDebuggingStep (StepFactory.)))))

(deftest ^{:manual true} test-add-steps-to-existing-flow
(testing "emr/add-step"
(binding [*flow-args* (conj *flow-args* {:keep-alive true})]
(let [jf-id (setup)
dummy-steps (make-dummy-step)]
(is= 0 (.size (steps-for-jobflow jf-id)))
(add-steps jf-id [dummy-steps])
(is= 1 (.size (steps-for-jobflow jf-id)))
(terminate-flow-id jf-id)))))
1 change: 1 addition & 0 deletions src/test/clj/lemur/core_test.clj
Expand Up @@ -17,6 +17,7 @@
lemur.core lemur.core
[lemur.command-line :only [quit]] [lemur.command-line :only [quit]]
[lemur.evaluating-map :only [evaluating-map]] [lemur.evaluating-map :only [evaluating-map]]
[lemur.common :only [eoval]]
midje.sweet midje.sweet
clojure.test clojure.test
lemur.test) lemur.test)
Expand Down

0 comments on commit 5e2495b

Please sign in to comment.