Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
uv: barrier before every snapshot
Browse files Browse the repository at this point in the history
This solves a bug that came up during jepsen tests, it occurs when:
- a server is killed ungracefully (leading to a written open segment in
the directory when the system restarts)
- a snapshot file exists
- no closed segments exist

Because open segments are not closed before writing a snapshot, raft,
when starting up and reading the files in the data directory,
erroneously assumes that all the entries in the open segment are newer
than the entries in the snapshot, while in reality the entries are
already contained in the snapshot, leading to a wrong state.

Closing the current open segments before writing the snapshot ensures
that no old entries can mistakenly be identified as new entries, all
entries in the open segments will be newer than the snapshot.

To close the open segments before making a snapshot, we perform a `barrier`
request, but we need to make a distinction between a 'blocking' and a 'non-blocking'
barrier in order to save performance.
Both barriers close all current open segments before firing the barrier
callback, but a 'non-blocking' barrier allows writes to go through to newly
created open-segments. This non-blocking barrier is used when raft is creating
a snapshot  during regular operation, the blocking barrier is used when installing
snapshots and truncating the log.
  • Loading branch information
Mathieu Borderé committed Feb 11, 2022
1 parent 88c3c89 commit 424de42
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 28 deletions.
3 changes: 2 additions & 1 deletion src/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ typedef void (*UvBarrierCb)(struct UvBarrier *req);
struct UvBarrier
{
void *data; /* User data */
bool blocking; /* Whether this barrier should block future writes */
UvBarrierCb cb; /* Completion callback */
};

Expand All @@ -335,7 +336,7 @@ struct UvBarrier
* that will be appended will have the new index.
*
* - Execution of new writes for subsequent append requests will be blocked
* until UvUnblock is called.
* until UvUnblock is called when the barrier is blocking.
*
* - Wait for all currently pending and inflight append requests against all
* open segments to complete, and for those open segments to be finalized,
Expand Down
5 changes: 2 additions & 3 deletions src/uv_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@ static int uvAppendMaybeStart(struct uv *uv)
return 0;
}

/* If there's a barrier in progress, and it's not waiting for this segment
/* If there's a blocking barrier in progress, and it's not waiting for this segment
* to be finalized, let's wait. */
if (uv->barrier != NULL && segment->barrier != uv->barrier) {
if (uv->barrier != NULL && segment->barrier != uv->barrier && uv->barrier->blocking) {
return 0;
}

Expand Down Expand Up @@ -773,7 +773,6 @@ int UvBarrier(struct uv *uv,

void UvUnblock(struct uv *uv)
{
assert(uv->barrier != NULL);
uv->barrier = NULL;
if (uv->closing) {
uvMaybeFireCloseCb(uv);
Expand Down
30 changes: 14 additions & 16 deletions src/uv_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,14 +573,10 @@ static void uvSnapshotPutAfterWorkCb(uv_work_t *work, int status)
{
struct uvSnapshotPut *put = work->data;
struct uv *uv = put->uv;
bool is_install = put->trailing == 0;
assert(status == 0);
uv->snapshot_put_work.data = NULL;
uvSnapshotPutFinish(put);
if (is_install) {
UvUnblock(uv);
}
uvMaybeFireCloseCb(uv);
UvUnblock(uv);
}

/* Start processing the given put request. */
Expand Down Expand Up @@ -612,7 +608,7 @@ static void uvSnapshotPutBarrierCb(struct UvBarrier *barrier)
}

struct uv *uv = put->uv;
assert(put->trailing == 0);
assert((barrier->blocking && put->trailing == 0) || !barrier->blocking);
put->barrier.data = NULL;
/* If we're closing, abort the request. */
if (uv->closing) {
Expand All @@ -635,6 +631,7 @@ int UvSnapshotPut(struct raft_io *io,
void *cursor;
unsigned crc;
int rv;
raft_index next_index;

uv = io->impl;
assert(!uv->closing);
Expand All @@ -653,6 +650,7 @@ int UvSnapshotPut(struct raft_io *io,
put->meta.timestamp = uv_now(uv->loop);
put->trailing = trailing;
put->barrier.data = put;
put->barrier.blocking = trailing == 0;

req->cb = cb;

Expand All @@ -677,17 +675,17 @@ int UvSnapshotPut(struct raft_io *io,
cursor = &put->meta.header[1];
bytePut64(&cursor, crc);

/* If the trailing parameter is set to 0, it means that we're restoring a
/* - If the trailing parameter is set to 0, it means that we're restoring a
* snapshot. Submit a barrier request setting the next append index to the
* snapshot's last index + 1. */
if (trailing == 0) {
rv = UvBarrier(uv, snapshot->index + 1, &put->barrier,
uvSnapshotPutBarrierCb);
if (rv != 0) {
goto err_after_configuration_encode;
}
} else {
uvSnapshotPutStart(put);
* snapshot's last index + 1.
* - When we are only writing a snapshot during normal operation, we close
* all current open segments. New writes can continue on newly opened
* segments that will only contain entries that are newer than the snapshot,
* and we don't change append_next_index. */
next_index = (trailing == 0) ? (snapshot->index + 1) : uv->append_next_index;
rv = UvBarrier(uv, next_index, &put->barrier, uvSnapshotPutBarrierCb);
if (rv != 0) {
goto err_after_configuration_encode;
}

return 0;
Expand Down
1 change: 1 addition & 0 deletions src/uv_truncate.c
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ int UvTruncate(struct raft_io *io, raft_index index)
truncate->uv = uv;
truncate->index = index;
truncate->barrier.data = truncate;
truncate->barrier.blocking = true;

/* Make sure that we wait for any inflight writes to finish and then close
* the current segment. */
Expand Down
106 changes: 99 additions & 7 deletions test/integration/test_uv_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,11 @@ TEST(append, ioSetupError, setUp, tearDown, 0, NULL)

struct barrierData
{
int current; /* Count the number of finished AppendEntries RPCs */
int expected; /* Expected number of finished AppendEntries RPCs */
bool done; /* @true if the Barrier CB has fired */
bool expectDone; /* Expect the Barrier CB to have fired or not */
int current; /* Count the number of finished AppendEntries RPCs */
int expected; /* Expected number of finished AppendEntries RPCs */
bool done; /* @true if the Barrier CB has fired */
bool expectDone; /* Expect the Barrier CB to have fired or not */
char** files; /* Expected files in the directory, NULL terminated */
struct uv *uv;
};

Expand All @@ -611,6 +612,13 @@ static void barrierCbCompareCounter(struct UvBarrier *barrier)
struct uv *uv = bd->uv;
UvUnblock(uv);
munit_assert_int(bd->current, ==, bd->expected);
if (bd->files != NULL) {
int i = 0;
while (bd->files[i] != NULL) {
munit_assert_true(DirHasFile(uv->dir, bd->files[i]));
++i;
}
}
}

static void appendCbIncreaseCounterAssertResult(struct raft_io_append *req,
Expand All @@ -624,11 +632,17 @@ static void appendCbIncreaseCounterAssertResult(struct raft_io_append *req,
bd->current += 1;
}

static char* bools[] = { "0", "1", NULL };
static MunitParameterEnum blocking_bool_params[] = {
{ "bool", bools },
{ NULL, NULL },
};

/* Fill up 3 segments worth of AppendEntries RPC's.
* Request a Barrier and expect that the AppendEntries RPC's are finished before
* the Barrier callback is fired.
*/
TEST(append, barrierOpenSegments, setUp, tearDown, 0, NULL)
TEST(append, barrierOpenSegments, setUp, tearDown, 0, blocking_bool_params)
{
struct fixture *f = data;
struct barrierData bd = {0};
Expand All @@ -637,13 +651,17 @@ TEST(append, barrierOpenSegments, setUp, tearDown, 0, NULL)
bd.done = false;
bd.expectDone = false;
bd.uv = f->io.impl;
char* files[] = {"0000000000000001-0000000000000004", "0000000000000005-0000000000000008",
"0000000000000009-0000000000000012", NULL};
bd.files = files;

APPEND_SUBMIT_CB_DATA(0, MAX_SEGMENT_BLOCKS, SEGMENT_BLOCK_SIZE, appendCbIncreaseCounterAssertResult, &bd);
APPEND_SUBMIT_CB_DATA(1, MAX_SEGMENT_BLOCKS, SEGMENT_BLOCK_SIZE, appendCbIncreaseCounterAssertResult, &bd);
APPEND_SUBMIT_CB_DATA(2, MAX_SEGMENT_BLOCKS, SEGMENT_BLOCK_SIZE, appendCbIncreaseCounterAssertResult, &bd);

struct UvBarrier barrier = {0};
barrier.data = (void*) &bd;
barrier.blocking = (bool)strtoul(munit_parameters_get(params, "bool"), NULL, 0);
UvBarrier(f->io.impl, 1, &barrier, barrierCbCompareCounter);

/* Make sure every callback fired */
Expand All @@ -654,10 +672,40 @@ TEST(append, barrierOpenSegments, setUp, tearDown, 0, NULL)
return MUNIT_OK;
}

/* Request a Barrier and expect that the no AppendEntries RPC's are finished before

/* Request a blocking Barrier and expect that the no AppendEntries RPC's are finished before
* the Barrier callback is fired.
*/
TEST(append, barrierNoOpenSegments, setUp, tearDown, 0, NULL)
TEST(append, blockingBarrierNoOpenSegments, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
struct barrierData bd = {0};
bd.current = 0;
bd.expected = 0;
bd.done = false;
bd.expectDone = true;
bd.uv = f->io.impl;

struct UvBarrier barrier = {0};
barrier.data = (void*) &bd;
barrier.blocking = true;
UvBarrier(f->io.impl, 1, &barrier, barrierCbCompareCounter);

APPEND_SUBMIT_CB_DATA(0, MAX_SEGMENT_BLOCKS, SEGMENT_BLOCK_SIZE, appendCbIncreaseCounterAssertResult, &bd);
APPEND_SUBMIT_CB_DATA(1, MAX_SEGMENT_BLOCKS, SEGMENT_BLOCK_SIZE, appendCbIncreaseCounterAssertResult, &bd);
APPEND_SUBMIT_CB_DATA(2, MAX_SEGMENT_BLOCKS, SEGMENT_BLOCK_SIZE, appendCbIncreaseCounterAssertResult, &bd);

/* Make sure every callback fired */
LOOP_RUN_UNTIL(&bd.done);
APPEND_WAIT(0);
APPEND_WAIT(1);
APPEND_WAIT(2);
return MUNIT_OK;
}

/* Request a blocking Barrier and expect that the no AppendEntries RPC's are finished before
* the Barrier callback is fired. */
TEST(append, blockingBarrierSingleOpenSegment, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
struct barrierData bd = {0};
Expand All @@ -666,9 +714,19 @@ TEST(append, barrierNoOpenSegments, setUp, tearDown, 0, NULL)
bd.done = false;
bd.expectDone = true;
bd.uv = f->io.impl;
char* files[] = { "0000000000000001-0000000000000001", NULL };
bd.files = files;

/* Wait until there is at least 1 open segment otherwise
* the barrier Cb is fired immediately. */
APPEND(1, 64);
while (!DirHasFile(f->dir, "open-1")) {
LOOP_RUN(1);
}

struct UvBarrier barrier = {0};
barrier.data = (void*) &bd;
barrier.blocking = true;
UvBarrier(f->io.impl, 1, &barrier, barrierCbCompareCounter);

APPEND_SUBMIT_CB_DATA(0, MAX_SEGMENT_BLOCKS, SEGMENT_BLOCK_SIZE, appendCbIncreaseCounterAssertResult, &bd);
Expand All @@ -682,3 +740,37 @@ TEST(append, barrierNoOpenSegments, setUp, tearDown, 0, NULL)
APPEND_WAIT(2);
return MUNIT_OK;
}

/* Request a non-blocking Barrier and expect the AppendEntries RPC to finish before
* the Barrier callback is fired.
*/
TEST(append, nonBlockingBarrierSingleOpenSegment, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
struct barrierData bd = {0};
bd.current = 0;
bd.expected = 1;
bd.done = false;
bd.expectDone = false;
bd.uv = f->io.impl;
char* files[] = { "0000000000000001-0000000000000001", NULL };
bd.files = files;

/* Wait until there is at least 1 open segment otherwise
* the barrier Cb is fired immediately. */
APPEND(1, 64);
while (!DirHasFile(f->dir, "open-1")) {
LOOP_RUN(1);
}

struct UvBarrier barrier = {0};
barrier.data = (void*) &bd;
barrier.blocking = false;
UvBarrier(f->io.impl, bd.uv->append_next_index, &barrier, barrierCbCompareCounter);
APPEND_SUBMIT_CB_DATA(0, 1, 64, appendCbIncreaseCounterAssertResult, &bd);

/* Make sure every callback fired */
LOOP_RUN_UNTIL(&bd.done);
APPEND_WAIT(0);
return MUNIT_OK;
}
2 changes: 1 addition & 1 deletion test/lib/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#include "../../include/raft.h"

#define FIXTURE_TRACER struct raft_tracer tracer
#define SET_UP_TRACER f->tracer.emit = TracerEmit
#define SET_UP_TRACER f->tracer.emit = TracerEmit; f->tracer.enabled = true
#define TEAR_DOWN_TRACER

void TracerEmit(struct raft_tracer *t,
Expand Down

0 comments on commit 424de42

Please sign in to comment.