Skip to content

Commit

Permalink
Revert "Add possibility of resuming multinode jobs (openpbs#1955)"
Browse files Browse the repository at this point in the history
This reverts commit fc02d4c.
  • Loading branch information
SudeshnaMoulik committed Sep 18, 2020
1 parent dd524eb commit d589dc1
Show file tree
Hide file tree
Showing 9 changed files with 12 additions and 295 deletions.
6 changes: 0 additions & 6 deletions src/include/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,6 @@ struct job {
int ji_stderr; /* socket for stderr */
int ji_ports[2]; /* ports for stdout/err */
enum bg_hook_request ji_hook_running_bg_on; /* set when hook starts in the background*/
int ji_msconnected; /* 0 - not connected, 1 - connected */
pbs_list_head ji_multinodejobs; /* links to recovered multinode jobs */
#else /* END Mom ONLY - start Server ONLY */
struct batch_request *ji_pmt_preq; /* outstanding preempt job request for deleting jobs */
int ji_discarding; /* discarding job */
Expand Down Expand Up @@ -594,8 +592,6 @@ struct job {
#ifdef PBS_MOM
tm_host_id ji_nodeidx; /* my node id */
tm_task_id ji_taskidx; /* generate task id's for job */
int ji_stdout;
int ji_stderr;
#if MOM_ALPS
long ji_reservation;
/* ALPS reservation identifier */
Expand Down Expand Up @@ -749,8 +745,6 @@ typedef struct infoent {
#define IM_EXEC_PROLOGUE 24
#define IM_CRED 25
#define IM_PMIX 26
#define IM_RECONNECT_TO_MS 27
#define IM_JOIN_RECOV_JOB 28

#define IM_ERROR 99
#define IM_ERROR2 100
Expand Down
2 changes: 1 addition & 1 deletion src/include/mom_func.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ extern pid_t fork_me(int sock);
extern ssize_t readpipe(int pfd, void *vptr, size_t nbytes);
extern ssize_t writepipe(int pfd, void *vptr, size_t nbytes);
extern int get_la(double *);
extern void init_abort_jobs(int, pbs_list_head *);
extern void init_abort_jobs(int);
extern void checkret(char **spot, int len);
extern void mom_nice(void);
extern void mom_unnice(void);
Expand Down
23 changes: 3 additions & 20 deletions src/resmom/catch_child.c
Original file line number Diff line number Diff line change
Expand Up @@ -443,9 +443,6 @@ scan_for_exiting(void)
for (pjob = (job *)GET_NEXT(svr_alljobs); pjob; pjob = nxjob) {
nxjob = (job *)GET_NEXT(pjob->ji_alljobs);

if (pjob->ji_numnodes > 1 && !pjob->ji_msconnected && pjob->ji_nodeid) /* assume that MS has a connection to itself at all times */
continue;

/*
** If a restart is active, skip this job since
** not all of the tasks may have started yet.
Expand Down Expand Up @@ -941,12 +938,11 @@ send_hellosvr(int stream)
* terminated and requeued.
*
* @param [in] recover - Specify recovering mode for MoM.
* @param [in] multinode_jobs - Pointer to list of pointers to recovered multinode jobs
*
*/

void
init_abort_jobs(int recover, pbs_list_head *multinode_jobs)
init_abort_jobs(int recover)
{
DIR *dir;
int i, sisters;
Expand All @@ -962,8 +958,6 @@ init_abort_jobs(int recover, pbs_list_head *multinode_jobs)
extern char *path_checkpoint;
extern char *path_spool;

CLEAR_HEAD((*multinode_jobs));

dir = opendir(path_jobs);
if (dir == NULL) {
log_event(PBSEVENT_ERROR, PBS_EVENTCLASS_SERVER, LOG_ALERT,
Expand Down Expand Up @@ -1027,10 +1021,8 @@ init_abort_jobs(int recover, pbs_list_head *multinode_jobs)
*/
if ((pj->ji_qs.ji_svrflags & JOB_SVFLG_HERE) == 0) {
/* I am sister, junk the job files */
if( recover != 2 ) {
mom_deljob(pj);
continue;
}
mom_deljob(pj);
continue;
}

sisters = pj->ji_numnodes - 1;
Expand Down Expand Up @@ -1129,15 +1121,6 @@ init_abort_jobs(int recover, pbs_list_head *multinode_jobs)

if (mom_do_poll(pj))
append_link(&mom_polljobs, &pj->ji_jobque, pj);

if (sisters > 0)
append_link(multinode_jobs, &pj->ji_multinodejobs, pj);

if (pj->ji_qs.ji_svrflags & JOB_SVFLG_HERE) {
/* I am MS */
pj->ji_stdout = pj->ji_ports[0] = pj->ji_extended.ji_ext.ji_stdout;
pj->ji_stderr = pj->ji_ports[1] = pj->ji_extended.ji_ext.ji_stdout;
}
}
}
if (errno != 0 && errno != ENOENT) {
Expand Down
33 changes: 1 addition & 32 deletions src/resmom/mom_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>

#include <unistd.h>
#include <dirent.h>
Expand Down Expand Up @@ -118,7 +117,6 @@ extern char *msg_err_malloc;
extern int
write_pipe_data(int upfds, void *data, int data_size);
char task_fmt[] = "/%8.8X";
extern void resume_multinode(job *pjob);


/* Function pointers
Expand Down Expand Up @@ -2317,7 +2315,7 @@ term_job(job *pjob)
int num;

for (num=0, np = pjob->ji_hosts;
num < pjob->ji_numnodes;
num<pjob->ji_numnodes;
num++, np++) {
if (np->hn_stream >= 0) {
np->hn_stream = -1;
Expand Down Expand Up @@ -2382,7 +2380,6 @@ im_eof(int stream, int ret)
np->hn_stream = -1;
if (np->hn_eof_ts == 0)
np->hn_eof_ts = time(0);
pjob->ji_msconnected = 0;

/*
** In case connection to pbs_comm is down/recently established, do not kill a job that is actually running.
Expand Down Expand Up @@ -2496,7 +2493,6 @@ check_ms(int stream, job *pjob)
np->hn_stream = stream;
}
np->hn_eof_ts = 0;
pjob->ji_msconnected = 1;
return FALSE;
}

Expand Down Expand Up @@ -3024,27 +3020,6 @@ im_request(int stream, int version)
BAIL("fromtask")
switch (command) {

case IM_JOIN_RECOV_JOB:
reply = 1;

hnodenum = disrsi(stream, &ret);
BAIL("JOINJOB nodenum")

np = NULL;
/* job should already exist */
pjob = find_job(jobid);
if( pjob == NULL ) {
SEND_ERR(PBSE_SYSTEM)
goto done;
}
pjob->ji_stdout = disrsi(stream, &ret);
BAIL("JOINJOB stdout")
pjob->ji_stderr = disrsi(stream, &ret);
BAIL("JOINJOB stderr")
pjob->ji_qs.ji_un.ji_momt.ji_exuid = pjob->ji_grpcache->gc_uid;
pjob->ji_qs.ji_un.ji_momt.ji_exgid = pjob->ji_grpcache->gc_gid;
pjob->ji_msconnected = 1;
goto done;
case IM_JOIN_JOB:
/*
** Sender is mom superior sending a job structure to me.
Expand Down Expand Up @@ -3089,7 +3064,6 @@ im_request(int stream, int version)
info = disrcs(stream, &len, &ret);
BAIL("JOINJOB credential")
}
pjob->ji_msconnected = 1;

pjob->ji_numnodes = hnodenum;
CLEAR_HEAD(lhead);
Expand Down Expand Up @@ -4056,7 +4030,6 @@ im_request(int stream, int version)
op->oe_u.oe_tm.oe_node = pvnodeid;
op->oe_u.oe_tm.oe_event = event;
op->oe_u.oe_tm.oe_taskid = fromtask;
task_save(ptask);
reply = 0;
}
break;
Expand Down Expand Up @@ -5430,10 +5403,6 @@ im_request(int stream, int version)
if (ret != DIS_SUCCESS)
goto err;
break;
case IM_RECONNECT_TO_MS:
if (pjob->ji_qs.ji_svrflags & JOB_SVFLG_HERE)
resume_multinode(pjob);
break;

default:
sprintf(log_buffer, "unknown command %d sent", command);
Expand Down
28 changes: 5 additions & 23 deletions src/resmom/mom_main.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ DWORD g_dwCurrentState = SERVICE_START_PENDING;
HANDLE hStop = NULL;
#endif /* WIN32 */
extern void mom_vnlp_report(vnl_t *vnl, char *header);
extern void resume_multinode(job *pjob);

int alien_attach = 0; /* attach alien procs */
int alien_kill = 0; /* kill alien procs */
Expand Down Expand Up @@ -607,7 +606,7 @@ extern void dep_cleanup(void);

/* External Functions */
extern void catch_child(int);
extern void init_abort_jobs(int, pbs_list_head *);
extern void init_abort_jobs(int);
extern void scan_for_exiting(void);
#ifdef NAS /* localmod 015 */
extern int to_size(char *, struct size_value *);
Expand Down Expand Up @@ -6301,7 +6300,6 @@ mom_over_limit(job *pjob)
* check attr value limits of job
*
* @param[in] pjob - pointer to job
* @param[in] recover - recovering mode for MoM
*
* @return int
* @retval 0 Failure
Expand All @@ -6310,7 +6308,7 @@ mom_over_limit(job *pjob)
*/

int
job_over_limit(job *pjob, int recover)
job_over_limit(job *pjob)
{
attribute *attr;
attribute *used;
Expand Down Expand Up @@ -6343,7 +6341,7 @@ job_over_limit(job *pjob, int recover)

/* special case EOF */
if (pnode->hn_sister == SISTER_EOF) {
if ((reliable_job_node_find(&pjob->ji_failed_node_list,pnode->hn_host) != NULL) || (do_tolerate_node_failures(pjob)) || recover == 2) {
if ((reliable_job_node_find(&pjob->ji_failed_node_list,pnode->hn_host) != NULL) || (do_tolerate_node_failures(pjob))) {
snprintf(log_buffer, sizeof(log_buffer), "ignoring node EOF %d from failed mom %s as job is tolerant of node failures", pjob->ji_nodekill, pnode->hn_host?pnode->hn_host:"");
log_event(PBSEVENT_DEBUG3, PBS_EVENTCLASS_JOB, LOG_DEBUG, pjob->ji_qs.ji_jobid, log_buffer);
return 0;
Expand Down Expand Up @@ -8485,10 +8483,8 @@ main(int argc, char *argv[])
log_err(c, msg_daemonname, "unable to recover vnode to host mapping");
}

pbs_list_link multinode_jobs;

/* recover & abort Jobs which were under MOM's control */
init_abort_jobs(recover, &multinode_jobs);
init_abort_jobs(recover);

/* deploy periodic hooks */
mom_hook_input_init(&hook_input);
Expand Down Expand Up @@ -8542,20 +8538,6 @@ main(int argc, char *argv[])
if (time_now > time_next_hello) {
send_hellosvr(server_stream);
time_next_hello = time_now + time_delta_hellosvr(MOM_DELTA_NORMAL);
if (server_stream != -1) {
job *m_job;
for (m_job = (job *)GET_NEXT(multinode_jobs); m_job;
m_job = (job *)GET_NEXT(m_job->ji_multinodejobs)) {
if (m_job->ji_qs.ji_svrflags & JOB_SVFLG_HERE) {
/* I am MS */
resume_multinode(m_job);
} else {
/* I am sister */
send_sisters(m_job, IM_RECONNECT_TO_MS, NULL);
}
}
CLEAR_HEAD(multinode_jobs);
}
}
} else
send_pending_updates();
Expand Down Expand Up @@ -8945,7 +8927,7 @@ main(int argc, char *argv[])
if (c & (JOB_SVFLG_OVERLMT1 | JOB_SVFLG_OVERLMT2 | JOB_SVFLG_TERMJOB))
continue;

if (job_over_limit(pjob, recover)) {
if (job_over_limit(pjob)) {

char *kill_msg;
log_event(PBSEVENT_JOB | PBSEVENT_FORCE,
Expand Down
48 changes: 0 additions & 48 deletions src/resmom/start_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -5927,52 +5927,6 @@ job_nodes(struct job *pjob)
return job_nodes_inner(pjob, NULL);
}

/**
* @brief
* Resume multinode job after one or more sisters has been restarted
*
* @param[in] pjob - job pointer
*
* @return Void
*
*/

void resume_multinode(job *pjob)
{
if (pjob->ji_hosts == NULL)
return;

int com = IM_JOIN_RECOV_JOB;
hnodent *np = NULL;
eventent *ep = NULL;
int i;
for(i = 1; i < pjob->ji_numnodes; i++) {
np = &pjob->ji_hosts[i];

if( i == 1 )
ep = event_alloc(pjob, com, -1, np, TM_NULL_EVENT, TM_NULL_TASK);
else
ep = event_dup(ep, pjob, np);

if (ep == NULL) {
exec_bail(pjob, JOB_EXEC_FAIL1, NULL);
return;
}

int stream = np->hn_stream;
im_compose(stream, pjob->ji_qs.ji_jobid,
get_jattr_str(pjob, JOB_ATR_Cookie),
com, ep->ee_event, TM_NULL_TASK, IM_OLD_PROTOCOL_VER);
(void)diswsi(stream, pjob->ji_numnodes);
(void)diswsi(stream, pjob->ji_ports[0]);
(void)diswsi(stream, pjob->ji_ports[1]);
dis_flush(stream);
#if defined(PBS_SECURITY) && (PBS_SECURITY == KRB5)
send_cred_sisters(pjob);
#endif
}
}

/**
* @brief
* start_exec() - start execution of a job
Expand Down Expand Up @@ -6235,8 +6189,6 @@ start_exec(job *pjob)
}
pjob->ji_stdout = socks[0];
pjob->ji_stderr = socks[1];
pjob->ji_extended.ji_ext.ji_stdout = pjob->ji_ports[0];
pjob->ji_extended.ji_ext.ji_stderr = pjob->ji_ports[1];
}

for (i = 1; i < nodenum; i++) {
Expand Down
6 changes: 0 additions & 6 deletions src/server/job_func.c
Original file line number Diff line number Diff line change
Expand Up @@ -348,10 +348,6 @@ job_alloc(void)
pj->ji_stderr = 0;
pj->ji_setup = NULL;
pj->ji_momsubt = 0;
pj->ji_msconnected = 0;
CLEAR_HEAD(pj->ji_multinodejobs);
pj->ji_extended.ji_ext.ji_stdout = 0;
pj->ji_extended.ji_ext.ji_stderr = 0;
#else /* SERVER */
pj->ji_discarding = 0;
pj->ji_prunreq = NULL;
Expand Down Expand Up @@ -579,8 +575,6 @@ job_free(job *pj)
if (job_free_extra != NULL)
job_free_extra(pj);

CLEAR_HEAD(pj->ji_multinodejobs);

#ifdef WIN32
if (pj->ji_hJob) {
CloseHandle(pj->ji_hJob);
Expand Down
4 changes: 2 additions & 2 deletions test/fw/ptl/lib/pbs_testlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -13340,14 +13340,14 @@ def stop(self, sig=None):
raise PbsServiceError(rc=e.rc, rv=e.rv, msg=e.msg)
return True

def restart(self, args=None):
def restart(self):
"""
Restart the PBS mom
"""
if self.isUp():
if not self.stop():
return False
return self.start(args=args)
return self.start()

def log_match(self, msg=None, id=None, n=50, tail=True, allmatch=False,
regexp=False, max_attempts=None, interval=None,
Expand Down

0 comments on commit d589dc1

Please sign in to comment.