Skip to content

Commit b817afa

Browse files
committed
MDEV-28689, MDEV-28690: Remove ctrl_mutex
This reverts the revert 4f62dfe and fixes the hang that was introduced when ctrl_mutex was removed. The test mariabackup.compress_qpress covers this code, but the test is skipped if a stand-alone qpress executable is not available. It is not available in many software repositories, possibly because the code base has not been updated since 2010. This was tested with an executable that was compile from the source code at http://www.quicklz.com/qpress-11-source.zip (after adding a missing #include <unistd.h> for the definition of isatty()). Compared to the grandparent commit (before the revert), the changes are as follows: comp_thread_ctxt_t::done_cond: A separate condition for completed compression, signaling that thd->to_len has been updated. compress_write(): Replace some threads[i] with thd. Reset thd->to_len = 0 after consuming the compressed data. compress_worker_thread_func(): After consuming the uncompressed data, set thd->data_avail = FALSE. After compressing, signal thd->done_cond.
1 parent 4f62dfe commit b817afa

File tree

1 file changed

+37
-70
lines changed

1 file changed

+37
-70
lines changed

extra/mariabackup/ds_compress.cc

Lines changed: 37 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
/******************************************************
22
Copyright (c) 2011-2013 Percona LLC and/or its affiliates.
3+
Copyright (c) 2022, MariaDB Corporation.
34
45
Compressing datasink implementation for XtraBackup.
56
@@ -32,11 +33,9 @@ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1335 USA
3233
typedef struct {
3334
pthread_t id;
3435
uint num;
35-
pthread_mutex_t ctrl_mutex;
36-
pthread_cond_t ctrl_cond;
3736
pthread_mutex_t data_mutex;
3837
pthread_cond_t data_cond;
39-
my_bool started;
38+
pthread_cond_t done_cond;
4039
my_bool data_avail;
4140
my_bool cancelled;
4241
const char *from;
@@ -208,14 +207,13 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
208207

209208
thd = threads + i;
210209

211-
pthread_mutex_lock(&thd->ctrl_mutex);
210+
pthread_mutex_lock(&thd->data_mutex);
212211

213212
chunk_len = (len > COMPRESS_CHUNK_SIZE) ?
214213
COMPRESS_CHUNK_SIZE : len;
215214
thd->from = ptr;
216215
thd->from_len = chunk_len;
217216

218-
pthread_mutex_lock(&thd->data_mutex);
219217
thd->data_avail = TRUE;
220218
pthread_cond_signal(&thd->data_cond);
221219
pthread_mutex_unlock(&thd->data_mutex);
@@ -234,32 +232,30 @@ compress_write(ds_file_t *file, const uchar *buf, size_t len)
234232
thd = threads + i;
235233

236234
pthread_mutex_lock(&thd->data_mutex);
237-
while (thd->data_avail == TRUE) {
238-
pthread_cond_wait(&thd->data_cond,
235+
while (!thd->to_len) {
236+
pthread_cond_wait(&thd->done_cond,
239237
&thd->data_mutex);
240238
}
241239

242-
xb_a(threads[i].to_len > 0);
243-
244240
bool fail = ds_write(dest_file, "NEWBNEWB", 8) ||
245241
write_uint64_le(dest_file,
246242
comp_file->bytes_processed);
247-
comp_file->bytes_processed += threads[i].from_len;
243+
comp_file->bytes_processed += thd->from_len;
248244

249245
if (!fail) {
250-
fail = write_uint32_le(dest_file, threads[i].adler) ||
251-
ds_write(dest_file, threads[i].to,
252-
threads[i].to_len);
246+
fail = write_uint32_le(dest_file, thd->adler) ||
247+
ds_write(dest_file, thd->to,
248+
thd->to_len);
253249
}
254250

255-
pthread_mutex_unlock(&threads[i].data_mutex);
251+
thd->to_len = 0;
252+
pthread_mutex_unlock(&thd->data_mutex);
256253

257254
if (fail) {
258255
msg("compress: write to the destination stream "
259256
"failed.");
260257
return 1;
261258
}
262-
pthread_mutex_unlock(&threads[i].ctrl_mutex);
263259
}
264260
}
265261

@@ -329,6 +325,24 @@ write_uint64_le(ds_file_t *file, ulonglong n)
329325
return ds_write(file, tmp, sizeof(tmp));
330326
}
331327

328+
static
329+
void
330+
destroy_worker_thread(comp_thread_ctxt_t *thd)
331+
{
332+
pthread_mutex_lock(&thd->data_mutex);
333+
thd->cancelled = TRUE;
334+
pthread_cond_signal(&thd->data_cond);
335+
pthread_mutex_unlock(&thd->data_mutex);
336+
337+
pthread_join(thd->id, NULL);
338+
339+
pthread_cond_destroy(&thd->data_cond);
340+
pthread_cond_destroy(&thd->done_cond);
341+
pthread_mutex_destroy(&thd->data_mutex);
342+
343+
my_free(thd->to);
344+
}
345+
332346
static
333347
comp_thread_ctxt_t *
334348
create_worker_threads(uint n)
@@ -337,60 +351,36 @@ create_worker_threads(uint n)
337351
uint i;
338352

339353
threads = (comp_thread_ctxt_t *)
340-
my_malloc(sizeof(comp_thread_ctxt_t) * n, MYF(MY_FAE));
354+
my_malloc(n * sizeof *threads, MYF(MY_ZEROFILL|MY_FAE));
341355

342356
for (i = 0; i < n; i++) {
343357
comp_thread_ctxt_t *thd = threads + i;
344358

345359
thd->num = i + 1;
346-
thd->started = FALSE;
347-
thd->cancelled = FALSE;
348-
thd->data_avail = FALSE;
349-
350360
thd->to = (char *) my_malloc(COMPRESS_CHUNK_SIZE +
351361
MY_QLZ_COMPRESS_OVERHEAD,
352362
MYF(MY_FAE));
353363

354-
/* Initialize the control mutex and condition var */
355-
if (pthread_mutex_init(&thd->ctrl_mutex, NULL) ||
356-
pthread_cond_init(&thd->ctrl_cond, NULL)) {
357-
goto err;
358-
}
359-
360364
/* Initialize and data mutex and condition var */
361365
if (pthread_mutex_init(&thd->data_mutex, NULL) ||
362-
pthread_cond_init(&thd->data_cond, NULL)) {
366+
pthread_cond_init(&thd->data_cond, NULL) ||
367+
pthread_cond_init(&thd->done_cond, NULL)) {
363368
goto err;
364369
}
365370

366-
pthread_mutex_lock(&thd->ctrl_mutex);
367-
368371
if (pthread_create(&thd->id, NULL, compress_worker_thread_func,
369372
thd)) {
370373
msg("compress: pthread_create() failed: "
371374
"errno = %d", errno);
372-
pthread_mutex_unlock(&thd->ctrl_mutex);
373375
goto err;
374376
}
375377
}
376378

377-
/* Wait for the threads to start */
378-
for (i = 0; i < n; i++) {
379-
comp_thread_ctxt_t *thd = threads + i;
380-
381-
while (thd->started == FALSE)
382-
pthread_cond_wait(&thd->ctrl_cond, &thd->ctrl_mutex);
383-
pthread_mutex_unlock(&thd->ctrl_mutex);
384-
}
385-
386379
return threads;
387380

388381
err:
389-
while (i > 0) {
390-
comp_thread_ctxt_t *thd;
391-
i--;
392-
thd = threads + i;
393-
pthread_mutex_unlock(&thd->ctrl_mutex);
382+
for (; i; i--) {
383+
destroy_worker_thread(threads + i);
394384
}
395385

396386
my_free(threads);
@@ -404,21 +394,7 @@ destroy_worker_threads(comp_thread_ctxt_t *threads, uint n)
404394
uint i;
405395

406396
for (i = 0; i < n; i++) {
407-
comp_thread_ctxt_t *thd = threads + i;
408-
409-
pthread_mutex_lock(&thd->data_mutex);
410-
threads[i].cancelled = TRUE;
411-
pthread_cond_signal(&thd->data_cond);
412-
pthread_mutex_unlock(&thd->data_mutex);
413-
414-
pthread_join(thd->id, NULL);
415-
416-
pthread_cond_destroy(&thd->data_cond);
417-
pthread_mutex_destroy(&thd->data_mutex);
418-
pthread_cond_destroy(&thd->ctrl_cond);
419-
pthread_mutex_destroy(&thd->ctrl_mutex);
420-
421-
my_free(thd->to);
397+
destroy_worker_thread(threads + i);
422398
}
423399

424400
my_free(threads);
@@ -430,26 +406,16 @@ compress_worker_thread_func(void *arg)
430406
{
431407
comp_thread_ctxt_t *thd = (comp_thread_ctxt_t *) arg;
432408

433-
pthread_mutex_lock(&thd->ctrl_mutex);
434-
435409
pthread_mutex_lock(&thd->data_mutex);
436410

437-
thd->started = TRUE;
438-
pthread_cond_signal(&thd->ctrl_cond);
439-
440-
pthread_mutex_unlock(&thd->ctrl_mutex);
441-
442411
while (1) {
443-
thd->data_avail = FALSE;
444-
pthread_cond_signal(&thd->data_cond);
445-
446412
while (!thd->data_avail && !thd->cancelled) {
447413
pthread_cond_wait(&thd->data_cond, &thd->data_mutex);
448414
}
449415

450416
if (thd->cancelled)
451417
break;
452-
418+
thd->data_avail = FALSE;
453419
thd->to_len = qlz_compress(thd->from, thd->to, thd->from_len,
454420
&thd->state);
455421

@@ -464,6 +430,7 @@ compress_worker_thread_func(void *arg)
464430

465431
thd->adler = adler32(0x00000001, (uchar *) thd->to,
466432
(uInt)thd->to_len);
433+
pthread_cond_signal(&thd->done_cond);
467434
}
468435

469436
pthread_mutex_unlock(&thd->data_mutex);

0 commit comments

Comments
 (0)