diff --git a/.gitignore b/.gitignore index 90ca34324..4d907d4fe 100644 --- a/.gitignore +++ b/.gitignore @@ -37,4 +37,5 @@ example/cluster benchmark/os-disk-write tmp conftest* -doc/build \ No newline at end of file +doc/build +recovery/downgrade_segment_format diff --git a/Makefile.am b/Makefile.am index 024ed5a9e..d89b93730 100644 --- a/Makefile.am +++ b/Makefile.am @@ -254,6 +254,18 @@ benchmark_os_disk_write_LDFLAGS = -luring endif # BENCHMARK_ENABLED +if RECOVERY_ENABLED + +bin_PROGRAMS += \ + recovery/downgrade_segment_format + +recovery_downgrade_segment_format_SOURCES = \ + recovery/downgrade_segment_format.c \ + ${libraft_la_SOURCES} +recovery_downgrade_segment_format_LDFLAGS = $(UV_LIBS) + +endif # RECOVERY_ENABLED + if DEBUG_ENABLED AM_CFLAGS += -Werror -Wall else diff --git a/configure.ac b/configure.ac index d70f94ced..2564b965c 100644 --- a/configure.ac +++ b/configure.ac @@ -54,6 +54,10 @@ AM_CONDITIONAL(EXAMPLE_ENABLED, test "x$enable_example" = "xyes") AC_ARG_ENABLE(benchmark, AS_HELP_STRING([--enable-benchmark[=ARG]], [build the benchmark programs [default=no]])) AM_CONDITIONAL(BENCHMARK_ENABLED, test "x$enable_benchmark" = "xyes") +# The recovery programs are optional. +AC_ARG_ENABLE(recovery, AS_HELP_STRING([--enable-recovery[=ARG]], [build the recovery programs [default=no]])) +AM_CONDITIONAL(RECOVERY_ENABLED, test "x$enable_recovery" = "xyes") + # Whether to enable debugging code. AC_ARG_ENABLE(debug, AS_HELP_STRING([--enable-debug[=ARG]], [enable debugging [default=no]])) AM_CONDITIONAL(DEBUG_ENABLED, test "x$enable_debug" = "xyes") diff --git a/recovery/downgrade_segment_format.c b/recovery/downgrade_segment_format.c new file mode 100644 index 000000000..d848d06d1 --- /dev/null +++ b/recovery/downgrade_segment_format.c @@ -0,0 +1,30 @@ +#include + +#include "raft.h" +#include "../src/uv.h" + +int main(int argc, char *argv[]) +{ + int rv; + char errmsg[RAFT_ERRMSG_BUF_SIZE] = {0}; + struct uv *uv = malloc(sizeof(*uv)); + if (argc != 2) { + fprintf(stderr, "expect a single 'dir' argument.\n"); + return 2; + } + if (uv == NULL) { + fprintf(stderr, "MALLOC\n"); + return 1; + } + + strncpy(uv->dir, argv[1], UV__DIR_LEN-1); + uv->dir[UV__DIR_LEN-1] = '\0'; + + rv = UvSegmentConvertDirToFormat(uv, 1, errmsg); + if (rv != 0) { + fprintf(stderr, "downgrading segments failed: %s\n", errmsg); + } else { + printf("downgrading segments success\n"); + } + return rv; +} diff --git a/src/uv.c b/src/uv.c index 974ca01f2..06b830c35 100644 --- a/src/uv.c +++ b/src/uv.c @@ -239,102 +239,6 @@ static void uvClose(struct raft_io *io, raft_io_close_cb cb) uvMaybeFireCloseCb(uv); } -/* Filter the given segment list to find the most recent contiguous chunk of - * closed segments that overlaps with the given snapshot last index. */ -static int uvFilterSegments(struct uv *uv, - raft_index last_index, - const char *snapshot_filename, - struct uvSegmentInfo **segments, - size_t *n) -{ - struct uvSegmentInfo *segment; - size_t i; /* First valid closed segment. */ - size_t j; /* Last valid closed segment. */ - - /* If there are not segments at all, or only open segments, there's nothing - * to do. */ - if (*segments == NULL || (*segments)[0].is_open) { - return 0; - } - - /* Find the index of the most recent closed segment. */ - for (j = 0; j < *n; j++) { - segment = &(*segments)[j]; - if (segment->is_open) { - break; - } - } - assert(j > 0); - j--; - - segment = &(*segments)[j]; - tracef("most recent closed segment is %s", segment->filename); - - /* If the end index of the last closed segment is lower than the last - * snapshot index, there might be no entry that we can keep. We return an - * empty segment list, unless there is at least one open segment, in that - * case we keep everything hoping that they contain all the entries since - * the last closed segment (TODO: we should encode the starting entry in the - * open segment). */ - if (segment->end_index < last_index) { - if (!(*segments)[*n - 1].is_open) { - tracef( - "discarding all closed segments, since most recent is behind " - "last snapshot"); - raft_free(*segments); - *segments = NULL; - *n = 0; - return 0; - } - tracef( - "most recent closed segment %s is behind last snapshot, " - "yet there are open segments", - segment->filename); - } - - /* Now scan the segments backwards, searching for the longest list of - * contiguous closed segments. */ - if (j >= 1) { - for (i = j; i > 0; i--) { - struct uvSegmentInfo *newer; - struct uvSegmentInfo *older; - newer = &(*segments)[i]; - older = &(*segments)[i - 1]; - if (older->end_index != newer->first_index - 1) { - tracef("discarding non contiguous segment %s", older->filename); - break; - } - } - } else { - i = j; - } - - /* Make sure that the first index of the first valid closed segment is not - * greater than the snapshot's last index plus one (so there are no - * missing entries). */ - segment = &(*segments)[i]; - if (segment->first_index > last_index + 1) { - ErrMsgPrintf(uv->io->errmsg, - "closed segment %s is past last snapshot %s", - segment->filename, snapshot_filename); - return RAFT_CORRUPT; - } - - if (i != 0) { - size_t new_n = *n - i; - struct uvSegmentInfo *new_segments; - new_segments = raft_malloc(new_n * sizeof *new_segments); - if (new_segments == NULL) { - return RAFT_NOMEM; - } - memcpy(new_segments, &(*segments)[i], new_n * sizeof *new_segments); - raft_free(*segments); - *segments = new_segments; - *n = new_n; - } - - return 0; -} /* Load the last snapshot (if any) and all entries contained in all segment * files of the data directory. This function can be called recursively, `depth` @@ -346,8 +250,10 @@ static int uvLoadSnapshotAndEntries(struct uv *uv, size_t *n, int depth) { + char snapshot_filename[UV__FILENAME_LEN] = {0}; struct uvSnapshotInfo *snapshots; struct uvSegmentInfo *segments; + raft_index snapshot_index = 0; size_t n_snapshots; size_t n_segments; int rv; @@ -366,7 +272,6 @@ static int uvLoadSnapshotAndEntries(struct uv *uv, /* Load the most recent snapshot, if any. */ if (snapshots != NULL) { - char snapshot_filename[UV__FILENAME_LEN]; *snapshot = RaftHeapMalloc(sizeof **snapshot); if (*snapshot == NULL) { rv = RAFT_NOMEM; @@ -380,57 +285,32 @@ static int uvLoadSnapshotAndEntries(struct uv *uv, goto err; } uvSnapshotFilenameOf(&snapshots[n_snapshots - 1], snapshot_filename); - tracef("most recent snapshot at %lld", (*snapshot)->index); + snapshot_index = (*snapshot)->index; + tracef("most recent snapshot at %lld", snapshot_index); RaftHeapFree(snapshots); snapshots = NULL; - - /* Update the start index. If there are closed segments on disk let's - * make sure that the first index of the first closed segment is not - * greater than the snapshot's last index plus one (so there are no - * missing entries), and update the start index accordingly. */ - rv = uvFilterSegments(uv, (*snapshot)->index, snapshot_filename, - &segments, &n_segments); - if (rv != 0) { - goto err; - } - if (segments != NULL) { - if (segments[0].is_open) { - *start_index = (*snapshot)->index + 1; - } else { - *start_index = segments[0].first_index; - } - } else { - *start_index = (*snapshot)->index + 1; - } } - /* Read data from segments, closing any open segments. */ - if (segments != NULL) { - raft_index last_index; - rv = uvSegmentLoadAll(uv, *start_index, segments, n_segments, entries, - n); - if (rv != 0) { - goto err; - } + /* If there are closed segments on disk let's ensure that the first + * index of the first closed segment is not greater than the snapshot's + * last index plus one (so there are no missing entries). */ + rv = uvSegmentFilter(uv, snapshot_index, snapshot_filename, + &segments, &n_segments); + if (rv != 0) { + goto err; + } - /* Check if all entries that we loaded are actually behind the last - * snapshot. This can happen if the last closed segment was behind the - * last snapshot and there were open segments, but the entries in the - * open segments turned out to be behind the snapshot as well. */ - last_index = *start_index + *n - 1; - if (*snapshot != NULL && last_index < (*snapshot)->index) { - ErrMsgPrintf(uv->io->errmsg, - "last entry on disk has index %llu, which is behind " - "last snapshot's index %llu", - last_index, (*snapshot)->index); - rv = RAFT_CORRUPT; - goto err; - } + *start_index = uvSegmentStartIndex(uv, segments, snapshot_index); - raft_free(segments); - segments = NULL; + /* Read data from segments, closing any open segments. */ + rv = uvSegmentLoadAll(uv, *start_index, snapshot_index, + segments, n_segments, entries, n); + if (rv != 0) { + goto err; } + raft_free(segments); + segments = NULL; return 0; err: diff --git a/src/uv.h b/src/uv.h index 91f05fd73..de261e388 100644 --- a/src/uv.h +++ b/src/uv.h @@ -154,15 +154,36 @@ int uvSegmentLoadClosed(struct uv *uv, struct raft_entry *entries[], size_t *n); -/* Load raft entries from the given segments. The @start_index is the expected - * index of the first entry of the first segment. */ +/* Determine the start index of the next entry to load from the segments. */ +raft_index uvSegmentStartIndex(struct uv *uv, + struct uvSegmentInfo *infos, + raft_index snapshot_index); + +/* Filter the given segment list to find the most recent contiguous chunk of + * closed segments that overlaps with the given snapshot last index. */ +int uvSegmentFilter(struct uv *uv, + raft_index snapshot_index, + const char *snapshot_filename, + struct uvSegmentInfo **segments, + size_t *n); + +/* Load raft entries from the given segments starting from @start_index. + * @snapshot_index is used to perform a sanity check after the segments have + * been loaded. */ int uvSegmentLoadAll(struct uv *uv, const raft_index start_index, + const raft_index snapshot_index, struct uvSegmentInfo *segments, size_t n_segments, struct raft_entry **entries, size_t *n_entries); +/* Converts all segments in the uv dir to the specified format. Valid values for + * @format are 1 or the macros UV__DISK_FORMAT (1) and UV__SEGMENT_DISK_FORMAT_LEGACY (1). + * Returns 0 on success. */ +int UvSegmentConvertDirToFormat(struct uv *uv, int format, + char errmsg[RAFT_ERRMSG_BUF_SIZE]); + /* Return the number of blocks in a segments. */ #define uvSegmentBlocks(UV) (UV->segment_size / UV->block_size) @@ -183,9 +204,9 @@ void uvSegmentBufferInit(struct uvSegmentBuffer *b, size_t block_size); /* Release all memory used by the buffer. */ void uvSegmentBufferClose(struct uvSegmentBuffer *b); -/* Encode the format version at the very beginning of the buffer. This function - * must be called when the buffer is empty. */ -int uvSegmentBufferFormat(struct uvSegmentBuffer *b); +/* Encode the format version and first_index at the very beginning of the + * buffer. This function must be called when the buffer is empty. */ +int uvSegmentBufferFormat(struct uvSegmentBuffer *b, raft_index first_index); /* Extend the segment's buffer by encoding the given entries. * diff --git a/src/uv_append.c b/src/uv_append.c index ba9fba456..e0a73153f 100644 --- a/src/uv_append.c +++ b/src/uv_append.c @@ -165,7 +165,7 @@ static int uvAliveSegmentEncodeEntriesToWriteBuf(struct uvAliveSegment *segment, /* If this is the very first write to the segment, we need to include the * format version */ if (segment->pending.n == 0 && segment->next_block == 0) { - rv = uvSegmentBufferFormat(&segment->pending); + rv = uvSegmentBufferFormat(&segment->pending, segment->first_index); if (rv != 0) { return rv; } diff --git a/src/uv_encoding.c b/src/uv_encoding.c index 8e770f6bf..54d39ddfb 100644 --- a/src/uv_encoding.c +++ b/src/uv_encoding.c @@ -7,6 +7,7 @@ #include "assert.h" #include "byte.h" #include "configuration.h" +#include "err.h" /** * Size of the request preamble. @@ -565,3 +566,64 @@ void uvDecodeEntriesBatch(uint8_t *batch, } } } + +size_t uvSizeofSegmentHeader(uint64_t format) +{ + switch (format) { + case UV__SEGMENT_DISK_FORMAT_LEGACY: + return sizeof(uint64_t); // Format version + case UV__SEGMENT_DISK_FORMAT_2: + return sizeof(uint64_t) // Format version + + sizeof(uint64_t) // First Index + + sizeof(uint64_t); // 4 byte CRC + 4 byte unused + default: + return 0; + } +} + +void uvEncodeSegmentHeaderFormat2(uint64_t first_index, void *buf) +{ + void *cursor = buf; + uint32_t crc; + + bytePut64(&cursor, UV__SEGMENT_DISK_FORMAT_2); + bytePut64(&cursor, first_index); + crc = byteCrc32(buf, 2 * sizeof(uint64_t), 0); + bytePut32(&cursor, crc); +} + +int uvDecodeSegmentHeaderValidate(void *buf, size_t n, uint64_t *format, + char errmsg[RAFT_ERRMSG_BUF_SIZE]) +{ + uint32_t crc_read; + uint32_t crc_calculated; + const void *cursor; + size_t crc_offset; + + assert(n >= 8); + *format = byteFlip64(*(uint64_t *)buf); + if (*format == 0) { + return 0; // Potentially all-zeros file. + } + if (*format == UV__SEGMENT_DISK_FORMAT_LEGACY) { + return 0; // Can't do any validation. + } + if (*format != UV__SEGMENT_DISK_FORMAT_2) { + ErrMsgPrintf(errmsg, "unexpected format version %ju", *format); + return RAFT_CORRUPT; // Unknown format. + } + // Format 2 as of here. + if (n < uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2)) { + ErrMsgPrintf(errmsg, "file has only %zu bytes", n); + return RAFT_IOERR; + } + crc_offset = 2 * sizeof(uint64_t); // 8 bytes format, 8 bytes first_index + crc_calculated = byteCrc32(buf, crc_offset, 0); + cursor = (char*)buf + crc_offset; + crc_read = byteGet32(&cursor); + if (crc_calculated != crc_read) { + ErrMsgPrintf(errmsg, "segment header crc mismatch"); + return RAFT_CORRUPT; + } + return 0; +} diff --git a/src/uv_encoding.h b/src/uv_encoding.h index e349fb1cb..061e51e27 100644 --- a/src/uv_encoding.h +++ b/src/uv_encoding.h @@ -10,6 +10,9 @@ /* Current disk format version. */ #define UV__DISK_FORMAT 1 +#define UV__SEGMENT_DISK_FORMAT_LEGACY UV__DISK_FORMAT +#define UV__SEGMENT_DISK_FORMAT_2 2 + int uvEncodeMessage(const struct raft_message *message, uv_buf_t **bufs, unsigned *n_bufs); @@ -56,4 +59,14 @@ void uvEncodeBatchHeader(const struct raft_entry *entries, unsigned n, void *buf); +/* Validates the contents of the segment header in @buf of len @n. + * Returns 0 when the segment header is valid. When the segment is valid, + * @format contains the format version that can potentially be 0 and valid when + * the file contains all 0's. It's the caller's responsibility to do further + * validation. */ +int uvDecodeSegmentHeaderValidate(void *buf, size_t n, uint64_t *format, char errmsg[RAFT_ERRMSG_BUF_SIZE]); + +size_t uvSizeofSegmentHeader(uint64_t format); + +void uvEncodeSegmentHeaderFormat2(uint64_t first_index, void *buf); #endif /* UV_ENCODING_H_ */ diff --git a/src/uv_segment.c b/src/uv_segment.c index e0db51588..f042d48e5 100644 --- a/src/uv_segment.c +++ b/src/uv_segment.c @@ -157,7 +157,8 @@ int uvSegmentKeepTrailing(struct uv *uv, return 0; } -/* Read a segment file and return its format version. */ +/* Read a segment file and return its format version. Validates the header if + * possible.*/ static int uvReadSegmentFile(struct uv *uv, const char *filename, struct raft_buffer *buf, @@ -175,7 +176,13 @@ static int uvReadSegmentFile(struct uv *uv, RaftHeapFree(buf->base); return RAFT_IOERR; } - *format = byteFlip64(*(uint64_t *)buf->base); + + rv = uvDecodeSegmentHeaderValidate(buf->base, buf->len, format, uv->io->errmsg); + if (rv != 0) { + RaftHeapFree(buf->base); + return rv; + } + return 0; } @@ -395,18 +402,22 @@ int uvSegmentLoadClosed(struct uv *uv, if (rv != 0) { goto err; } - if (format != UV__DISK_FORMAT) { - ErrMsgPrintf(uv->io->errmsg, "unexpected format version %ju", format); + + /* Closed segment can't have all 0's content. */ + if (format == 0) { rv = RAFT_CORRUPT; goto err_after_read; } + /* Move beyond the segment header to the data. */ + offset = uvSizeofSegmentHeader(format); + assert(offset != 0); + /* Load all batches in the segment. */ *entries = NULL; *n = 0; last = false; - offset = sizeof format; for (i = 1; !last; i++) { rv = uvLoadEntriesBatch(uv, &buf, &tmp_entries, &tmp_n, &offset, &last); if (rv != 0) { @@ -468,7 +479,73 @@ static bool uvContentHasOnlyTrailingZeros(const struct raft_buffer *buf, return true; } -/* Load all entries contained in an open segment. */ +/* Peek at the first index of an open segment. + * Returns 0 if first_index is filled with a sensible value. */ +static int uvSegmentPeekIndex(struct uv *uv, + struct uvSegmentInfo *info, + raft_index *first_index) +{ + struct raft_buffer buf = {0}; + uint64_t format = 0; + int rv = -1; + + rv = uvReadSegmentFile(uv, info->filename, &buf, &format); + if (rv != 0) { + return rv; + } + + if (format == UV__SEGMENT_DISK_FORMAT_2) { + /* uvReadSegmentFile ensures a minimum buf.len */ + *first_index = byteFlip64(*(((uint64_t *)buf.base)+1)); + if (*first_index == 0) { + return RAFT_CORRUPT; + } + rv = 0; + } + + RaftHeapFree(buf.base); + return rv; +} + +raft_index uvSegmentStartIndex(struct uv *uv, + struct uvSegmentInfo *infos, + raft_index snapshot_index) +{ + int rv; + raft_index start_index = 1; //default + + /* No segments, next segment index comes after snapshot. */ + if (infos == NULL) { + start_index = snapshot_index + 1; + return start_index; + } + + /* There are no closed segments. */ + if (infos[0].is_open) { + raft_index first_index = 1; + rv = uvSegmentPeekIndex(uv, &infos[0], &first_index); + /* Peek index succes, this means an open segment with an encoded first_index */ + if (rv == 0) { + start_index = first_index; + /* Peek index failed, possibly a legacy open segment. In legacy systems + * all open segments are newer than the present snapshot. */ + } else { + start_index = snapshot_index + 1; + } + /* The are closed segments. */ + } else { + /* No open segments and a snapshot */ + if (snapshot_index != 0) { + start_index = infos[0].first_index; + } + /* No snapshot, expect that a segment containing a log entry with index + * 1 exists. The start_index remains 1. */ + } + + return start_index; +} + +/* Load all entries contained in an open segment and close it. */ static int uvSegmentLoadOpen(struct uv *uv, struct uvSegmentInfo *info, struct raft_entry *entries[], @@ -490,8 +567,6 @@ static int uvSegmentLoadOpen(struct uv *uv, char errmsg[RAFT_ERRMSG_BUF_SIZE]; int rv; - first_index = *next_index; - rv = UvFsFileIsEmpty(uv->dir, info->filename, &empty, errmsg); if (rv != 0) { tracef("check if %s is empty: %s", info->filename, errmsg); @@ -511,22 +586,40 @@ static int uvSegmentLoadOpen(struct uv *uv, goto err; } - /* Check that the format is the expected one, or perhaps 0, indicating that + /* Check that the format is one of the expected ones, or perhaps 0, indicating that * the segment was allocated but never written. */ - offset = sizeof format; - if (format != UV__DISK_FORMAT) { - if (format == 0) { - all_zeros = uvContentHasOnlyTrailingZeros(&buf, offset); - if (all_zeros) { - /* This is equivalent to the empty case, let's remove the - * segment. */ - tracef("remove zeroed open segment %s", info->filename); - remove = true; - RaftHeapFree(buf.base); - buf.base = NULL; - goto done; - } + offset = 0; + switch (format) { + case 0: + offset = sizeof format; + all_zeros = uvContentHasOnlyTrailingZeros(&buf, offset); + if (all_zeros) { + // This is equivalent to the empty case, let's remove the segment. + tracef("remove zeroed open segment %s", info->filename); + remove = true; + RaftHeapFree(buf.base); + buf.base = NULL; + goto done; + } + rv = RAFT_CORRUPT; + ErrMsgPrintf(uv->io->errmsg, "unexpected format version %ju", format); + goto err_after_read; + case UV__SEGMENT_DISK_FORMAT_LEGACY: + offset = sizeof format; + first_index = *next_index; + break; + case UV__SEGMENT_DISK_FORMAT_2: + first_index = byteFlip64(*(((uint64_t *)buf.base)+1)); + offset = uvSizeofSegmentHeader(format); + if (first_index != *next_index) { + ErrMsgPrintf(uv->io->errmsg, "unexpected first_index:%llu expected:%llu", + first_index, *next_index); + rv = RAFT_INVALID; // This is a logical error, don't try to autorecover. + goto err_after_read; } + break; + default: + assert(false); // uvReadSegmentFile should have caught this. ErrMsgPrintf(uv->io->errmsg, "unexpected format version %ju", format); rv = RAFT_CORRUPT; goto err_after_read; @@ -589,42 +682,40 @@ static int uvSegmentLoadOpen(struct uv *uv, rv = RAFT_IOERR; goto err_after_read; } - } else { - char filename[UV__SEGMENT_FILENAME_BUF_SIZE]; - raft_index end_index = *next_index - 1; - - /* At least one entry was loaded */ - assert(end_index >= first_index); - int nb = snprintf(filename, sizeof(filename), UV__CLOSED_TEMPLATE, - first_index, end_index); - if ((nb < 0) || ((size_t)nb >= sizeof(filename))) { - tracef("snprintf failed: %d", nb); - rv = RAFT_IOERR; - goto err; - } + return 0; + } - tracef("finalize %s into %s", info->filename, filename); + /* Close the open segment. */ + char filename[UV__SEGMENT_FILENAME_BUF_SIZE]; + raft_index end_index = *next_index - 1; - rv = UvFsTruncateAndRenameFile(uv->dir, (size_t)offset, info->filename, - filename, errmsg); - if (rv != 0) { - tracef("finalize %s: %s", info->filename, errmsg); - rv = RAFT_IOERR; - goto err; - } + /* At least one entry was loaded */ + assert(end_index >= first_index); + int nb = snprintf(filename, sizeof(filename), UV__CLOSED_TEMPLATE, first_index, end_index); + if ((nb < 0) || ((size_t)nb >= sizeof(filename))) { + tracef("snprintf failed: %d", nb); + rv = RAFT_IOERR; + goto err; + } - info->is_open = false; - info->first_index = first_index; - info->end_index = end_index; - memset(info->filename, '\0', sizeof(info->filename)); - _Static_assert(sizeof(info->filename) >= sizeof(filename), - "Destination buffer too small"); - /* info->filename is zeroed out, info->filename is at least as large as - * filename and we checked that nb < sizeof(filename) -> we won't - * overflow and the result will be zero terminated. */ - memcpy(info->filename, filename, (size_t)nb); + tracef("finalize %s into %s", info->filename, filename); + rv = UvFsTruncateAndRenameFile(uv->dir, (size_t)offset, info->filename, + filename, errmsg); + if (rv != 0) { + tracef("finalize %s: %s", info->filename, errmsg); + rv = RAFT_IOERR; + goto err; } + info->is_open = false; + info->first_index = first_index; + info->end_index = end_index; + memset(info->filename, '\0', sizeof(info->filename)); + _Static_assert(sizeof(info->filename) >= sizeof(filename), "Destination buffer too small"); + /* info->filename is zeroed out, info->filename is at least as large as + * filename and we checked that nb < sizeof(filename) -> we won't + * overflow and the result will be zero terminated. */ + memcpy(info->filename, filename, (size_t)nb); return 0; err_after_batch_load: @@ -638,7 +729,6 @@ static int uvSegmentLoadOpen(struct uv *uv, err: assert(rv != 0); - return rv; } @@ -696,20 +786,21 @@ void uvSegmentBufferClose(struct uvSegmentBuffer *b) } } -int uvSegmentBufferFormat(struct uvSegmentBuffer *b) +int uvSegmentBufferFormat(struct uvSegmentBuffer *b, raft_index first_index) { int rv; - void *cursor; size_t n; assert(b->n == 0); - n = sizeof(uint64_t); + n = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2); + if (n == 0) { + return -1; + } rv = uvEnsureSegmentBufferIsLargeEnough(b, n); if (rv != 0) { return rv; } b->n = n; - cursor = b->arena.base; - bytePut64(&cursor, UV__DISK_FORMAT); + uvEncodeSegmentHeaderFormat2(first_index, b->arena.base); return 0; } @@ -867,19 +958,122 @@ static void uvRecoverFromCorruptSegment(struct uv *uv, } } +/* Filter the given segment list to find the most recent contiguous chunk of + * closed segments that overlaps with the given snapshot last index. */ +int uvSegmentFilter(struct uv *uv, + raft_index snapshot_index, + const char *snapshot_filename, + struct uvSegmentInfo **segments, + size_t *n) +{ + struct uvSegmentInfo *segment; + size_t i; /* First valid closed segment. */ + size_t j; /* Last valid closed segment. */ + + /* If there are not segments at all, or only open segments, or no snapshot, + * there's nothing to do. */ + if (*segments == NULL || (*segments)[0].is_open || snapshot_index == 0) { + return 0; + } + + /* Find the index of the most recent closed segment. */ + for (j = 0; j < *n; j++) { + segment = &(*segments)[j]; + if (segment->is_open) { + break; + } + } + assert(j > 0); + j--; + + segment = &(*segments)[j]; + tracef("most recent closed segment is %s", segment->filename); + + /* If the end index of the last closed segment is lower than the last + * snapshot index, there might be no entry that we can keep. We return an + * empty segment list, unless there is at least one open segment, in that + * case we keep everything hoping that they contain all the entries since + * the last closed segment */ + if (segment->end_index < snapshot_index) { + if (!(*segments)[*n - 1].is_open) { + tracef( + "discarding all closed segments, since most recent is behind " + "last snapshot"); + raft_free(*segments); + *segments = NULL; + *n = 0; + return 0; + } + tracef( + "most recent closed segment %s is behind last snapshot, " + "yet there are open segments", + segment->filename); + } + + /* Now scan the segments backwards, searching for the longest list of + * contiguous closed segments. */ + if (j >= 1) { + for (i = j; i > 0; i--) { + struct uvSegmentInfo *newer; + struct uvSegmentInfo *older; + newer = &(*segments)[i]; + older = &(*segments)[i - 1]; + if (older->end_index != newer->first_index - 1) { + tracef("discarding non contiguous segment %s", older->filename); + break; + } + } + } else { + i = j; + } + + /* Make sure that the first index of the first valid closed segment is not + * greater than the snapshot's last index plus one (so there are no + * missing entries). */ + segment = &(*segments)[i]; + if (segment->first_index > snapshot_index + 1) { + ErrMsgPrintf(uv->io->errmsg, + "closed segment %s is past last snapshot %s", + segment->filename, snapshot_filename); + return RAFT_CORRUPT; + } + + if (i != 0) { + size_t new_n = *n - i; + struct uvSegmentInfo *new_segments; + new_segments = raft_malloc(new_n * sizeof *new_segments); + if (new_segments == NULL) { + return RAFT_NOMEM; + } + memcpy(new_segments, &(*segments)[i], new_n * sizeof *new_segments); + raft_free(*segments); + *segments = new_segments; + *n = new_n; + } + + return 0; +} + int uvSegmentLoadAll(struct uv *uv, const raft_index start_index, + const raft_index snapshot_index, struct uvSegmentInfo *infos, size_t n_infos, struct raft_entry **entries, size_t *n_entries) { raft_index next_index; /* Next entry to load from disk */ + raft_index last_index; /* Index of last log entry loaded from disk */ struct raft_entry *tmp_entries; /* Entries in current segment */ size_t tmp_n; /* Number of entries in current segment */ size_t i; int rv; + /* Nothing to load */ + if (infos == NULL) { + return 0; + } + assert(start_index >= 1); assert(n_infos > 0); @@ -887,12 +1081,10 @@ int uvSegmentLoadAll(struct uv *uv, *n_entries = 0; next_index = start_index; - for (i = 0; i < n_infos; i++) { struct uvSegmentInfo *info = &infos[i]; tracef("load segment %s", info->filename); - if (info->is_open) { rv = uvSegmentLoadOpen(uv, info, entries, n_entries, &next_index); ErrMsgWrapf(uv->io->errmsg, "load open segment %s", info->filename); @@ -939,6 +1131,20 @@ int uvSegmentLoadAll(struct uv *uv, } } + /* Check if all entries that we loaded are actually behind the last + * snapshot. This can happen if the last closed segment was behind the + * last snapshot and there were open segments, but the entries in the + * open segments turned out to be behind the snapshot as well. */ + last_index = start_index + *n_entries - 1; + if (last_index < snapshot_index) { + ErrMsgPrintf(uv->io->errmsg, + "last entry on disk has index %llu, which is behind " + "last snapshot's index %llu", + last_index, snapshot_index); + rv = RAFT_CORRUPT; + goto err; + } + return 0; err: @@ -989,6 +1195,7 @@ static int uvWriteClosedSegment(struct uv *uv, * block */ cap = uv->block_size - (sizeof(uint64_t) /* Format version */ + + sizeof(uint64_t) /* First index */ + sizeof(uint64_t) /* Checksums */ + uvSizeofBatchHeader(1)); if (conf->len > cap) { return RAFT_TOOBIG; @@ -996,7 +1203,7 @@ static int uvWriteClosedSegment(struct uv *uv, uvSegmentBufferInit(&buf, uv->block_size); - rv = uvSegmentBufferFormat(&buf); + rv = uvSegmentBufferFormat(&buf, first_index); if (rv != 0) { return rv; } @@ -1100,7 +1307,7 @@ int uvSegmentTruncate(struct uv *uv, uvSegmentBufferInit(&buf, uv->block_size); - rv = uvSegmentBufferFormat(&buf); + rv = uvSegmentBufferFormat(&buf, segment->first_index); if (rv != 0) { goto out_after_buffer_init; } @@ -1134,4 +1341,86 @@ int uvSegmentTruncate(struct uv *uv, return rv; } +int UvSegmentConvertDirToFormat(struct uv *uv, int target_format, + char errmsg[RAFT_ERRMSG_BUF_SIZE]) +{ + int rv; + struct uvSnapshotInfo *snapshots; + struct uvSegmentInfo *segments; + size_t n_snapshots; + size_t n_segments; + uint64_t format; + bool empty; + struct raft_buffer buf = {0}; + uint8_t format_buf[8] = {1,0,0,0,0,0,0,0}; + struct raft_buffer fmt_buf = {format_buf, 8}; + + if (uv == NULL) { + ErrMsgPrintf(errmsg, "uv is NULL"); + return -1; + } + if (target_format != UV__SEGMENT_DISK_FORMAT_LEGACY) { + ErrMsgPrintf(errmsg, "unknown target format %d", target_format); + return -1; + } + rv = UvList(uv, &snapshots, &n_snapshots, &segments, &n_segments, errmsg); + if (rv != 0) { + ErrMsgPrintf(errmsg, "failed to list segments %d", rv); + return -1; + } + // Not interested in snapshots. + if (snapshots != NULL) { + RaftHeapFree(snapshots); + } + // Nothing to do. + if (segments == NULL) { + ErrMsgPrintf(errmsg, "No segments detected"); + return 0; + } + // Convert segment to desired format and replace original. + for (size_t i = 0; i < n_segments; i++) { + struct uvSegmentInfo *info = &segments[i]; + printf("segment %s: start conversion ...\n", info->filename); + rv = UvFsFileIsEmpty(uv->dir, info->filename, &empty, errmsg); + if (rv != 0) { + ErrMsgPrintf(errmsg, "failed to read %s", info->filename); + goto out; + } + if (empty) { + printf("segment %s: empty\n", info->filename); + continue; + } + rv = uvReadSegmentFile(uv, info->filename, &buf, &format); + if (rv != 0) { + ErrMsgPrintf(errmsg, "failed to read %s", info->filename); + goto out; + } + if (format == UV__SEGMENT_DISK_FORMAT_LEGACY) { + printf("segment %s: nothing to do\n", info->filename); + RaftHeapFree(buf.base); + continue; + } + rv = UvFsRemoveFile(uv->dir, info->filename, errmsg); + if (rv != 0) { + RaftHeapFree(buf.base); + ErrMsgPrintf(errmsg, "failed to remove %s", info->filename); + goto out; + } + // Write the legacy header and data + size_t offset = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2); + struct raft_buffer bufs[2] = {fmt_buf, {(char*)buf.base + offset, buf.len - offset}}; + rv = UvFsMakeFile(uv->dir, info->filename, bufs, 2, errmsg); + RaftHeapFree(buf.base); + if (rv != 0) { + ErrMsgPrintf(errmsg, "failed to create %s", info->filename); + goto out; + } + printf("segment %s: conversion done\n", info->filename); + } + +out: + RaftHeapFree(segments); + return rv; +} + #undef tracef diff --git a/test/integration/test_uv_load.c b/test/integration/test_uv_load.c index 3e7b53f0a..3f1115e43 100644 --- a/test/integration/test_uv_load.c +++ b/test/integration/test_uv_load.c @@ -2,6 +2,7 @@ #include "../../src/byte.h" #include "../../src/uv.h" +#include "../../src/uv_encoding.h" #include "../lib/runner.h" #include "../lib/uv.h" @@ -50,6 +51,18 @@ struct snapshot uint64_t data; }; +static char *bools[] = { + "0", + "1", + NULL, +}; + +#define SEGMENTS_CONVERT "segments-convert" +static MunitParameterEnum convertDirParams[] = { + {SEGMENTS_CONVERT, bools}, + {NULL, NULL}, +}; + #define WORD_SIZE 8 /* Maximum number of blocks a segment can have */ @@ -333,7 +346,9 @@ struct snapshot /* Initialize the raft_io instance, then invoke raft_io->load() and assert that * it returns the given state. If non-NULL, SNAPSHOT points to a struct snapshot * object whose attributes must match the loaded snapshot. ENTRIES_DATA is - * supposed to be the integer stored in the data of first loaded entry. */ + * supposed to be the integer stored in the data of first loaded entry. Converts + * the segments in the directory to legacy format if SEGMENTS_CONVERT parameter + * is non-0. */ #define LOAD(TERM, VOTED_FOR, SNAPSHOT, START_INDEX, ENTRIES_DATA, N_ENTRIES) \ do { \ LOAD_VARS; \ @@ -341,6 +356,15 @@ struct snapshot uint64_t _data = ENTRIES_DATA; \ unsigned _i; \ SETUP_UV; \ + const char *_convert = munit_parameters_get(params, SEGMENTS_CONVERT);\ + if (_convert != NULL && strtoul(_convert, NULL, 0)) { \ + int _rv2; \ + struct uv *_uv = f->io.impl; \ + char _errmsg[RAFT_ERRMSG_BUF_SIZE]; \ + _rv2 = UvSegmentConvertDirToFormat(_uv, \ + UV__SEGMENT_DISK_FORMAT_LEGACY, _errmsg); \ + munit_assert_int(_rv2, ==, 0); \ + } \ _LOAD(TERM, VOTED_FOR, SNAPSHOT, START_INDEX, N_ENTRIES) \ } while (0) @@ -479,7 +503,7 @@ TEST(load, removeUnusableFiles, setUp, tearDown, 0, unusableFilesParams) } /* The data directory has an empty open segment. */ -TEST(load, emptyOpenSegment, setUp, tearDown, 0, NULL) +TEST(load, emptyOpenSegment, setUp, tearDown, 0, convertDirParams) { struct fixture *f = data; DirWriteFile(f->dir, "open-1", NULL, 0); @@ -496,7 +520,7 @@ TEST(load, emptyOpenSegment, setUp, tearDown, 0, NULL) } /* The data directory has a freshly allocated open segment filled with zeros. */ -TEST(load, openSegmentWithTrailingZeros, setUp, tearDown, 0, NULL) +TEST(load, openSegmentWithTrailingZeros, setUp, tearDown, 0, convertDirParams) { struct fixture *f = data; DirWriteFileWithZeros(f->dir, "open-1", 256); @@ -513,7 +537,7 @@ TEST(load, openSegmentWithTrailingZeros, setUp, tearDown, 0, NULL) } /* The data directory has a valid closed and open segments. */ -TEST(load, bothOpenAndClosedSegments, setUp, tearDown, 0, NULL) +TEST(load, bothOpenAndClosedSegments, setUp, tearDown, 0, convertDirParams) { struct fixture *f = data; APPEND(2, 1); @@ -532,13 +556,13 @@ TEST(load, bothOpenAndClosedSegments, setUp, tearDown, 0, NULL) /* The data directory has an allocated open segment which contains non-zero * corrupted data in its second batch. */ -TEST(load, openSegmentWithNonZeroData, setUp, tearDown, 0, NULL) +TEST(load, openSegmentWithNonZeroData, setUp, tearDown, 0, convertDirParams) { struct fixture *f = data; uint64_t corrupt = 123456789; APPEND(2, 1); UNFINALIZE(1, 2, 1); - DirOverwriteFile(f->dir, "open-1", &corrupt, sizeof corrupt, 60); + DirOverwriteFile(f->dir, "open-1", &corrupt, sizeof corrupt, 76); LOAD(0, /* term */ 0, /* voted for */ NULL, /* snapshot */ @@ -555,14 +579,14 @@ TEST(load, openSegmentWithNonZeroData, setUp, tearDown, 0, NULL) /* The data directory has an open segment with a partially written batch that * needs to be truncated. */ -TEST(load, openSegmentWithIncompleteBatch, setUp, tearDown, 0, NULL) +TEST(load, openSegmentWithIncompleteBatch, setUp, tearDown, 0, convertDirParams) { struct fixture *f = data; uint8_t zero[256]; APPEND(2, 1); UNFINALIZE(1, 2, 1); memset(zero, 0, sizeof zero); - DirOverwriteFile(f->dir, "open-1", &zero, sizeof zero, 62); + DirOverwriteFile(f->dir, "open-1", &zero, sizeof zero, 78); LOAD(0, /* term */ 0, /* voted for */ NULL, /* snapshot */ @@ -574,13 +598,17 @@ TEST(load, openSegmentWithIncompleteBatch, setUp, tearDown, 0, NULL) } /* The data directory has an open segment whose first batch is only - * partially written. In that case the segment gets removed. */ + * partially written, only the segment header has been written. + * In that case the segment gets removed. */ TEST(load, openSegmentWithIncompleteFirstBatch, setUp, tearDown, 0, NULL) { + uint32_t crc; struct fixture *f = data; - uint8_t buf[4 * WORD_SIZE] = { - 1, 0, 0, 0, 0, 0, 0, 0, /* Format version */ - 0, 0, 0, 0, 0, 0, 0, 0, /* CRC32 checksums */ + uint8_t buf[6 * WORD_SIZE] = { + 2, 0, 0, 0, 0, 0, 0, 0, /* Format version */ + 1, 0, 0, 0, 0, 0, 0, 0, /* First Index */ + 0, 0, 0, 0, 0, 0, 0, 0, /* Segment Header Checksum */ + 0, 0, 0, 0, 0, 0, 0, 0, /* Batch CRC32 checksums */ 0, 0, 0, 0, 0, 0, 0, 0, /* Number of entries */ 0, 0, 0, 0, 0, 0, 0, 0 /* Batch data */ }; @@ -589,6 +617,13 @@ TEST(load, openSegmentWithIncompleteFirstBatch, setUp, tearDown, 0, NULL) DirOverwriteFile(f->dir, "open-1", buf, sizeof buf, 0); + // Ensure the crc checks out. + uint64_t format = UV__SEGMENT_DISK_FORMAT_2; + uint64_t first_index = 1; + crc = byteCrc32(&format, sizeof(format), 0); + crc = byteCrc32(&first_index, sizeof(first_index), crc); + DirOverwriteFile(f->dir, "open-1", &crc, sizeof crc, 16); + LOAD(0, /* term */ 0, /* voted for */ NULL, /* snapshot */ @@ -600,6 +635,40 @@ TEST(load, openSegmentWithIncompleteFirstBatch, setUp, tearDown, 0, NULL) return MUNIT_OK; } +/* The data directory has an open segment whose first batch is only + * partially written, only the segment header has been written. + * In that case the segment gets removed. */ +TEST(load, openSegmentWithBadCrc, setUp, tearDown, 0, NULL) +{ + uint32_t crc; + struct fixture *f = data; + uint8_t buf[6 * WORD_SIZE] = { + 2, 0, 0, 0, 0, 0, 0, 0, /* Format version */ + 1, 0, 0, 0, 0, 0, 0, 0, /* First Index */ + 0, 0, 0, 0, 0, 0, 0, 0, /* Segment Header Checksum */ + 0, 0, 0, 0, 0, 0, 0, 0, /* Batch CRC32 checksums */ + 0, 0, 0, 0, 0, 0, 0, 0, /* Number of entries */ + 0, 0, 0, 0, 0, 0, 0, 0 /* Batch data */ + }; + APPEND(1, 1); + UNFINALIZE(1, 1, 1); + + DirOverwriteFile(f->dir, "open-1", buf, sizeof buf, 0); + + // Ensure the segment header crc doesn't check out. + uint64_t format = UV__SEGMENT_DISK_FORMAT_2; + uint64_t first_index = 1; + crc = byteCrc32(&format, sizeof(format), 0); + crc = byteCrc32(&first_index, sizeof(first_index), crc); + crc += 1; // Invalidate the crc. + DirOverwriteFile(f->dir, "open-1", &crc, sizeof crc, 16); + + LOAD_ERROR_NO_RECOVER(RAFT_CORRUPT, + "load open segment open-1: segment header crc mismatch"); + + return MUNIT_OK; +} + /* The data directory has two segments, with the second having an entry. */ TEST(load, twoOpenSegments, setUp, tearDown, 0, NULL) { @@ -653,8 +722,26 @@ TEST(load, secondOpenSegmentIsAllZeros, setUp, tearDown, 0, NULL) return MUNIT_OK; } -/* The data directory has two open segments, the first one has a corrupt header. - */ +/* The data directory has a closed segment containing all zeros */ +TEST(load, closedSegmentAllZeros, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + APPEND(1, 1); + DirWriteFileWithZeros(f->dir, CLOSED_SEGMENT_FILENAME(1,1), SEGMENT_SIZE); + + LOAD(0, /* term */ + 0, /* voted for */ + NULL, /* snapshot */ + 1, /* start index */ + 1, /* data for first loaded entry */ + 1 /* n entries */ + ); + + munit_assert_false(HAS_CLOSED_SEGMENT_FILE(1, 1)); + return MUNIT_OK; +} + +/* The data directory has two open segments, the first one has a corrupt header. */ TEST(load, twoOpenSegmentsFirstCorrupt, setUp, tearDown, 0, NULL) { struct fixture *f = data; @@ -682,8 +769,31 @@ TEST(load, twoOpenSegmentsFirstCorrupt, setUp, tearDown, 0, NULL) return MUNIT_OK; } -/* The data directory has two open segments, the first one has a corrupt header. - */ +/* The data directory has two open segments, with the second one having an + * unexpected first index. */ +TEST(load, secondOpenSegmentUnexpectedFirstIndex, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + APPEND(1, 1); + APPEND(1, 2); + UNFINALIZE(1, 1, 1); + UNFINALIZE(2, 2, 2); + uint32_t crc; + + /* Write the new first index and ensure the crc checks out. */ + uint64_t format = UV__SEGMENT_DISK_FORMAT_2; + uint64_t first_index = 3; + crc = byteCrc32(&format, sizeof(format), 0); + crc = byteCrc32(&first_index, sizeof(first_index), crc); + DirOverwriteFile(f->dir, "open-2", &first_index, sizeof first_index, 8); + DirOverwriteFile(f->dir, "open-2", &crc, sizeof crc, 16); + + LOAD_ERROR_NO_RECOVER(RAFT_INVALID, + "load open segment open-2: unexpected first_index:3 expected:2"); + return MUNIT_OK; +} + +/* The data directory has two open segments, the first one has a corrupt header. */ TEST(load, twoOpenSegmentsFirstCorruptNoRecovery, setUp, tearDown, 0, NULL) { struct fixture *f = data; @@ -703,6 +813,30 @@ TEST(load, twoOpenSegmentsFirstCorruptNoRecovery, setUp, tearDown, 0, NULL) return MUNIT_OK; } +/* The data directory has a closed and an open segment, with the open segment + * having an unexpected first index. */ +TEST(load, oneCloseOneOpenSegmentUnexpectedFirstIndex, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + uint32_t crc; + APPEND(1, 1); + APPEND(1, 2); + UNFINALIZE(2, 2, 1); + + /* Write the new first index and ensure the crc checks out. */ + uint64_t format = UV__SEGMENT_DISK_FORMAT_2; + uint64_t first_index = 3; + crc = byteCrc32(&format, sizeof(format), 0); + crc = byteCrc32(&first_index, sizeof(first_index), crc); + DirOverwriteFile(f->dir, "open-1", &first_index, sizeof first_index, 8); + DirOverwriteFile(f->dir, "open-1", &crc, sizeof crc, 16); + + LOAD_ERROR_NO_RECOVER(RAFT_INVALID, + "load open segment open-1: unexpected first_index:3 expected:2"); + return MUNIT_OK; +} + + /* The data directory has a valid open segment. */ TEST(load, openSegment, setUp, tearDown, 0, NULL) { @@ -1388,15 +1522,15 @@ TEST(load, APPEND(2, 8); /* Corrupt the last closed segment */ - size_t offset = - WORD_SIZE /* Format version */ + WORD_SIZE / 2 /* Header checksum */; + size_t offset = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2) + + WORD_SIZE / 2 /* Batch Header checksum */; uint32_t corrupted = 123456789; DirOverwriteFile(f->dir, CLOSED_SEGMENT_FILENAME(8, 9), &corrupted, sizeof corrupted, offset); LOAD_ERROR_NO_RECOVER( RAFT_CORRUPT, "load closed segment 0000000000000008-0000000000000009: entries " - "batch 1 starting at byte 8: data checksum mismatch"); + "batch 1 starting at byte 24: data checksum mismatch"); return MUNIT_OK; } @@ -1421,8 +1555,8 @@ TEST(load, APPEND(2, 8); /* Corrupt the last closed segment */ - size_t offset = - WORD_SIZE /* Format version */ + WORD_SIZE / 2 /* Header checksum */; + size_t offset = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2) + + WORD_SIZE / 2 /* Header checksum */; uint32_t corrupted = 123456789; DirOverwriteFile(f->dir, CLOSED_SEGMENT_FILENAME(8, 9), &corrupted, sizeof corrupted, offset); @@ -1459,8 +1593,8 @@ TEST(load, UNFINALIZE(9, 9, 1); /* Corrupt the last closed segment */ - size_t offset = - WORD_SIZE /* Format version */ + WORD_SIZE / 2 /* Header checksum */; + size_t offset = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2) + + WORD_SIZE / 2 /* Header checksum */; uint32_t corrupted = 123456789; DirOverwriteFile(f->dir, CLOSED_SEGMENT_FILENAME(8, 8), &corrupted, sizeof corrupted, offset); @@ -1495,8 +1629,8 @@ TEST(load, UNFINALIZE(9, 9, 1); /* Corrupt the last closed segment */ - size_t offset = - WORD_SIZE /* Format version */ + WORD_SIZE / 2 /* Header checksum */; + size_t offset = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2) + + WORD_SIZE / 2 /* Batch Header checksum */; uint32_t corrupted = 123456789; DirOverwriteFile(f->dir, CLOSED_SEGMENT_FILENAME(8, 8), &corrupted, sizeof corrupted, offset); @@ -1504,7 +1638,7 @@ TEST(load, LOAD_ERROR_NO_RECOVER( RAFT_CORRUPT, "load closed segment 0000000000000008-0000000000000008: entries " - "batch 1 starting at byte 8: data checksum mismatch"); + "batch 1 starting at byte 24: data checksum mismatch"); return MUNIT_OK; } @@ -1523,21 +1657,21 @@ TEST(load, APPEND(2, 6); APPEND(2, 8); - /* Corrupt the second last closed segment */ - size_t offset = - WORD_SIZE /* Format version */ + WORD_SIZE / 2 /* Header checksum */; + /* Corrupt the last closed segment */ + size_t offset = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2) + + WORD_SIZE / 2 /* Header checksum */; uint32_t corrupted = 123456789; DirOverwriteFile(f->dir, CLOSED_SEGMENT_FILENAME(6, 7), &corrupted, sizeof corrupted, offset); LOAD_ERROR(RAFT_CORRUPT, "load closed segment 0000000000000006-0000000000000007: entries " - "batch 1 starting at byte 8: data checksum mismatch"); + "batch 1 starting at byte 24: data checksum mismatch"); /* Second load still fails. */ LOAD_ERROR_NO_SETUP( RAFT_CORRUPT, "load closed segment 0000000000000006-0000000000000007: entries " - "batch 1 starting at byte 8: data checksum mismatch"); + "batch 1 starting at byte 24: data checksum mismatch"); return MUNIT_OK; } @@ -1597,17 +1731,32 @@ TEST(load, openSegmentWithIncompleteFormat, setUp, tearDown, 0, NULL) return MUNIT_OK; } +/* The data directory has an open segment which has incomplete first index data. */ +TEST(load, openSegmentWithIncompleteFirstIndex, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + size_t offset = WORD_SIZE + /* Format version */ + WORD_SIZE - 1; /* Short First Index */ + + APPEND(1, 1); + UNFINALIZE(1, 1, 1); + DirTruncateFile(f->dir, "open-1", offset); + LOAD_ERROR(RAFT_IOERR, "load open segment open-1: file has only 15 bytes"); + return MUNIT_OK; +} + /* The data directory has an open segment which has an incomplete batch * preamble. */ TEST(load, openSegmentWithIncompletePreamble, setUp, tearDown, 0, NULL) { struct fixture *f = data; - size_t offset = WORD_SIZE /* Format version */ + WORD_SIZE /* Checksums */; APPEND(1, 1); UNFINALIZE(1, 1, 1); + size_t offset = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2) + + WORD_SIZE /* Batch Checksums */; DirTruncateFile(f->dir, "open-1", offset); LOAD_ERROR(RAFT_IOERR, - "load open segment open-1: entries batch 1 starting at byte 16: " + "load open segment open-1: entries batch 1 starting at byte 32: " "read preamble: short read: 0 bytes instead of 8"); return MUNIT_OK; } @@ -1616,8 +1765,8 @@ TEST(load, openSegmentWithIncompletePreamble, setUp, tearDown, 0, NULL) TEST(load, openSegmentWithIncompleteBatchHeader, setUp, tearDown, 0, NULL) { struct fixture *f = data; - size_t offset = WORD_SIZE + /* Format version */ - WORD_SIZE + /* Checksums */ + size_t offset = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2) + + WORD_SIZE + /* Batch Checksums */ WORD_SIZE + /* Number of entries */ WORD_SIZE /* Partial batch header */; @@ -1625,7 +1774,7 @@ TEST(load, openSegmentWithIncompleteBatchHeader, setUp, tearDown, 0, NULL) UNFINALIZE(1, 1, 1); DirTruncateFile(f->dir, "open-1", offset); LOAD_ERROR(RAFT_IOERR, - "load open segment open-1: entries batch 1 starting at byte 8: " + "load open segment open-1: entries batch 1 starting at byte 24: " "read header: short read: 8 bytes instead of 16"); return MUNIT_OK; } @@ -1634,7 +1783,7 @@ TEST(load, openSegmentWithIncompleteBatchHeader, setUp, tearDown, 0, NULL) TEST(load, openSegmentWithIncompleteBatchData, setUp, tearDown, 0, NULL) { struct fixture *f = data; - size_t offset = WORD_SIZE + /* Format version */ + size_t offset = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2) + WORD_SIZE + /* Checksums */ WORD_SIZE + /* Number of entries */ WORD_SIZE + /* Entry term */ @@ -1645,7 +1794,7 @@ TEST(load, openSegmentWithIncompleteBatchData, setUp, tearDown, 0, NULL) UNFINALIZE(1, 1, 1); DirTruncateFile(f->dir, "open-1", offset); LOAD_ERROR(RAFT_IOERR, - "load open segment open-1: entries batch 1 starting at byte 8: " + "load open segment open-1: entries batch 1 starting at byte 24: " "read data: short read: 4 bytes instead of 8"); return MUNIT_OK; } @@ -1654,7 +1803,7 @@ TEST(load, openSegmentWithIncompleteBatchData, setUp, tearDown, 0, NULL) TEST(load, closedSegmentWithCorruptedBatchHeader, setUp, tearDown, 0, NULL) { struct fixture *f = data; - size_t offset = WORD_SIZE /* Format version */; + size_t offset = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2); uint64_t corrupted = 12345678; APPEND(1, 1); DirOverwriteFile(f->dir, CLOSED_SEGMENT_FILENAME(1, 1), &corrupted, @@ -1662,7 +1811,7 @@ TEST(load, closedSegmentWithCorruptedBatchHeader, setUp, tearDown, 0, NULL) LOAD_ERROR_NO_RECOVER( RAFT_CORRUPT, "load closed segment 0000000000000001-0000000000000001: entries " - "batch 1 starting at byte 8: header checksum mismatch"); + "batch 1 starting at byte 24: header checksum mismatch"); return MUNIT_OK; } @@ -1670,8 +1819,8 @@ TEST(load, closedSegmentWithCorruptedBatchHeader, setUp, tearDown, 0, NULL) TEST(load, closedSegmentWithCorruptedBatchData, setUp, tearDown, 0, NULL) { struct fixture *f = data; - size_t offset = - WORD_SIZE /* Format version */ + WORD_SIZE / 2 /* Header checksum */; + size_t offset = uvSizeofSegmentHeader(UV__SEGMENT_DISK_FORMAT_2) + + WORD_SIZE / 2 /* Header checksum */; uint32_t corrupted = 123456789; APPEND(1, 1); DirOverwriteFile(f->dir, CLOSED_SEGMENT_FILENAME(1, 1), &corrupted, @@ -1679,7 +1828,7 @@ TEST(load, closedSegmentWithCorruptedBatchData, setUp, tearDown, 0, NULL) LOAD_ERROR_NO_RECOVER( RAFT_CORRUPT, "load closed segment 0000000000000001-0000000000000001: entries " - "batch 1 starting at byte 8: data checksum mismatch"); + "batch 1 starting at byte 24: data checksum mismatch"); return MUNIT_OK; } @@ -1713,12 +1862,12 @@ TEST(load, emptyClosedSegment, setUp, tearDown, 0, NULL) TEST(load, closedSegmentWithBadFormat, setUp, tearDown, 0, NULL) { struct fixture *f = data; - uint8_t buf[8] = {2, 0, 0, 0, 0, 0, 0, 0}; + uint8_t buf[8] = {255, 0, 0, 0, 0, 0, 0, 0}; DirWriteFile(f->dir, CLOSED_SEGMENT_FILENAME(1, 1), buf, sizeof buf); LOAD_ERROR_NO_RECOVER( RAFT_CORRUPT, "load closed segment 0000000000000001-0000000000000001: " - "unexpected format version 2"); + "unexpected format version 255"); return MUNIT_OK; } @@ -1760,11 +1909,209 @@ TEST(load, openSegmentWithZeroFormatAndThenData, setUp, tearDown, 0, NULL) TEST(load, openSegmentWithBadFormat, setUp, tearDown, 0, NULL) { struct fixture *f = data; - uint8_t version[8] = {2, 0, 0, 0, 0, 0, 0, 0}; + uint8_t version[8] = {255, 0, 0, 0, 0, 0, 0, 0}; APPEND(1, 1); UNFINALIZE(1, 1, 1); DirOverwriteFile(f->dir, "open-1", version, sizeof version, 0); LOAD_ERROR_NO_RECOVER( - RAFT_CORRUPT, "load open segment open-1: unexpected format version 2"); + RAFT_CORRUPT, "load open segment open-1: unexpected format version 255"); + return MUNIT_OK; +} + +/***************************************************************************** + * * + * LEGACY SEGMENTS * + * * + *****************************************************************************/ + +SUITE(loadLegacy) + +/* The data directory has an open segment whose first batch is only + * partially written. In that case the segment gets removed. */ +TEST(loadLegacy, openSegmentWithIncompleteFirstBatch, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + uint8_t buf[4 * WORD_SIZE] = { + 1, 0, 0, 0, 0, 0, 0, 0, /* Format version */ + 0, 0, 0, 0, 0, 0, 0, 0, /* CRC32 checksums */ + 0, 0, 0, 0, 0, 0, 0, 0, /* Number of entries */ + 0, 0, 0, 0, 0, 0, 0, 0 /* Batch data */ + }; + APPEND(1, 1); + UNFINALIZE(1, 1, 1); + + DirOverwriteFile(f->dir, "open-1", buf, sizeof buf, 0); + + LOAD(0, /* term */ + 0, /* voted for */ + NULL, /* snapshot */ + 1, /* start index */ + 0, /* data for first loaded entry */ + 0 /* n entries */ + ); + + return MUNIT_OK; +} + +static int segmentBufferFormatLegacy(struct uvSegmentBuffer *b) +{ + int rv; + void *cursor; + rv = uvSegmentBufferFormat(b, 0); + munit_assert(rv == 0); + munit_assert(b->n == uvSizeofSegmentHeader(2)); + /* Rewind the arena and write legacy format. */ + cursor = b->arena.base; + bytePut64(&cursor, 1); // Write legacy format + b->n = sizeof(uint64_t); + return 0; +} + +static void createLegacySegmentWithEntries(struct fixture *f, + const char *filename, + const struct raft_entry entries[], + unsigned n_entries) +{ + struct uvSegmentBuffer b = {0}; + struct raft_buffer write = {0}; + char err[RAFT_ERRMSG_BUF_SIZE]; + + uvSegmentBufferInit(&b, SEGMENT_BLOCK_SIZE); + munit_assert(segmentBufferFormatLegacy(&b) == 0); + if (entries != NULL) { + munit_assert(uvSegmentBufferAppend(&b, entries, n_entries) == 0); + } + + write.base = b.arena.base; + write.len = b.n; + /* Write file to disk. */ + munit_assert(UvFsMakeFile(f->dir, filename, &write, 1, err) == 0); + munit_assert_true(DirHasFile(f->dir, filename)); + + uvSegmentBufferClose(&b); +} + +TEST(loadLegacy, openSingleEntry, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + uint64_t entry_data = 1; + + struct raft_entry entry = {0}; + entry.term = 1; + entry.type = RAFT_COMMAND; + entry.buf.base = &entry_data; + entry.buf.len = sizeof(entry_data); + entry.batch = NULL; + + createLegacySegmentWithEntries(f, "open-1", &entry, 1); + LOAD(0, /* term (NOT UPDATED IN THIS TEST) */ + 0, /* voted for */ + NULL, /* snapshot */ + 1, /* start index */ + 1, /* data for first loaded entry */ + 1 /* n entries */ + ); + + /* The open segment has been closed. */ + munit_assert_true(HAS_CLOSED_SEGMENT_FILE(1, 1)); + return MUNIT_OK; +} + +TEST(loadLegacy, multipleOpenSingleEntry, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + uint64_t entry_data = 1; + + struct raft_entry entry = {0}; + entry.term = 1; + entry.type = RAFT_COMMAND; + entry.buf.base = &entry_data; + entry.buf.len = sizeof(entry_data); + entry.batch = NULL; + + createLegacySegmentWithEntries(f, "open-1", &entry, 1); + entry_data = 2; + createLegacySegmentWithEntries(f, "open-2", &entry, 1); + entry_data = 3; + createLegacySegmentWithEntries(f, "open-3", &entry, 1); + LOAD(0, /* term (NOT UPDATED IN THIS TEST) */ + 0, /* voted for */ + NULL, /* snapshot */ + 1, /* start index */ + 1, /* data for first loaded entry */ + 3 /* n entries */ + ); + + /* The open segment has been closed. */ + munit_assert_true(HAS_CLOSED_SEGMENT_FILE(1, 1)); + munit_assert_true(HAS_CLOSED_SEGMENT_FILE(2, 2)); + munit_assert_true(HAS_CLOSED_SEGMENT_FILE(3, 3)); + return MUNIT_OK; +} + +TEST(loadLegacy, openMultipleEntries, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + uint64_t entry_data_1 = 1; + uint64_t entry_data_2 = 2; + + struct raft_entry entries[2] = {{0}, {0}}; + entries[0].term = 1; + entries[0].type = RAFT_COMMAND; + entries[0].buf.base = &entry_data_1; + entries[0].buf.len = sizeof(entry_data_1); + entries[0].batch = NULL; + + entries[1].term = 1; + entries[1].type = RAFT_COMMAND; + entries[1].buf.base = &entry_data_2; + entries[1].buf.len = sizeof(entry_data_2); + entries[1].batch = NULL; + + createLegacySegmentWithEntries(f, "open-1", entries, 2); + LOAD(0, /* term (NOT UPDATED IN THIS TEST) */ + 0, /* voted for */ + NULL, /* snapshot */ + 1, /* start index */ + 1, /* data for first loaded entry */ + 2 /* n entries */ + ); + + /* The open segment has been closed. */ + munit_assert_true(HAS_CLOSED_SEGMENT_FILE(1, 2)); + + return MUNIT_OK; +} + +TEST(loadLegacy, closedMultipleEntries, setUp, tearDown, 0, NULL) +{ + struct fixture *f = data; + uint64_t entry_data_1 = 1; + uint64_t entry_data_2 = 2; + + struct raft_entry entries[2] = {{0}, {0}}; + entries[0].term = 1; + entries[0].type = RAFT_COMMAND; + entries[0].buf.base = &entry_data_1; + entries[0].buf.len = sizeof(entry_data_1); + entries[0].batch = NULL; + + entries[1].term = 1; + entries[1].type = RAFT_COMMAND; + entries[1].buf.base = &entry_data_2; + entries[1].buf.len = sizeof(entry_data_2); + entries[1].batch = NULL; + + createLegacySegmentWithEntries(f, CLOSED_SEGMENT_FILENAME(1,2), entries, 2); + LOAD(0, /* term (NOT UPDATED IN THIS TEST) */ + 0, /* voted for */ + NULL, /* snapshot */ + 1, /* start index */ + 1, /* data for first loaded entry */ + 2 /* n entries */ + ); + + /* The open segment has been closed. */ + munit_assert_true(HAS_CLOSED_SEGMENT_FILE(1, 2)); return MUNIT_OK; }