Skip to content
This repository has been archived by the owner on Mar 4, 2024. It is now read-only.

Commit

Permalink
Add initial raft_recover implementation and test
Browse files Browse the repository at this point in the history
  • Loading branch information
freeekanayaka committed Oct 31, 2019
1 parent fa3a40d commit 5b49e37
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 9 deletions.
2 changes: 2 additions & 0 deletions Makefile.am
Expand Up @@ -79,6 +79,7 @@ test_integration_core_SOURCES = \
test/integration/test_bootstrap.c \
test/integration/test_election.c \
test/integration/test_membership.c \
test/integration/test_recover.c \
test/integration/test_replication.c \
test/integration/test_snapshot.c \
test/integration/test_strerror.c \
Expand Down Expand Up @@ -150,6 +151,7 @@ test_unit_uv_SOURCES += \
test/unit/test_uv_list.c \
test/unit/test_uv_load.c \
test/unit/test_uv_prepare.c \
test/unit/test_uv_recover.c \
test/unit/test_uv_append.c \
test/unit/test_uv_finalize.c \
test/unit/test_uv_snapshot.c \
Expand Down
20 changes: 15 additions & 5 deletions src/fixture.c
@@ -1,10 +1,10 @@
#include "../include/raft/fixture.h"

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "../include/raft/fixture.h"

#include "assert.h"
#include "configuration.h"
#include "entry.h"
Expand Down Expand Up @@ -54,9 +54,9 @@ static char *describeMessage(const struct raft_message *m)
}
return d;
}
# define tracef(MSG, ...) debugf(io->io, MSG, ##__VA_ARGS__)
#define tracef(MSG, ...) debugf(io->io, MSG, ##__VA_ARGS__)
#else
# define tracef(MSG, ...)
#define tracef(MSG, ...)
#endif

/* Maximum number of peer stub instances connected to a certain stub
Expand Down Expand Up @@ -555,6 +555,15 @@ static int ioMethodBootstrap(struct raft_io *raft_io,
return 0;
}

static int ioMethodRecover(struct raft_io *io,
const struct raft_configuration *conf)
{
/* TODO: implement this API */
(void)io;
(void)conf;
return RAFT_IOERR;
}

static int ioMethodSetTerm(struct raft_io *raft_io, const raft_term term)
{
struct io *io = raft_io->impl;
Expand Down Expand Up @@ -669,7 +678,7 @@ static int ioMethodTruncate(struct raft_io *raft_io, raft_index index)
}

static int ioMethodSnapshotPut(struct raft_io *raft_io,
unsigned trailing,
unsigned trailing,
struct raft_io_snapshot_put *req,
const struct raft_snapshot *snapshot,
raft_io_snapshot_put_cb cb)
Expand Down Expand Up @@ -906,6 +915,7 @@ static int ioInit(struct raft_io *raft_io, unsigned index, raft_time *time)
raft_io->close = ioMethodClose;
raft_io->load = ioMethodLoad;
raft_io->bootstrap = ioMethodBootstrap;
raft_io->recover = ioMethodRecover;
raft_io->set_term = ioMethodSetTerm;
raft_io->set_vote = ioMethodSetVote;
raft_io->append = ioMethodAppend;
Expand Down
16 changes: 16 additions & 0 deletions src/raft.c
Expand Up @@ -129,6 +129,22 @@ int raft_bootstrap(struct raft *r, const struct raft_configuration *conf)
return 0;
}

int raft_recover(struct raft *r, const struct raft_configuration *conf)
{
int rv;

if (r->state != RAFT_UNAVAILABLE) {
return RAFT_BUSY;
}

rv = r->io->recover(r->io, conf);
if (rv != 0) {
return rv;
}

return 0;
}

const char *raft_strerror(int errnum)
{
return errCodeToString(errnum);
Expand Down
38 changes: 38 additions & 0 deletions src/uv.c
Expand Up @@ -475,6 +475,43 @@ static int uvBootstrap(struct raft_io *io,
return 0;
}

/* Implementation of raft_io->recover. */
static int uvRecover(struct raft_io *io, const struct raft_configuration *conf)
{
struct uv *uv = io->impl;
struct raft_snapshot *snapshot;
raft_index start_index;
raft_index next_index;
struct raft_entry *entries;
size_t n_entries;
int rv;

/* Load the current state. This also closes any leftover open segment. */
rv = loadSnapshotAndEntries(uv, &snapshot, &start_index, &entries,
&n_entries);
if (rv != 0) {
return rv;
}

/* We don't care about the actual data, just index of the last entry. */
if (snapshot != NULL) {
snapshotDestroy(snapshot);
}
if (entries != NULL) {
entryBatchesDestroy(entries, n_entries);
}

assert(start_index > 0);
next_index = start_index + n_entries;

rv = uvSegmentCreateClosedWithConfiguration(uv, next_index, conf);
if (rv != 0) {
return rv;
}

return 0;
}

/* Implementation of raft_io->set_term. */
static int uvSetVote(struct raft_io *io, const unsigned server_id)
{
Expand Down Expand Up @@ -599,6 +636,7 @@ int raft_uv_init(struct raft_io *io,
io->close = uvClose;
io->load = uvLoad;
io->bootstrap = uvBootstrap;
io->recover = uvRecover;
io->set_term = uvSetTerm;
io->set_vote = uvSetVote;
io->append = uvAppend;
Expand Down
7 changes: 7 additions & 0 deletions src/uv.h
Expand Up @@ -207,6 +207,13 @@ void uvSegmentBufferFinalize(struct uvSegmentBuffer *b, uv_buf_t *out);
* be set accordingly. */
void uvSegmentBufferReset(struct uvSegmentBuffer *b, unsigned retain);

/* Write a closed segment, containing just one entry at the given index
* for the given configuration. */
int uvSegmentCreateClosedWithConfiguration(
struct uv *uv,
raft_index index,
const struct raft_configuration *configuration);

/* Write the first closed segment, containing just one entry for the given
* configuration. */
int uvSegmentCreateFirstClosed(struct uv *uv,
Expand Down
10 changes: 9 additions & 1 deletion src/uv_segment.c
Expand Up @@ -927,6 +927,14 @@ static int writeFirstClosed(struct uv *uv,

int uvSegmentCreateFirstClosed(struct uv *uv,
const struct raft_configuration *configuration)
{
return uvSegmentCreateClosedWithConfiguration(uv, 1, configuration);
}

int uvSegmentCreateClosedWithConfiguration(
struct uv *uv,
raft_index index,
const struct raft_configuration *configuration)
{
struct raft_buffer buf;
char filename[UV__FILENAME_LEN];
Expand All @@ -936,7 +944,7 @@ int uvSegmentCreateFirstClosed(struct uv *uv,
int rv;

/* Render the path */
sprintf(filename, UV__CLOSED_TEMPLATE, (raft_index)1, (raft_index)1);
sprintf(filename, UV__CLOSED_TEMPLATE, index, index);

/* Encode the given configuration. */
rv = configurationEncode(configuration, &buf);
Expand Down
57 changes: 57 additions & 0 deletions test/integration/test_recover.c
@@ -0,0 +1,57 @@
#include "../lib/cluster.h"
#include "../lib/runner.h"

/******************************************************************************
*
* Fixture holding a bootstrapd raft caluster.
*
*****************************************************************************/

struct cluster
{
FIXTURE_CLUSTER;
};

static void *setupCluster(const MunitParameter params[],
MUNIT_UNUSED void *user_data)
{
struct cluster *f = munit_malloc(sizeof *f);
SETUP_CLUSTER(3);
CLUSTER_BOOTSTRAP;
return f;
}

static void tearDownCluster(void *data)
{
struct cluster *f = data;
TEAR_DOWN_CLUSTER;
free(f);
}

/******************************************************************************
*
* Recover tests.
*
*****************************************************************************/

SUITE(raft_recover)

/* Attempting to recover a running instance results in RAFT_BUSY. */
TEST(raft_recover, busy, setupCluster, tearDownCluster, 0, NULL)
{
struct cluster *f = data;
struct raft *raft;
struct raft_configuration configuration;
int rv;

/* Start all servers. */
CLUSTER_START;

raft = CLUSTER_RAFT(0);
CLUSTER_CONFIGURATION(&configuration);
rv = raft_recover(raft, &configuration);
munit_assert_int(rv, ==, RAFT_BUSY);
raft_configuration_close(&configuration);

return MUNIT_OK;
}
3 changes: 0 additions & 3 deletions test/unit/test_uv_append.c
Expand Up @@ -3,8 +3,6 @@
#include "../lib/runner.h"
#include "../lib/uv.h"

TEST_MODULE(uv_append)

/* Maximum number of blocks a segment can have */
#define MAX_SEGMENT_BLOCKS 4

Expand All @@ -27,7 +25,6 @@ struct fixture
static void *setup(const MunitParameter params[], void *user_data)
{
struct fixture *f = munit_malloc(sizeof *f);
(void)user_data;
SETUP_UV;
f->uv->n_blocks = MAX_SEGMENT_BLOCKS;
f->count = 0;
Expand Down
77 changes: 77 additions & 0 deletions test/unit/test_uv_recover.c
@@ -0,0 +1,77 @@
#include "../lib/runner.h"
#include "../lib/uv.h"

/******************************************************************************
*
* Fixture
*
*****************************************************************************/

struct fixture
{
FIXTURE_UV;
};

static void *setupIo(const MunitParameter params[], void *user_data)
{
struct fixture *f = munit_malloc(sizeof *f);
SETUP_UV;
return f;
}

static void tearDownIo(void *data)
{
struct fixture *f = data;
TEAR_DOWN_UV;
free(f);
}

/******************************************************************************
*
* UvRecover
*
*****************************************************************************/

SUITE(UvRecover)

/* Invoke UvRecover and assert that it fails with the given error. */
#define RECOVER_ERROR(RV, CONF) \
{ \
int rv_; \
rv_ = f->io.recover(&f->io, CONF); \
munit_assert_int(rv_, ==, RV); \
}

/* Invoke UvRecover and assert that it succeeds */
#define RECOVER(CONF) RECOVER_ERROR(0, CONF)

/* If the instance has been already initialized, an error is returned. */
/* A new configuration is saved as last entry on disk. */
TEST(UvRecover, newConfiguration, setupIo, tearDownIo, 0, NULL)
{
struct fixture *f = data;
struct raft_configuration configuration1;
struct raft_configuration configuration2;
int rv;

/* Boostrap using an initial configuration */
raft_configuration_init(&configuration1);
rv = raft_configuration_add(&configuration1, 1, "1", true);
munit_assert_int(rv, ==, 0);
rv = raft_configuration_add(&configuration1, 2, "2", true);
munit_assert_int(rv, ==, 0);
rv = f->io.bootstrap(&f->io, &configuration1);
munit_assert_int(rv, ==, 0);

/* Boostrap using a different configuration */
raft_configuration_init(&configuration2);
rv = raft_configuration_add(&configuration2, 1, "1", true);
munit_assert_int(rv, ==, 0);

RECOVER(&configuration2);

raft_configuration_close(&configuration1);
raft_configuration_close(&configuration2);

return 0;
}

0 comments on commit 5b49e37

Please sign in to comment.