Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' of https://github.com/mlimotte/lemur

  • Loading branch information...
commit 9cf79fb6a9cad92bfffe139dc0bbbb3106e5c877 2 parents 5172f98 + d8ebad6
@mlimotte mlimotte authored
View
6 CHANGES.txt
@@ -1,5 +1,11 @@
CHANGES
+1.3.0
+
+- Allow explicit action-on-failure setting for defstep
+
+- New submit command to add a step to a running jobflow
+
1.2.0
- s3/cp now supports S3 directory to S3 directory copies
View
15 src/main/clj/com/climate/services/aws/emr.clj
@@ -224,10 +224,11 @@
(.setActionOnFailure (str ActionOnFailure/TERMINATE_JOB_FLOW))
(.setHadoopJarStep (.newEnableDebuggingStep (StepFactory.)))))
-(defn step-config [name alive? jar-path main-class cli-args & [properties]]
+(defn step-config [name alive? jar-path main-class cli-args & {:keys [action-on-failure properties]}]
"Create a step to be submitted to EMR.
jar-path is the hadoop job jar, usually an s3:// path.
cli-args is a collection of Strings that are passed as args to main-class (can be nil).
+ action-on-failure is a String or enum com.amazonaws.services.elasticmapreduce.model.ActionOnFailure.
properties is a map of Java properties that are set when the step runs."
(let [sc (StepConfig. name
(doto
@@ -235,17 +236,17 @@
(.setJar jar-path)
(.setMainClass main-class)
(.setArgs (vec cli-args)) ;collection of strings
- (.setProperties (kv-props properties))))]
- (if alive?
- (doto sc
- (.setActionOnFailure (str ActionOnFailure/CANCEL_AND_WAIT)))
- sc)))
+ (.setProperties (kv-props properties))
+ (.setActionOnFailure (str (or action-on-failure
+ (and alive? ActionOnFailure/CANCEL_AND_WAIT)
+ ActionOnFailure/TERMINATE_JOB_FLOW)))))]
+ sc))
(defn add-steps
"Add a step to a running jobflow. Steps is a seq of StepConfig objects.
Use (step-config) to create StepConfig objects."
[jobflow-id steps]
- (AddJobFlowStepsRequest. jobflow-id steps))
+ (.addJobFlowSteps *emr* (AddJobFlowStepsRequest. jobflow-id steps)))
(defn start-job-flow [name steps {:keys [log-uri bootstrap-actions ami-version supported-products]
:or {bootstrap-actions [] supported-products []}
View
1  src/main/clj/lemur/command_line.clj
@@ -243,6 +243,7 @@
[:master-instance-type "Instance type for master" "m1.large"]
[:slave-instance-type "Instance type for slaves" "m1.large"]
[:availability-zone "Amazon availabilty zone" nil]
+ [:jobflow "A jobflow-id to use with the submit command" nil]
[:num-instances "Number of instances (including the master)" "1"]
[:ami-version
"Which AMI to use (see RunJobFlowRequest#setAmiVersion in the AWS Java SDK)"]
View
2  src/main/clj/lemur/common.clj
@@ -33,6 +33,8 @@
java.io.IOException
java.io.File))
+(util/defalias eoval lemur.evaluating-map/eoval)
+
;;; Validators
;TODO make a more extensible model for adding new checks
View
40 src/main/clj/lemur/core.clj
@@ -101,14 +101,13 @@ calls launch - take action (upload files, start cluster, etc)
`(defn ~sym [] (= (context-get :command) ~name))))
(defcommand local?)
+(defcommand submit?)
(defcommand run?)
(defcommand dry-run?)
(defcommand start?)
(defcommand help?)
(defcommand formatted-help?)
-(util/defalias eoval lemur.evaluating-map/eoval)
-
(defn profile?
"Test if the profile x is in use."
[x]
@@ -365,7 +364,9 @@ calls launch - take action (upload files, start cluster, etc)
(:keep-alive? estep)
path-to-jar
(:main-class estep)
- args))))
+ args
+ :action-on-failure (:action-on-failure estep)
+ :properties (:properties estep)))))
coll))
(defn- save-metajob
@@ -466,6 +467,22 @@ calls launch - take action (upload files, start cluster, etc)
(context-set :jobflow-id jobflow-id)
(context-set :request-ts request-ts)))
+(defmethod launch :submit
+ [command eopts cluster steps jobflow-options uploads metajob]
+ ; upload files
+ (util/upload (:show-progress? eopts) (mapcat val (:jobdef-step uploads)) (:dest-working-dir uploads))
+ (util/upload (:show-progress? eopts) (:jars uploads))
+ ; Write details to metajob file
+ (save-metajob cluster steps metajob)
+ ; launch
+ (let [request-ts (System/currentTimeMillis)
+ steps (mk-steps cluster (filter (complement enable-debugging-step?) steps))
+ jobflow-id (:jobflow eopts)]
+ (emr/add-steps jobflow-id steps)
+ (println "JobFlow id:" jobflow-id)
+ (context-set :jobflow-id jobflow-id)
+ (context-set :request-ts request-ts)))
+
(defmethod launch :dry-run
[command eopts cluster steps jobflow-options uploads metajob]
(println "dry-run, not launching."))
@@ -557,7 +574,7 @@ calls launch - take action (upload files, start cluster, etc)
:spot-task-type spot-task-type
:spot-task-bid spot-task-bid
:spot-task-num spot-task-num
- :keep-alive (or (:keep-alive? evaluating-opts) (start?))
+ :keep-alive (or (:keep-alive? evaluating-opts) (start?) (submit?))
:keypair (:keypair evaluating-opts)
:ami-version (:ami-version evaluating-opts)
:hadoop-version (:hadoop-version evaluating-opts)
@@ -578,12 +595,13 @@ calls launch - take action (upload files, start cluster, etc)
(:runtime-jar evaluating-opts)
[])
:bootstrap-actions
- (if-let [src (:scripts-src-path evaluating-opts)]
- [[(s3/slash$ src)
- (s3/s3path (:bucket evaluating-opts) (:std-scripts-prefix evaluating-opts))]]
- [])
+ (let [src (:scripts-src-path evaluating-opts)]
+ (if (and src (not (submit?)))
+ [[(s3/slash$ src)
+ (s3/s3path (:bucket evaluating-opts) (:std-scripts-prefix evaluating-opts))]]
+ []))
:jobdef-cluster
- (parse-upload-property evaluating-opts)
+ (if (submit?) [] (parse-upload-property evaluating-opts))
:jobdef-step
(->> steps
(filter map?)
@@ -613,7 +631,7 @@ calls launch - take action (upload files, start cluster, etc)
(concat [:command :app :comment :username :run-id :jar-src-path :runtime-jar :base-uri]
display-in-metajob))
(doto (DumperOptions.) (.setDefaultFlowStyle DumperOptions$FlowStyle/BLOCK)))
- (if-not (local?)
+ (if-not (or (local?) (submit?))
(vector
(util/mk-yaml "Jobflow Options"
(merge (select-keys evaluating-opts [:emr-name]) (dissoc jobflow-options :bootstrap-actions))
@@ -924,7 +942,7 @@ calls launch - take action (upload files, start cluster, etc)
emr/*emr* (emr/emr aws-creds)
ec2/*ec2* (ec2/ec2 aws-creds)]
(case command
- ("run" "start" "dry-run" "local")
+ ("run" "start" "dry-run" "local" "submit")
(execute-jobdef jobdef-path)
"display-types"
(display-types)
Please sign in to comment.
Something went wrong with that request. Please try again.