Skip to content

Commit

Permalink
preliminary work to make various fields configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
alandekok committed Apr 21, 2020
1 parent 27e62e8 commit 0d95390
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 33 deletions.
4 changes: 2 additions & 2 deletions src/lib/io/schedule.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ static void *fr_schedule_worker_thread(void *arg)
}


sw->worker = fr_worker_create(ctx, sw->el, worker_name, sc->log, sc->lvl);
sw->worker = fr_worker_create(ctx, sw->el, worker_name, sc->log, sc->lvl, NULL);
if (!sw->worker) {
PERROR("%s - Failed creating worker", worker_name);
goto fail;
Expand Down Expand Up @@ -435,7 +435,7 @@ fr_schedule_t *fr_schedule_create(TALLOC_CTX *ctx, fr_event_list_t *el,
return NULL;
}

sc->single_worker = fr_worker_create(sc, el, "Worker", sc->log, sc->lvl);
sc->single_worker = fr_worker_create(sc, el, "Worker", sc->log, sc->lvl, NULL);
if (!sc->single_worker) {
PERROR("Failed creating worker");
goto pre_instantiate_st_fail;
Expand Down
61 changes: 31 additions & 30 deletions src/lib/io/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ static _Thread_local fr_worker_t *thread_local_worker;
*/
struct fr_worker_s {
char const *name; //!< name of this worker
fr_worker_config_t config; //!< external configuration

pthread_t thread_id; //!< my thread ID

Expand All @@ -91,15 +92,6 @@ struct fr_worker_s {
uint64_t number; //!< Per worker request id.

int num_channels; //!< actual number of channels
int max_channels; //!< maximum number of channels

int message_set_size; //!< default start number of messages
int ring_buffer_size; //!< default start size for the ring buffers

fr_time_delta_t max_request_time; //!< maximum time a request can be processed

size_t talloc_pool_size; //!< for each REQUEST


fr_heap_t *runnable; //!< current runnable requests which we've spent time processing
fr_heap_t *time_order; //!< time ordered heap of requests
Expand Down Expand Up @@ -211,17 +203,17 @@ static void worker_channel_callback(void *ctx, void const *data, size_t data_siz
fr_assert(ch != NULL);

ok = false;
for (i = 0; i < worker->max_channels; i++) {
for (i = 0; i < worker->config.max_channels; i++) {
fr_assert(worker->channel[i] != ch);

if (worker->channel[i] != NULL) continue;

worker->channel[i] = ch;
DEBUG3("Received channel %p into array entry %d", ch, i);

ms = fr_message_set_create(worker, worker->message_set_size,
ms = fr_message_set_create(worker, worker->config.message_set_size,
sizeof(fr_channel_data_t),
worker->ring_buffer_size);
worker->config.ring_buffer_size);
fr_assert(ms != NULL);
fr_channel_responder_uctx_add(ch, ms);

Expand All @@ -242,7 +234,7 @@ static void worker_channel_callback(void *ctx, void const *data, size_t data_siz
* Locate the signalling channel in the list
* of channels.
*/
for (i = 0; i < worker->max_channels; i++) {
for (i = 0; i < worker->config.max_channels; i++) {
if (!worker->channel[i]) continue;

if (worker->channel[i] != ch) continue;
Expand Down Expand Up @@ -580,7 +572,7 @@ static void worker_max_request_time(UNUSED fr_event_list_t *el, UNUSED fr_time_t
REQUEST_VERIFY(request);

cleanup = request->async->recv_time;
cleanup += worker->max_request_time;
cleanup += worker->config.max_request_time;
if (cleanup > now) break;

/*
Expand Down Expand Up @@ -620,9 +612,9 @@ static void worker_max_request_timer(fr_worker_t *worker)
if (!request) return;

cleanup = request->async->recv_time;
cleanup += worker->max_request_time;
cleanup += worker->config.max_request_time;

DEBUG2("Resetting cleanup timer to +%pV", fr_box_time_delta(worker->max_request_time));
DEBUG2("Resetting cleanup timer to +%pV", fr_box_time_delta(worker->config.max_request_time));
if (fr_event_timer_at(worker, worker->el, &worker->ev_cleanup,
cleanup, worker_max_request_time, worker) < 0) {
ERROR("Failed inserting max_request_time timer");
Expand Down Expand Up @@ -707,6 +699,8 @@ static void worker_request_bootstrap(fr_worker_t *worker, fr_channel_data_t *cd,
TALLOC_CTX *ctx;
fr_listen_t const *listen;

if (fr_heap_num_elements(worker->time_order) >= (uint32_t) worker->config.max_requests) goto nak;

ctx = request = request_alloc(NULL);
if (!request) goto nak;

Expand Down Expand Up @@ -1048,7 +1042,7 @@ void fr_worker_destroy(fr_worker_t *worker)
* the TO_REQUESTOR queue, as we own those. They will be
* automatically freed when our talloc context is freed.
*/
for (i = 0; i < worker->max_channels; i++) {
for (i = 0; i < worker->config.max_channels; i++) {
if (!worker->channel[i]) continue;

fr_channel_responder_ack_close(worker->channel[i]);
Expand All @@ -1066,13 +1060,14 @@ void fr_worker_destroy(fr_worker_t *worker)
* @param[in] el the event list
* @param[in] logger the destination for all logging messages
* @param[in] lvl log level
* @param[in] config various configuration parameters
* @return
* - NULL on error
* - fr_worker_t on success
*/
fr_worker_t *fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl)
fr_worker_t *fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name, fr_log_t const *logger, fr_log_lvl_t lvl,
fr_worker_config_t *config)
{
int max_channels = 64;
fr_worker_t *worker;

worker = talloc_zero(ctx, fr_worker_t);
Expand All @@ -1084,7 +1079,22 @@ fr_worker_t *fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *

worker->name = talloc_strdup(worker, name); /* thread locality */

worker->channel = talloc_zero_array(worker, fr_channel_t *, max_channels);
if (config) worker->config = *config;

#define CHECK_CONFIG(_x, _min, _max) do { \
if (!worker->config._x) worker->config._x = _min; \
if (worker->config._x < _min) worker->config._x = _min; \
if (worker->config._x > _max) worker->config._x = _max; \
} while (0)

CHECK_CONFIG(max_requests,(1 << 20),(1 << 30));
CHECK_CONFIG(max_channels, 64, 1024);
CHECK_CONFIG(talloc_pool_size, 4096, 65536);
CHECK_CONFIG(message_set_size, 1024, 8192);
CHECK_CONFIG(ring_buffer_size, (1 << 17), (1 << 20));
CHECK_CONFIG(max_request_time, fr_time_delta_from_sec(30), fr_time_delta_from_sec(60));

worker->channel = talloc_zero_array(worker, fr_channel_t *, worker->config.max_channels);
if (!worker->channel) {
talloc_free(worker);
goto nomem;
Expand All @@ -1095,15 +1105,6 @@ fr_worker_t *fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *
worker->log = logger;
worker->lvl = lvl;

/*
* @todo make these configurable
*/
worker->max_channels = max_channels;
worker->talloc_pool_size = 4096; /* at least enough for a REQUEST */
worker->message_set_size = 1024;
worker->ring_buffer_size = (1 << 17);
worker->max_request_time = fr_time_delta_from_sec(30);

/*
* The worker thread starts now. Manually initialize it,
* because we're tracking request time, not the time that
Expand Down Expand Up @@ -1352,7 +1353,7 @@ static void worker_verify(fr_worker_t *worker)
fr_assert(worker->dedup != NULL);
(void) talloc_get_type_abort(worker->dedup, rbtree_t);

for (i = 0; i < worker->max_channels; i++) {
for (i = 0; i < worker->config.max_channels; i++) {
if (!worker->channel[i]) continue;

(void) talloc_get_type_abort(worker->channel[i], fr_channel_t);
Expand Down
15 changes: 14 additions & 1 deletion src/lib/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,21 @@ extern "C" {
#endif
extern fr_cmd_table_t cmd_worker_table[];

typedef struct {
int max_requests; //!< max requests this worker will handlex

int max_channels; //!< maximum number of channels

int message_set_size; //!< default start number of messages
int ring_buffer_size; //!< default start size for the ring buffers

fr_time_delta_t max_request_time; //!< maximum time a request can be processed

size_t talloc_pool_size; //!< for each request
} fr_worker_config_t;

fr_worker_t *fr_worker_create(TALLOC_CTX *ctx, fr_event_list_t *el, char const *name,
fr_log_t const *logger, fr_log_lvl_t lvl) CC_HINT(nonnull(2,3,4));
fr_log_t const *logger, fr_log_lvl_t lvl, fr_worker_config_t *config) CC_HINT(nonnull(2,3,4));

void fr_worker_destroy(fr_worker_t *worker) CC_HINT(nonnull);

Expand Down

0 comments on commit 0d95390

Please sign in to comment.