Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Submit job to running cluster, migrate to lein2 #17

Merged
merged 12 commits into from
Feb 19, 2013
6 changes: 6 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
CHANGES

1.3.0

- Allow explicit action-on-failure setting for defstep
- New submit command to add a step to a running jobflow
- Move to lein2

1.2.1

- set the VisibleToAllUsers flag on all jobflows
Expand Down
2 changes: 1 addition & 1 deletion bin/lemur
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fi
if [ -f $BASE/lemur-*.jar ]; then
LEMUR_CP=`echo $BASE/lemur-*.jar`
else
LEMUR_CP=$BASE/src/main/clj:$BASE/classes:$BASE/resources
LEMUR_CP=$BASE/src/main/clj:$BASE/target/classes:$BASE/resources
fi

DEPS_CP=$BASE/lib/*
Expand Down
17 changes: 12 additions & 5 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,27 @@
<macrodef name="lein">
<element name="lein-params" implicit="true"/>
<sequential>
<exec executable="lein" failonerror="yes"
<exec executable="lein2" failonerror="yes"
dir="${basedir}">
<env key="JVM_OPTS" value="-Droot.logger=error,file,stdout"/>
<lein-params/>
</exec>
</sequential>
</macrodef>

<target name="libdir">
<lein><arg value="libdir"/></lein>
</target>

<target name="jar">
<lein><arg value="jar"/></lein>
</target>

<target name="package" depends="jar">
<target name="package" depends="libdir,jar">

<exec logError="true" executable="bash" outputproperty="jar-name">
<arg value="-c"/>
<arg value="ls lemur-*.jar"/>
<arg value="ls target/lemur-*.jar"/>
</exec>

<fail message="Could not find lemur jar">
Expand All @@ -29,7 +33,7 @@

<exec logError="true" executable="sed" failonerror="true" inputstring="${jar-name}" outputproperty="jar-version">
<arg value="-e"/>
<arg value="s/lemur-\(.*\).jar/\1/"/>
<arg value="s/.*lemur-\(.*\).jar/\1/"/>
</exec>
<echo message="jar ${jar-version}"/>

Expand All @@ -39,11 +43,14 @@
prefix="/lemur-${jar-version}">
<include name="README.md"/>
<include name="project.clj"/>
<include name="lemur-*.jar"/>
<include name="lib/*.jar"/>
<include name="src/"/>
<include name="examples/"/>
</tarfileset>
<tarfileset dir="${basedir}/target"
prefix="/lemur-${jar-version}">
<include name="lemur-*.jar"/>
</tarfileset>
<tarfileset dir="${basedir}"
filemode="755"
prefix="/lemur-${jar-version}">
Expand Down
33 changes: 17 additions & 16 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject lemur "1.2.1"
(defproject lemur "1.3.0"

:description "Lemur is a tool to launch hadoop jobs locally or on EMR
based on a configuration file, referred to as a jobdef."
Expand All @@ -8,12 +8,13 @@
["-Dlog4j.configuration=file:resources/log4j.properties"]
[])

:source-path "src/main/clj"
:test-path "src/test/clj"
:source-paths ["src/main/clj"]
:test-paths ["src/test/clj"]

:dependencies [[org.clojure/clojure "1.3.0"]
[org.clojure/tools.logging "0.2.3"]
[org.clojure/data.json "0.1.2"]
[bultitude "0.2.0"]

; aws-java-sdk-1.3.3 does not specify the correct httpclient, so we do it explicitly
[org.apache.httpcomponents/httpclient "4.1.1"]
Expand All @@ -31,24 +32,24 @@
; Other
[log4j/log4j "1.2.16"]]

:dev-dependencies [[robert/hooke "1.1.2"] ;for leiningen test-selectors
[org.clojure/tools.trace "0.7.1"]
[midje "1.3.1"]
[lein-midje "1.0.8"]
[com.offbytwo.iclojure/iclojure "1.1.0"]
[clojure-source "1.3.0"]]
:plugins [[lein-libdir "0.1.0"]]
:libdir-path "lib"

:test-selectors {:default (fn [v] (not (or (:integration v) (:manual v))))
:integration :integration
:manual :manual
:all (fn [v] (not (:manual v)))}
:profiles {:dev {:plugins [[lein-midje "2.0.4"]]
:dependencies [[midje "1.4.0"]
[org.clojure/tools.trace "0.7.3"]
[clojure-source "1.3.0"]]}}

:repl-init lemur.repl
:main ^:skip-aot lemur.repl
:min-lein-version "2.0.0"

:run-aliases {:lemur lemur.core}

; Launch irepl:
;java -cp lib/*:lib/dev/* com.offbytwo.iclojure.Main
:test-selectors {:default (fn [v] (not (or (:integration v) (:manual v))))
:integration :integration
:manual :manual
:all (fn [v] (not (:manual v)))}

:aot [lemur.core])
:aot [lemur.core]
)
1 change: 0 additions & 1 deletion src/main/clj/com/climate/services/aws/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
[com.climate.shell :as sh]
[clojure.data.json :as json]
[clojure.tools.logging :as log]
[clojure.tools.trace :as trace]
[clojure.java.io :as io])
(:import
java.io.IOException
Expand Down
34 changes: 22 additions & 12 deletions src/main/clj/com/climate/services/aws/emr.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
(:import
java.io.File
com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient
com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest
com.amazonaws.services.elasticmapreduce.util.StepFactory
com.amazonaws.auth.BasicAWSCredentials
[com.amazonaws.services.elasticmapreduce.model
Expand Down Expand Up @@ -224,28 +225,37 @@
(.setActionOnFailure (str ActionOnFailure/TERMINATE_JOB_FLOW))
(.setHadoopJarStep (.newEnableDebuggingStep (StepFactory.)))))

(defn step-config [name alive? jar-path main-class cli-args & [properties]]
(defn terminate-flow-id
([jobflow-id]
(terminate-flow-id jobflow-id *emr*))
([jobflow-id emr]
(.terminateJobFlows emr
(TerminateJobFlowsRequest. (java.util.ArrayList. [jobflow-id])))))

(defn step-config [name alive? jar-path main-class cli-args & {:keys [action-on-failure properties]}]
"Create a step to be submitted to EMR.
jar-path is the hadoop job jar, usually an s3:// path.
cli-args is a collection of Strings that are passed as args to main-class (can be nil).
action-on-failure is a String or enum com.amazonaws.services.elasticmapreduce.model.ActionOnFailure.
properties is a map of Java properties that are set when the step runs."
(let [sc (StepConfig. name
(doto
(HadoopJarStepConfig.)
(.setJar jar-path)
(.setMainClass main-class)
(.setArgs (vec cli-args)) ;collection of strings
(.setProperties (kv-props properties))))]
(if alive?
(doto sc
(.setActionOnFailure (str ActionOnFailure/CANCEL_AND_WAIT)))
sc)))
(doto
(HadoopJarStepConfig.)
(.setJar jar-path)
(.setMainClass main-class)
(.setArgs (vec cli-args)) ;collection of strings
(.setProperties (kv-props properties))))]
(.setActionOnFailure sc (str (or action-on-failure
(and alive? ActionOnFailure/CANCEL_AND_WAIT)
ActionOnFailure/TERMINATE_JOB_FLOW)))
sc))

(defn add-steps
"Add a step to a running jobflow. Steps is a seq of StepConfig objects.
Use (step-config) to create StepConfig objects."
[jobflow-id steps]
(AddJobFlowStepsRequest. jobflow-id steps))
(let [steps-array (to-array steps)]
(.addJobFlowSteps *emr* (AddJobFlowStepsRequest. jobflow-id steps))))

(defn start-job-flow [name steps {:keys [log-uri bootstrap-actions ami-version supported-products visible-to-all-users]
:or {bootstrap-actions [] supported-products [] visible-to-all-users false}
Expand Down
1 change: 1 addition & 0 deletions src/main/clj/lemur/command_line.clj
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@
[:master-instance-type "Instance type for master" "m1.large"]
[:slave-instance-type "Instance type for slaves" "m1.large"]
[:availability-zone "Amazon availabilty zone" nil]
[:jobflow "A jobflow-id to use with the submit command" nil]
[:num-instances "Number of instances (including the master)" "1"]
[:ami-version
"Which AMI to use (see RunJobFlowRequest#setAmiVersion in the AWS Java SDK)"]
Expand Down
2 changes: 2 additions & 0 deletions src/main/clj/lemur/common.clj
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
java.io.IOException
java.io.File))

(util/defalias eoval lemur.evaluating-map/eoval)

;;; Validators

;TODO make a more extensible model for adding new checks
Expand Down
41 changes: 29 additions & 12 deletions src/main/clj/lemur/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,13 @@ calls launch - take action (upload files, start cluster, etc)
`(defn ~sym [] (= (context-get :command) ~name))))

(defcommand local?)
(defcommand submit?)
(defcommand run?)
(defcommand dry-run?)
(defcommand start?)
(defcommand help?)
(defcommand formatted-help?)

(util/defalias eoval lemur.evaluating-map/eoval)

(defn profile?
"Test if the profile x is in use."
[x]
Expand Down Expand Up @@ -365,7 +364,9 @@ calls launch - take action (upload files, start cluster, etc)
(:keep-alive? estep)
path-to-jar
(:main-class estep)
args))))
args
:action-on-failure (:action-on-failure estep)
:properties (:properties estep)))))
coll))

(defn- save-metajob
Expand Down Expand Up @@ -466,6 +467,22 @@ calls launch - take action (upload files, start cluster, etc)
(context-set :jobflow-id jobflow-id)
(context-set :request-ts request-ts)))

(defmethod launch :submit
[command eopts cluster steps jobflow-options uploads metajob]
; upload files
(util/upload (:show-progress? eopts) (mapcat val (:jobdef-step uploads)) (:dest-working-dir uploads))
(util/upload (:show-progress? eopts) (:jars uploads))
; Write details to metajob file
(save-metajob cluster steps metajob)
; launch
(let [request-ts (System/currentTimeMillis)
steps (mk-steps cluster (filter (complement enable-debugging-step?) steps))
jobflow-id (:jobflow eopts)]
(emr/add-steps jobflow-id steps)
(println "JobFlow id:" jobflow-id)
(context-set :jobflow-id jobflow-id)
(context-set :request-ts request-ts)))

(defmethod launch :dry-run
[command eopts cluster steps jobflow-options uploads metajob]
(println "dry-run, not launching."))
Expand Down Expand Up @@ -557,7 +574,7 @@ calls launch - take action (upload files, start cluster, etc)
:spot-task-type spot-task-type
:spot-task-bid spot-task-bid
:spot-task-num spot-task-num
:keep-alive (or (:keep-alive? evaluating-opts) (start?))
:keep-alive (or (:keep-alive? evaluating-opts) (start?) (submit?))
:keypair (:keypair evaluating-opts)
:ami-version (:ami-version evaluating-opts)
:hadoop-version (:hadoop-version evaluating-opts)
Expand All @@ -579,12 +596,13 @@ calls launch - take action (upload files, start cluster, etc)
(:runtime-jar evaluating-opts)
[])
:bootstrap-actions
(if-let [src (:scripts-src-path evaluating-opts)]
[[(s3/slash$ src)
(s3/s3path (:bucket evaluating-opts) (:std-scripts-prefix evaluating-opts))]]
[])
(let [src (:scripts-src-path evaluating-opts)]
(if (and src (not (submit?)))
[[(s3/slash$ src)
(s3/s3path (:bucket evaluating-opts) (:std-scripts-prefix evaluating-opts))]]
[]))
:jobdef-cluster
(parse-upload-property evaluating-opts)
(if (submit?) [] (parse-upload-property evaluating-opts))
:jobdef-step
(->> steps
(filter map?)
Expand Down Expand Up @@ -614,7 +632,7 @@ calls launch - take action (upload files, start cluster, etc)
(concat [:command :app :comment :username :run-id :jar-src-path :runtime-jar :base-uri]
display-in-metajob))
(doto (DumperOptions.) (.setDefaultFlowStyle DumperOptions$FlowStyle/BLOCK)))
(if-not (local?)
(if-not (or (local?) (submit?))
(vector
(util/mk-yaml "Jobflow Options"
(merge (select-keys evaluating-opts [:emr-name]) (dissoc jobflow-options :bootstrap-actions))
Expand Down Expand Up @@ -925,7 +943,7 @@ calls launch - take action (upload files, start cluster, etc)
emr/*emr* (emr/emr aws-creds)
ec2/*ec2* (ec2/ec2 aws-creds)]
(case command
("run" "start" "dry-run" "local")
("run" "start" "dry-run" "local" "submit")
(execute-jobdef jobdef-path)
"display-types"
(display-types)
Expand All @@ -941,4 +959,3 @@ calls launch - take action (upload files, start cluster, etc)
(quit :msg (slurp (io/resource "help.txt"))))
(quit :msg "Unrecognized lemur command" :cmdspec (context-get :command-spec) :exit-code 1))))
(quit))

Loading