Skip to content

Commit

Permalink
schedule: Call module thread detach callbacks before stopping the eve…
Browse files Browse the repository at this point in the history
…nt loop

Fixes issue where we see multiple failures in the trunk API caused by the event loop exiting with requests still being processed.
  • Loading branch information
arr2036 committed Mar 3, 2020
1 parent bc1ab7c commit 1bd268b
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 15 deletions.
11 changes: 10 additions & 1 deletion src/bin/radiusd.c
Expand Up @@ -138,6 +138,15 @@ static int thread_instantiate(TALLOC_CTX *ctx, fr_event_list_t *el, UNUSED void
return 0;
}

/** Explicitly cleanup module/xlat resources
*
*/
static void thread_detach(UNUSED void *uctx)
{
modules_thread_detach();
xlat_thread_detach();
}

#define EXIT_WITH_FAILURE \
do { \
ret = EXIT_FAILURE; \
Expand Down Expand Up @@ -775,7 +784,7 @@ int main(int argc, char *argv[])
}

sc = fr_schedule_create(NULL, el, &default_log, fr_debug_lvl,
thread_instantiate, schedule);
thread_instantiate, thread_detach, schedule);
if (!sc) {
PERROR("Failed starting the scheduler: %s", fr_strerror());
EXIT_WITH_FAILURE;
Expand Down
34 changes: 25 additions & 9 deletions src/lib/io/schedule.c
Expand Up @@ -131,6 +131,7 @@ struct fr_schedule_s {
sem_t network_sem; //!< for inter-thread signaling

fr_schedule_thread_instantiate_t worker_thread_instantiate; //!< thread instantiation callback
fr_schedule_thread_detach_t worker_thread_detach;

fr_dlist_head_t workers; //!< list of workers

Expand Down Expand Up @@ -236,6 +237,15 @@ static void *fr_schedule_worker_thread(void *arg)

INFO("Worker %d exiting", sw->id);

if (sc->worker_thread_detach) sc->worker_thread_detach(NULL); /* Fixme once we figure out what uctx should be */

/*
* Not looping at this point, but may catch timer/fd
* insertions being done after the thread should have
* exited.
*/
if (sw->el) fr_event_loop_exit(sw->el, 1);

/*
* Tell the scheduler we're done.
*/
Expand Down Expand Up @@ -361,19 +371,22 @@ int fr_schedule_pthread_create(pthread_t *thread, void *(*func)(void *), void *a

/** Create a scheduler and spawn the child threads.
*
* @param[in] ctx talloc context.
* @param[in] el event list, only for single-threaded mode.
* @param[in] logger destination for all logging messages.
* @param[in] lvl log level.
* @param[in] ctx talloc context.
* @param[in] el event list, only for single-threaded mode.
* @param[in] logger destination for all logging messages.
* @param[in] lvl log level.
* @param[in] worker_thread_instantiate callback for new worker threads.
* @param[in] config configuration for the scheduler
* @param[in] worker_thread_detach callback to destroy resources
* allocated by worker_thread_instantiate.
* @param[in] config configuration for the scheduler
* @return
* - NULL on error
* - fr_schedule_t new scheduler
*/
fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el,
fr_log_t *logger, fr_log_lvl_t lvl,
fr_schedule_thread_instantiate_t worker_thread_instantiate,
fr_schedule_thread_detach_t worker_thread_detach,
fr_schedule_config_t *config)
{
unsigned int i;
Expand All @@ -392,6 +405,7 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el,
sc->lvl = lvl;

sc->worker_thread_instantiate = worker_thread_instantiate;
sc->worker_thread_detach = worker_thread_detach;
sc->running = true;

/*
Expand All @@ -401,15 +415,15 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el,
sc->single_network = fr_network_create(sc, el, sc->log, sc->lvl);
if (!sc->single_network) {
ERROR("Failed creating network: %s", fr_strerror());
st_fail:
pre_instantiate_st_fail:
talloc_free(sc);
return NULL;
}

sc->single_worker = fr_worker_create(sc, "0", el, sc->log, sc->lvl);
if (!sc->single_worker) {
ERROR("Failed creating worker: %s", fr_strerror());
goto st_fail;
goto pre_instantiate_st_fail;
}

/*
Expand All @@ -423,13 +437,15 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el,

if (sc->worker_thread_instantiate(sc->single_worker, el, subcs) < 0) {
ERROR("Failed calling thread instantiate: %s", fr_strerror());
goto st_fail;
goto pre_instantiate_st_fail;
}
}

if (fr_command_register_hook(NULL, "0", sc->single_worker, cmd_worker_table) < 0) {
ERROR("Failed adding worker commands: %s", fr_strerror());
goto st_fail;
st_fail:
if (sc->worker_thread_detach) sc->worker_thread_detach(NULL);
goto pre_instantiate_st_fail;
}

if (fr_command_register_hook(NULL, "0", sc->single_network, cmd_network_table) < 0) {
Expand Down
7 changes: 7 additions & 0 deletions src/lib/io/schedule.h
Expand Up @@ -54,6 +54,12 @@ extern "C" {
*/
typedef int (*fr_schedule_thread_instantiate_t)(TALLOC_CTX *ctx, fr_event_list_t *el, void *uctx);

/** Explicitly free resources allocated by #fr_schedule_thread_instantiate_t
*
* @param[in] uctx User data passed to callback.
*/
typedef void (*fr_schedule_thread_detach_t)(void *uctx);

typedef struct {
uint32_t max_networks; //!< number of network threads
uint32_t max_workers; //!< number of network threads
Expand All @@ -66,6 +72,7 @@ int fr_schedule_worker_id(void);
int fr_schedule_pthread_create(pthread_t *thread, void *(*func)(void *), void *arg);
fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_log_t *log, fr_log_lvl_t lvl,
fr_schedule_thread_instantiate_t worker_thread_instantiate,
fr_schedule_thread_detach_t worked_thread_detach,
fr_schedule_config_t *config) CC_HINT(nonnull(3));
/* schedulers are async, so there's no fr_schedule_run() */
int fr_schedule_destroy(fr_schedule_t *sc);
Expand Down
10 changes: 5 additions & 5 deletions src/lib/io/worker.c
Expand Up @@ -150,8 +150,11 @@ static void worker_exit(fr_worker_t *worker)
* Don't allow the post event to run
* any more requests. They'll be
* signalled to stop before we exit.
*
* This only has an effect in single
* threaded mode.
*/
fr_event_loop_exit(worker->el, 1);
(void)fr_event_post_delete(worker->el, fr_worker_post_event, worker);
}

/** Handle a control plane message sent to the worker via a channel
Expand Down Expand Up @@ -1035,7 +1038,6 @@ void fr_worker_destroy(fr_worker_t *worker)

fr_channel_responder_ack_close(worker->channel[i]);
}

talloc_free(worker);
}

Expand Down Expand Up @@ -1142,7 +1144,7 @@ void fr_worker(fr_worker_t *worker)
{
WORKER_VERIFY;

while (true) {
while (!worker->exiting) {
bool wait_for_event;
int num_events;

Expand All @@ -1163,8 +1165,6 @@ void fr_worker(fr_worker_t *worker)
*/
num_events = fr_event_corral(worker->el, fr_time(), wait_for_event);
if (num_events < 0) {
if (worker->exiting) return; /* don't complain if we're exiting */

PERROR("Failed corralling events");
break;
}
Expand Down
11 changes: 11 additions & 0 deletions src/lib/server/module.c
Expand Up @@ -1287,6 +1287,17 @@ int modules_thread_instantiate(TALLOC_CTX *ctx, fr_event_list_t *el)
return 0;
}

/** Explicitly call thread_detach and free any module thread instances
*
* Call this function if the module thread instances need to be free explicitly before
* another resource like the even loop is freed.
*/
void modules_thread_detach(void)
{
if (!module_thread_inst_array) return;
TALLOC_FREE(module_thread_inst_array);
}

/** Complete module setup by calling its instantiate function
*
* @param[in] instance of module to complete instantiation for.
Expand Down
2 changes: 2 additions & 0 deletions src/lib/server/module.h
Expand Up @@ -299,6 +299,8 @@ void modules_free(void);

int modules_thread_instantiate(TALLOC_CTX *ctx, fr_event_list_t *el) CC_HINT(nonnull);

void modules_thread_detach(void);

int modules_instantiate(void) CC_HINT(nonnull);

module_instance_t *module_bootstrap(module_instance_t const *parent, CONF_SECTION *cs) CC_HINT(nonnull(2));
Expand Down
2 changes: 2 additions & 0 deletions src/lib/server/xlat.h
Expand Up @@ -330,6 +330,8 @@ int xlat_thread_instantiate(TALLOC_CTX *ctx);

int xlat_instantiate(void);

void xlat_thread_detach(void);

int xlat_bootstrap(xlat_exp_t *root);

void xlat_instances_free(void);
Expand Down
10 changes: 10 additions & 0 deletions src/lib/server/xlat_inst.c
Expand Up @@ -370,6 +370,16 @@ int xlat_thread_instantiate(TALLOC_CTX *ctx)
return 0;
}

/** Destroy any thread specific xlat instances
*
*/
void xlat_thread_detach(void)
{
if (!xlat_thread_inst_tree) return;

TALLOC_FREE(xlat_thread_inst_tree);
}

/** Walk over #xlat_exp_t that require instantiation
*
* @param[in] uctx UNUSED.
Expand Down

0 comments on commit 1bd268b

Please sign in to comment.