Skip to content

Commit

Permalink
feat: Support ACKs from replica to master (#1243)
Browse files Browse the repository at this point in the history
* feat: Support ACKs from replica to master

* Rework after CR

* Split the acks into a different fiber and remove the PING loop

* const convention

* move around the order.

* revert sleep removal

* Exit ack fiber on cancellation

* Don't send ACKs if server doesn't support it
  • Loading branch information
royjacobson committed May 29, 2023
1 parent 76801fa commit 29c258d
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 27 deletions.
4 changes: 4 additions & 0 deletions src/server/conn_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ namespace dfly {
class EngineShardSet;
class ConnectionContext;
class ChannelStore;
class FlowInfo;

// Stores command id and arguments for delayed invocation.
// Used for storing MULTI/EXEC commands.
Expand Down Expand Up @@ -172,7 +173,10 @@ class ConnectionContext : public facade::ConnectionContext {
void PUnsubscribeAll(bool to_reply);
void ChangeMonitor(bool start); // either start or stop monitor on a given connection

// Whether this connection is a connection from a replica to its master.
bool is_replicating = false;
// Reference to a FlowInfo for this connection if from a master to a replica.
FlowInfo* replication_flow;
bool monitor = false; // when a monitor command is sent over a given connection, we need to aware
// of it as a state for the connection

Expand Down
7 changes: 4 additions & 3 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
absl::InsecureBitGen gen;
string eof_token = GetRandomHex(gen, 40);

cntx->replication_flow = &replica_ptr->flows[flow_id];
replica_ptr->flows[flow_id].conn = cntx->owner();
replica_ptr->flows[flow_id].eof_token = eof_token;
listener_->Migrate(cntx->owner(), shard_set->pool()->at(flow_id));
Expand Down Expand Up @@ -612,17 +613,17 @@ void DflyCmd::Shutdown() {
}
}

void DflyCmd::FlowInfo::TryShutdownSocket() {
void FlowInfo::TryShutdownSocket() {
// Close socket for clean disconnect.
if (conn->socket()->IsOpen()) {
(void)conn->socket()->Shutdown(SHUT_RDWR);
}
}

DflyCmd::FlowInfo::~FlowInfo() {
FlowInfo::~FlowInfo() {
}

DflyCmd::FlowInfo::FlowInfo() {
FlowInfo::FlowInfo() {
}

} // namespace dfly
36 changes: 19 additions & 17 deletions src/server/dflycmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ class ServerFamily;
class RdbSaver;
class JournalStreamer;

// Stores information related to a single flow.
struct FlowInfo {
FlowInfo();
~FlowInfo();
// Shutdown associated socket if its still open.
void TryShutdownSocket();

facade::Connection* conn;

Fiber full_sync_fb; // Full sync fiber.
std::unique_ptr<RdbSaver> saver; // Saver used by the full sync phase.
std::unique_ptr<JournalStreamer> streamer;
std::string eof_token;

uint64_t last_acked_lsn;

std::function<void()> cleanup; // Optional cleanup for cancellation.
};

// DflyCmd is responsible for managing replication. A master instance can be connected
// to many replica instances, what is more, each of them can open multiple connections.
// This is why its important to understand replica lifecycle management before making
Expand Down Expand Up @@ -75,23 +94,6 @@ class DflyCmd {
// See header comments for state descriptions.
enum class SyncState { PREPARATION, FULL_SYNC, STABLE_SYNC, CANCELLED };

// Stores information related to a single flow.
struct FlowInfo {
FlowInfo();
~FlowInfo();
// Shutdown associated socket if its still open.
void TryShutdownSocket();

facade::Connection* conn;

Fiber full_sync_fb; // Full sync fiber.
std::unique_ptr<RdbSaver> saver; // Saver used by the full sync phase.
std::unique_ptr<JournalStreamer> streamer;
std::string eof_token;

std::function<void()> cleanup; // Optional cleanup for cancellation.
};

// Stores information related to a single replica.
struct ReplicaInfo {
ReplicaInfo(unsigned flow_count, std::string address, uint32_t listening_port,
Expand Down
6 changes: 6 additions & 0 deletions src/server/journal/serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ void JournalWriter::Write(const journal::Entry& entry) {
switch (entry.opcode) {
case journal::Op::SELECT:
return Write(entry.dbid);
case journal::Op::PING:
return;
case journal::Op::COMMAND:
case journal::Op::EXPIRED:
case journal::Op::MULTI_COMMAND:
Expand Down Expand Up @@ -186,6 +188,10 @@ io::Result<journal::ParsedEntry> JournalReader::ReadEntry() {
entry.dbid = dbid_;
entry.opcode = opcode;

if (opcode == journal::Op::PING) {
return entry;
}

SET_OR_UNEXPECT(ReadUInt<uint64_t>(), entry.txid);
SET_OR_UNEXPECT(ReadUInt<uint32_t>(), entry.shard_cnt);

Expand Down
1 change: 1 addition & 0 deletions src/server/journal/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum class Op : uint8_t {
COMMAND = 10,
MULTI_COMMAND = 11,
EXEC = 12,
PING = 13,
};

struct EntryBase {
Expand Down
63 changes: 59 additions & 4 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ Replica::~Replica() {
if (sync_fb_.IsJoinable()) {
sync_fb_.Join();
}
if (acks_fb_.IsJoinable()) {
acks_fb_.Join();
}
if (execution_fb_.IsJoinable()) {
execution_fb_.Join();
}
Expand Down Expand Up @@ -202,8 +205,11 @@ void Replica::Stop() {

// Make sure the replica fully stopped and did all cleanup,
// so we can freely release resources (connections).
waker_.notifyAll();
if (sync_fb_.IsJoinable())
sync_fb_.Join();
if (acks_fb_.IsJoinable())
acks_fb_.Join();
}

void Replica::Pause(bool pause) {
Expand Down Expand Up @@ -783,6 +789,9 @@ void Replica::JoinAllFlows() {
if (flow->sync_fb_.IsJoinable()) {
flow->sync_fb_.Join();
}
if (flow->acks_fb_.IsJoinable()) {
flow->acks_fb_.Join();
}
if (flow->execution_fb_.IsJoinable()) {
flow->execution_fb_.Join();
}
Expand Down Expand Up @@ -934,6 +943,11 @@ void Replica::StableSyncDflyReadFb(Context* cntx) {

JournalReader reader{&ps, 0};
TransactionReader tx_reader{};

if (master_context_.version > DflyVersion::VER0) {
acks_fb_ = MakeFiber(&Replica::StableSyncDflyAcksFb, this, cntx);
}

while (!cntx->IsCancelled()) {
waker_.await([&]() {
return ((trans_data_queue_.size() < kYieldAfterItemsInQueue) || cntx->IsCancelled());
Expand All @@ -947,16 +961,53 @@ void Replica::StableSyncDflyReadFb(Context* cntx) {

last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();

if (use_multi_shard_exe_sync_) {
InsertTxDataToShardResource(std::move(*tx_data));
if (!tx_data->is_ping) {
if (use_multi_shard_exe_sync_) {
InsertTxDataToShardResource(std::move(*tx_data));
} else {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
}
} else {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
force_ping_ = true;
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
}

waker_.notify();
}
}

void Replica::StableSyncDflyAcksFb(Context* cntx) {
constexpr std::chrono::duration kAckTimeMaxInterval = 3s;
constexpr size_t kAckRecordMaxInterval = 1024;
std::string ack_cmd;
ReqSerializer serializer{sock_.get()};

auto next_ack_tp = std::chrono::steady_clock::now();

while (!cntx->IsCancelled()) {
// Handle ACKs with the master. PING opcodes from the master mean we should immediately
// answer.
uint64_t current_offset = journal_rec_executed_.load(std::memory_order_relaxed);
VLOG(1) << "Sending an ACK with offset=" << current_offset << " forced=" << force_ping_;
ack_cmd = absl::StrCat("REPLCONF ACK ", current_offset);
force_ping_ = false;
next_ack_tp = std::chrono::steady_clock::now() + kAckTimeMaxInterval;
if (auto ec = SendCommand(ack_cmd, &serializer); ec) {
cntx->ReportError(ec);
break;
}
ack_offs_ = current_offset;

waker_.await_until(
[&]() {
return journal_rec_executed_.load(std::memory_order_relaxed) >
ack_offs_ + kAckRecordMaxInterval ||
force_ping_ || cntx->IsCancelled();
},
next_ack_tp);
}
}

void Replica::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) {
if (cntx->IsCancelled()) {
return;
Expand Down Expand Up @@ -1314,6 +1365,9 @@ bool Replica::TransactionData::AddEntry(journal::ParsedEntry&& entry) {
++journal_rec_count;

switch (entry.opcode) {
case journal::Op::PING:
is_ping = true;
return true;
case journal::Op::EXPIRED:
case journal::Op::COMMAND:
commands.push_back(std::move(entry.cmd));
Expand Down Expand Up @@ -1355,7 +1409,8 @@ auto Replica::TransactionReader::NextTxData(JournalReader* reader, Context* cntx

// Check if journal command can be executed right away.
// Expiration checks lock on master, so it never conflicts with running multi transactions.
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND)
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND ||
res->opcode == journal::Op::PING)
return TransactionData::FromSingle(std::move(res.value()));

// Otherwise, continue building multi command.
Expand Down
5 changes: 5 additions & 0 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class Replica {
uint32_t shard_cnt{0};
absl::InlinedVector<journal::ParsedEntry::CmdData, 1> commands{0};
uint32_t journal_rec_count{0}; // Count number of source entries to check offset.
bool is_ping = false; // For Op::PING entries.
};

// Utility for reading TransactionData from a journal reader.
Expand Down Expand Up @@ -162,6 +163,8 @@ class Replica {
// Single flow stable state sync fiber spawned by StartStableSyncFlow.
void StableSyncDflyReadFb(Context* cntx);

void StableSyncDflyAcksFb(Context* cntx);

void StableSyncDflyExecFb(Context* cntx);

private: /* Utility */
Expand Down Expand Up @@ -252,6 +255,8 @@ class Replica {

// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.
Fiber sync_fb_;
Fiber acks_fb_;
bool force_ping_ = false;
Fiber execution_fb_;

std::vector<std::unique_ptr<Replica>> shard_flows_;
Expand Down
21 changes: 20 additions & 1 deletion src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1939,15 +1939,34 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Client version for session_id="
<< cntx->conn_state.replicaiton_info.repl_session_id << " is " << version;
cntx->conn_state.replicaiton_info.repl_version = DflyVersion(version);
} else if (cmd == "ACK" && args.size() == 2) {
// Don't send error/Ok back through the socket, because we don't want to interleave with
// the journal writes that we write into the same socket.

if (!cntx->replication_flow) {
LOG(ERROR) << "No replication flow assigned";
return;
}

uint64_t ack;
if (!absl::SimpleAtoi(arg, &ack)) {
LOG(ERROR) << "Bad int in REPLCONF ACK command! arg=" << arg;
return;
}
VLOG(1) << "Received client ACK=" << ack;
cntx->replication_flow->last_acked_lsn = ack;
return;
} else {
VLOG(1) << cmd << " " << arg;
VLOG(1) << cmd << " " << arg << " " << args.size();
goto err;
}
}

(*cntx)->SendOk();
return;

err:
LOG(ERROR) << "Error in receiving command: " << args;
(*cntx)->SendError(kSyntaxErr);
}

Expand Down
7 changes: 5 additions & 2 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ async def test_disconnect_master(df_local_factory, df_seeder_factory, t_master,
seeder = df_seeder_factory.create(port=master.port, keys=n_keys, dbcount=2)

async def crash_master_fs():
await asyncio.sleep(random.random() / 10 + 0.1 * len(replicas))
await asyncio.sleep(random.random() / 10)
master.stop(kill=True)

async def start_master():
Expand All @@ -307,7 +307,8 @@ async def start_master():

# Crash master during full sync, but with all passing initial connection phase
await asyncio.gather(*(c_replica.execute_command("REPLICAOF localhost " + str(master.port))
for c_replica in c_replicas), crash_master_fs())
for c_replica in c_replicas))
await crash_master_fs()

await asyncio.sleep(1 + len(replicas) * 0.5)

Expand Down Expand Up @@ -890,6 +891,8 @@ async def test_role_command(df_local_factory, n_keys=20):
assert await c_replica.execute_command("role") == [
b'replica', b'localhost', bytes(str(master.port), 'ascii'), b'stable_sync']

# This tests that we react fast to socket shutdowns and don't hang on
# things like the ACK or execution fibers.
master.stop()
await asyncio.sleep(0.1)
assert await c_replica.execute_command("role") == [
Expand Down

0 comments on commit 29c258d

Please sign in to comment.