Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #17 from mlimotte/master

Submit job to running cluster, migrate to lein2
  • Loading branch information...
commit 026999151c9e1728472d4f40ec29752d6033d42c 2 parents a76c7a0 + a8ce265
Marc Limotte mlimotte authored
6 CHANGES.txt
View
@@ -1,5 +1,11 @@
CHANGES
+1.3.0
+
+- Allow explicit action-on-failure setting for defstep
+- New submit command to add a step to a running jobflow
+- Move to lein2
+
1.2.1
- set the VisibleToAllUsers flag on all jobflows
2  bin/lemur
View
@@ -41,7 +41,7 @@ fi
if [ -f $BASE/lemur-*.jar ]; then
LEMUR_CP=`echo $BASE/lemur-*.jar`
else
- LEMUR_CP=$BASE/src/main/clj:$BASE/classes:$BASE/resources
+ LEMUR_CP=$BASE/src/main/clj:$BASE/target/classes:$BASE/resources
fi
DEPS_CP=$BASE/lib/*
17 build.xml
View
@@ -3,7 +3,7 @@
<macrodef name="lein">
<element name="lein-params" implicit="true"/>
<sequential>
- <exec executable="lein" failonerror="yes"
+ <exec executable="lein2" failonerror="yes"
dir="${basedir}">
<env key="JVM_OPTS" value="-Droot.logger=error,file,stdout"/>
<lein-params/>
@@ -11,15 +11,19 @@
</sequential>
</macrodef>
+ <target name="libdir">
+ <lein><arg value="libdir"/></lein>
+ </target>
+
<target name="jar">
<lein><arg value="jar"/></lein>
</target>
- <target name="package" depends="jar">
+ <target name="package" depends="libdir,jar">
<exec logError="true" executable="bash" outputproperty="jar-name">
<arg value="-c"/>
- <arg value="ls lemur-*.jar"/>
+ <arg value="ls target/lemur-*.jar"/>
</exec>
<fail message="Could not find lemur jar">
@@ -29,7 +33,7 @@
<exec logError="true" executable="sed" failonerror="true" inputstring="${jar-name}" outputproperty="jar-version">
<arg value="-e"/>
- <arg value="s/lemur-\(.*\).jar/\1/"/>
+ <arg value="s/.*lemur-\(.*\).jar/\1/"/>
</exec>
<echo message="jar ${jar-version}"/>
@@ -39,11 +43,14 @@
prefix="/lemur-${jar-version}">
<include name="README.md"/>
<include name="project.clj"/>
- <include name="lemur-*.jar"/>
<include name="lib/*.jar"/>
<include name="src/"/>
<include name="examples/"/>
</tarfileset>
+ <tarfileset dir="${basedir}/target"
+ prefix="/lemur-${jar-version}">
+ <include name="lemur-*.jar"/>
+ </tarfileset>
<tarfileset dir="${basedir}"
filemode="755"
prefix="/lemur-${jar-version}">
33 project.clj
View
@@ -1,4 +1,4 @@
-(defproject lemur "1.2.1"
+(defproject lemur "1.3.0"
:description "Lemur is a tool to launch hadoop jobs locally or on EMR
based on a configuration file, referred to as a jobdef."
@@ -8,12 +8,13 @@
["-Dlog4j.configuration=file:resources/log4j.properties"]
[])
- :source-path "src/main/clj"
- :test-path "src/test/clj"
+ :source-paths ["src/main/clj"]
+ :test-paths ["src/test/clj"]
:dependencies [[org.clojure/clojure "1.3.0"]
[org.clojure/tools.logging "0.2.3"]
[org.clojure/data.json "0.1.2"]
+ [bultitude "0.2.0"]
; aws-java-sdk-1.3.3 does not specify the correct httpclient, so we do it explicitly
[org.apache.httpcomponents/httpclient "4.1.1"]
@@ -31,24 +32,24 @@
; Other
[log4j/log4j "1.2.16"]]
- :dev-dependencies [[robert/hooke "1.1.2"] ;for leiningen test-selectors
- [org.clojure/tools.trace "0.7.1"]
- [midje "1.3.1"]
- [lein-midje "1.0.8"]
- [com.offbytwo.iclojure/iclojure "1.1.0"]
- [clojure-source "1.3.0"]]
+ :plugins [[lein-libdir "0.1.0"]]
+ :libdir-path "lib"
- :test-selectors {:default (fn [v] (not (or (:integration v) (:manual v))))
- :integration :integration
- :manual :manual
- :all (fn [v] (not (:manual v)))}
+ :profiles {:dev {:plugins [[lein-midje "2.0.4"]]
+ :dependencies [[midje "1.4.0"]
+ [org.clojure/tools.trace "0.7.3"]
+ [clojure-source "1.3.0"]]}}
:repl-init lemur.repl
:main ^:skip-aot lemur.repl
+ :min-lein-version "2.0.0"
:run-aliases {:lemur lemur.core}
- ; Launch irepl:
- ;java -cp lib/*:lib/dev/* com.offbytwo.iclojure.Main
+ :test-selectors {:default (fn [v] (not (or (:integration v) (:manual v))))
+ :integration :integration
+ :manual :manual
+ :all (fn [v] (not (:manual v)))}
- :aot [lemur.core])
+ :aot [lemur.core]
+ )
1  src/main/clj/com/climate/services/aws/common.clj
View
@@ -5,7 +5,6 @@
[com.climate.shell :as sh]
[clojure.data.json :as json]
[clojure.tools.logging :as log]
- [clojure.tools.trace :as trace]
[clojure.java.io :as io])
(:import
java.io.IOException
34 src/main/clj/com/climate/services/aws/emr.clj
View
@@ -9,6 +9,7 @@
(:import
java.io.File
com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient
+ com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest
com.amazonaws.services.elasticmapreduce.util.StepFactory
com.amazonaws.auth.BasicAWSCredentials
[com.amazonaws.services.elasticmapreduce.model
@@ -224,28 +225,37 @@
(.setActionOnFailure (str ActionOnFailure/TERMINATE_JOB_FLOW))
(.setHadoopJarStep (.newEnableDebuggingStep (StepFactory.)))))
-(defn step-config [name alive? jar-path main-class cli-args & [properties]]
+(defn terminate-flow-id
+ ([jobflow-id]
+ (terminate-flow-id jobflow-id *emr*))
+ ([jobflow-id emr]
+ (.terminateJobFlows emr
+ (TerminateJobFlowsRequest. (java.util.ArrayList. [jobflow-id])))))
+
+(defn step-config [name alive? jar-path main-class cli-args & {:keys [action-on-failure properties]}]
"Create a step to be submitted to EMR.
jar-path is the hadoop job jar, usually an s3:// path.
cli-args is a collection of Strings that are passed as args to main-class (can be nil).
+ action-on-failure is a String or enum com.amazonaws.services.elasticmapreduce.model.ActionOnFailure.
properties is a map of Java properties that are set when the step runs."
(let [sc (StepConfig. name
- (doto
- (HadoopJarStepConfig.)
- (.setJar jar-path)
- (.setMainClass main-class)
- (.setArgs (vec cli-args)) ;collection of strings
- (.setProperties (kv-props properties))))]
- (if alive?
- (doto sc
- (.setActionOnFailure (str ActionOnFailure/CANCEL_AND_WAIT)))
- sc)))
+ (doto
+ (HadoopJarStepConfig.)
+ (.setJar jar-path)
+ (.setMainClass main-class)
+ (.setArgs (vec cli-args)) ;collection of strings
+ (.setProperties (kv-props properties))))]
+ (.setActionOnFailure sc (str (or action-on-failure
+ (and alive? ActionOnFailure/CANCEL_AND_WAIT)
+ ActionOnFailure/TERMINATE_JOB_FLOW)))
+ sc))
(defn add-steps
"Add a step to a running jobflow. Steps is a seq of StepConfig objects.
Use (step-config) to create StepConfig objects."
[jobflow-id steps]
- (AddJobFlowStepsRequest. jobflow-id steps))
+ (let [steps-array (to-array steps)]
+ (.addJobFlowSteps *emr* (AddJobFlowStepsRequest. jobflow-id steps))))
(defn start-job-flow [name steps {:keys [log-uri bootstrap-actions ami-version supported-products visible-to-all-users]
:or {bootstrap-actions [] supported-products [] visible-to-all-users false}
1  src/main/clj/lemur/command_line.clj
View
@@ -243,6 +243,7 @@
[:master-instance-type "Instance type for master" "m1.large"]
[:slave-instance-type "Instance type for slaves" "m1.large"]
[:availability-zone "Amazon availabilty zone" nil]
+ [:jobflow "A jobflow-id to use with the submit command" nil]
[:num-instances "Number of instances (including the master)" "1"]
[:ami-version
"Which AMI to use (see RunJobFlowRequest#setAmiVersion in the AWS Java SDK)"]
2  src/main/clj/lemur/common.clj
View
@@ -33,6 +33,8 @@
java.io.IOException
java.io.File))
+(util/defalias eoval lemur.evaluating-map/eoval)
+
;;; Validators
;TODO make a more extensible model for adding new checks
41 src/main/clj/lemur/core.clj
View
@@ -101,14 +101,13 @@ calls launch - take action (upload files, start cluster, etc)
`(defn ~sym [] (= (context-get :command) ~name))))
(defcommand local?)
+(defcommand submit?)
(defcommand run?)
(defcommand dry-run?)
(defcommand start?)
(defcommand help?)
(defcommand formatted-help?)
-(util/defalias eoval lemur.evaluating-map/eoval)
-
(defn profile?
"Test if the profile x is in use."
[x]
@@ -365,7 +364,9 @@ calls launch - take action (upload files, start cluster, etc)
(:keep-alive? estep)
path-to-jar
(:main-class estep)
- args))))
+ args
+ :action-on-failure (:action-on-failure estep)
+ :properties (:properties estep)))))
coll))
(defn- save-metajob
@@ -466,6 +467,22 @@ calls launch - take action (upload files, start cluster, etc)
(context-set :jobflow-id jobflow-id)
(context-set :request-ts request-ts)))
+(defmethod launch :submit
+ [command eopts cluster steps jobflow-options uploads metajob]
+ ; upload files
+ (util/upload (:show-progress? eopts) (mapcat val (:jobdef-step uploads)) (:dest-working-dir uploads))
+ (util/upload (:show-progress? eopts) (:jars uploads))
+ ; Write details to metajob file
+ (save-metajob cluster steps metajob)
+ ; launch
+ (let [request-ts (System/currentTimeMillis)
+ steps (mk-steps cluster (filter (complement enable-debugging-step?) steps))
+ jobflow-id (:jobflow eopts)]
+ (emr/add-steps jobflow-id steps)
+ (println "JobFlow id:" jobflow-id)
+ (context-set :jobflow-id jobflow-id)
+ (context-set :request-ts request-ts)))
+
(defmethod launch :dry-run
[command eopts cluster steps jobflow-options uploads metajob]
(println "dry-run, not launching."))
@@ -557,7 +574,7 @@ calls launch - take action (upload files, start cluster, etc)
:spot-task-type spot-task-type
:spot-task-bid spot-task-bid
:spot-task-num spot-task-num
- :keep-alive (or (:keep-alive? evaluating-opts) (start?))
+ :keep-alive (or (:keep-alive? evaluating-opts) (start?) (submit?))
:keypair (:keypair evaluating-opts)
:ami-version (:ami-version evaluating-opts)
:hadoop-version (:hadoop-version evaluating-opts)
@@ -579,12 +596,13 @@ calls launch - take action (upload files, start cluster, etc)
(:runtime-jar evaluating-opts)
[])
:bootstrap-actions
- (if-let [src (:scripts-src-path evaluating-opts)]
- [[(s3/slash$ src)
- (s3/s3path (:bucket evaluating-opts) (:std-scripts-prefix evaluating-opts))]]
- [])
+ (let [src (:scripts-src-path evaluating-opts)]
+ (if (and src (not (submit?)))
+ [[(s3/slash$ src)
+ (s3/s3path (:bucket evaluating-opts) (:std-scripts-prefix evaluating-opts))]]
+ []))
:jobdef-cluster
- (parse-upload-property evaluating-opts)
+ (if (submit?) [] (parse-upload-property evaluating-opts))
:jobdef-step
(->> steps
(filter map?)
@@ -614,7 +632,7 @@ calls launch - take action (upload files, start cluster, etc)
(concat [:command :app :comment :username :run-id :jar-src-path :runtime-jar :base-uri]
display-in-metajob))
(doto (DumperOptions.) (.setDefaultFlowStyle DumperOptions$FlowStyle/BLOCK)))
- (if-not (local?)
+ (if-not (or (local?) (submit?))
(vector
(util/mk-yaml "Jobflow Options"
(merge (select-keys evaluating-opts [:emr-name]) (dissoc jobflow-options :bootstrap-actions))
@@ -925,7 +943,7 @@ calls launch - take action (upload files, start cluster, etc)
emr/*emr* (emr/emr aws-creds)
ec2/*ec2* (ec2/ec2 aws-creds)]
(case command
- ("run" "start" "dry-run" "local")
+ ("run" "start" "dry-run" "local" "submit")
(execute-jobdef jobdef-path)
"display-types"
(display-types)
@@ -941,4 +959,3 @@ calls launch - take action (upload files, start cluster, etc)
(quit :msg (slurp (io/resource "help.txt"))))
(quit :msg "Unrecognized lemur command" :cmdspec (context-get :command-spec) :exit-code 1))))
(quit))
-
56 src/test/clj/com/climate/services/aws/emr_test.clj
View
@@ -24,9 +24,13 @@
[ec2 :as ec2]
[common :as awscommon]])
(:import
+ com.amazonaws.services.elasticmapreduce.util.StepFactory
[com.amazonaws.services.elasticmapreduce.model
JobFlowDetail
- JobFlowExecutionStatusDetail]
+ JobFlowExecutionStatusDetail
+ HadoopJarStepConfig
+ ActionOnFailure
+ StepConfig]
java.util.Date))
;; Some tests are labelled as :manual, rather than :integration, because
@@ -37,6 +41,23 @@
(def aws-creds (awscommon/aws-credential-discovery))
+(def ^:dynamic *flow-args*
+ {:bootstrap-actions
+ ; Only publicly available script, so we don't have to upload the others.
+ [(bootstrap "Hadoop Config"
+ "s3://elasticmapreduce/bootstrap-actions/configure-hadoop"
+ ["-m" "mapred.map.tasks.speculative.execution=false"])]
+ :log-uri (str "s3://" bucket)
+ :keypair (:keypair aws-creds) ; the elastic-mapreduce credentials.json file as a keypair entry
+ :ami-version "latest"
+ :num-instances 2
+ :master-type "m1.xlarge"
+ :slave-type "m1.xlarge"
+ :spot-task-type "m1.xlarge"
+ :spot-task-bid "1.00"
+ :spot-task-num 1
+ :keep-alive false})
+
(use-fixtures :once
(fn [f]
(binding [s3/*s3* (s3/s3 aws-creds)
@@ -70,22 +91,7 @@
["-input" (format "s3://%s/data/simple.txt" bucket)
"-output" "/out"
"-mapper" (format "s3://%s/scripts/wc.sh" bucket)])]
- {:bootstrap-actions
- ; Only publicly available script, so we don't have to upload the others.
- [(bootstrap "Hadoop Config"
- "s3://elasticmapreduce/bootstrap-actions/configure-hadoop"
- ["-m" "mapred.map.tasks.speculative.execution=false"])]
- :log-uri (str "s3://" bucket)
- :keypair (:keypair aws-creds) ; the elastic-mapreduce credentials.json file as a keypair entry
- :ami-version "latest"
- :num-instances 2
- :master-type "m1.xlarge"
- :slave-type "m1.xlarge"
- :spot-task-type "m1.xlarge"
- :spot-task-bid "1.00"
- :spot-task-num 1
- ;too dangerous to use keep-alive in tests, so tests are limited-- no (emr/add-step) for example
- :keep-alive false}))))
+ *flow-args*))))
; Specified as a fn rather than a test. This is a hack to force it to run before
; test-wait-on-step. It will fail if the cluster has already COMPLETED.
@@ -159,3 +165,19 @@
(is= "m1.xlarge" spot-task-type)
(is= 20 spot-task-num)
(is= (format "%.3f" expected-bid) spot-task-bid)))
+
+(defn make-dummy-step []
+ (doto (StepConfig.)
+ (.setName "Dummy")
+ (.setActionOnFailure (str ActionOnFailure/TERMINATE_JOB_FLOW))
+ (.setHadoopJarStep (.newEnableDebuggingStep (StepFactory.)))))
+
+(deftest ^{:manual true} test-add-steps-to-existing-flow
+ (testing "emr/add-step"
+ (binding [*flow-args* (conj *flow-args* {:keep-alive true})]
+ (let [jf-id (setup)
+ dummy-steps (make-dummy-step)]
+ (is= 0 (.size (steps-for-jobflow jf-id)))
+ (add-steps jf-id [dummy-steps])
+ (is= 1 (.size (steps-for-jobflow jf-id)))
+ (terminate-flow-id jf-id)))))
1  src/test/clj/lemur/core_test.clj
View
@@ -17,6 +17,7 @@
lemur.core
[lemur.command-line :only [quit]]
[lemur.evaluating-map :only [evaluating-map]]
+ [lemur.common :only [eoval]]
midje.sweet
clojure.test
lemur.test)
Please sign in to comment.
Something went wrong with that request. Please try again.