Skip to content

Commit

Permalink
Merge pull request #59 from dongahn/jsc_validate
Browse files Browse the repository at this point in the history
support for reliable jsc
  • Loading branch information
Don Lipari committed Sep 3, 2015
2 parents d623b0d + f8aad8a commit f912cb2
Show file tree
Hide file tree
Showing 9 changed files with 422 additions and 32 deletions.
4 changes: 3 additions & 1 deletion sched/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@ LIBS = $(FLUX_LIBS) $(RESRC_LIBS)

CONF=../conf/hype.lua

BUILD = schedsrv.so schedplugin1.so
BUILD = schedsrv.so schedplugin1.so flux-waitjob

all: $(BUILD)

schedsrv.so: schedsrv.o xzmalloc.o log.o jsonutil.o
$(CC) -shared -o $@ $^ $(LIBS)
schedplugin1.so: schedplugin1.o xzmalloc.o log.o jsonutil.o
$(CC) -shared -o $@ $^ $(LIBS)
flux-waitjob: flux-waitjob.o log.o jsonutil.o
$(CC) -o $@ $^ $(LIBS)

start:
$(FLUX) start
Expand Down
13 changes: 12 additions & 1 deletion sched/flux-submit
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ if not wreck:parse_cmdline (arg) then
end

-- Set signal handlers
posix.signal[posix.SIGTERM] = posix.signal[posix.SIGINT]
-- posix.signal[posix.SIGTERM] = posix.signal[posix.SIGINT]

-- Start in-program timer:
local tt = timer.new()
Expand Down Expand Up @@ -99,6 +99,17 @@ if not lwj then wreck:die ("f:kvsdir(lwj.%d): %s\n", resp.jobid, err) end
lwj.state = "submitted"
lwj:commit()

--
-- Send "submitted" event:
-- This is so that JSC can be notified of this event through subscribe.
-- Here, this doesn't have to wait for its own event because
-- the schedsrv will not change the state until it sees this event.
--
local pl = {}
pl["lwj"] = resp.jobid
local rc,err = f:sendevent (pl, "wreck.state.submitted", resp.jobid)
if not rc then wreck:die ("sendevent: %s\n", err) end

os.exit (0)

-- vi: ts=4 sw=4 expandtab
236 changes: 236 additions & 0 deletions sched/flux-waitjob.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*****************************************************************************\
* Copyright (c) 2014 Lawrence Livermore National Security, LLC. Produced at
* the Lawrence Livermore National Laboratory (cf, AUTHORS, DISCLAIMER.LLNS).
* LLNL-CODE-658032 All rights reserved.
*
* This file is part of the Flux resource manager framework.
* For details, see https://github.com/flux-framework.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the Free
* Software Foundation; either version 2 of the license, or (at your option)
* any later version.
*
* Flux is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the IMPLIED WARRANTY OF MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the terms and conditions of the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
* See also: http://www.gnu.org/licenses/
\*****************************************************************************/

#include <stdio.h>
#include <getopt.h>
#include <errno.h>
#include <czmq.h>
#include <json.h>
#include <flux/core.h>

#include "src/common/libutil/shortjson.h"
#include "src/common/libutil/jsonutil.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/xzmalloc.h"

typedef struct {
flux_t h;
int64_t jobid;
char *sync;
} wjctx_t;

static flux_t sig_flux_h;

#define OPTIONS "hj:o:"
static const struct option longopts[] = {
{"help", no_argument, 0, 'h'},
{"out", required_argument, 0, 'o'},
{ 0, 0, 0, 0 },
};

static void usage (void)
{
fprintf (stderr,
"Usage: flux-waitjob [OPTIONS] <jobid>\n"
" -h, --help Display this message\n"
" -o, --out=filename Create an empty file when detects jobid completed\n"
);
exit (1);
}

static void freectx (void *arg)
{
wjctx_t *ctx = arg;
if (ctx->sync)
free (ctx->sync);
}

static wjctx_t *getctx (flux_t h)
{
wjctx_t *ctx = (wjctx_t *)flux_aux_get (h, "waitjob");
if (!ctx) {
ctx = xzmalloc (sizeof (*ctx));
ctx->sync = NULL;
flux_aux_set (h, "waitjob", ctx, freectx);
}
return ctx;
}

static void sig_handler (int s)
{
if (s == SIGINT) {
fprintf (stdout, "Exit on INT");
flux_close (sig_flux_h);
log_fini ();
exit (0);
}
}

static void create_outfile (const char *fn)
{
FILE *fp;
if (!fn)
fp = NULL;
else if ( !(fp = fopen (fn, "w")))
fprintf (stderr, "Failed to open %s\n", fn);
fclose (fp);
}

static inline void get_jobid (JSON jcb, int64_t *j)
{
Jget_int64 (jcb, JSC_JOBID, j);
}

static inline void get_states (JSON jcb, int64_t *os, int64_t *ns)
{
JSON o = NULL;
Jget_obj (jcb, JSC_STATE_PAIR, &o);
Jget_int64 (o, JSC_STATE_PAIR_OSTATE, os);
Jget_int64 (o, JSC_STATE_PAIR_NSTATE, ns);
}

static int waitjob_cb (const char *jcbstr, void *arg, int errnum)
{
int64_t os = 0;
int64_t ns = 0;
int64_t j = 0;
JSON jcb = NULL;
wjctx_t *ctx = NULL;
flux_t h = (flux_t)arg;

ctx = getctx (h);
if (errnum > 0) {
flux_log (ctx->h, LOG_ERR, "waitjob_cb: errnum passed in");
return -1;
}

if (!(jcb = Jfromstr (jcbstr))) {
flux_log (ctx->h, LOG_ERR, "waitjob_cb: error parsing JSON string");
return -1;
}
get_jobid (jcb, &j);
get_states (jcb, &os, &ns);
Jput (jcb);

if ((j == ctx->jobid) && (ns == J_COMPLETE)) {
if (ctx->sync)
create_outfile (ctx->sync);
raise (SIGINT);
}

return 0;
}

int wait_job_complete (flux_t h, int64_t jobid)
{
int rc = -1;
sig_flux_h = h;
JSON jcb = NULL;
JSON o = NULL;
wjctx_t *ctx = getctx (h);
ctx->jobid = jobid;
char *json_str = NULL;
int64_t state = J_NULL;

if (jsc_query_jcb (h, jobid, JSC_STATE_PAIR, &json_str) == 0) {
jcb = Jfromstr (json_str);
Jget_obj (jcb, JSC_STATE_PAIR, &o);
Jget_int64 (o, JSC_STATE_PAIR_NSTATE, &state);
Jput (jcb);
free (json_str);
flux_log (h, LOG_INFO, "%ld already started (%s)",
jobid, jsc_job_num2state (state));
if (state == J_COMPLETE) {
flux_log (h, LOG_INFO, "%ld already completed", jobid);
if (ctx->sync)
create_outfile (ctx->sync);
rc =0;
goto done;
}
} else if (signal (SIGINT, sig_handler) == SIG_ERR) {
goto done;
} else if (jsc_notify_status (h, waitjob_cb, (void *)h) != 0) {
flux_log (h, LOG_ERR, "failed to reg a waitjob CB");
goto done;
} else if (flux_reactor_start (h) < 0) {
flux_log (h, LOG_ERR, "error in flux_reactor_start");
goto done;
}

done:
return rc;
}

/******************************************************************************
* *
* Main entry point *
* *
******************************************************************************/

int main (int argc, char *argv[])
{
flux_t h;
int ch = 0;
int64_t jobid = -1;
char *fn;
wjctx_t *ctx = NULL;

log_init ("flux-waitjob");
while ((ch = getopt_long (argc, argv, OPTIONS, longopts, NULL)) != -1) {
switch (ch) {
case 'h': /* --help */
usage ();
break;
case 'o': /* --out */
fn = strdup (optarg);
break;
default:
usage ();
break;
}
}
if (optind == argc)
usage ();

jobid = strtol (argv[optind], NULL, 10);

if (!(h = flux_open (NULL, 0)))
err_exit ("flux_open");

ctx = getctx (h);
ctx->sync = strdup (fn);
free (fn);

flux_log_set_facility (h, "waitjob");
wait_job_complete (h, jobid);

flux_close (h);
log_fini ();

return 0;
}

/*
* vi:tabstop=4 shiftwidth=4 expandtab
*/
23 changes: 13 additions & 10 deletions sched/schedsrv.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
#if ENABLE_TIMER_EVENT
static int timer_event_cb (flux_t h, void *arg);
#endif
static int res_event_cb (flux_t h, int t, flux_msg_t **msg, void *arg);
static void res_event_cb (flux_t h, flux_msg_watcher_t *w,
const flux_msg_t *msg, void *arg);
static int job_status_cb (JSON jcb, void *arg, int errnum);

/******************************************************************************
Expand Down Expand Up @@ -100,9 +101,6 @@ typedef struct {
static void freectx (void *arg)
{
ssrvctx_t *ctx = arg;
/* FIXME: we probably need some item free hooked into the lists
* ignore this for a while.
*/
zlist_destroy (&(ctx->p_queue));
zlist_destroy (&(ctx->r_queue));
zlist_destroy (&(ctx->c_queue));
Expand Down Expand Up @@ -385,6 +383,11 @@ static int load_rdl (ssrvctx_t *ctx, int argc, char **argv)
* *
******************************************************************************/

static struct flux_msghandler htab[] = {
{ FLUX_MSGTYPE_EVENT, "sched.res.*", res_event_cb},
FLUX_MSGHANDLER_TABLE_END
};

/*
* Register events, some of which CAN triger a scheduling loop iteration.
* Currently,
Expand All @@ -398,13 +401,12 @@ static int inline reg_events (ssrvctx_t *ctx)
int rc = 0;
flux_t h = ctx->h;

if (flux_event_subscribe (h, "sched.res.event") < 0) {
if (flux_event_subscribe (h, "sched.res.") < 0) {
flux_log (h, LOG_ERR, "subscribing to event: %s", strerror (errno));
rc = -1;
goto done;
}
if (flux_msghandler_add (h, FLUX_MSGTYPE_EVENT, "sched.res.event",
res_event_cb, (void *)h) < 0) {
if (flux_msg_watcher_addvec (h, htab, (void *)h) < 0) {
flux_log (h, LOG_ERR,
"error registering resource event handler: %s",
strerror (errno));
Expand Down Expand Up @@ -775,7 +777,7 @@ static int action (ssrvctx_t *ctx, flux_lwj_t *job, job_state_t newstate)
flux_log (h, LOG_ERR, "%s: failed to release resources for job %ld",
__FUNCTION__, job->lwj_id);
} else {
flux_msg_t *msg = flux_event_encode ("sched.res.event", NULL);
flux_msg_t *msg = flux_event_encode ("sched.res.freed", NULL);

if (!msg || flux_send (h, msg, 0) < 0) {
flux_log (h, LOG_ERR, "%s: error sending event: %s",
Expand Down Expand Up @@ -816,10 +818,11 @@ static int action (ssrvctx_t *ctx, flux_lwj_t *job, job_state_t newstate)
* For now, the only resource event is raised when a job releases its
* RDL allocation.
*/
static int res_event_cb (flux_t h, int t, flux_msg_t **msg, void *arg)
static void res_event_cb (flux_t h, flux_msg_watcher_t *w,
const flux_msg_t *msg, void *arg)
{
schedule_jobs (getctx ((flux_t)arg));
return 0;
return;
}

#if ENABLE_TIMER_EVENT
Expand Down
1 change: 1 addition & 0 deletions t/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ TESTS = \
lua/t0002-multilevel.t \
lua/t0003-default-tags.t \
lua/t0004-derived-type.t \
t1000-jsc.t \
t2000-fcfs.t \
t2001-fcfs-aware.t \
t2002-easy.t
Expand Down
2 changes: 1 addition & 1 deletion t/sharness.d/flux-sharness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ test_under_flux() {

TEST_UNDER_FLUX_ACTIVE=t \
TERM=${ORIGINAL_TERM} \
exec flux -M${tdir}/sched start --size=${size} ${quiet} "sh $0 ${flags}"
exec flux -M${tdir}/sched -C"${tdir}/rdl/?.so" -L"${tdir}/rdl/?.lua" start --size=${size} ${quiet} "sh $0 ${flags}"
}


Expand Down
Loading

0 comments on commit f912cb2

Please sign in to comment.