Skip to content

Commit

Permalink
raft: introduce persistent raft state
Browse files Browse the repository at this point in the history
The patch introduces a sceleton of Raft module and a method to
persist a Raft state in snapshot, not bound to any space.

Part of #1146
  • Loading branch information
Gerold103 committed Sep 29, 2020
1 parent 764a548 commit 4f0f7c8
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/box/CMakeLists.txt
Expand Up @@ -171,6 +171,7 @@ add_library(box STATIC
port.c
txn.c
txn_limbo.c
raft.c
box.cc
gc.c
checkpoint_schedule.c
Expand Down
8 changes: 8 additions & 0 deletions src/box/box.cc
Expand Up @@ -78,6 +78,7 @@
#include "sequence.h"
#include "sql_stmt_cache.h"
#include "msgpack.h"
#include "raft.h"
#include "trivia/util.h"

static char status[64] = "unknown";
Expand Down Expand Up @@ -384,6 +385,13 @@ apply_wal_row(struct xstream *stream, struct xrow_header *row)
diag_raise();
return;
}
if (iproto_type_is_raft_request(row->type)) {
struct raft_request raft_req;
if (xrow_decode_raft(row, &raft_req) != 0)
diag_raise();
raft_process_recovery(&raft_req);
return;
}
xrow_decode_dml_xc(row, &request, dml_request_key_map(row->type));
if (request.type != IPROTO_NOP) {
struct space *space = space_cache_find_xc(request.space_id);
Expand Down
13 changes: 13 additions & 0 deletions src/box/iproto_constants.h
Expand Up @@ -220,6 +220,8 @@ enum iproto_type {
/** The maximum typecode used for box.stat() */
IPROTO_TYPE_STAT_MAX,

IPROTO_RAFT = 30,

/** A confirmation message for synchronous transactions. */
IPROTO_CONFIRM = 40,
/** A rollback message for synchronous transactions. */
Expand Down Expand Up @@ -259,6 +261,11 @@ enum iproto_type {
/** IPROTO type name by code */
extern const char *iproto_type_strs[];

enum iproto_raft_keys {
IPROTO_RAFT_TERM = 0,
IPROTO_RAFT_VOTE = 1,
};

/**
* Returns IPROTO type name by @a type code.
* @param type IPROTO type.
Expand Down Expand Up @@ -333,6 +340,12 @@ iproto_type_is_synchro_request(uint32_t type)
return type == IPROTO_CONFIRM || type == IPROTO_ROLLBACK;
}

static inline bool
iproto_type_is_raft_request(uint32_t type)
{
return type == IPROTO_RAFT;
}

/** This is an error. */
static inline bool
iproto_type_is_error(uint32_t type)
Expand Down
1 change: 1 addition & 0 deletions src/box/lua/misc.cc
Expand Up @@ -40,6 +40,7 @@
#include "box/tuple.h"
#include "box/tuple_format.h"
#include "box/lua/tuple.h"
#include "box/xrow.h"
#include "mpstream/mpstream.h"

static uint32_t CTID_STRUCT_TUPLE_FORMAT_PTR;
Expand Down
35 changes: 35 additions & 0 deletions src/box/memtx_engine.c
Expand Up @@ -49,6 +49,7 @@
#include "replication.h"
#include "schema.h"
#include "gc.h"
#include "raft.h"

/* sync snapshot every 16MB */
#define SNAP_SYNC_INTERVAL (1 << 24)
Expand Down Expand Up @@ -201,12 +202,25 @@ memtx_engine_recover_snapshot(struct memtx_engine *memtx,
return 0;
}

static int
memtx_engine_recover_raft(const struct xrow_header *row)
{
assert(row->type == IPROTO_RAFT);
struct raft_request req;
if (xrow_decode_raft(row, &req) != 0)
return -1;
raft_process_recovery(&req);
return 0;
}

static int
memtx_engine_recover_snapshot_row(struct memtx_engine *memtx,
struct xrow_header *row)
{
assert(row->bodycnt == 1); /* always 1 for read */
if (row->type != IPROTO_INSERT) {
if (row->type == IPROTO_RAFT)
return memtx_engine_recover_raft(row);
diag_set(ClientError, ER_UNKNOWN_REQUEST_TYPE,
(uint32_t) row->type);
return -1;
Expand Down Expand Up @@ -514,6 +528,7 @@ struct checkpoint {
/** The vclock of the snapshot file. */
struct vclock vclock;
struct xdir dir;
struct raft_request raft;
/**
* Do nothing, just touch the snapshot file - the
* checkpoint already exists.
Expand All @@ -538,6 +553,7 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit)
opts.free_cache = true;
xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts);
vclock_create(&ckpt->vclock);
raft_serialize_for_disk(&ckpt->raft);
ckpt->touch = false;
return ckpt;
}
Expand Down Expand Up @@ -609,6 +625,23 @@ checkpoint_add_space(struct space *sp, void *data)
return 0;
};

static int
checkpoint_write_raft(struct xlog *l, const struct raft_request *req)
{
struct xrow_header row;
struct region *region = &fiber()->gc;
uint32_t svp = region_used(region);
int rc = -1;
if (xrow_encode_raft(&row, region, req) != 0)
goto finish;
if (checkpoint_write_row(l, &row) != 0)
goto finish;
rc = 0;
finish:
region_truncate(region, svp);
return rc;
}

static int
checkpoint_f(va_list ap)
{
Expand Down Expand Up @@ -644,6 +677,8 @@ checkpoint_f(va_list ap)
if (rc != 0)
goto fail;
}
if (checkpoint_write_raft(&snap, &ckpt->raft) != 0)
goto fail;
if (xlog_flush(&snap) < 0)
goto fail;

Expand Down
65 changes: 65 additions & 0 deletions src/box/raft.c
@@ -0,0 +1,65 @@
/*
* Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include "raft.h"

#include "error.h"
#include "journal.h"
#include "xrow.h"
#include "small/region.h"

/** Raft state of this instance. */
struct raft raft = {
.term = 1,
.vote = 0,
};

void
raft_process_recovery(const struct raft_request *req)
{
if (req->term != 0)
raft.term = req->term;
if (req->vote != 0)
raft.vote = req->vote;
}

void
raft_serialize_for_network(struct raft_request *req)
{
req->term = raft.term;
req->vote = raft.vote;
}

void
raft_serialize_for_disk(struct raft_request *req)
{
req->term = raft.term;
req->vote = raft.vote;
}
67 changes: 67 additions & 0 deletions src/box/raft.h
@@ -0,0 +1,67 @@
#pragma once
/*
* Copyright 2010-2020, Tarantool AUTHORS, please see AUTHORS file.
*
* Redistribution and use in source and binary forms, with or
* without modification, are permitted provided that the following
* conditions are met:
*
* 1. Redistributions of source code must retain the above
* copyright notice, this list of conditions and the
* following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials
* provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
* <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
* INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
* THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*/
#include <stdint.h>

#if defined(__cplusplus)
extern "C" {
#endif

struct raft_request;

struct raft {
uint64_t term;
uint32_t vote;
};

extern struct raft raft;

/** Process a raft entry stored in WAL/snapshot. */
void
raft_process_recovery(const struct raft_request *req);

/**
* Save complete Raft state into a request to be sent to other instances of the
* cluster. It is allowed to save anything here, not only persistent state.
*/
void
raft_serialize_for_network(struct raft_request *req);

/**
* Save complete Raft state into a request to be persisted on disk. Only term
* and vote are being persisted.
*/
void
raft_serialize_for_disk(struct raft_request *req);

#if defined(__cplusplus)
}
#endif
56 changes: 56 additions & 0 deletions src/box/xrow.c
Expand Up @@ -958,6 +958,62 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req)
return 0;
}

int
xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r)
{
size_t size = mp_sizeof_map(2) +
mp_sizeof_uint(IPROTO_RAFT_TERM) +
mp_sizeof_uint(r->term) +
mp_sizeof_uint(IPROTO_RAFT_VOTE) +
mp_sizeof_uint(r->vote);
char *buf = region_alloc(region, size);
if (buf == NULL) {
diag_set(OutOfMemory, size, "region_alloc", "buf");
return -1;
}
memset(row, 0, sizeof(*row));
row->type = IPROTO_RAFT;
row->body[0].iov_base = buf;
row->body[0].iov_len = size;
row->group_id = GROUP_LOCAL;
row->bodycnt = 1;
buf = mp_encode_map(buf, 2);
buf = mp_encode_uint(buf, IPROTO_RAFT_TERM);
buf = mp_encode_uint(buf, r->term);
buf = mp_encode_uint(buf, IPROTO_RAFT_VOTE);
buf = mp_encode_uint(buf, r->vote);
return 0;
}

int
xrow_decode_raft(const struct xrow_header *row, struct raft_request *r)
{
/* TODO: handle bad format. */
assert(row->type == IPROTO_RAFT);
assert(row->bodycnt == 1);
assert(row->group_id == GROUP_LOCAL);
memset(r, 0, sizeof(*r));
const char *pos = row->body[0].iov_base;
uint32_t map_size = mp_decode_map(&pos);
for (uint32_t i = 0; i < map_size; ++i)
{
uint64_t key = mp_decode_uint(&pos);
switch (key) {
case IPROTO_RAFT_TERM:
r->term = mp_decode_uint(&pos);
break;
case IPROTO_RAFT_VOTE:
r->vote = mp_decode_uint(&pos);
break;
default:
mp_next(&pos);
break;
}
}
return 0;
}

int
xrow_to_iovec(const struct xrow_header *row, struct iovec *out)
{
Expand Down
12 changes: 12 additions & 0 deletions src/box/xrow.h
Expand Up @@ -264,6 +264,18 @@ xrow_encode_synchro(struct xrow_header *row,
int
xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req);

struct raft_request {
uint64_t term;
uint32_t vote;
};

int
xrow_encode_raft(struct xrow_header *row, struct region *region,
const struct raft_request *r);

int
xrow_decode_raft(const struct xrow_header *row, struct raft_request *r);

/**
* CALL/EVAL request.
*/
Expand Down

0 comments on commit 4f0f7c8

Please sign in to comment.