Skip to content

Commit

Permalink
Added configurable priorities for the queue
Browse files Browse the repository at this point in the history
"default" is by listener, and then by timestamp

"802.1X" is by state[0], which is the number of rounds we've done
for 802.1X, followed by the "default" if the states are identical

The idea is to prioritize ongoing EAP sessions, so that
users who are partway through authenticating are more likely to
finish.
  • Loading branch information
alandekok committed Dec 1, 2015
1 parent ca542ad commit 4bfd345
Showing 1 changed file with 49 additions and 13 deletions.
62 changes: 49 additions & 13 deletions src/main/threads.c
Expand Up @@ -171,6 +171,9 @@ typedef struct THREAD_POOL {
*/
pthread_mutex_t queue_mutex;

char const *queue_priority;
fr_heap_cmp_t heap_cmp;

uint32_t max_queue_size;
fr_heap_t *heap;
#endif /* WITH_GCD */
Expand All @@ -197,6 +200,7 @@ static const CONF_PARSER thread_config[] = {
{ FR_CONF_POINTER("max_requests_per_server", PW_TYPE_INTEGER, &thread_pool.max_requests_per_thread), .dflt = "0" },
{ FR_CONF_POINTER("cleanup_delay", PW_TYPE_INTEGER, &thread_pool.cleanup_delay), .dflt = "5" },
{ FR_CONF_POINTER("max_queue_size", PW_TYPE_INTEGER, &thread_pool.max_queue_size), .dflt = "65536" },
{ FR_CONF_POINTER("queue_priority", PW_TYPE_STRING, &thread_pool.queue_priority), .dflt = NULL },
# ifdef WITH_STATS
# ifdef WITH_ACCOUNTING
{ FR_CONF_POINTER("auto_limit_acct", PW_TYPE_BOOLEAN, &thread_pool.auto_limit_acct) },
Expand Down Expand Up @@ -777,6 +781,40 @@ static int pid_cmp(void const *one, void const *two)
}
#endif

/*
* Smaller entries go to the top of the heap.
* Larger ones to the bottom of the heap.
*/
static int default_cmp(void const *one, void const *two)
{
REQUEST const *a = one;
REQUEST const *b = two;

if (a->priority < b->priority) return -1;
if (a->priority > b->priority) return +1;

if (timercmp(&a->timestamp, &b->timestamp, < )) return -1;
if (timercmp(&a->timestamp, &b->timestamp, > )) return +1;

return 0;
}


/*
* Prioritize by how far along the EAP session is.
*/
static int state_cmp(void const *one, void const *two)
{
REQUEST const *a = one;
REQUEST const *b = two;

if (a->packet->rounds < b->packet->rounds) return -1;
if (a->packet->rounds > b->packet->rounds) return +1;

return default_cmp(one, two);
}


/** Parse the configuration for the thread pool
*
*/
Expand Down Expand Up @@ -835,22 +873,20 @@ int thread_pool_bootstrap(CONF_SECTION *cs, bool *spawn_workers)
thread_pool.start_threads, thread_pool.max_threads);
return -1;
}
#endif /* WITH_GCD */
return 0;
}

if (!thread_pool.queue_priority ||
(strcmp(thread_pool.queue_priority, "default") == 0)) {
thread_pool.heap_cmp = default_cmp;

static int default_cmp(void const *one, void const *two)
{
REQUEST const *a = one;
REQUEST const *b = two;

if (a->priority < b->priority) return -1;
if (a->priority > b->priority) return +1;
} else if (strcmp(thread_pool.queue_priority, "802.1X") == 0) {
thread_pool.heap_cmp = state_cmp;

if (timercmp(&a->timestamp, &b->timestamp, < )) return -1;
if (timercmp(&a->timestamp, &b->timestamp, > )) return +1;
} else {
ERROR("FATAL: Invalid queue_priority '%s'", thread_pool.queue_priority);
return -1;
}

#endif /* WITH_GCD */
return 0;
}

Expand Down Expand Up @@ -920,7 +956,7 @@ int thread_pool_init(void)
return -1;
}

thread_pool.heap = fr_heap_create(default_cmp, offsetof(REQUEST, heap_id));
thread_pool.heap = fr_heap_create(thread_pool.heap_cmp, offsetof(REQUEST, heap_id));
if (!thread_pool.heap) {
ERROR("FATAL: Failed to initialize the incoming queue.");
return -1;
Expand Down

0 comments on commit 4bfd345

Please sign in to comment.