Skip to content

Commit

Permalink
Merge ee7e54f into 54fcc97
Browse files Browse the repository at this point in the history
  • Loading branch information
grondo committed May 10, 2018
2 parents 54fcc97 + ee7e54f commit 63072bd
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 22 deletions.
8 changes: 8 additions & 0 deletions src/bindings/lua/wreck.lua
Expand Up @@ -54,6 +54,9 @@ local default_opts = {
['input'] = { char = "i", arg = "HOW" },
['label-io'] = { char = "l", },
['skip-env'] = { char = "S", },
['epilog'] = { char = "x", arg = "SCRIPT" },
['postscript'] =
{ char = "p", arg = "SCRIPT" },
['options'] = { char = 'o', arg = "OPTIONS.." },
}
Expand Down Expand Up @@ -123,6 +126,9 @@ function wreck:usage()
-E, --error=FILENAME Send stderr to a different location than stdout.
-l, --labelio Prefix lines of output with task id
-S, --skip-env Skip export of environment to job
-x, --epilog=PATH Execute a script after all tasks exit but before
the job state is set to "complete"
-p, --postscript=PATH Execute a script after job state is "complete"
]])
for _,v in pairs (self.extra_options) do
local optstr = v.name .. (v.arg and "="..v.arg or "")
Expand Down Expand Up @@ -378,6 +384,8 @@ function wreck:jobreq ()
["opts.cores-per-task"] = self.opts.c,
["opts.gpus-per-task"] = self.opts.g,
["opts.tasks-per-node"] = self.opts.t,
["epilog.pre"] = self.opts.x,
["epilog.post"] = self.opts.p,
}
if self.opts.o then
for opt in self.opts.o:gmatch ('[^,]+') do
Expand Down
2 changes: 1 addition & 1 deletion src/cmd/flux-wreckrun
Expand Up @@ -189,7 +189,7 @@ local function check_job_completed ()
wreck:die ("job %d failed\n", jobid)
end
if (not taskio or taskio:complete()) and
(state == "complete" or state == "reaped") then
(state == "completing" or state == "complete" or state == "reaped") then
local rc = lwj_return_code (f, wreck, jobid)
if rc == 0 then
wreck:verbose ("%.3fs: All tasks completed successfully.\n",
Expand Down
3 changes: 2 additions & 1 deletion src/common/libjsc/jstatctl.c
Expand Up @@ -75,9 +75,10 @@ static stab_t job_state_tab[] = {
{ J_ALLOCATED, "allocated" },
{ J_RUNREQUEST, "runrequest" },
{ J_STARTING, "starting" },
{ J_STOPPED, "stopped" },
{ J_SYNC, "sync" },
{ J_RUNNING, "running" },
{ J_CANCELLED, "cancelled" },
{ J_COMPLETING, "completing" },
{ J_COMPLETE, "complete" },
{ J_REAPED, "reaped" },
{ J_FAILED, "failed" },
Expand Down
48 changes: 32 additions & 16 deletions src/common/libjsc/jstatctl.h
Expand Up @@ -39,22 +39,38 @@ extern "C" {
* please refer to README.md
*/
typedef enum {
J_NULL = 1, /*!< The state has yet to be assigned */
J_RESERVED, /*!< Reserved by the program execution service */
J_SUBMITTED, /*!< Submitted to the system */
J_PENDING, /*!< Pending */
J_SCHEDREQ, /*!< Resources requested to be selected */
J_SELECTED, /*!< Assigned to requested resource in RDL */
J_ALLOCATED, /*!< Got allocated/contained by the program executoin service */
J_RUNREQUEST,/*!< Requested to be executed */
J_STARTING, /*!< Starting */
J_STOPPED, /*!< Stopped *including init barrier hit for a tool) */
J_RUNNING, /*!< Running */
J_CANCELLED, /*!< Cancelled */
J_COMPLETE, /*!< Completed */
J_REAPED, /*!< Reaped */
J_FAILED, /*!< Failed */
J_FOR_RENT /*!< Space For Rent */
J_NULL = 0, /*!< The state has yet to be assigned */

/* WRECK job initial condition states:
*/
J_RESERVED = 1, /*!< Reserved by the program execution service */
J_SUBMITTED = 2, /*!< Submitted to the system */

/* Scheduler internal states:
*/
J_PENDING = 11, /*!< Pending */
J_SCHEDREQ = 12, /*!< Resources requested to be selected */
J_SELECTED = 13, /*!< Assigned to requested resource in RDL */
J_ALLOCATED = 14, /*!< Got alloc/contained by the program exec service */

/* WRECK job execution states:
*/
J_RUNREQUEST= 21, /*!< Requested to be executed */
J_STARTING = 22, /*!< Starting */
J_SYNC = 23, /*!< Tasks stopped in exec waiting for a tool */
J_RUNNING = 24, /*!< Running */
J_COMPLETING= 26, /*!< Completing (all tasks exited, epilog running) */

/* WRECK job terminal states:
*/
J_CANCELLED = 51, /*!< Cancelled (before execution) */
J_COMPLETE = 52, /*!< Completed */
J_FAILED = 53, /*!< Failed (before exec) */

/* Scheduler post exec states:
*/
J_REAPED = 101, /*!< Reaped */
J_FOR_RENT = 102, /*!< Space For Rent */
} job_state_t;

typedef int (*jsc_handler_f)(const char *base_jcb, void *arg, int errnum);
Expand Down
1 change: 1 addition & 0 deletions src/modules/wreck/Makefile.am
Expand Up @@ -76,6 +76,7 @@ dist_wreckscripts_SCRIPTS = \
lua.d/02-affinity.lua \
lua.d/timeout.lua \
lua.d/output.lua \
lua.d/epilog.lua \
lua.d/input.lua \
lua.d/mvapich.lua \
lua.d/pmi-mapping.lua \
Expand Down
21 changes: 21 additions & 0 deletions src/modules/wreck/lua.d/epilog.lua
@@ -0,0 +1,21 @@
local posix = require 'flux.posix'

-- execute a path from kvs `key`
local function run_kvs (key)
local epilog = wreck.kvsdir [key] or wreck.flux:kvs_get ("lwj."..key)
if not epilog then return end
return os.execute (epilog)
end

function rexecd_complete ()
local rc, err = run_kvs ("epilog.pre")
if not rc then wreck:log_msg ("error: epilog: %s", err) end
end

-- rexecd_exit callback happens after the job is in the complete state
function rexecd_exit ()
local rc, err = run_kvs ("epilog.post")
if not rc then wreck:log_msg ("error: epilog.post: %s", err) end
end

-- vi: ts=4 sw=4 expandtab
3 changes: 3 additions & 0 deletions src/modules/wreck/wrexecd.c
Expand Up @@ -2408,6 +2408,9 @@ int main (int ac, char **av)
}

if (exec_rc == 0) {
rexec_state_change (ctx, "completing");
lua_stack_call (ctx->lua_stack, "rexecd_complete");

rexec_state_change (ctx, "complete");
wlog_msg (ctx, "job complete. exiting...");

Expand Down
2 changes: 2 additions & 0 deletions t/Makefile.am
Expand Up @@ -64,6 +64,7 @@ TESTS = \
t2000-wreck.t \
t2000-wreck-env.t \
t2000-wreck-dummy-sched.t \
t2000-wreck-epilog.t \
t2001-jsc.t \
t2002-pmi.t \
t2003-recurse.t \
Expand Down Expand Up @@ -146,6 +147,7 @@ check_SCRIPTS = \
t2000-wreck.t \
t2000-wreck-env.t \
t2000-wreck-dummy-sched.t \
t2000-wreck-epilog.t \
t2001-jsc.t \
t2002-pmi.t \
t2003-recurse.t \
Expand Down
67 changes: 67 additions & 0 deletions t/t2000-wreck-epilog.t
@@ -0,0 +1,67 @@
#!/bin/sh
#

test_description='Test basic wreck epilog functionality
'
. `dirname $0`/sharness.sh
SIZE=${FLUX_TEST_SIZE:-4}
test_under_flux ${SIZE} wreck

# Return previous job path in kvs
last_job_path() {
flux wreck last-jobid -p
}

epilog_path="$(pwd)/epilog.wait.sh"
kvswait=${SHARNESS_TEST_SRCDIR}/scripts/kvs-watch-until.lua
eventtrace=${SHARNESS_TEST_SRCDIR}/scripts/event-trace.lua

# Create epilog test that will block until an 'epilog.test' event
cat <<EOF >${epilog_path}
#!/bin/sh
flux event sub -c 1 epilog.test
flux event pub epilog.test.done
EOF
chmod +x ${epilog_path}

wait_for_complete() {
$kvswait -vt 5 $1.state 'v == "complete"'
}

test_expect_success 'flux-wreck: global epilog.pre' '
flux kvs put --json lwj.epilog.pre="$epilog_path" &&
flux wreckrun /bin/true &&
LWJ=$(last_job_path) &&
STATE=$(flux kvs get --json ${LWJ}.state) &&
test_debug "echo job state is now ${STATE}" &&
test "$STATE" = "completing" &&
flux event pub epilog.test &&
wait_for_complete $LWJ
'
test_expect_success 'flux-wreck: per-job epilog.pre' '
flux kvs unlink lwj.epilog.pre &&
flux wreckrun -x ${epilog_path} /bin/true &&
LWJ=$(last_job_path) &&
test $(flux kvs get --json ${LWJ}.epilog.pre) = "$epilog_path" &&
STATE=$(flux kvs get --json ${LWJ}.state) &&
test_debug "echo job state is now ${STATE}" &&
test "$STATE" = "completing" &&
flux event pub epilog.test &&
wait_for_complete $LWJ
'
test_expect_success 'flux-wreck: global epilog.post' '
flux kvs put --json lwj.epilog.post="$epilog_path" &&
flux wreckrun /bin/true &&
wait_for_complete $LWJ &&
${eventtrace} -t 5 epilog.test epilog.test.done \
flux event pub epilog.test
'
test_expect_success 'flux-wreck: per-job epilog.post' '
flux kvs unlink lwj.epilog.post &&
flux wreckrun -p "$epilog_path" /bin/true &&
wait_for_complete $LWJ &&
${eventtrace} -t 5 epilog.test epilog.test.done \
flux event pub epilog.test
'

test_done
3 changes: 2 additions & 1 deletion t/t2000-wreck.t
Expand Up @@ -129,11 +129,12 @@ test_expect_success 'wreck: job state events emitted' '
$SHARNESS_TEST_SRCDIR/scripts/event-trace.lua \
wreck.state wreck.state.complete \
flux wreckrun -n${SIZE} /bin/true > output &&
tail -4 output > output_states && # only care about last 4
tail -5 output > output_states && # only care about last 4
cat >expected_states <<-EOF &&
wreck.state.reserved
wreck.state.starting
wreck.state.running
wreck.state.completing
wreck.state.complete
EOF
test_cmp expected_states output_states
Expand Down
8 changes: 5 additions & 3 deletions t/t2001-jsc.t
Expand Up @@ -14,11 +14,13 @@ fi
tr1="null->reserved"
tr2="reserved->starting"
tr3="starting->running"
tr4="running->complete"
tr4="running->completing"
tr5="completing->complete"
trans="$tr1
$tr2
$tr3
$tr4"
$tr4
$tr5"

# Return previous job path in kvs
last_job_path() {
Expand Down Expand Up @@ -205,7 +207,7 @@ test_expect_success 'jstat 8: query detects bad inputs' '
'

test_expect_success 'jstat 9: update state-pair' "
flux jstat update 1 state-pair '{\"state-pair\": {\"ostate\": 13, \"nstate\": 12}}' &&
flux jstat update 1 state-pair '{\"state-pair\": {\"ostate\": 24, \"nstate\": 51}}' &&
flux kvs get --json $(flux wreck kvs-path 1).state > output.9.1 &&
cat >expected.9.1 <<-EOF &&
cancelled
Expand Down

0 comments on commit 63072bd

Please sign in to comment.