Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Submit job to running cluster, migrate to lein2 #17

Merged
merged 12 commits into from

1 participant

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Nov 7, 2012
  1. @mlimotte

    lemur submit and ActionOnFailure

    mlimotte authored
    - New lemur command, submit, to add jobs to a running cluster
    - allow setting of ActionOnFailure
  2. @mlimotte
Commits on Nov 19, 2012
  1. SetActionOnFailure should be called for the StepConfig object

    Brubacher, Federico authored
Commits on Jan 7, 2013
  1. Test for add-action step

    Brubacher, Federico authored
  2. update project.clj to be compatible with lein2

    Brubacher, Federico authored
  3. Require commons from lemur.core.test

    Brubacher, Federico authored
Commits on Jan 25, 2013
  1. the method add-steps now recieves a seq

    Brubacher, Federico authored
  2. Final support for lein 2 (correct versions of bultitude , midje, lein…

    Brubacher, Federico authored
    … midje)
Commits on Feb 19, 2013
  1. @mlimotte

    Merge pull request #1 from fbrubacher/master

    mlimotte authored
    Fixes to submit.  Move to lein2.
  2. @mlimotte
  3. @mlimotte

    Merge remote-tracking branch 'tcc-lemur/master'

    mlimotte authored
    Conflicts:
    	CHANGES.txt
  4. @mlimotte

    Set project version 1.3.0

    mlimotte authored
    - fixes for lein2
    - fixes for action on failure
This page is out of date. Refresh to see the latest.
View
6 CHANGES.txt
@@ -1,5 +1,11 @@
CHANGES
+1.3.0
+
+- Allow explicit action-on-failure setting for defstep
+- New submit command to add a step to a running jobflow
+- Move to lein2
+
1.2.1
- set the VisibleToAllUsers flag on all jobflows
View
2  bin/lemur
@@ -41,7 +41,7 @@ fi
if [ -f $BASE/lemur-*.jar ]; then
LEMUR_CP=`echo $BASE/lemur-*.jar`
else
- LEMUR_CP=$BASE/src/main/clj:$BASE/classes:$BASE/resources
+ LEMUR_CP=$BASE/src/main/clj:$BASE/target/classes:$BASE/resources
fi
DEPS_CP=$BASE/lib/*
View
17 build.xml
@@ -3,7 +3,7 @@
<macrodef name="lein">
<element name="lein-params" implicit="true"/>
<sequential>
- <exec executable="lein" failonerror="yes"
+ <exec executable="lein2" failonerror="yes"
dir="${basedir}">
<env key="JVM_OPTS" value="-Droot.logger=error,file,stdout"/>
<lein-params/>
@@ -11,15 +11,19 @@
</sequential>
</macrodef>
+ <target name="libdir">
+ <lein><arg value="libdir"/></lein>
+ </target>
+
<target name="jar">
<lein><arg value="jar"/></lein>
</target>
- <target name="package" depends="jar">
+ <target name="package" depends="libdir,jar">
<exec logError="true" executable="bash" outputproperty="jar-name">
<arg value="-c"/>
- <arg value="ls lemur-*.jar"/>
+ <arg value="ls target/lemur-*.jar"/>
</exec>
<fail message="Could not find lemur jar">
@@ -29,7 +33,7 @@
<exec logError="true" executable="sed" failonerror="true" inputstring="${jar-name}" outputproperty="jar-version">
<arg value="-e"/>
- <arg value="s/lemur-\(.*\).jar/\1/"/>
+ <arg value="s/.*lemur-\(.*\).jar/\1/"/>
</exec>
<echo message="jar ${jar-version}"/>
@@ -39,11 +43,14 @@
prefix="/lemur-${jar-version}">
<include name="README.md"/>
<include name="project.clj"/>
- <include name="lemur-*.jar"/>
<include name="lib/*.jar"/>
<include name="src/"/>
<include name="examples/"/>
</tarfileset>
+ <tarfileset dir="${basedir}/target"
+ prefix="/lemur-${jar-version}">
+ <include name="lemur-*.jar"/>
+ </tarfileset>
<tarfileset dir="${basedir}"
filemode="755"
prefix="/lemur-${jar-version}">
View
33 project.clj
@@ -1,4 +1,4 @@
-(defproject lemur "1.2.1"
+(defproject lemur "1.3.0"
:description "Lemur is a tool to launch hadoop jobs locally or on EMR
based on a configuration file, referred to as a jobdef."
@@ -8,12 +8,13 @@
["-Dlog4j.configuration=file:resources/log4j.properties"]
[])
- :source-path "src/main/clj"
- :test-path "src/test/clj"
+ :source-paths ["src/main/clj"]
+ :test-paths ["src/test/clj"]
:dependencies [[org.clojure/clojure "1.3.0"]
[org.clojure/tools.logging "0.2.3"]
[org.clojure/data.json "0.1.2"]
+ [bultitude "0.2.0"]
; aws-java-sdk-1.3.3 does not specify the correct httpclient, so we do it explicitly
[org.apache.httpcomponents/httpclient "4.1.1"]
@@ -31,24 +32,24 @@
; Other
[log4j/log4j "1.2.16"]]
- :dev-dependencies [[robert/hooke "1.1.2"] ;for leiningen test-selectors
- [org.clojure/tools.trace "0.7.1"]
- [midje "1.3.1"]
- [lein-midje "1.0.8"]
- [com.offbytwo.iclojure/iclojure "1.1.0"]
- [clojure-source "1.3.0"]]
+ :plugins [[lein-libdir "0.1.0"]]
+ :libdir-path "lib"
- :test-selectors {:default (fn [v] (not (or (:integration v) (:manual v))))
- :integration :integration
- :manual :manual
- :all (fn [v] (not (:manual v)))}
+ :profiles {:dev {:plugins [[lein-midje "2.0.4"]]
+ :dependencies [[midje "1.4.0"]
+ [org.clojure/tools.trace "0.7.3"]
+ [clojure-source "1.3.0"]]}}
:repl-init lemur.repl
:main ^:skip-aot lemur.repl
+ :min-lein-version "2.0.0"
:run-aliases {:lemur lemur.core}
- ; Launch irepl:
- ;java -cp lib/*:lib/dev/* com.offbytwo.iclojure.Main
+ :test-selectors {:default (fn [v] (not (or (:integration v) (:manual v))))
+ :integration :integration
+ :manual :manual
+ :all (fn [v] (not (:manual v)))}
- :aot [lemur.core])
+ :aot [lemur.core]
+ )
View
1  src/main/clj/com/climate/services/aws/common.clj
@@ -5,7 +5,6 @@
[com.climate.shell :as sh]
[clojure.data.json :as json]
[clojure.tools.logging :as log]
- [clojure.tools.trace :as trace]
[clojure.java.io :as io])
(:import
java.io.IOException
View
34 src/main/clj/com/climate/services/aws/emr.clj
@@ -9,6 +9,7 @@
(:import
java.io.File
com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient
+ com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest
com.amazonaws.services.elasticmapreduce.util.StepFactory
com.amazonaws.auth.BasicAWSCredentials
[com.amazonaws.services.elasticmapreduce.model
@@ -224,28 +225,37 @@
(.setActionOnFailure (str ActionOnFailure/TERMINATE_JOB_FLOW))
(.setHadoopJarStep (.newEnableDebuggingStep (StepFactory.)))))
-(defn step-config [name alive? jar-path main-class cli-args & [properties]]
+(defn terminate-flow-id
+ ([jobflow-id]
+ (terminate-flow-id jobflow-id *emr*))
+ ([jobflow-id emr]
+ (.terminateJobFlows emr
+ (TerminateJobFlowsRequest. (java.util.ArrayList. [jobflow-id])))))
+
+(defn step-config [name alive? jar-path main-class cli-args & {:keys [action-on-failure properties]}]
"Create a step to be submitted to EMR.
jar-path is the hadoop job jar, usually an s3:// path.
cli-args is a collection of Strings that are passed as args to main-class (can be nil).
+ action-on-failure is a String or enum com.amazonaws.services.elasticmapreduce.model.ActionOnFailure.
properties is a map of Java properties that are set when the step runs."
(let [sc (StepConfig. name
- (doto
- (HadoopJarStepConfig.)
- (.setJar jar-path)
- (.setMainClass main-class)
- (.setArgs (vec cli-args)) ;collection of strings
- (.setProperties (kv-props properties))))]
- (if alive?
- (doto sc
- (.setActionOnFailure (str ActionOnFailure/CANCEL_AND_WAIT)))
- sc)))
+ (doto
+ (HadoopJarStepConfig.)
+ (.setJar jar-path)
+ (.setMainClass main-class)
+ (.setArgs (vec cli-args)) ;collection of strings
+ (.setProperties (kv-props properties))))]
+ (.setActionOnFailure sc (str (or action-on-failure
+ (and alive? ActionOnFailure/CANCEL_AND_WAIT)
+ ActionOnFailure/TERMINATE_JOB_FLOW)))
+ sc))
(defn add-steps
"Add a step to a running jobflow. Steps is a seq of StepConfig objects.
Use (step-config) to create StepConfig objects."
[jobflow-id steps]
- (AddJobFlowStepsRequest. jobflow-id steps))
+ (let [steps-array (to-array steps)]
+ (.addJobFlowSteps *emr* (AddJobFlowStepsRequest. jobflow-id steps))))
(defn start-job-flow [name steps {:keys [log-uri bootstrap-actions ami-version supported-products visible-to-all-users]
:or {bootstrap-actions [] supported-products [] visible-to-all-users false}
View
1  src/main/clj/lemur/command_line.clj
@@ -243,6 +243,7 @@
[:master-instance-type "Instance type for master" "m1.large"]
[:slave-instance-type "Instance type for slaves" "m1.large"]
[:availability-zone "Amazon availabilty zone" nil]
+ [:jobflow "A jobflow-id to use with the submit command" nil]
[:num-instances "Number of instances (including the master)" "1"]
[:ami-version
"Which AMI to use (see RunJobFlowRequest#setAmiVersion in the AWS Java SDK)"]
View
2  src/main/clj/lemur/common.clj
@@ -33,6 +33,8 @@
java.io.IOException
java.io.File))
+(util/defalias eoval lemur.evaluating-map/eoval)
+
;;; Validators
;TODO make a more extensible model for adding new checks
View
41 src/main/clj/lemur/core.clj
@@ -101,14 +101,13 @@ calls launch - take action (upload files, start cluster, etc)
`(defn ~sym [] (= (context-get :command) ~name))))
(defcommand local?)
+(defcommand submit?)
(defcommand run?)
(defcommand dry-run?)
(defcommand start?)
(defcommand help?)
(defcommand formatted-help?)
-(util/defalias eoval lemur.evaluating-map/eoval)
-
(defn profile?
"Test if the profile x is in use."
[x]
@@ -365,7 +364,9 @@ calls launch - take action (upload files, start cluster, etc)
(:keep-alive? estep)
path-to-jar
(:main-class estep)
- args))))
+ args
+ :action-on-failure (:action-on-failure estep)
+ :properties (:properties estep)))))
coll))
(defn- save-metajob
@@ -466,6 +467,22 @@ calls launch - take action (upload files, start cluster, etc)
(context-set :jobflow-id jobflow-id)
(context-set :request-ts request-ts)))
+(defmethod launch :submit
+ [command eopts cluster steps jobflow-options uploads metajob]
+ ; upload files
+ (util/upload (:show-progress? eopts) (mapcat val (:jobdef-step uploads)) (:dest-working-dir uploads))
+ (util/upload (:show-progress? eopts) (:jars uploads))
+ ; Write details to metajob file
+ (save-metajob cluster steps metajob)
+ ; launch
+ (let [request-ts (System/currentTimeMillis)
+ steps (mk-steps cluster (filter (complement enable-debugging-step?) steps))
+ jobflow-id (:jobflow eopts)]
+ (emr/add-steps jobflow-id steps)
+ (println "JobFlow id:" jobflow-id)
+ (context-set :jobflow-id jobflow-id)
+ (context-set :request-ts request-ts)))
+
(defmethod launch :dry-run
[command eopts cluster steps jobflow-options uploads metajob]
(println "dry-run, not launching."))
@@ -557,7 +574,7 @@ calls launch - take action (upload files, start cluster, etc)
:spot-task-type spot-task-type
:spot-task-bid spot-task-bid
:spot-task-num spot-task-num
- :keep-alive (or (:keep-alive? evaluating-opts) (start?))
+ :keep-alive (or (:keep-alive? evaluating-opts) (start?) (submit?))
:keypair (:keypair evaluating-opts)
:ami-version (:ami-version evaluating-opts)
:hadoop-version (:hadoop-version evaluating-opts)
@@ -579,12 +596,13 @@ calls launch - take action (upload files, start cluster, etc)
(:runtime-jar evaluating-opts)
[])
:bootstrap-actions
- (if-let [src (:scripts-src-path evaluating-opts)]
- [[(s3/slash$ src)
- (s3/s3path (:bucket evaluating-opts) (:std-scripts-prefix evaluating-opts))]]
- [])
+ (let [src (:scripts-src-path evaluating-opts)]
+ (if (and src (not (submit?)))
+ [[(s3/slash$ src)
+ (s3/s3path (:bucket evaluating-opts) (:std-scripts-prefix evaluating-opts))]]
+ []))
:jobdef-cluster
- (parse-upload-property evaluating-opts)
+ (if (submit?) [] (parse-upload-property evaluating-opts))
:jobdef-step
(->> steps
(filter map?)
@@ -614,7 +632,7 @@ calls launch - take action (upload files, start cluster, etc)
(concat [:command :app :comment :username :run-id :jar-src-path :runtime-jar :base-uri]
display-in-metajob))
(doto (DumperOptions.) (.setDefaultFlowStyle DumperOptions$FlowStyle/BLOCK)))
- (if-not (local?)
+ (if-not (or (local?) (submit?))
(vector
(util/mk-yaml "Jobflow Options"
(merge (select-keys evaluating-opts [:emr-name]) (dissoc jobflow-options :bootstrap-actions))
@@ -925,7 +943,7 @@ calls launch - take action (upload files, start cluster, etc)
emr/*emr* (emr/emr aws-creds)
ec2/*ec2* (ec2/ec2 aws-creds)]
(case command
- ("run" "start" "dry-run" "local")
+ ("run" "start" "dry-run" "local" "submit")
(execute-jobdef jobdef-path)
"display-types"
(display-types)
@@ -941,4 +959,3 @@ calls launch - take action (upload files, start cluster, etc)
(quit :msg (slurp (io/resource "help.txt"))))
(quit :msg "Unrecognized lemur command" :cmdspec (context-get :command-spec) :exit-code 1))))
(quit))
-
View
56 src/test/clj/com/climate/services/aws/emr_test.clj
@@ -24,9 +24,13 @@
[ec2 :as ec2]
[common :as awscommon]])
(:import
+ com.amazonaws.services.elasticmapreduce.util.StepFactory
[com.amazonaws.services.elasticmapreduce.model
JobFlowDetail
- JobFlowExecutionStatusDetail]
+ JobFlowExecutionStatusDetail
+ HadoopJarStepConfig
+ ActionOnFailure
+ StepConfig]
java.util.Date))
;; Some tests are labelled as :manual, rather than :integration, because
@@ -37,6 +41,23 @@
(def aws-creds (awscommon/aws-credential-discovery))
+(def ^:dynamic *flow-args*
+ {:bootstrap-actions
+ ; Only publicly available script, so we don't have to upload the others.
+ [(bootstrap "Hadoop Config"
+ "s3://elasticmapreduce/bootstrap-actions/configure-hadoop"
+ ["-m" "mapred.map.tasks.speculative.execution=false"])]
+ :log-uri (str "s3://" bucket)
+ :keypair (:keypair aws-creds) ; the elastic-mapreduce credentials.json file as a keypair entry
+ :ami-version "latest"
+ :num-instances 2
+ :master-type "m1.xlarge"
+ :slave-type "m1.xlarge"
+ :spot-task-type "m1.xlarge"
+ :spot-task-bid "1.00"
+ :spot-task-num 1
+ :keep-alive false})
+
(use-fixtures :once
(fn [f]
(binding [s3/*s3* (s3/s3 aws-creds)
@@ -70,22 +91,7 @@
["-input" (format "s3://%s/data/simple.txt" bucket)
"-output" "/out"
"-mapper" (format "s3://%s/scripts/wc.sh" bucket)])]
- {:bootstrap-actions
- ; Only publicly available script, so we don't have to upload the others.
- [(bootstrap "Hadoop Config"
- "s3://elasticmapreduce/bootstrap-actions/configure-hadoop"
- ["-m" "mapred.map.tasks.speculative.execution=false"])]
- :log-uri (str "s3://" bucket)
- :keypair (:keypair aws-creds) ; the elastic-mapreduce credentials.json file as a keypair entry
- :ami-version "latest"
- :num-instances 2
- :master-type "m1.xlarge"
- :slave-type "m1.xlarge"
- :spot-task-type "m1.xlarge"
- :spot-task-bid "1.00"
- :spot-task-num 1
- ;too dangerous to use keep-alive in tests, so tests are limited-- no (emr/add-step) for example
- :keep-alive false}))))
+ *flow-args*))))
; Specified as a fn rather than a test. This is a hack to force it to run before
; test-wait-on-step. It will fail if the cluster has already COMPLETED.
@@ -159,3 +165,19 @@
(is= "m1.xlarge" spot-task-type)
(is= 20 spot-task-num)
(is= (format "%.3f" expected-bid) spot-task-bid)))
+
+(defn make-dummy-step []
+ (doto (StepConfig.)
+ (.setName "Dummy")
+ (.setActionOnFailure (str ActionOnFailure/TERMINATE_JOB_FLOW))
+ (.setHadoopJarStep (.newEnableDebuggingStep (StepFactory.)))))
+
+(deftest ^{:manual true} test-add-steps-to-existing-flow
+ (testing "emr/add-step"
+ (binding [*flow-args* (conj *flow-args* {:keep-alive true})]
+ (let [jf-id (setup)
+ dummy-steps (make-dummy-step)]
+ (is= 0 (.size (steps-for-jobflow jf-id)))
+ (add-steps jf-id [dummy-steps])
+ (is= 1 (.size (steps-for-jobflow jf-id)))
+ (terminate-flow-id jf-id)))))
View
1  src/test/clj/lemur/core_test.clj
@@ -17,6 +17,7 @@
lemur.core
[lemur.command-line :only [quit]]
[lemur.evaluating-map :only [evaluating-map]]
+ [lemur.common :only [eoval]]
midje.sweet
clojure.test
lemur.test)
Something went wrong with that request. Please try again.