Skip to content

Commit

Permalink
[compress] split check_lib_init into 2 functions
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio M. Di Nitto <fdinitto@redhat.com>
  • Loading branch information
fabbione committed Sep 14, 2017
1 parent e61787a commit 899de76
Showing 1 changed file with 111 additions and 78 deletions.
189 changes: 111 additions & 78 deletions libknet/compress.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,20 +136,11 @@ static int val_level(
return compress_modules_cmds[compress_model].val_level(knet_h, compress_level);
}

static int compress_check_init_lib(knet_handle_t knet_h, int cmp_model, int rate_limit)
/*
* compress_check_lib_is_init needs to be invoked in a locked context!
*/
static int compress_check_lib_is_init(knet_handle_t knet_h, int cmp_model)
{
struct timespec clock_now;
unsigned long long timediff;
int savederrno = 0;

savederrno = pthread_rwlock_rdlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}

/*
* if the module is already loaded and init for this handle,
* we will return and keep the lock to avoid any race condition
Expand All @@ -162,15 +153,36 @@ static int compress_check_init_lib(knet_handle_t knet_h, int cmp_model, int rate
if (compress_modules_cmds[cmp_model].loaded == 1) {
if (compress_modules_cmds[cmp_model].is_init == NULL) {
if (knet_h->compress_int_data[cmp_model] != NULL) {
return 0;
return 1;
}
} else {
if (compress_modules_cmds[cmp_model].is_init(knet_h, cmp_model) == 1) {
return 0;
return 1;
}
}
}

return 0;
}

/*
* compress_load_lib should _always_ be invoked in write lock context
*/
static int compress_load_lib(knet_handle_t knet_h, int cmp_model, int rate_limit)
{
struct timespec clock_now;
unsigned long long timediff;

/*
* checking again for paranoia and because
* compress_check_lib_is_init is usually invoked in read context
* and we need to switch from read to write locking in between.
* another thread might have init the library in the meantime
*/
if (compress_check_lib_is_init(knet_h, cmp_model)) {
return 0;
}

/*
* due to the fact that decompress can load libraries
* on demand, depending on the compress model selected
Expand All @@ -188,38 +200,22 @@ static int compress_check_init_lib(knet_handle_t knet_h, int cmp_model, int rate
clock_gettime(CLOCK_MONOTONIC, &clock_now);
timespec_diff(last_load_failure, clock_now, &timediff);
if (timediff < 10000000000) {
pthread_rwlock_unlock(&shlib_rwlock);
errno = EAGAIN;
return -1;
}
}
}

/*
* need to switch to write lock, load the lib, and return with a write lock
* this is not racy because .init should be written idempotent.
*/
pthread_rwlock_unlock(&shlib_rwlock);
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}

if (compress_modules_cmds[cmp_model].loaded == 0) {
if (compress_modules_cmds[cmp_model].load_lib(knet_h) < 0) {
clock_gettime(CLOCK_MONOTONIC, &last_load_failure);
pthread_rwlock_unlock(&shlib_rwlock);
return -1;
}
compress_modules_cmds[cmp_model].loaded = 1;
}

if (compress_modules_cmds[cmp_model].init != NULL) {
if (compress_modules_cmds[cmp_model].init(knet_h, cmp_model) < 0) {
pthread_rwlock_unlock(&shlib_rwlock);
return -1;
}
} else {
Expand Down Expand Up @@ -253,58 +249,81 @@ int compress_cfg(
int savederrno = 0, err = 0;
int cmp_model;

log_debug(knet_h, KNET_SUB_COMPRESS,
"Initizializing compress module [%s/%d/%u]",
knet_handle_compress_cfg->compress_model, knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_threshold);

cmp_model = compress_get_model(knet_handle_compress_cfg->compress_model);
if (cmp_model < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s not supported", knet_handle_compress_cfg->compress_model);
errno = EINVAL;
return -1;
}

log_debug(knet_h, KNET_SUB_COMPRESS,
"Initizializing compress module [%s/%d/%u]",
knet_handle_compress_cfg->compress_model, knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_threshold);

if (cmp_model > 0) {
if (compress_modules_cmds[cmp_model].built_in == 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress model %s support has not been built in. Please contact your vendor or fix the build", knet_handle_compress_cfg->compress_model);
savederrno = EINVAL;
err = -1;
goto out;
}

if (compress_check_init_lib(knet_h, cmp_model, 0) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load/init shared lib for model %s: %s",
knet_handle_compress_cfg->compress_model, strerror(errno));
err = -1;
goto out_unlock;
}

if (val_level(knet_h, cmp_model, knet_handle_compress_cfg->compress_level) < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress level %d not supported for model %s",
knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_model);
savederrno = EINVAL;
err = -1;
goto out_unlock;
errno = EINVAL;
return -1;
}

if (knet_handle_compress_cfg->compress_threshold > KNET_MAX_PACKET_SIZE) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress threshold cannot be higher than KNET_MAX_PACKET_SIZE (%d).",
KNET_MAX_PACKET_SIZE);
savederrno = EINVAL;
err = -1;
goto out_unlock;
errno = EINVAL;
return -1;
}

if (knet_handle_compress_cfg->compress_threshold == 0) {
knet_h->compress_threshold = KNET_COMPRESS_THRESHOLD;
log_debug(knet_h, KNET_SUB_COMPRESS, "resetting compression threshold to default (%d)", KNET_COMPRESS_THRESHOLD);
} else {
knet_h->compress_threshold = knet_handle_compress_cfg->compress_threshold;
}

savederrno = pthread_rwlock_rdlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}

if (!compress_check_lib_is_init(knet_h, cmp_model)) {
/*
* need to switch to write lock, load the lib, and return with a write lock
* this is not racy because compress_load_lib is written idempotent.
*/
pthread_rwlock_unlock(&shlib_rwlock);
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}

if (compress_load_lib(knet_h, cmp_model, 0) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load library: %s",
strerror(savederrno));
err = -1;
goto out_unlock;
}
}

if (val_level(knet_h, cmp_model, knet_handle_compress_cfg->compress_level) < 0) {
log_err(knet_h, KNET_SUB_COMPRESS, "compress level %d not supported for model %s",
knet_handle_compress_cfg->compress_level, knet_handle_compress_cfg->compress_model);
savederrno = EINVAL;
err = -1;
goto out_unlock;
}

out_unlock:
pthread_rwlock_unlock(&shlib_rwlock);
}
out:

if (!err) {
knet_h->compress_model = cmp_model;
knet_h->compress_level = knet_handle_compress_cfg->compress_level;
Expand Down Expand Up @@ -352,29 +371,18 @@ void compress_fini(
return;
}

/*
* compress does not require compress_check_lib_is_init
* because it's protected by compress_cfg
*/
int compress(
knet_handle_t knet_h,
const unsigned char *buf_in,
const ssize_t buf_in_len,
unsigned char *buf_out,
ssize_t *buf_out_len)
{
int savederrno = 0, err = 0;

if (compress_check_init_lib(knet_h, knet_h->compress_model, 0) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load/init shared lib (compress) for model %s: %s",
compress_modules_cmds[knet_h->compress_model].model_name, strerror(savederrno));
return -1;
}

err = compress_modules_cmds[knet_h->compress_model].compress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
savederrno = errno;

pthread_rwlock_unlock(&shlib_rwlock);

errno = savederrno;
return err;
return compress_modules_cmds[knet_h->compress_model].compress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
}

int decompress(
Expand All @@ -399,16 +407,41 @@ int decompress(
return -1;
}

if (compress_check_init_lib(knet_h, compress_model, 1) < 0) {
savederrno = errno;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load/init shared lib (decompress) for model %s: %s",
compress_modules_cmds[compress_model].model_name, strerror(savederrno));
savederrno = pthread_rwlock_rdlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get read lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}

if (!compress_check_lib_is_init(knet_h, compress_model)) {
/*
* need to switch to write lock, load the lib, and return with a write lock
* this is not racy because compress_load_lib is written idempotent.
*/
pthread_rwlock_unlock(&shlib_rwlock);
savederrno = pthread_rwlock_wrlock(&shlib_rwlock);
if (savederrno) {
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to get write lock: %s",
strerror(savederrno));
errno = savederrno;
return -1;
}

if (compress_load_lib(knet_h, compress_model, 1) < 0) {
savederrno = errno;
err = -1;
log_err(knet_h, KNET_SUB_COMPRESS, "Unable to load library: %s",
strerror(savederrno));
goto out_unlock;
}
}

err = compress_modules_cmds[compress_model].decompress(knet_h, buf_in, buf_in_len, buf_out, buf_out_len);
savederrno = errno;

out_unlock:
pthread_rwlock_unlock(&shlib_rwlock);

errno = savederrno;
Expand Down

0 comments on commit 899de76

Please sign in to comment.