Skip to content

Commit

Permalink
Tweak Job Scheduling code layout.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marco van Wieringen committed Feb 17, 2015
1 parent 9231657 commit cca7a5a
Showing 1 changed file with 88 additions and 34 deletions.
122 changes: 88 additions & 34 deletions src/dird/jobq.c
Original file line number Diff line number Diff line change
Expand Up @@ -361,14 +361,12 @@ static int start_server(jobq_t *jq)
return status;
}


/*
* This is the worker thread that serves the job queue.
* When all the resources are acquired for the job,
* it will call the user's engine.
*/
extern "C"
void *jobq_server(void *arg)
extern "C" void *jobq_server(void *arg)
{
struct timespec timeout;
jobq_t *jq = (jobq_t *)arg;
Expand Down Expand Up @@ -402,7 +400,9 @@ void *jobq_server(void *arg)
timedout = true;
break;
} else if (status != 0) {
/* This shouldn't happen */
/*
* This shouldn't happen
*/
Dmsg0(2300, "This shouldn't happen\n");
jq->num_workers--;
V(jq->mutex);
Expand Down Expand Up @@ -430,34 +430,45 @@ void *jobq_server(void *arg)
}
jq->running_jobs->append(je);

/* Attach jcr to this thread while we run the job */
/*
* Attach jcr to this thread while we run the job
*/
jcr->set_killable(true);
set_jcr_in_tsd(jcr);
Dmsg1(2300, "Took jobid=%d from ready and appended to run\n", jcr->JobId);

/* Release job queue lock */
/*
* Release job queue lock
*/
V(jq->mutex);

/* Call user's routine here */
/*
* Call user's routine here
*/
Dmsg3(2300, "Calling user engine for jobid=%d use=%d stat=%c\n", jcr->JobId,
jcr->use_count(), jcr->JobStatus);
jcr->use_count(), jcr->JobStatus);
jq->engine(je->jcr);

/* Job finished detach from thread */
/*
* Job finished detach from thread
*/
remove_jcr_from_tsd(je->jcr);
je->jcr->set_killable(false);

Dmsg2(2300, "Back from user engine jobid=%d use=%d.\n", jcr->JobId,
jcr->use_count());
jcr->use_count());

/* Reacquire job queue lock */
/*
* Reacquire job queue lock
*/
P(jq->mutex);
Dmsg0(200, "Done lock mutex after running job. Release locks.\n");
jq->running_jobs->remove(je);

/*
* Release locks if acquired. Note, they will not have
* been acquired for jobs canceled before they were
* put into the ready queue.
* been acquired for jobs canceled before they were
* put into the ready queue.
*/
if (jcr->acquired_resource_locks) {
dec_read_store(jcr);
Expand All @@ -484,17 +495,19 @@ void *jobq_server(void *arg)
continue; /* go look for more work */
}

/* Clean up and release old jcr */
/*
* Clean up and release old jcr
*/
Dmsg2(2300, "====== Termination job=%d use_cnt=%d\n", jcr->JobId, jcr->use_count());
jcr->SDJobStatus = 0;
V(jq->mutex); /* release internal lock */
free_jcr(jcr);
free(je); /* release job entry */
P(jq->mutex); /* reacquire job queue lock */
}

/*
* If any job in the wait queue can be run,
* move it to the ready queue
* If any job in the wait queue can be run, move it to the ready queue
*/
Dmsg0(2300, "Done check ready, now check wait queue.\n");
if (!jq->waiting_jobs->empty() && !jq->quit) {
Expand All @@ -507,6 +520,7 @@ void *jobq_server(void *arg)
Dmsg2(2300, "JobId %d is running. Look for pri=%d\n",
re->jcr->JobId, Priority);
running_allow_mix = true;

for ( ; re; ) {
Dmsg2(2300, "JobId %d is also running with %s\n",
re->jcr->JobId,
Expand All @@ -523,20 +537,24 @@ void *jobq_server(void *arg)
Priority = je->jcr->JobPriority;
Dmsg1(2300, "No job running. Look for Job pri=%d\n", Priority);
}

/*
* Walk down the list of waiting jobs and attempt
* to acquire the resources it needs.
* Walk down the list of waiting jobs and attempt to acquire the resources it needs.
*/
for ( ; je; ) {
/* je is current job item on the queue, jn is the next one */
/*
* je is current job item on the queue, jn is the next one
*/
JCR *jcr = je->jcr;
jobq_item_t *jn = (jobq_item_t *)jq->waiting_jobs->next(je);

Dmsg4(2300, "Examining Job=%d JobPri=%d want Pri=%d (%s)\n",
jcr->JobId, jcr->JobPriority, Priority,
jcr->res.job->allow_mixed_priority ? "mix" : "no mix");

/* Take only jobs of correct Priority */
/*
* Take only jobs of correct Priority
*/
if (!(jcr->JobPriority == Priority
|| (jcr->JobPriority < Priority &&
jcr->res.job->allow_mixed_priority && running_allow_mix))) {
Expand All @@ -545,7 +563,9 @@ void *jobq_server(void *arg)
}

if (!acquire_resources(jcr)) {
/* If resource conflict, job is canceled */
/*
* If resource conflict, job is canceled
*/
if (!job_canceled(jcr)) {
je = jn; /* point to next waiting job */
continue;
Expand All @@ -554,37 +574,42 @@ void *jobq_server(void *arg)

/*
* Got all locks, now remove it from wait queue and append it
* to the ready queue. Note, we may also get here if the
* job was canceled. Once it is "run", it will quickly
* terminate.
* to the ready queue. Note, we may also get here if the
* job was canceled. Once it is "run", it will quickly terminate.
*/
jq->waiting_jobs->remove(je);
jq->ready_jobs->append(je);
Dmsg1(2300, "moved JobId=%d from wait to ready queue\n", je->jcr->JobId);
je = jn; /* Point to next waiting job */
} /* end for loop */

} /* end if */

Dmsg0(2300, "Done checking wait queue.\n");

/*
* If no more ready work and we are asked to quit, then do it
*/
if (jq->ready_jobs->empty() && jq->quit) {
jq->num_workers--;
if (jq->num_workers == 0) {
Dmsg0(2300, "Wake up destroy routine\n");
/* Wake up destroy routine if he is waiting */

/*
* Wake up destroy routine if he is waiting
*/
pthread_cond_broadcast(&jq->work);
}
break;
}

Dmsg0(2300, "Check for work request\n");

/*
* If no more work requests, and we waited long enough, quit
*/
Dmsg2(2300, "timedout=%d read empty=%d\n", timedout,
jq->ready_jobs->empty());
jq->ready_jobs->empty());

if (jq->ready_jobs->empty() && timedout) {
Dmsg0(2300, "break big loop\n");
jq->num_workers--;
Expand All @@ -595,14 +620,17 @@ void *jobq_server(void *arg)
if (work) {
/*
* If a job is waiting on a Resource, don't consume all
* the CPU time looping looking for work, and even more
* important, release the lock so that a job that has
* terminated can give us the resource.
* the CPU time looping looking for work, and even more
* important, release the lock so that a job that has
* terminated can give us the resource.
*/
V(jq->mutex);
bmicrosleep(2, 0); /* pause for 2 seconds */
P(jq->mutex);
/* Recompute work as something may have changed in last 2 secs */

/*
* Recompute work as something may have changed in last 2 secs
*/
work = !jq->ready_jobs->empty() || !jq->waiting_jobs->empty();
}
Dmsg1(2300, "Loop again. work=%d\n", work);
Expand All @@ -611,6 +639,7 @@ void *jobq_server(void *arg)
Dmsg0(200, "unlock mutex\n");
V(jq->mutex);
Dmsg0(2300, "End jobq_server\n");

return NULL;
}

Expand Down Expand Up @@ -774,6 +803,7 @@ static bool acquire_resources(JCR *jcr)
" Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"),
jcr->res.rstore->name(), jcr->res.rstore_source, jcr->res.wstore->name(), jcr->res.wstore_source);
jcr->setJobStatus(JS_Canceled);

return false;
}
#endif
Expand All @@ -782,6 +812,7 @@ static bool acquire_resources(JCR *jcr)
if (!inc_read_store(jcr)) {
Dmsg1(200, "Fail rncj=%d\n", jcr->res.rstore->NumConcurrentJobs);
jcr->setJobStatus(JS_WaitStoreRes);

return false;
}
}
Expand All @@ -802,6 +833,7 @@ static bool acquire_resources(JCR *jcr)

if (skip_this_jcr) {
jcr->setJobStatus(JS_WaitStoreRes);

return false;
}

Expand All @@ -818,10 +850,13 @@ static bool acquire_resources(JCR *jcr)
if (jcr->res.client->NumConcurrentJobs < jcr->res.client->MaxConcurrentJobs) {
jcr->res.client->NumConcurrentJobs++;
} else {
/* Back out previous locks */
/*
* Back out previous locks
*/
dec_write_store(jcr);
dec_read_store(jcr);
jcr->setJobStatus(JS_WaitClientRes);

return false;
}
}
Expand All @@ -831,15 +866,34 @@ static bool acquire_resources(JCR *jcr)
if (jcr->res.job->NumConcurrentJobs < jcr->res.job->MaxConcurrentJobs) {
jcr->res.job->NumConcurrentJobs++;
} else {
/* Back out previous locks */
/*
* Back out previous locks
*/
dec_write_store(jcr);
dec_read_store(jcr);
jcr->res.client->NumConcurrentJobs--;

/*
* Some Job Types are excluded from the client concurrency as they have no
* interaction with the client at all.
*/
switch (jcr->getJobType()) {
case JT_MIGRATE:
case JT_COPY:
break;
default:
if (jcr->res.client) {
jcr->res.client->NumConcurrentJobs--;
}
break;
}

jcr->setJobStatus(JS_WaitJobRes);

return false;
}

jcr->acquired_resource_locks = true;

return true;
}

Expand Down

0 comments on commit cca7a5a

Please sign in to comment.