Skip to content

Commit

Permalink
Merge branch 'bareos-15.2'
Browse files Browse the repository at this point in the history
  • Loading branch information
Marco van Wieringen committed Apr 25, 2016
2 parents fc7d327 + 031808d commit 98a529b
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 95 deletions.
1 change: 1 addition & 0 deletions src/dird/dird_conf.c
Expand Up @@ -408,6 +408,7 @@ RES_ITEM job_items[] = {
{ "CancelQueuedDuplicates", CFG_TYPE_BOOL, ITEM(res_job.CancelQueuedDuplicates), 0, CFG_ITEM_DEFAULT, "false", NULL, NULL },
{ "CancelRunningDuplicates", CFG_TYPE_BOOL, ITEM(res_job.CancelRunningDuplicates), 0, CFG_ITEM_DEFAULT, "false", NULL, NULL },
{ "SaveFileHistory", CFG_TYPE_BOOL, ITEM(res_job.SaveFileHist), 0, CFG_ITEM_DEFAULT, "true", "14.2.0-", NULL },
{ "FileHistorySize", CFG_TYPE_SIZE64, ITEM(res_job.FileHistSize), 0, CFG_ITEM_DEFAULT, "10000000", "15.2.4-", NULL },
{ "PluginOptions", CFG_TYPE_ALIST_STR, ITEM(res_job.FdPluginOptions), 0, CFG_ITEM_DEPRECATED | CFG_ITEM_ALIAS, NULL, "-12.4.0", NULL },
{ "FdPluginOptions", CFG_TYPE_ALIST_STR, ITEM(res_job.FdPluginOptions), 0, 0, NULL, NULL, NULL },
{ "SdPluginOptions", CFG_TYPE_ALIST_STR, ITEM(res_job.SdPluginOptions), 0, 0, NULL, NULL, NULL },
Expand Down
5 changes: 3 additions & 2 deletions src/dird/dird_conf.h
Expand Up @@ -375,10 +375,11 @@ class JOBRES : public BRSRES {
utime_t MaxDiffInterval; /* Maximum time interval between Diffs */
utime_t DuplicateJobProximity; /* Permitted time between duplicicates */
int64_t spool_size; /* Size of spool file for this job */
int64_t max_bandwidth; /* Speed limit on this job */
int64_t FileHistSize; /* Hint about the size of the expected File history */
int32_t MaxConcurrentJobs; /* Maximum concurrent jobs */
int32_t NumConcurrentJobs; /* Number of concurrent jobs running */
int32_t MaxConcurrentCopies; /* Limit number of concurrent jobs one Copy Job spawns */
bool allow_mixed_priority; /* Allow jobs with higher priority concurrently with this */

MSGSRES *messages; /* How and where to send messages */
SCHEDRES *schedule; /* When -- Automatic schedule */
Expand All @@ -398,6 +399,7 @@ class JOBRES : public BRSRES {
alist *run_cmds; /* Run commands */
alist *RunScripts; /* Run {client} program {after|before} Job */

bool allow_mixed_priority; /* Allow jobs with higher priority concurrently with this */
bool where_use_regexp; /* true if RestoreWhere is a BREGEXP */
bool RescheduleOnError; /* Set to reschedule on error */
bool RescheduleIncompleteJobs; /* Set to reschedule incomplete Jobs */
Expand Down Expand Up @@ -425,7 +427,6 @@ class JOBRES : public BRSRES {
alist *SdPluginOptions; /* Generic SD plugin options used by this Job */
alist *DirPluginOptions; /* Generic DIR plugin options used by this Job */
alist *base; /* Base jobs */
int64_t max_bandwidth; /* Speed limit on this job */

/* Methods */
};
Expand Down
220 changes: 139 additions & 81 deletions src/dird/jobq.c
Expand Up @@ -41,8 +41,7 @@
#include "bareos.h"
#include "dird.h"

static pthread_mutex_t rstore_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t wstore_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

/* Forward referenced functions */
extern "C" void *jobq_server(void *arg);
Expand All @@ -51,6 +50,10 @@ extern "C" void *sched_wait(void *arg);
static int start_server(jobq_t *jq);
static bool acquire_resources(JCR *jcr);
static bool reschedule_job(JCR *jcr, jobq_t *jq, jobq_item_t *je);
static bool inc_client_concurrency(JCR *jcr);
static void dec_client_concurrency(JCR *jcr);
static bool inc_job_concurrency(JCR *jcr);
static void dec_job_concurrency(JCR *jcr);
static bool inc_write_store(JCR *jcr);
static void dec_write_store(JCR *jcr);

Expand Down Expand Up @@ -164,8 +167,7 @@ struct wait_pkt {
* most jobs are put into the job queue only when their
* scheduled time arives.
*/
extern "C"
void *sched_wait(void *arg)
extern "C" void *sched_wait(void *arg)
{
JCR *jcr = ((wait_pkt *)arg)->jcr;
jobq_t *jq = ((wait_pkt *)arg)->jq;
Expand Down Expand Up @@ -502,18 +504,10 @@ extern "C" void *jobq_server(void *arg)
* been acquired for jobs canceled before they were put into the ready queue.
*/
if (jcr->acquired_resource_locks) {
if (!jcr->IgnoreStorageConcurrency) {
dec_read_store(jcr);
dec_write_store(jcr);
}

if (!jcr->IgnoreClientConcurrency) {
if (jcr->res.client) {
jcr->res.client->NumConcurrentJobs--;
}
}

jcr->res.job->NumConcurrentJobs--;
dec_read_store(jcr);
dec_write_store(jcr);
dec_client_concurrency(jcr);
dec_job_concurrency(jcr);
jcr->acquired_resource_locks = false;
}

Expand Down Expand Up @@ -851,65 +845,41 @@ static bool acquire_resources(JCR *jcr)
}
#endif

if (!jcr->IgnoreStorageConcurrency) {
if (jcr->res.rstore) {
if (!inc_read_store(jcr)) {
Dmsg2(200, "Fail to acquire Rstore=%s rncj=%d\n",
jcr->res.rstore->name(), jcr->res.rstore->NumConcurrentJobs);
jcr->setJobStatus(JS_WaitStoreRes);

return false;
}
}

if (jcr->res.wstore) {
if (!inc_write_store(jcr)) {
Dmsg2(200, "Fail to acquire Wstore=%s wncj=%d\n",
jcr->res.wstore->name(), jcr->res.wstore->NumConcurrentJobs);
dec_read_store(jcr);
jcr->setJobStatus(JS_WaitStoreRes);
if (jcr->res.rstore) {
if (!inc_read_store(jcr)) {
jcr->setJobStatus(JS_WaitStoreRes);

return false;
}
return false;
}
}

if (!jcr->IgnoreClientConcurrency) {
if (jcr->res.client) {
if (jcr->res.client->NumConcurrentJobs < jcr->res.client->MaxConcurrentJobs) {
jcr->res.client->NumConcurrentJobs++;
} else {
/*
* Back out previous locks
*/
if (!jcr->IgnoreStorageConcurrency) {
dec_write_store(jcr);
dec_read_store(jcr);
}
jcr->setJobStatus(JS_WaitClientRes);
if (jcr->res.wstore) {
if (!inc_write_store(jcr)) {
dec_read_store(jcr);
jcr->setJobStatus(JS_WaitStoreRes);

return false;
}
return false;
}
}

if (jcr->res.job->NumConcurrentJobs < jcr->res.job->MaxConcurrentJobs) {
jcr->res.job->NumConcurrentJobs++;
} else {
if (!inc_client_concurrency(jcr)) {
/*
* Back out previous locks
*/
if (!jcr->IgnoreStorageConcurrency) {
dec_write_store(jcr);
dec_read_store(jcr);
}
dec_write_store(jcr);
dec_read_store(jcr);
jcr->setJobStatus(JS_WaitClientRes);

if (!jcr->IgnoreClientConcurrency) {
if (jcr->res.client) {
jcr->res.client->NumConcurrentJobs--;
}
}
return false;
}

if (!inc_job_concurrency(jcr)) {
/*
* Back out previous locks
*/
dec_write_store(jcr);
dec_read_store(jcr);
dec_client_concurrency(jcr);
jcr->setJobStatus(JS_WaitJobRes);

return false;
Expand All @@ -920,65 +890,153 @@ static bool acquire_resources(JCR *jcr)
return true;
}

static bool inc_client_concurrency(JCR *jcr)
{
if (!jcr->res.client || jcr->IgnoreClientConcurrency) {
return true;
}

P(mutex);
if (jcr->res.client->NumConcurrentJobs < jcr->res.client->MaxConcurrentJobs) {
jcr->res.client->NumConcurrentJobs++;
Dmsg2(50, "Inc Client=%s rncj=%d\n",
jcr->res.client->name(), jcr->res.client->NumConcurrentJobs);
V(mutex);

return true;
}

V(mutex);

return false;
}

static void dec_client_concurrency(JCR *jcr)
{
if (jcr->IgnoreClientConcurrency) {
return;
}

P(mutex);
if (jcr->res.client) {
jcr->res.client->NumConcurrentJobs--;
Dmsg2(50, "Dec Client=%s rncj=%d\n",
jcr->res.client->name(), jcr->res.client->NumConcurrentJobs);
}
V(mutex);
}

static bool inc_job_concurrency(JCR *jcr)
{
P(mutex);
if (jcr->res.job->NumConcurrentJobs < jcr->res.job->MaxConcurrentJobs) {
jcr->res.job->NumConcurrentJobs++;
Dmsg2(50, "Inc Job=%s rncj=%d\n",
jcr->res.job->name(), jcr->res.job->NumConcurrentJobs);
V(mutex);

return true;
}

V(mutex);

return false;
}

static void dec_job_concurrency(JCR *jcr)
{
P(mutex);
jcr->res.job->NumConcurrentJobs--;
Dmsg2(50, "Dec Job=%s rncj=%d\n",
jcr->res.job->name(), jcr->res.job->NumConcurrentJobs);
V(mutex);
}

/*
* Note: inc_read_store() and dec_read_store() are
* called from select_next_rstore() in src/dird/job.c
*/
bool inc_read_store(JCR *jcr)
{
P(rstore_mutex);
if (jcr->IgnoreStorageConcurrency) {
return true;
}

P(mutex);
if (jcr->res.rstore->NumConcurrentJobs < jcr->res.rstore->MaxConcurrentJobs) {
jcr->res.rstore->NumConcurrentReadJobs++;
jcr->res.rstore->NumConcurrentJobs++;
Dmsg2(200, "Inc Rstore=%s rncj=%d\n",
Dmsg2(50, "Inc Rstore=%s rncj=%d\n",
jcr->res.rstore->name(), jcr->res.rstore->NumConcurrentJobs);
V(rstore_mutex);
V(mutex);

return true;
}
V(rstore_mutex);
V(mutex);

Dmsg2(50, "Fail to acquire Rstore=%s rncj=%d\n",
jcr->res.rstore->name(), jcr->res.rstore->NumConcurrentJobs);

return false;
}

void dec_read_store(JCR *jcr)
{
if (jcr->res.rstore) {
P(rstore_mutex);
if (jcr->res.rstore && !jcr->IgnoreStorageConcurrency) {
P(mutex);
jcr->res.rstore->NumConcurrentReadJobs--; /* back out rstore */
jcr->res.rstore->NumConcurrentJobs--; /* back out rstore */
Dmsg2(200, "Dec Rstore=%s rncj=%d\n",
Dmsg2(50, "Dec Rstore=%s rncj=%d\n",
jcr->res.rstore->name(), jcr->res.rstore->NumConcurrentJobs);
ASSERT(jcr->res.rstore->NumConcurrentReadJobs >= 0);
ASSERT(jcr->res.rstore->NumConcurrentJobs >= 0);
V(rstore_mutex);

if (jcr->res.rstore->NumConcurrentReadJobs < 0) {
Jmsg(jcr, M_FATAL, 0, _("NumConcurrentReadJobs Dec Rstore=%s rncj=%d\n"),
jcr->res.rstore->name(), jcr->res.rstore->NumConcurrentReadJobs);
}

if (jcr->res.rstore->NumConcurrentJobs < 0) {
Jmsg(jcr, M_FATAL, 0, _("NumConcurrentJobs Dec Rstore=%s rncj=%d\n"),
jcr->res.rstore->name(), jcr->res.rstore->NumConcurrentJobs);
}
V(mutex);
}
}

static bool inc_write_store(JCR *jcr)
{
P(wstore_mutex);
if (jcr->IgnoreStorageConcurrency) {
return true;
}

P(mutex);
if (jcr->res.wstore->NumConcurrentJobs < jcr->res.wstore->MaxConcurrentJobs) {
jcr->res.wstore->NumConcurrentJobs++;
Dmsg2(200, "Inc Wstore=%s wncj=%d\n",
Dmsg2(50, "Inc Wstore=%s wncj=%d\n",
jcr->res.wstore->name(), jcr->res.wstore->NumConcurrentJobs);
V(wstore_mutex);
V(mutex);

return true;
}
V(wstore_mutex);
V(mutex);

Dmsg2(50, "Fail to acquire Wstore=%s wncj=%d\n",
jcr->res.wstore->name(), jcr->res.wstore->NumConcurrentJobs);

return false;
}

static void dec_write_store(JCR *jcr)
{
if (jcr->res.wstore) {
P(wstore_mutex);
if (jcr->res.wstore && !jcr->IgnoreStorageConcurrency) {
P(mutex);
jcr->res.wstore->NumConcurrentJobs--;
Dmsg2(200, "Dec Wstore=%s wncj=%d\n",
Dmsg2(50, "Dec Wstore=%s wncj=%d\n",
jcr->res.wstore->name(), jcr->res.wstore->NumConcurrentJobs);
ASSERT(jcr->res.wstore->NumConcurrentJobs >= 0);
V(wstore_mutex);

if (jcr->res.wstore->NumConcurrentJobs < 0) {
Jmsg(jcr, M_FATAL, 0, _("NumConcurrentJobs Dec Wstore=%s wncj=%d\n"),
jcr->res.wstore->name(), jcr->res.wstore->NumConcurrentJobs);
}
V(mutex);
}
}
1 change: 1 addition & 0 deletions src/dird/ndmp_dma_backup.c
Expand Up @@ -545,6 +545,7 @@ bool do_ndmp_backup(JCR *jcr)
nis->FileIndex = cnt + 1;
nis->jcr = jcr;
nis->save_filehist = jcr->res.job->SaveFileHist;
nis->filehist_size = jcr->res.job->FileHistSize;

ndmp_sess.param->log.ctx = nis;
ndmp_sess.param->log_tag = bstrdup("DIR-NDMP");
Expand Down
1 change: 1 addition & 0 deletions src/dird/ndmp_dma_priv.h
Expand Up @@ -98,6 +98,7 @@ struct ndmp_internal_state {
int32_t FileIndex;
char *virtual_filename;
bool save_filehist;
int64_t filehist_size;
void *fhdb_state;
};
typedef struct ndmp_internal_state NIS;
Expand Down

0 comments on commit 98a529b

Please sign in to comment.