Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
MathieuBordere committed Jan 29, 2021
1 parent f205aaf commit a3ef291
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/client.c
Expand Up @@ -302,7 +302,7 @@ int raft_assign(struct raft *r,
r->leader_state.round_start = r->io->time(r->io);

/* Immediately initiate an AppendEntries request. */
rv = replicationProgress(r, server_index);
rv = replicationProgress(r, server_index, false, false);
if (rv != 0 && rv != RAFT_NOCONNECTION) {
/* This error is not fatal. */
tracef("failed to send append entries to server %u: %s (%d)",
Expand Down
41 changes: 26 additions & 15 deletions src/replication.c
Expand Up @@ -56,6 +56,7 @@ static void sendAppendEntriesCb(struct raft_io_send *send, const int status)
tracef("failed to send append entries to server %u: %s",
req->server_id, raft_strerror(status));
/* Go back to probe mode. */
/* TODO Raft spec says to retry failed RPC's (Extended Raft paper 5.5)*/
progressToProbe(r, i);
} else {
/* Update the last_send timestamp: for a heartbeat_timeout
Expand All @@ -82,7 +83,8 @@ static void sendAppendEntriesCb(struct raft_io_send *send, const int status)
static int sendAppendEntries(struct raft *r,
const unsigned i,
const raft_index prev_index,
const raft_term prev_term)
const raft_term prev_term,
bool heartBeat)
{
struct raft_server *server = &r->configuration.servers[i];
struct raft_message message;
Expand All @@ -94,11 +96,15 @@ static int sendAppendEntries(struct raft *r,
args->term = r->current_term;
args->prev_log_index = prev_index;
args->prev_log_term = prev_term;
args->entries = NULL;
args->n_entries = 0;

/* TODO: implement a limit to the total size of the entries being sent */
rv = logAcquire(&r->log, next_index, &args->entries, &args->n_entries);
if (rv != 0) {
goto err;
if (!heartBeat) {
rv = logAcquire(&r->log, next_index, &args->entries, &args->n_entries);
if (rv != 0) {
goto err;
}
}

/* From Section 3.5:
Expand Down Expand Up @@ -296,7 +302,7 @@ static int sendSnapshot(struct raft *r, const unsigned i)
return rv;
}

int replicationProgress(struct raft *r, unsigned i)
int replicationProgress(struct raft *r, unsigned i, bool heartBeat, bool force)
{
struct raft_server *server = &r->configuration.servers[i];
raft_index snapshot_index = logSnapshotIndex(&r->log);
Expand All @@ -308,7 +314,7 @@ int replicationProgress(struct raft *r, unsigned i)
assert(server->id != r->id);
assert(next_index >= 1);

if (!progressShouldReplicate(r, i)) {
if (!progressShouldReplicate(r, i) && !force) {
return 0;
}

Expand All @@ -329,7 +335,7 @@ int replicationProgress(struct raft *r, unsigned i)
/* We're including the very first entry, so prevIndex and prevTerm are
* null. If the first entry is not available anymore, send the last
* snapshot. */
if (snapshot_index > 0) {
if (snapshot_index > 0 && !heartBeat) {
raft_index last_index = logLastIndex(&r->log);
assert(last_index > 0); /* The log can't be empty */
goto send_snapshot;
Expand All @@ -342,14 +348,14 @@ int replicationProgress(struct raft *r, unsigned i)
prev_index = next_index - 1;
prev_term = logTermOf(&r->log, prev_index);
/* If the entry is not anymore in our log, send the last snapshot. */
if (prev_term == 0) {
if (prev_term == 0 && !heartBeat) {
assert(prev_index < snapshot_index);
tracef("missing entry at index %lld -> send snapshot", prev_index);
goto send_snapshot;
}
}

return sendAppendEntries(r, i, prev_index, prev_term);
return sendAppendEntries(r, i, prev_index, prev_term, heartBeat);

send_snapshot:
return sendSnapshot(r, i);
Expand All @@ -360,7 +366,7 @@ int replicationProgress(struct raft *r, unsigned i)
* This function loops through all followers and triggers replication on them.
*
* It must be called only by leaders. */
static int triggerAll(struct raft *r)
static int triggerAll(struct raft *r, bool heartBeat)
{
unsigned i;
int rv;
Expand All @@ -378,7 +384,7 @@ static int triggerAll(struct raft *r)
server->id != r->leader_state.promotee_id) {
continue;
}
rv = replicationProgress(r, i);
rv = replicationProgress(r, i, heartBeat, false);
if (rv != 0 && rv != RAFT_NOCONNECTION) {
/* This is not a critical failure, let's just log it. */
tracef("failed to send append entries to server %u: %s (%d)",
Expand All @@ -391,7 +397,7 @@ static int triggerAll(struct raft *r)

int replicationHeartbeat(struct raft *r)
{
return triggerAll(r);
return triggerAll(r, true);
}

/* Context for a write log entries request that was submitted by a leader. */
Expand Down Expand Up @@ -592,7 +598,7 @@ int replicationTrigger(struct raft *r, raft_index index)
return rv;
}

return triggerAll(r);
return triggerAll(r, false);
}

/* Helper to be invoked after a promotion of a non-voting server has been
Expand Down Expand Up @@ -655,6 +661,9 @@ static int triggerActualPromotion(struct raft *r)
return rv;
}

/*
* TODO Check if reply with stale term is rejected
*/
int replicationUpdate(struct raft *r,
const struct raft_server *server,
const struct raft_append_entries_result *result)
Expand Down Expand Up @@ -687,7 +696,8 @@ int replicationUpdate(struct raft *r,
if (retry) {
/* Retry, ignoring errors. */
tracef("log mismatch -> send old entries to %u", server->id);
replicationProgress(r, i);
/* Force resend even if we have already sent an AppendEntries RPC during this HeartBeat period */
replicationProgress(r, i, false, true);
}
return 0;
}
Expand Down Expand Up @@ -770,7 +780,8 @@ int replicationUpdate(struct raft *r,
}
/* If this follower is in pipeline mode, send it more entries. */
if (progressState(r, i) == PROGRESS__PIPELINE) {
replicationProgress(r, i);
/* Force send AppendEntries RPC, even if we have already sent one during this HeartBeat period */
replicationProgress(r, i, false, true);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/replication.h
Expand Up @@ -39,7 +39,7 @@ int replicationTrigger(struct raft *r, raft_index index);
index onward (possibly zero).
*
* This function must be called only by leaders. */
int replicationProgress(struct raft *r, unsigned i);
int replicationProgress(struct raft *r, unsigned i, bool heartBeat, bool force);

/* Update the replication state (match and next indexes) for the given server
* using the given AppendEntries RPC result.
Expand Down

0 comments on commit a3ef291

Please sign in to comment.