Skip to content

Commit

Permalink
Jobid/Resvid wrap it to zero after server restart multiple times cons…
Browse files Browse the repository at this point in the history
…ecutively

Signed-off-by: Bhagat-Rajput <bhagatsingh.rajput@altair.com>
  • Loading branch information
Bhagat-Rajput committed Nov 16, 2018
1 parent ed47e9f commit 26032c6
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 27 deletions.
5 changes: 5 additions & 0 deletions src/server/pbsd_init.c
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,11 @@ pbsd_init(int type)
log_err(rc, __func__, msg_init_baddb);
return (-1);
}
/*
* Retrieve the jobidnumber from the database and use it to generate jobid's locally
* see: get_next_svr_sequence_id(void)
*/
svr_jobidnumber = server.sv_qs.sv_jobidnumber;
if (server.sv_attr[(int)SRV_ATR_resource_assn].at_flags &
ATR_VFLAG_SET) {
svr_attr_def[(int)SRV_ATR_resource_assn].at_free(
Expand Down
2 changes: 1 addition & 1 deletion src/server/req_quejob.c
Original file line number Diff line number Diff line change
Expand Up @@ -3236,7 +3236,7 @@ long long get_next_svr_sequence_id(void)
++svr_sequence_window_count;
ret_svr_sequence_id = svr_jobidnumber;
/* sequence window count is more than 1000, reset back to zero*/
if (svr_sequence_window_count > SEQ_WIN_INCR) {
if (svr_sequence_window_count >= SEQ_WIN_INCR) {
svr_sequence_window_count = 0;
}
/* If server job limit is over, reset back to zero */
Expand Down
2 changes: 1 addition & 1 deletion src/server/req_shutdown.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ svr_shutdown(int type)

/* Lets start by logging shutdown and saving everything */

/* Saving server jobid number to the database as server is going to shutdown.
/* Saving server jobid number to the database as server is going to shutdown.
* Once server will come up then it will start jobid/resvid from this number onwards.
*/
server.sv_qs.sv_jobidnumber = svr_jobidnumber;
Expand Down
88 changes: 63 additions & 25 deletions test/tests/functional/pbs_trillion_jobid.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def stop_and_restart_svr(self, restart_type):
self.assertTrue(self.server.isUp(), restart_msg)

def submit_job(self, sleep=10, lower=0,
upper=0, job_id=None, job_msg=None):
upper=0, job_id=None, job_msg=None, verify=False):
"""
Helper method to submit a normal/array job
and also checks the R state and particular jobid if success,
Expand All @@ -165,6 +165,9 @@ def submit_job(self, sleep=10, lower=0,
:param job_msg : Expected message upon submission failure
:type job_msg : int
:param verify : Checks Job status R
:type verify : boolean(True/False)
"""
arr_flag = False
j = Job(TEST_USER)
Expand All @@ -178,15 +181,15 @@ def submit_job(self, sleep=10, lower=0,
if job_id is not None:
self.assertEqual(jid.split('.')[0], job_id)
if arr_flag:
self.server.expect(JOB, {'job_state': 'B'}, id=jid)
self.server.expect(JOB, {'job_state=R': '%d' % (total_jobs)},
count=True, id=jid, extend='t')
if sleep == 1:
self.server.expect(JOB, 'queue', op=UNSET, id=jid)
if verify:
self.server.expect(JOB, {'job_state': 'B'}, id=jid)
self.server.expect(
JOB,
{'job_state=R': total_jobs},
count=True, id=jid, extend='t')
else:
self.server.expect(JOB, {'job_state': 'R'}, id=jid)
if sleep == 1:
self.server.expect(JOB, 'queue', op=UNSET, id=jid)
if verify:
self.server.expect(JOB, {'job_state': 'R'}, id=jid)
except PbsSubmitError as e:
if job_msg is not None:
# if JobId already exist
Expand Down Expand Up @@ -323,13 +326,14 @@ def test_max_job_sequence_id_wrap(self):
# Check default limit(9999999) and wrap it 0
a = {'resources_available.ncpus': 20}
self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
self.submit_job()
self.submit_job(lower=1, upper=2)
self.submit_job(verify=True)
self.submit_job(lower=1, upper=2, verify=True)
self.submit_resv()
sv_jobidnumber = 9999999 # default
self.set_svr_sv_jobidnumber(sv_jobidnumber)
self.submit_job(job_id='%s' % (sv_jobidnumber))
self.submit_job(lower=1, upper=2, job_id='0[]') # wrap it
self.submit_job(job_id='%s' % (sv_jobidnumber), verify=True)
self.submit_job(lower=1, upper=2, job_id='0[]',
verify=True) # wrap it
self.submit_resv(resv_id='R1')

# Check max limit (999999999999) and wrap it 0
Expand All @@ -342,12 +346,13 @@ def test_max_job_sequence_id_wrap(self):
runas=ROOT_USER,
expect=True)
self.server.expect(SERVER, seq_id)
self.submit_job()
self.submit_job(lower=1, upper=2)
self.submit_job(verify=True)
self.submit_job(lower=1, upper=2, verify=True)
self.submit_resv()
self.set_svr_sv_jobidnumber(sv_jobidnumber)
self.submit_job(job_id='%s' % (sv_jobidnumber))
self.submit_job(lower=1, upper=2, job_id='0[]') # wrap it
self.submit_job(job_id='%s' % (sv_jobidnumber), verify=True)
self.submit_job(lower=1, upper=2, job_id='0[]',
verify=True) # wrap it
self.submit_resv(resv_id='R1')

# Someone set the max_job_sequence_id less than current jobid then also
Expand All @@ -363,8 +368,8 @@ def test_max_job_sequence_id_wrap(self):
self.server.expect(SERVER, seq_id)
sv_jobidnumber = 123456789
self.set_svr_sv_jobidnumber(sv_jobidnumber)
self.submit_job(job_id='%s' % (sv_jobidnumber))
self.submit_job(lower=1, upper=2, job_id='123456790[]')
self.submit_job(job_id='%s' % (sv_jobidnumber), verify=True)
self.submit_job(lower=1, upper=2, job_id='123456790[]', verify=True)
self.submit_resv(resv_id='R123456791')
# Set smaller(12345678) than current jobid(123456790)
sv_jobidnumber = 12345678
Expand All @@ -376,8 +381,8 @@ def test_max_job_sequence_id_wrap(self):
runas=ROOT_USER,
expect=True)
self.server.expect(SERVER, seq_id)
self.submit_job(job_id='0') # wrap it to zero
self.submit_job(lower=1, upper=2, job_id='1[]')
self.submit_job(job_id='0', verify=True) # wrap it to zero
self.submit_job(lower=1, upper=2, job_id='1[]', verify=True)
self.submit_resv(resv_id='R2')

def test_verify_sequence_window(self):
Expand All @@ -387,8 +392,6 @@ def test_verify_sequence_window(self):
"""
# Abruptly kill the server so next jobid should be 1000 after server
# start
a = {'resources_available.ncpus': 15}
self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
self.set_svr_sv_jobidnumber(0)
self.submit_job(job_id='0')
self.submit_job(lower=1, upper=2, job_id='1[]')
Expand All @@ -411,14 +414,18 @@ def test_verify_sequence_window(self):
self.submit_job(lower=1, upper=2, job_id='2004[]')
self.submit_resv(resv_id='R2005')

# Verify the sequence window, incase of submitting more than 1001 jobs
# and all jobs should submit successfully without any duplication error
for _ in xrange(1010):
j = Job(TEST_USER)
self.server.submit(j)

def test_jobid_duplication(self):
"""
Tests the JobId/ResvId duplication after wrap
Job/Resv shouldn't submit because previous
jobs with the same id's are still running
"""
a = {'resources_available.ncpus': 8}
self.server.manager(MGR_CMD_SET, NODE, a, self.mom.shortname)
seq_id = {ATTR_max_job_sequence_id: 99999999}
self.server.manager(
MGR_CMD_SET,
Expand All @@ -444,3 +451,34 @@ def test_jobid_duplication(self):
# Job should submit successfully because all existing id's has been
# passed
self.submit_job(lower=1, upper=2, job_id='3[]')

def test_jobid_resvid_after_multiple_restart(self):
"""
Test to check the Jobid/Resvid should not wrap to 0 during
server restart multiple times consecutively either gracefully/abruptly
"""
j = Job(TEST_USER)
jid = self.server.submit(j)
curr_id = int(jid.split('.')[0])
self.submit_job(job_id='%s' % str(curr_id + 1))
self.submit_job(lower=1, upper=2, job_id='%s[]' % str(curr_id + 2))
self.submit_resv(resv_id='R%s' % str(curr_id + 3))
# Gracefully stop and start the server twice consecutively
self.stop_and_restart_svr('normal')
self.stop_and_restart_svr('normal')
self.submit_job(job_id='%s' % str(curr_id + 4))
self.submit_job(lower=1, upper=2, job_id='%s[]' % str(curr_id + 5))
self.submit_resv(resv_id='R%s' % str(curr_id + 6))
# Abruptly kill and start the server twice consecutively
self.stop_and_restart_svr('kill')
self.stop_and_restart_svr('kill')
# Adding 1000 in current jobid for the sequence window buffer and
# 4 for the jobs that ran already after server start
curr_id += 1000 + 4
self.submit_job(job_id='%s' % str(curr_id))
self.submit_job(lower=1, upper=2, job_id='%s[]' % str(curr_id + 1))
self.submit_resv(resv_id='R%s' % str(curr_id + 2))

def tearDown(self):
self.server.cleanup_jobs()
TestFunctional.tearDown(self)

0 comments on commit 26032c6

Please sign in to comment.