Skip to content

Commit

Permalink
MDEV-29043 mariabackup --compress hangs
Browse files Browse the repository at this point in the history
Even though commit b817afa passed
the test mariabackup.compress_qpress, that test turned out to be
too small to reveal one more problem that had previously been prevented
by the existence of ctrl_mutex. I did not realize that there can be
multiple concurrent callers to compress_write(). One of them is the
log copying thread; further callers are data file copying threads
(default: --parallel=1).

By default, there is only one compression worker thread
(--compress-threads=1).

compress_write(): Fix a race condition between threads that would
use the same worker thread object. Make thd->data_avail contain the
thread identifier of the submitter, and add thd->avail_cond to
notify other compress_write() threads that are waiting for a slot.
  • Loading branch information
dr-m committed Aug 19, 2022
1 parent 3216722 commit a1055ab
Showing 1 changed file with 43 additions and 8 deletions.
51 changes: 43 additions & 8 deletions extra/mariabackup/ds_compress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ typedef struct {
pthread_t id;
uint num;
pthread_mutex_t data_mutex;
pthread_cond_t avail_cond;
pthread_cond_t data_cond;
pthread_cond_t done_cond;
my_bool data_avail;
pthread_t data_avail;
my_bool cancelled;
const char *from;
size_t from_len;
Expand Down Expand Up @@ -197,9 +198,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
threads = comp_ctxt->threads;
nthreads = comp_ctxt->nthreads;

const pthread_t self = pthread_self();

ptr = (const char *) buf;
while (len > 0) {
uint max_thread;
bool wait = nthreads == 1;
retry:
bool submitted = false;

/* Send data to worker threads for compression */
for (i = 0; i < nthreads; i++) {
Expand All @@ -208,30 +213,54 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
thd = threads + i;

pthread_mutex_lock(&thd->data_mutex);
if (thd->data_avail == pthread_t(~0UL)) {
} else if (!wait) {
skip:
pthread_mutex_unlock(&thd->data_mutex);
continue;
} else {
for (;;) {
pthread_cond_wait(&thd->avail_cond,
&thd->data_mutex);
if (thd->data_avail
== pthread_t(~0UL)) {
break;
}
goto skip;
}
}

chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
COMPRESS_CHUNK_SIZE : len;
thd->from = ptr;
thd->from_len = chunk_len;

thd->data_avail = TRUE;
thd->data_avail = self;
pthread_cond_signal(&thd->data_cond);
pthread_mutex_unlock(&thd->data_mutex);

submitted = true;
len -= chunk_len;
if (len == 0) {
break;
}
ptr += chunk_len;
}

max_thread = (i < nthreads) ? i : nthreads - 1;
if (!submitted) {
wait = true;
goto retry;
}

/* Reap and stream the compressed data */
for (i = 0; i <= max_thread; i++) {
for (i = 0; i < nthreads; i++) {
thd = threads + i;

pthread_mutex_lock(&thd->data_mutex);
if (thd->data_avail != self) {
pthread_mutex_unlock(&thd->data_mutex);
continue;
}

while (!thd->to_len) {
pthread_cond_wait(&thd->done_cond,
&thd->data_mutex);
Expand All @@ -249,6 +278,8 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
}

thd->to_len = 0;
thd->data_avail = pthread_t(~0UL);
pthread_cond_signal(&thd->avail_cond);
pthread_mutex_unlock(&thd->data_mutex);

if (fail) {
Expand Down Expand Up @@ -336,6 +367,7 @@ destroy_worker_thread(comp_thread_ctxt_t *thd)

pthread_join(thd->id, NULL);

pthread_cond_destroy(&thd->avail_cond);
pthread_cond_destroy(&thd->data_cond);
pthread_cond_destroy(&thd->done_cond);
pthread_mutex_destroy(&thd->data_mutex);
Expand Down Expand Up @@ -363,11 +395,14 @@ create_worker_threads(uint n)

/* Initialize and data mutex and condition var */
if (pthread_mutex_init(&thd->data_mutex, NULL) ||
pthread_cond_init(&thd->avail_cond, NULL) ||
pthread_cond_init(&thd->data_cond, NULL) ||
pthread_cond_init(&thd->done_cond, NULL)) {
goto err;
}

thd->data_avail = pthread_t(~0UL);

if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
thd)) {
msg("compress: pthread_create() failed: "
Expand Down Expand Up @@ -409,13 +444,13 @@ compress_worker_thread_func(void *arg)
pthread_mutex_lock(&thd->data_mutex);

while (1) {
while (!thd->data_avail && !thd->cancelled) {
while (!thd->cancelled
&& (thd->to_len || thd->data_avail == pthread_t(~0UL))) {
pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
}

if (thd->cancelled)
break;
thd->data_avail = FALSE;
thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
&thd->state);

Expand Down

0 comments on commit a1055ab

Please sign in to comment.