Skip to content

Commit

Permalink
create and clean up the network thread
Browse files Browse the repository at this point in the history
  • Loading branch information
alandekok committed Jan 16, 2017
1 parent 59473fa commit fa85ff2
Showing 1 changed file with 110 additions and 33 deletions.
143 changes: 110 additions & 33 deletions src/util/schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,18 @@ RCSID("$Id$")
#define sem_destroy(s) semaphore_destroy(mach_task_self(),*s)
#endif /* __APPLE__ */

#define SEM_WAIT_INTR(_x) do {if (sem_wait(_x) == 0) break;} while (errno == EINTR)

/**
* Track the worker thread status.
* Track the child thread status.
*/
typedef enum fr_schedule_worker_status_t {
FR_WORKER_FREE = 0, //!< worker is free
FR_WORKER_INITIALIZING, //!< initialized, but not running
FR_WORKER_RUNNING, //!< running, and in the worker queue
FR_WORKER_EXITED, //!< exited, and in the done_worker queue
FR_WORKER_FAIL //!< failed, and in the done_worker queue
} fr_schedule_worker_status_t;
typedef enum fr_schedule_child_status_t {
FR_CHILD_FREE = 0, //!< child is free
FR_CHILD_INITIALIZING, //!< initialized, but not running
FR_CHILD_RUNNING, //!< running, and in the running queue
FR_CHILD_EXITED, //!< exited, and in the exited queue
FR_CHILD_FAIL //!< failed, and in the exited queue
} fr_schedule_child_status_t;

/**
* A data structure to track workers.
Expand All @@ -91,7 +93,7 @@ typedef struct fr_schedule_worker_t {

fr_schedule_t *sc; //!< the scheduler we are running under

fr_schedule_worker_status_t status; //!< status of the worker
fr_schedule_child_status_t status; //!< status of the worker
fr_worker_t *worker; //!< the worker data structure
} fr_schedule_worker_t;

Expand All @@ -101,9 +103,9 @@ typedef struct fr_schedule_worker_t {
typedef struct fr_schedule_receiver_t {
pthread_t pthread_id; //!< the thread of this receiver

int kq; //!< the receivers KQ
fr_event_list_t *el; //!< the receivers event list
fr_schedule_t *sc; //!< the scheduler we are running under

fr_schedule_child_status_t status; //!< status of the worker
fr_receiver_t *rc; //!< the receive data structure
} fr_schedule_receiver_t;

Expand Down Expand Up @@ -132,6 +134,8 @@ struct fr_schedule_t {
fr_heap_t *workers; //!< heap of workers
fr_heap_t *done_workers; //!< heap of done workers

fr_schedule_receiver_t *sr; //!< pointer to the (one) network thread

uint32_t num_transports; //!< how many transport layers we have
fr_transport_t **transports; //!< array of active transports.
};
Expand Down Expand Up @@ -193,7 +197,7 @@ static void *fr_schedule_worker_thread(void *arg)
TALLOC_CTX *ctx;
fr_schedule_worker_t *sw = arg;
fr_schedule_t *sc = sw->sc;
fr_schedule_worker_status_t status = FR_WORKER_FAIL;
fr_schedule_child_status_t status = FR_CHILD_FAIL;

ctx = talloc_init("worker");
if (!ctx) goto fail;
Expand All @@ -211,7 +215,7 @@ static void *fr_schedule_worker_thread(void *arg)
goto fail;
}

sw->status = FR_WORKER_RUNNING;
sw->status = FR_CHILD_RUNNING;

PTHREAD_MUTEX_LOCK(&sc->mutex);
(void) fr_heap_insert(sc->workers, sw);
Expand Down Expand Up @@ -241,7 +245,7 @@ static void *fr_schedule_worker_thread(void *arg)
sc->num_workers--;
PTHREAD_MUTEX_UNLOCK(&sc->mutex);

status = FR_WORKER_EXITED;
status = FR_CHILD_EXITED;

fail:
if (ctx) talloc_free(ctx);
Expand All @@ -264,6 +268,58 @@ static void *fr_schedule_worker_thread(void *arg)
}


/** Initialize and run the receiver thread.
*
* @param[in] arg the fr_schedule_receiver_t
* @return NULL
*/
static void *fr_schedule_receiver_thread(void *arg)
{
TALLOC_CTX *ctx;
fr_schedule_receiver_t *sr = arg;
fr_schedule_t *sc = sr->sc;
fr_schedule_child_status_t status = FR_CHILD_FAIL;

ctx = talloc_init("receiver");
if (!ctx) goto fail;

sr->rc = fr_receiver_create(ctx, sc->num_transports, sc->transports);
if (!sr->rc) {
goto fail;
}

sr->status = FR_CHILD_RUNNING;
sem_post(&sc->semaphore);

/*
* Do all of the work.
*/
fr_receiver(sr->rc);

/*
* Talloc ordering issues. We want to be independent of
* how talloc walks it's children, and ensure that some
* things are freed in a specific order.
*/
fr_receiver_destroy(sr->rc);
sr->rc = NULL;

status = FR_CHILD_EXITED;

fail:
if (ctx) talloc_free(ctx);

sr->status = status;

/*
* Tell the scheduler we're done.
*/
sem_post(&sc->semaphore);

return NULL;
}


/** Create a scheduler and spawn the child threads.
*
* @param[in] ctx the talloc context
Expand All @@ -283,8 +339,10 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, int max_inputs, int max_worke
void *worker_thread_ctx)
{
#ifdef HAVE_PTHREAD_H
int i;
int i, num_workers;
int rcode;
pthread_attr_t attr;

#endif
fr_schedule_t *sc;

Expand Down Expand Up @@ -312,6 +370,9 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, int max_inputs, int max_worke
if (!sc->max_inputs && !sc->max_workers) return sc;

#ifdef HAVE_PTHREAD_H
(void) pthread_attr_init(&attr);
(void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

rcode = pthread_mutex_init(&sc->mutex, NULL);
if (rcode != 0) {
talloc_free(sc);
Expand All @@ -336,42 +397,37 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, int max_inputs, int max_worke
/*
* Create all of the workers.
*/
num_workers = 0;
for (i = 0; i < sc->max_workers; i++) {
fr_schedule_worker_t *sw;
pthread_attr_t attr;

(void) pthread_attr_init(&attr);
(void) pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);

/*
* Create a worker "glue" structure
*/
sw = talloc_zero(sc, fr_schedule_worker_t);
if (!sw) {
fr_schedule_destroy(sc);
return NULL;
}
if (!sw) break;

sw->sc = sc;
sw->status = FR_WORKER_INITIALIZING;
sw->status = FR_CHILD_INITIALIZING;

rcode = pthread_create(&sw->pthread_id, &attr, fr_schedule_worker_thread, sc);
rcode = pthread_create(&sw->pthread_id, &attr, fr_schedule_worker_thread, sw);
if (rcode != 0) {
fr_schedule_destroy(sc);
return NULL;
talloc_free(sw);
break;
}

num_workers++;
}

/*
* Wait for all of the workers to start.
*/
for (i = 0; i < sc->max_workers; i++) {
sem_wait(&sc->semaphore);
for (i = 0; i < num_workers; i++) {
SEM_WAIT_INTR(&sc->semaphore);
}

PTHREAD_MUTEX_LOCK(&sc->mutex);
if (sc->num_workers != sc->max_workers) {
int num_workers = sc->num_workers;
int num_workers_exited = sc->num_workers_exited;
fr_schedule_worker_t *sw;

Expand Down Expand Up @@ -407,7 +463,7 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, int max_inputs, int max_worke
* signaled us that they've exited.
*/
for (i = 0; i < num_workers; i++) {
sem_wait(&sc->semaphore);
SEM_WAIT_INTR(&sc->semaphore);
}

talloc_free(sc);
Expand All @@ -417,8 +473,21 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, int max_inputs, int max_worke
PTHREAD_MUTEX_UNLOCK(&sc->mutex);

/*
* @todo create the network threads
* Create the network thread
*/
sc->sr = talloc_zero(sc, fr_schedule_receiver_t);
sc->sr->sc = sc;

rcode = pthread_create(&sc->sr->pthread_id, &attr, fr_schedule_receiver_thread, sc->sr);
if (rcode != 0) goto fail;

SEM_WAIT_INTR(&sc->semaphore);
if (sc->sr->status != FR_CHILD_RUNNING) {
fail:
TALLOC_FREE(sc->sr);
fr_schedule_destroy(sc);
return NULL;
}
#endif

return sc;
Expand Down Expand Up @@ -460,7 +529,15 @@ int fr_schedule_destroy(fr_schedule_t *sc)
* underneath the workers!
*/
for (i = 0; i < num; i++) {
sem_wait(&sc->semaphore);
SEM_WAIT_INTR(&sc->semaphore);
}

/*
* If the network thread is running, tell it to exit.
*/
if (sc->sr->status == FR_CHILD_RUNNING) {
fr_receiver_exit(sc->sr->rc);
SEM_WAIT_INTR(&sc->semaphore);
}

sem_destroy(&sc->semaphore);
Expand Down

0 comments on commit fa85ff2

Please sign in to comment.