Skip to content

Commit

Permalink
Convert vrps.table_lock into a mutex
Browse files Browse the repository at this point in the history
There are no readers, so there's no point in this being a reader-writer
lock.

Still not meant to be a fix for #83/#89. I'm mostly just trying to force
myself to interact with the code in hopes of finding the bug.
  • Loading branch information
ydahhrk committed Feb 2, 2023
1 parent 0b8449e commit 48868d8
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 70 deletions.
20 changes: 20 additions & 0 deletions src/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,26 @@
#include "config.h"
#include "log.h"

void
panic_on_fail(int error, char const *function_name)
{
if (error)
pr_crit("%s() returned error code %d. This is too critical for a graceful recovery; I must die now.",
function_name, error);
}

void
mutex_lock(pthread_mutex_t *lock)
{
panic_on_fail(pthread_mutex_lock(lock), "pthread_mutex_lock");
}

void
mutex_unlock(pthread_mutex_t *lock)
{
panic_on_fail(pthread_mutex_unlock(lock), "pthread_mutex_unlock");
}

int
rwlock_read_lock(pthread_rwlock_t *lock)
{
Expand Down
9 changes: 9 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@

#define ARRAY_LEN(array) (sizeof(array) / sizeof((array)[0]))

void panic_on_fail(int, char const *);

/*
* Mutex wrappers. They are just a bunch of boilerplate, and removal of
* unrecoverable resulting error codes.
*/
void mutex_lock(pthread_mutex_t *);
void mutex_unlock(pthread_mutex_t *);

/*
* rwlock wrappers. They are just a bunch of boilerplate, and removal of
* unrecoverable resulting error codes.
Expand Down
33 changes: 19 additions & 14 deletions src/rtr/db/vrps.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,23 @@ static struct thread_pool *pool;
/** Protects @state.base, @state.deltas and @state.serial. */
static pthread_rwlock_t state_lock;

/** Lock to protect ROA table during construction. */
static pthread_rwlock_t table_lock;
/**
* Lock to protect the ROA table while it's being built up.
*
* To be honest, I'm tempted to remove this mutex completely. It currently
* exists because all the threads store their ROAs in the same table, which is
* awkward engineering. Each thread should work on its own table, and the main
* thread should join the tables afterwards. This would render the semaphore
* redundant, as well as rid the relevant code from any concurrency risks.
*
* I'm conflicted about committing to the refactor however, because the change
* would require about twice as much memory and involve the extra joining step.
* And the current implementation is working fine...
*
* Assuming, that is, that #83/#89 isn't a concurrency problem. But I can't
* figure out how it could be.
*/
static pthread_mutex_t table_lock = PTHREAD_MUTEX_INITIALIZER;

int
vrps_init(void)
Expand Down Expand Up @@ -134,17 +149,8 @@ vrps_init(void)
goto revert_deltas;
}

error = pthread_rwlock_init(&table_lock, NULL);
if (error) {
pr_op_err("table pthread_rwlock_init() errored: %s",
strerror(error));
goto revert_state_lock;
}

return 0;

revert_state_lock:
pthread_rwlock_destroy(&state_lock);
revert_deltas:
darray_destroy(state.deltas);
revert_thread_pool:
Expand All @@ -158,7 +164,6 @@ vrps_destroy(void)
thread_pool_destroy(pool);

pthread_rwlock_destroy(&state_lock);
pthread_rwlock_destroy(&table_lock);

if (state.slurm != NULL)
db_slurm_destroy(state.slurm);
Expand All @@ -170,9 +175,9 @@ vrps_destroy(void)

#define WLOCK_HANDLER(cb) \
int error; \
rwlock_write_lock(&table_lock); \
mutex_lock(&table_lock); \
error = cb; \
rwlock_unlock(&table_lock); \
mutex_unlock(&table_lock); \
return error;

int
Expand Down
28 changes: 4 additions & 24 deletions src/rtr/rtr.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,26 +44,6 @@ enum poll_verdict {
PV_STOP,
};

static void
panic_on_fail(int error, char const *function_name)
{
if (error)
pr_crit("%s() returned error code %d. This is too critical for a graceful recovery; I must die now.",
function_name, error);
}

static void
lock_mutex(void)
{
panic_on_fail(pthread_mutex_lock(&lock), "pthread_mutex_lock");
}

static void
unlock_mutex(void)
{
panic_on_fail(pthread_mutex_unlock(&lock), "pthread_mutex_unlock");
}

static void
cleanup_server(struct rtr_server *server)
{
Expand Down Expand Up @@ -724,9 +704,9 @@ fddb_poll(void)
}
}

lock_mutex();
mutex_lock(&lock);
apply_pollfds(pollfds, nclients);
unlock_mutex();
mutex_unlock(&lock);
/* Fall through */

success:
Expand Down Expand Up @@ -810,7 +790,7 @@ rtr_foreach_client(rtr_foreach_client_cb cb, void *arg)
unsigned int i;
int error = 0;

lock_mutex();
mutex_lock(&lock);

ARRAYLIST_FOREACH(&clients, client, i) {
if (client->fd != -1) {
Expand All @@ -820,7 +800,7 @@ rtr_foreach_client(rtr_foreach_client_cb cb, void *arg)
}
}

unlock_mutex();
mutex_unlock(&lock);

return error;
}
45 changes: 13 additions & 32 deletions src/thread/thread_pool.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include "common.h"
#include "log.h"

/*
Expand Down Expand Up @@ -79,26 +80,6 @@ struct thread_pool {
unsigned int thread_ids_len;
};

static void
panic_on_fail(int error, char const *function_name)
{
if (error)
pr_crit("%s() returned error code %d. This is too critical for a graceful recovery; I must die now.",
function_name, error);
}

static void
mutex_lock(struct thread_pool *pool)
{
panic_on_fail(pthread_mutex_lock(&pool->lock), "pthread_mutex_lock");
}

static void
mutex_unlock(struct thread_pool *pool)
{
panic_on_fail(pthread_mutex_unlock(&pool->lock), "pthread_mutex_unlock");
}

/* Wait until the parent sends us work. */
static void
wait_for_parent_signal(struct thread_pool *pool, unsigned int thread_id)
Expand Down Expand Up @@ -203,7 +184,7 @@ tasks_poll(void *arg)
struct thread_pool_task *task;
unsigned int thread_id;

mutex_lock(pool);
mutex_lock(&pool->lock);

pool->thread_count++;
thread_id = pool->thread_count;
Expand All @@ -218,7 +199,7 @@ tasks_poll(void *arg)
/* Claim the work. */
task = task_queue_pull(pool, thread_id);
pool->working_count++;
mutex_unlock(pool);
mutex_unlock(&pool->lock);

if (task != NULL) {
task->cb(task->arg);
Expand All @@ -227,7 +208,7 @@ tasks_poll(void *arg)
task_destroy(task);
}

mutex_lock(pool);
mutex_lock(&pool->lock);
pool->working_count--;

if (pool->stop)
Expand All @@ -237,7 +218,7 @@ tasks_poll(void *arg)
signal_to_parent(pool);
}

mutex_unlock(pool);
mutex_unlock(&pool->lock);
pr_op_debug("Thread %s.%u: Returning.", pool->name, thread_id);
return NULL;
}
Expand Down Expand Up @@ -387,7 +368,7 @@ thread_pool_destroy(struct thread_pool *pool)
pr_op_debug("Destroying thread pool '%s'.", pool->name);

/* Remove all pending work and send the signal to stop it */
mutex_lock(pool);
mutex_lock(&pool->lock);
queue = &(pool->queue);
while (!TAILQ_EMPTY(queue)) {
tmp = TAILQ_FIRST(queue);
Expand All @@ -396,7 +377,7 @@ thread_pool_destroy(struct thread_pool *pool)
}
pool->stop = true;
pthread_cond_broadcast(&pool->parent2worker);
mutex_unlock(pool);
mutex_unlock(&pool->lock);

for (t = 0; t < pool->thread_ids_len; t++)
pthread_join(pool->thread_ids[t], NULL);
Expand Down Expand Up @@ -425,9 +406,9 @@ thread_pool_push(struct thread_pool *pool, char const *task_name,
if (error)
return error;

mutex_lock(pool);
mutex_lock(&pool->lock);
task_queue_push(pool, task);
mutex_unlock(pool);
mutex_unlock(&pool->lock);

/*
* Note: This assumes the threads have already spawned.
Expand All @@ -443,9 +424,9 @@ thread_pool_avail_threads(struct thread_pool *pool)
{
bool result;

mutex_lock(pool);
mutex_lock(&pool->lock);
result = (pool->working_count < pool->thread_ids_len);
mutex_unlock(pool);
mutex_unlock(&pool->lock);

return result;
}
Expand All @@ -454,7 +435,7 @@ thread_pool_avail_threads(struct thread_pool *pool)
void
thread_pool_wait(struct thread_pool *pool)
{
mutex_lock(pool);
mutex_lock(&pool->lock);

/* If the pool has to stop, the wait will happen during the joins. */
while (!pool->stop) {
Expand All @@ -473,5 +454,5 @@ thread_pool_wait(struct thread_pool *pool)
wait_for_worker_signal(pool);
}

mutex_unlock(pool);
mutex_unlock(&pool->lock);
}

0 comments on commit 48868d8

Please sign in to comment.