diff --git a/src/dird/jobq.c b/src/dird/jobq.c index e1ef533654c..fd2e07b403c 100644 --- a/src/dird/jobq.c +++ b/src/dird/jobq.c @@ -153,10 +153,10 @@ struct wait_pkt { /* * Wait until schedule time arrives before starting. Normally - * this routine is only used for jobs started from the console - * for which the user explicitly specified a start time. Otherwise - * most jobs are put into the job queue only when their - * scheduled time arives. + * this routine is only used for jobs started from the console + * for which the user explicitly specified a start time. Otherwise + * most jobs are put into the job queue only when their + * scheduled time arives. */ extern "C" void *sched_wait(void *arg) @@ -169,12 +169,18 @@ void *sched_wait(void *arg) free(arg); time_t wtime = jcr->sched_time - time(NULL); jcr->setJobStatus(JS_WaitStartTime); - /* Wait until scheduled time arrives */ + + /* + * Wait until scheduled time arrives + */ if (wtime > 0) { Jmsg(jcr, M_INFO, 0, _("Job %s waiting %d seconds for scheduled start time.\n"), jcr->Job, wtime); } - /* Check every 30 seconds if canceled */ + + /* + * Check every 30 seconds if canceled + */ while (wtime > 0) { Dmsg3(2300, "Waiting on sched time, jobid=%d secs=%d use=%d\n", jcr->JobId, wtime, jcr->use_count()); @@ -191,12 +197,13 @@ void *sched_wait(void *arg) jobq_add(jq, jcr); free_jcr(jcr); /* we are done with jcr */ Dmsg0(2300, "Exit sched_wait\n"); + return NULL; } /* - * Add a job to the queue - * jq is a queue that was created with jobq_init + * Add a job to the queue + * jq is a queue that was created with jobq_init */ int jobq_add(jobq_t *jq, JCR *jcr) { @@ -208,7 +215,9 @@ int jobq_add(jobq_t *jq, JCR *jcr) wait_pkt *sched_pkt; if (!jcr->term_wait_inited) { - /* Initialize termination condition variable */ + /* + * Initialize termination condition variable + */ if ((status = pthread_cond_init(&jcr->term_wait, NULL)) != 0) { berrno be; Jmsg1(jcr, M_FATAL, 0, _("Unable to init job cond variable: ERR=%s\n"), be.bstrerror(status)); @@ -246,14 +255,20 @@ int jobq_add(jobq_t *jq, JCR *jcr) } item->jcr = jcr; - /* While waiting in a queue this job is not attached to a thread */ + /* + * While waiting in a queue this job is not attached to a thread + */ set_jcr_in_tsd(INVALID_JCR); if (job_canceled(jcr)) { - /* Add job to ready queue so that it is canceled quickly */ + /* + * Add job to ready queue so that it is canceled quickly + */ jq->ready_jobs->prepend(item); Dmsg1(2300, "Prepended job=%d to ready queue\n", jcr->JobId); } else { - /* Add this job to the wait queue in priority sorted order */ + /* + * Add this job to the wait queue in priority sorted order + */ foreach_dlist(li, jq->waiting_jobs) { Dmsg2(2300, "waiting item jobid=%d priority=%d\n", li->jcr->JobId, li->jcr->JobPriority); @@ -265,14 +280,19 @@ int jobq_add(jobq_t *jq, JCR *jcr) break; } } - /* If not jobs in wait queue, append it */ + + /* + * If not jobs in wait queue, append it + */ if (!inserted) { jq->waiting_jobs->append(item); Dmsg1(2300, "Appended item jobid=%d to waiting queue\n", jcr->JobId); } } - /* Ensure that at least one server looks at the queue. */ + /* + * Ensure that at least one server looks at the queue. + */ status = start_server(jq); V(jq->mutex); @@ -281,13 +301,13 @@ int jobq_add(jobq_t *jq, JCR *jcr) } /* - * Remove a job from the job queue. Used only by cancel_job(). - * jq is a queue that was created with jobq_init - * work_item is an element of work + * Remove a job from the job queue. Used only by cancel_job(). + * jq is a queue that was created with jobq_init + * work_item is an element of work * - * Note, it is "removed" from the job queue. - * If you want to cancel it, you need to provide some external means - * of doing so (e.g. pthread_kill()). + * Note, it is "removed" from the job queue. + * If you want to cancel it, you need to provide some external means + * of doing so (e.g. pthread_kill()). */ int jobq_remove(jobq_t *jq, JCR *jcr) { @@ -313,7 +333,9 @@ int jobq_remove(jobq_t *jq, JCR *jcr) return EINVAL; } - /* Move item to be the first on the list */ + /* + * Move item to be the first on the list + */ jq->waiting_jobs->remove(item); jq->ready_jobs->prepend(item); Dmsg2(2300, "jobq_remove jobid=%d jcr=0x%x moved to ready queue\n", jcr->JobId, jcr); @@ -325,7 +347,6 @@ int jobq_remove(jobq_t *jq, JCR *jcr) return status; } - /* * Start the server thread if it isn't already running */ @@ -335,9 +356,10 @@ static int start_server(jobq_t *jq) pthread_t id; /* - * if any threads are idle, wake one. - * Actually we do a broadcast because on /lib/tls - * these signals seem to get lost from time to time. + * If any threads are idle, wake one. + * + * Actually we do a broadcast because on /lib/tls + * these signals seem to get lost from time to time. */ if (jq->idle_workers > 0) { Dmsg0(2300, "Signal worker to wake up\n"); @@ -348,7 +370,9 @@ static int start_server(jobq_t *jq) } } else if (jq->num_workers < jq->max_workers) { Dmsg0(2300, "Create worker thread\n"); - /* No idle threads so create a new one */ + /* + * No idle threads so create a new one + */ set_thread_concurrency(jq->max_workers + 1); jq->num_workers++; if ((status = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) { @@ -364,7 +388,7 @@ static int start_server(jobq_t *jq) /* * This is the worker thread that serves the job queue. * When all the resources are acquired for the job, - * it will call the user's engine. + * it will call the user's engine. */ extern "C" void *jobq_server(void *arg) { @@ -411,12 +435,14 @@ extern "C" void *jobq_server(void *arg) break; } } + /* * If anything is in the ready queue, run it */ Dmsg0(2300, "Checking ready queue.\n"); while (!jq->ready_jobs->empty() && !jq->quit) { JCR *jcr; + je = (jobq_item_t *)jq->ready_jobs->first(); jcr = je->jcr; jq->ready_jobs->remove(je); @@ -467,26 +493,20 @@ extern "C" void *jobq_server(void *arg) /* * Release locks if acquired. Note, they will not have - * been acquired for jobs canceled before they were - * put into the ready queue. + * been acquired for jobs canceled before they were put into the ready queue. */ if (jcr->acquired_resource_locks) { - dec_read_store(jcr); - dec_write_store(jcr); - if (jcr->res.client) { - /* - * Some Job Types are excluded from the client concurrency as they have no - * interaction with the client at all. - */ - switch (jcr->getJobType()) { - case JT_MIGRATE: - case JT_COPY: - break; - default: + if (!jcr->IgnoreStorageConcurrency) { + dec_read_store(jcr); + dec_write_store(jcr); + } + + if (!jcr->IgnoreClientConcurrency) { + if (jcr->res.client) { jcr->res.client->NumConcurrentJobs--; - break; } } + jcr->res.job->NumConcurrentJobs--; jcr->acquired_resource_locks = false; } @@ -555,9 +575,10 @@ extern "C" void *jobq_server(void *arg) /* * Take only jobs of correct Priority */ - if (!(jcr->JobPriority == Priority - || (jcr->JobPriority < Priority && - jcr->res.job->allow_mixed_priority && running_allow_mix))) { + if (!(jcr->JobPriority == Priority || + (jcr->JobPriority < Priority && + jcr->res.job->allow_mixed_priority && + running_allow_mix))) { jcr->setJobStatus(JS_WaitPriority); break; } @@ -775,15 +796,46 @@ static bool acquire_resources(JCR *jcr) { bool skip_this_jcr = false; + /* + * Set that we didn't acquire any resourse locks yet. + */ jcr->acquired_resource_locks = false; -/* - * Turning this code off is likely to cause some deadlocks, - * but we do not really have enough information here to - * know if this is really a deadlock (it may be a dual drive - * autochanger), and in principle, the SD reservation system - * should detect these deadlocks, so push the work off on it. - */ + + /* + * Some Job Types are excluded from the client and storage concurrency + * as they have no interaction with the client or storage at all. + */ + switch (jcr->getJobType()) { + case JT_MIGRATE: + case JT_COPY: + /* + * Migration/Copy jobs are not counted for client concurrency + * as they do not touch the client at all + */ + jcr->IgnoreClientConcurrency = true; + Dmsg1(200, "Skipping migrate/copy Job %s for client concurrency\n", jcr->Job); + + if (jcr->MigrateJobId == 0) { + /* + * Migration/Copy control jobs are not counted for storage concurrency + * as they do not touch the storage at all + */ + Dmsg1(200, "Skipping migrate/copy Control Job %s for storage concurrency\n", jcr->Job); + jcr->IgnoreStorageConcurrency = true; + } + break; + default: + break; + } + #ifdef xxx + /* + * Turning this code off is likely to cause some deadlocks, + * but we do not really have enough information here to + * know if this is really a deadlock (it may be a dual drive + * autochanger), and in principle, the SD reservation system + * should detect these deadlocks, so push the work off on it. + */ if (jcr->res.rstore && jcr->res.rstore == jcr->res.wstore) { /* possible deadlock */ Jmsg(jcr, M_FATAL, 0, _("Job canceled. Attempt to read and write same device.\n" " Read storage \"%s\" (From %s) -- Write storage \"%s\" (From %s)\n"), @@ -793,45 +845,40 @@ static bool acquire_resources(JCR *jcr) return false; } #endif - if (jcr->res.rstore) { - Dmsg1(200, "Rstore=%s\n", jcr->res.rstore->name()); - if (!inc_read_store(jcr)) { - Dmsg1(200, "Fail rncj=%d\n", jcr->res.rstore->NumConcurrentJobs); - jcr->setJobStatus(JS_WaitStoreRes); - return false; + if (!jcr->IgnoreStorageConcurrency) { + if (jcr->res.rstore) { + Dmsg1(200, "Rstore=%s\n", jcr->res.rstore->name()); + if (!inc_read_store(jcr)) { + Dmsg1(200, "Fail rncj=%d\n", jcr->res.rstore->NumConcurrentJobs); + jcr->setJobStatus(JS_WaitStoreRes); + + return false; + } } - } - if (jcr->res.wstore) { - Dmsg1(200, "Wstore=%s\n", jcr->res.wstore->name()); - if (jcr->res.wstore->NumConcurrentJobs < jcr->res.wstore->MaxConcurrentJobs) { - jcr->res.wstore->NumConcurrentJobs++; - Dmsg1(200, "Inc wncj=%d\n", jcr->res.wstore->NumConcurrentJobs); - } else if (jcr->res.rstore) { - dec_read_store(jcr); - skip_this_jcr = true; - } else { - Dmsg1(200, "Fail wncj=%d\n", jcr->res.wstore->NumConcurrentJobs); - skip_this_jcr = true; + if (jcr->res.wstore) { + Dmsg1(200, "Wstore=%s\n", jcr->res.wstore->name()); + if (jcr->res.wstore->NumConcurrentJobs < jcr->res.wstore->MaxConcurrentJobs) { + jcr->res.wstore->NumConcurrentJobs++; + Dmsg1(200, "Inc wncj=%d\n", jcr->res.wstore->NumConcurrentJobs); + } else if (jcr->res.rstore) { + dec_read_store(jcr); + skip_this_jcr = true; + } else { + Dmsg1(200, "Fail wncj=%d\n", jcr->res.wstore->NumConcurrentJobs); + skip_this_jcr = true; + } } - } - if (skip_this_jcr) { - jcr->setJobStatus(JS_WaitStoreRes); + if (skip_this_jcr) { + jcr->setJobStatus(JS_WaitStoreRes); - return false; + return false; + } } - /* - * Some Job Types are excluded from the client concurrency as they have no - * interaction with the client at all. - */ - switch (jcr->getJobType()) { - case JT_MIGRATE: - case JT_COPY: - break; - default: + if (!jcr->IgnoreClientConcurrency) { if (jcr->res.client) { if (jcr->res.client->NumConcurrentJobs < jcr->res.client->MaxConcurrentJobs) { jcr->res.client->NumConcurrentJobs++; @@ -839,14 +886,15 @@ static bool acquire_resources(JCR *jcr) /* * Back out previous locks */ - dec_write_store(jcr); - dec_read_store(jcr); + if (!jcr->IgnoreStorageConcurrency) { + dec_write_store(jcr); + dec_read_store(jcr); + } jcr->setJobStatus(JS_WaitClientRes); return false; } } - break; } if (jcr->res.job->NumConcurrentJobs < jcr->res.job->MaxConcurrentJobs) { @@ -855,22 +903,15 @@ static bool acquire_resources(JCR *jcr) /* * Back out previous locks */ - dec_write_store(jcr); - dec_read_store(jcr); + if (!jcr->IgnoreStorageConcurrency) { + dec_write_store(jcr); + dec_read_store(jcr); + } - /* - * Some Job Types are excluded from the client concurrency as they have no - * interaction with the client at all. - */ - switch (jcr->getJobType()) { - case JT_MIGRATE: - case JT_COPY: - break; - default: + if (!jcr->IgnoreClientConcurrency) { if (jcr->res.client) { jcr->res.client->NumConcurrentJobs--; } - break; } jcr->setJobStatus(JS_WaitJobRes); diff --git a/src/include/jcr.h b/src/include/jcr.h index ad2d3e63645..ce0b58c44b0 100644 --- a/src/include/jcr.h +++ b/src/include/jcr.h @@ -462,6 +462,8 @@ class JCR { volatile bool sd_msg_thread_done; /* Set when Storage message thread done */ bool IgnoreDuplicateJobChecking; /* Set in migration jobs */ bool IgnoreLevelPoolOverides; /* Set if a cmdline pool was specified */ + bool IgnoreClientConcurrency; /* Set in migration jobs */ + bool IgnoreStorageConcurrency; /* Set in migration jobs */ bool spool_data; /* Spool data in SD */ bool acquired_resource_locks; /* Set if resource locks acquired */ bool term_wait_inited; /* Set when cond var inited */