Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

New segment format #444

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ example/cluster
benchmark/os-disk-write
tmp
conftest*
doc/build
doc/build
recovery/downgrade_segment_format
12 changes: 12 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,18 @@ benchmark_os_disk_write_LDFLAGS = -luring

endif # BENCHMARK_ENABLED

if RECOVERY_ENABLED

bin_PROGRAMS += \
recovery/downgrade_segment_format

recovery_downgrade_segment_format_SOURCES = \
recovery/downgrade_segment_format.c \
${libraft_la_SOURCES}
recovery_downgrade_segment_format_LDFLAGS = $(UV_LIBS)

endif # RECOVERY_ENABLED

if DEBUG_ENABLED
AM_CFLAGS += -Werror -Wall
else
Expand Down
4 changes: 4 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ AM_CONDITIONAL(EXAMPLE_ENABLED, test "x$enable_example" = "xyes")
AC_ARG_ENABLE(benchmark, AS_HELP_STRING([--enable-benchmark[=ARG]], [build the benchmark programs [default=no]]))
AM_CONDITIONAL(BENCHMARK_ENABLED, test "x$enable_benchmark" = "xyes")

# The recovery programs are optional.
AC_ARG_ENABLE(recovery, AS_HELP_STRING([--enable-recovery[=ARG]], [build the recovery programs [default=no]]))
AM_CONDITIONAL(RECOVERY_ENABLED, test "x$enable_recovery" = "xyes")

# Whether to enable debugging code.
AC_ARG_ENABLE(debug, AS_HELP_STRING([--enable-debug[=ARG]], [enable debugging [default=no]]))
AM_CONDITIONAL(DEBUG_ENABLED, test "x$enable_debug" = "xyes")
Expand Down
30 changes: 30 additions & 0 deletions recovery/downgrade_segment_format.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#include <malloc.h>

#include "raft.h"
#include "../src/uv.h"

int main(int argc, char *argv[])
{
int rv;
char errmsg[RAFT_ERRMSG_BUF_SIZE] = {0};
struct uv *uv = malloc(sizeof(*uv));
if (argc != 2) {
fprintf(stderr, "expect a single 'dir' argument.\n");
return 2;
}
if (uv == NULL) {
fprintf(stderr, "MALLOC\n");
return 1;
}

strncpy(uv->dir, argv[1], UV__DIR_LEN-1);
uv->dir[UV__DIR_LEN-1] = '\0';

rv = UvSegmentConvertDirToFormat(uv, 1, errmsg);
if (rv != 0) {
fprintf(stderr, "downgrading segments failed: %s\n", errmsg);
} else {
printf("downgrading segments success\n");
}
return rv;
}
160 changes: 20 additions & 140 deletions src/uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,102 +239,6 @@ static void uvClose(struct raft_io *io, raft_io_close_cb cb)
uvMaybeFireCloseCb(uv);
}

/* Filter the given segment list to find the most recent contiguous chunk of
* closed segments that overlaps with the given snapshot last index. */
static int uvFilterSegments(struct uv *uv,
raft_index last_index,
const char *snapshot_filename,
struct uvSegmentInfo **segments,
size_t *n)
{
struct uvSegmentInfo *segment;
size_t i; /* First valid closed segment. */
size_t j; /* Last valid closed segment. */

/* If there are not segments at all, or only open segments, there's nothing
* to do. */
if (*segments == NULL || (*segments)[0].is_open) {
return 0;
}

/* Find the index of the most recent closed segment. */
for (j = 0; j < *n; j++) {
segment = &(*segments)[j];
if (segment->is_open) {
break;
}
}
assert(j > 0);
j--;

segment = &(*segments)[j];
tracef("most recent closed segment is %s", segment->filename);

/* If the end index of the last closed segment is lower than the last
* snapshot index, there might be no entry that we can keep. We return an
* empty segment list, unless there is at least one open segment, in that
* case we keep everything hoping that they contain all the entries since
* the last closed segment (TODO: we should encode the starting entry in the
* open segment). */
if (segment->end_index < last_index) {
if (!(*segments)[*n - 1].is_open) {
tracef(
"discarding all closed segments, since most recent is behind "
"last snapshot");
raft_free(*segments);
*segments = NULL;
*n = 0;
return 0;
}
tracef(
"most recent closed segment %s is behind last snapshot, "
"yet there are open segments",
segment->filename);
}

/* Now scan the segments backwards, searching for the longest list of
* contiguous closed segments. */
if (j >= 1) {
for (i = j; i > 0; i--) {
struct uvSegmentInfo *newer;
struct uvSegmentInfo *older;
newer = &(*segments)[i];
older = &(*segments)[i - 1];
if (older->end_index != newer->first_index - 1) {
tracef("discarding non contiguous segment %s", older->filename);
break;
}
}
} else {
i = j;
}

/* Make sure that the first index of the first valid closed segment is not
* greater than the snapshot's last index plus one (so there are no
* missing entries). */
segment = &(*segments)[i];
if (segment->first_index > last_index + 1) {
ErrMsgPrintf(uv->io->errmsg,
"closed segment %s is past last snapshot %s",
segment->filename, snapshot_filename);
return RAFT_CORRUPT;
}

if (i != 0) {
size_t new_n = *n - i;
struct uvSegmentInfo *new_segments;
new_segments = raft_malloc(new_n * sizeof *new_segments);
if (new_segments == NULL) {
return RAFT_NOMEM;
}
memcpy(new_segments, &(*segments)[i], new_n * sizeof *new_segments);
raft_free(*segments);
*segments = new_segments;
*n = new_n;
}

return 0;
}

/* Load the last snapshot (if any) and all entries contained in all segment
* files of the data directory. This function can be called recursively, `depth`
Expand All @@ -346,8 +250,10 @@ static int uvLoadSnapshotAndEntries(struct uv *uv,
size_t *n,
int depth)
{
char snapshot_filename[UV__FILENAME_LEN] = {0};
struct uvSnapshotInfo *snapshots;
struct uvSegmentInfo *segments;
raft_index snapshot_index = 0;
size_t n_snapshots;
size_t n_segments;
int rv;
Expand All @@ -366,7 +272,6 @@ static int uvLoadSnapshotAndEntries(struct uv *uv,

/* Load the most recent snapshot, if any. */
if (snapshots != NULL) {
char snapshot_filename[UV__FILENAME_LEN];
*snapshot = RaftHeapMalloc(sizeof **snapshot);
if (*snapshot == NULL) {
rv = RAFT_NOMEM;
Expand All @@ -380,57 +285,32 @@ static int uvLoadSnapshotAndEntries(struct uv *uv,
goto err;
}
uvSnapshotFilenameOf(&snapshots[n_snapshots - 1], snapshot_filename);
tracef("most recent snapshot at %lld", (*snapshot)->index);
snapshot_index = (*snapshot)->index;
tracef("most recent snapshot at %lld", snapshot_index);
RaftHeapFree(snapshots);
snapshots = NULL;

/* Update the start index. If there are closed segments on disk let's
* make sure that the first index of the first closed segment is not
* greater than the snapshot's last index plus one (so there are no
* missing entries), and update the start index accordingly. */
rv = uvFilterSegments(uv, (*snapshot)->index, snapshot_filename,
&segments, &n_segments);
if (rv != 0) {
goto err;
}
if (segments != NULL) {
if (segments[0].is_open) {
*start_index = (*snapshot)->index + 1;
} else {
*start_index = segments[0].first_index;
}
} else {
*start_index = (*snapshot)->index + 1;
}
}

/* Read data from segments, closing any open segments. */
if (segments != NULL) {
raft_index last_index;
rv = uvSegmentLoadAll(uv, *start_index, segments, n_segments, entries,
n);
if (rv != 0) {
goto err;
}
/* If there are closed segments on disk let's ensure that the first
* index of the first closed segment is not greater than the snapshot's
* last index plus one (so there are no missing entries). */
rv = uvSegmentFilter(uv, snapshot_index, snapshot_filename,
&segments, &n_segments);
if (rv != 0) {
goto err;
}

/* Check if all entries that we loaded are actually behind the last
* snapshot. This can happen if the last closed segment was behind the
* last snapshot and there were open segments, but the entries in the
* open segments turned out to be behind the snapshot as well. */
last_index = *start_index + *n - 1;
if (*snapshot != NULL && last_index < (*snapshot)->index) {
ErrMsgPrintf(uv->io->errmsg,
"last entry on disk has index %llu, which is behind "
"last snapshot's index %llu",
last_index, (*snapshot)->index);
rv = RAFT_CORRUPT;
goto err;
}
*start_index = uvSegmentStartIndex(uv, segments, snapshot_index);

raft_free(segments);
segments = NULL;
/* Read data from segments, closing any open segments. */
rv = uvSegmentLoadAll(uv, *start_index, snapshot_index,
segments, n_segments, entries, n);
if (rv != 0) {
goto err;
}

raft_free(segments);
segments = NULL;
return 0;

err:
Expand Down
31 changes: 26 additions & 5 deletions src/uv.h
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,36 @@ int uvSegmentLoadClosed(struct uv *uv,
struct raft_entry *entries[],
size_t *n);

/* Load raft entries from the given segments. The @start_index is the expected
* index of the first entry of the first segment. */
/* Determine the start index of the next entry to load from the segments. */
raft_index uvSegmentStartIndex(struct uv *uv,
struct uvSegmentInfo *infos,
raft_index snapshot_index);

/* Filter the given segment list to find the most recent contiguous chunk of
* closed segments that overlaps with the given snapshot last index. */
int uvSegmentFilter(struct uv *uv,
raft_index snapshot_index,
const char *snapshot_filename,
struct uvSegmentInfo **segments,
size_t *n);

/* Load raft entries from the given segments starting from @start_index.
* @snapshot_index is used to perform a sanity check after the segments have
* been loaded. */
int uvSegmentLoadAll(struct uv *uv,
const raft_index start_index,
const raft_index snapshot_index,
struct uvSegmentInfo *segments,
size_t n_segments,
struct raft_entry **entries,
size_t *n_entries);

/* Converts all segments in the uv dir to the specified format. Valid values for
* @format are 1 or the macros UV__DISK_FORMAT (1) and UV__SEGMENT_DISK_FORMAT_LEGACY (1).
* Returns 0 on success. */
int UvSegmentConvertDirToFormat(struct uv *uv, int format,
char errmsg[RAFT_ERRMSG_BUF_SIZE]);

/* Return the number of blocks in a segments. */
#define uvSegmentBlocks(UV) (UV->segment_size / UV->block_size)

Expand All @@ -183,9 +204,9 @@ void uvSegmentBufferInit(struct uvSegmentBuffer *b, size_t block_size);
/* Release all memory used by the buffer. */
void uvSegmentBufferClose(struct uvSegmentBuffer *b);

/* Encode the format version at the very beginning of the buffer. This function
* must be called when the buffer is empty. */
int uvSegmentBufferFormat(struct uvSegmentBuffer *b);
/* Encode the format version and first_index at the very beginning of the
* buffer. This function must be called when the buffer is empty. */
int uvSegmentBufferFormat(struct uvSegmentBuffer *b, raft_index first_index);

/* Extend the segment's buffer by encoding the given entries.
*
Expand Down
2 changes: 1 addition & 1 deletion src/uv_append.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ static int uvAliveSegmentEncodeEntriesToWriteBuf(struct uvAliveSegment *segment,
/* If this is the very first write to the segment, we need to include the
* format version */
if (segment->pending.n == 0 && segment->next_block == 0) {
rv = uvSegmentBufferFormat(&segment->pending);
rv = uvSegmentBufferFormat(&segment->pending, segment->first_index);
if (rv != 0) {
return rv;
}
Expand Down
Loading
Loading