Skip to content

Commit

Permalink
cleanup: removed dead code and wrong comments
Browse files Browse the repository at this point in the history
- wq->idle_workers = 0;              /* no idle threads */
  idle_workers only set to 0 and never changed
  • Loading branch information
franku committed May 30, 2018
1 parent 8909198 commit f89ceba
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 121 deletions.
34 changes: 4 additions & 30 deletions core/src/dird/jobq.cc
Expand Up @@ -67,7 +67,7 @@ static void DecWriteStore(JobControlRecord *jcr);
* Returns: 0 on success
* errno on failure
*/
int JobqInit(jobq_t *jq, int threads, void *(*engine)(void *arg))
int JobqInit(jobq_t *jq, int max_workers, void *(*engine)(void *arg))
{
int status;
jobq_item_t *item = NULL;
Expand Down Expand Up @@ -95,9 +95,8 @@ int JobqInit(jobq_t *jq, int threads, void *(*engine)(void *arg))
return status;
}
jq->quit = false;
jq->max_workers = threads; /* max threads to create */
jq->max_workers = max_workers; /* max threads to create */
jq->num_workers = 0; /* no threads yet */
jq->idle_workers = 0; /* no idle threads */
jq->engine = engine; /* routine to run */
jq->valid = JOBQ_VALID;

Expand Down Expand Up @@ -132,14 +131,6 @@ int JobqDestroy(jobq_t *jq)
*/
if (jq->num_workers > 0) {
jq->quit = true;
if (jq->idle_workers) {
if ((status = pthread_cond_broadcast(&jq->work)) != 0) {
BErrNo be;
Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_broadcast: ERR=%s\n"), be.bstrerror(status));
V(jq->mutex);
return status;
}
}
while (jq->num_workers > 0) {
if ((status = pthread_cond_wait(&jq->work, &jq->mutex)) != 0) {
BErrNo be;
Expand Down Expand Up @@ -367,32 +358,15 @@ static int StartServer(jobq_t *jq)
int status = 0;
pthread_t id;

/*
* If any threads are idle, wake one.
*
* Actually we do a broadcast because on /lib/tls
* these signals seem to get lost from time to time.
*/
if (jq->idle_workers > 0) {
Dmsg0(2300, "Signal worker to wake up\n");
if ((status = pthread_cond_broadcast(&jq->work)) != 0) {
BErrNo be;
Jmsg1(NULL, M_ERROR, 0, _("pthread_cond_signal: ERR=%s\n"), be.bstrerror(status));
return status;
}
} else if (jq->num_workers < jq->max_workers) {
if (jq->num_workers < jq->max_workers) {
Dmsg0(2300, "Create worker thread\n");
/*
* No idle threads so create a new one
*/
SetThreadConcurrency(jq->max_workers + 1);
jq->num_workers++;
if ((status = pthread_create(&id, &jq->attr, jobq_server, (void *)jq)) != 0) {
BErrNo be;
jq->num_workers--;
Jmsg1(NULL, M_ERROR, 0, _("pthread_create: ERR=%s\n"), be.bstrerror(status));
return status;
}
jq->num_workers++;
}
return status;
}
Expand Down
3 changes: 1 addition & 2 deletions core/src/dird/jobq.h
Expand Up @@ -55,15 +55,14 @@ struct jobq_t {
bool quit; /* jobq should quit */
int max_workers; /* max threads */
int num_workers; /* current threads */
int idle_workers; /* idle threads */
void *(*engine)(void *arg); /* user engine */
};

#define JOBQ_VALID 0xdec1993

extern int JobqInit(
jobq_t *wq,
int threads, /* maximum threads */
int max_workers, /* maximum threads */
void *(*engine)(void *) /* engine routine */
);
extern int JobqDestroy(jobq_t *wq);
Expand Down
90 changes: 3 additions & 87 deletions core/src/lib/workq.cc
Expand Up @@ -58,7 +58,7 @@ extern "C" void *workq_server(void *arg);
* Returns: 0 on success
* errno on failure
*/
int WorkqInit(workq_t *wq, int threads, void *(*engine)(void *arg))
int WorkqInit(workq_t *wq, int max_workers, void *(*engine)(void *arg))
{
int status;

Expand All @@ -80,9 +80,8 @@ int WorkqInit(workq_t *wq, int threads, void *(*engine)(void *arg))
}
wq->quit = 0;
wq->first = wq->last = NULL;
wq->max_workers = threads; /* max threads to create */
wq->max_workers = max_workers; /* max threads to create */
wq->num_workers = 0; /* no threads yet */
wq->idle_workers = 0; /* no idle threads */
wq->engine = engine; /* routine to run */
wq->valid = WORKQ_VALID;
return 0;
Expand All @@ -104,17 +103,8 @@ int WorkqDestroy(workq_t *wq)
P(wq->mutex);
wq->valid = 0; /* prevent any more operations */

/*
* If any threads are active, wake them
*/
if (wq->num_workers > 0) {
wq->quit = 1;
if (wq->idle_workers) {
if ((status = pthread_cond_broadcast(&wq->work)) != 0) {
V(wq->mutex);
return status;
}
}
while (wq->num_workers > 0) {
Dmsg1(1400, "active workers: %d. Waiting for them to finish.\n", wq->num_workers);
if ((status = pthread_cond_wait(&wq->work, &wq->mutex)) != 0) {
Expand Down Expand Up @@ -178,16 +168,8 @@ int WorkqAdd(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
wq->last = item;
}

/* if any threads are idle, wake one */
if (wq->idle_workers > 0) {
Dmsg0(1400, "Signal worker\n");
if ((status = pthread_cond_broadcast(&wq->work)) != 0) {
V(wq->mutex);
return status;
}
} else if (wq->num_workers < wq->max_workers) {
if (wq->num_workers < wq->max_workers) {
Dmsg0(1400, "Create worker thread\n");
/* No idle threads so create a new one */
SetThreadConcurrency(wq->max_workers + 1);
if ((status = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
V(wq->mutex);
Expand All @@ -204,72 +186,6 @@ int WorkqAdd(workq_t *wq, void *element, workq_ele_t **work_item, int priority)
return status;
}

/*
* Remove work from a queue
* wq is a queue that was created with workq_init
* work_item is an element of work
*
* Note, it is "removed" by immediately calling a processing routine.
* if you want to cancel it, you need to provide some external means
* of doing so.
*/
int WorkqRemove(workq_t *wq, workq_ele_t *work_item)
{
int status, found = 0;
pthread_t id;
workq_ele_t *item, *prev;

Dmsg0(1400, "WorkqRemove\n");
if (wq->valid != WORKQ_VALID) {
return EINVAL;
}

P(wq->mutex);

for (prev=item=wq->first; item; item=item->next) {
if (item == work_item) {
found = 1;
break;
}
prev = item;
}
if (!found) {
return EINVAL;
}

/* Move item to be first on list */
if (wq->first != work_item) {
prev->next = work_item->next;
if (wq->last == work_item) {
wq->last = prev;
}
work_item->next = wq->first;
wq->first = work_item;
}

/* if any threads are idle, wake one */
if (wq->idle_workers > 0) {
Dmsg0(1400, "Signal worker\n");
if ((status = pthread_cond_broadcast(&wq->work)) != 0) {
V(wq->mutex);
return status;
}
} else {
Dmsg0(1400, "Create worker thread\n");
/* No idle threads so create a new one */
SetThreadConcurrency(wq->max_workers + 1);
if ((status = pthread_create(&id, &wq->attr, workq_server, (void *)wq)) != 0) {
V(wq->mutex);
return status;
}
wq->num_workers++;
}
V(wq->mutex);
Dmsg0(1400, "Return WorkqRemove\n");
return status;
}


/*
* This is the worker thread that serves the work queue.
* In due course, it will call the user's engine.
Expand Down
2 changes: 0 additions & 2 deletions core/src/lib/workq.h
Expand Up @@ -54,7 +54,6 @@ typedef struct workq_tag {
int quit; /* workq should quit */
int max_workers; /* max threads */
int num_workers; /* current threads */
int idle_workers; /* idle threads */
void *(*engine)(void *arg); /* user engine */
} workq_t;

Expand All @@ -67,6 +66,5 @@ extern int WorkqInit(
);
extern int WorkqDestroy(workq_t *wq);
extern int WorkqAdd(workq_t *wq, void *element, workq_ele_t **work_item, int priority);
extern int WorkqRemove(workq_t *wq, workq_ele_t *work_item);

#endif /* BAREOS_LIB_WORKQ_H_ */

0 comments on commit f89ceba

Please sign in to comment.