Skip to content

Commit

Permalink
Split the acks into a different fiber and remove the PING loop
Browse files Browse the repository at this point in the history
  • Loading branch information
royjacobson committed May 22, 2023
1 parent b686514 commit cf82beb
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 39 deletions.
18 changes: 0 additions & 18 deletions src/server/journal/journal_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ JournalSlice::JournalSlice() {

JournalSlice::~JournalSlice() {
CHECK(!shard_file_);

pings_done_.Notify();
if (pings_fb_.IsJoinable())
pings_fb_.Join();
}

void JournalSlice::Init(unsigned index) {
Expand All @@ -55,7 +51,6 @@ void JournalSlice::Init(unsigned index) {

slice_index_ = index;
ring_buffer_.emplace(128); // TODO: to make it configurable
pings_fb_ = MakeFiber(&JournalSlice::PingsLoop, this, 3000ms);
}

std::error_code JournalSlice::Open(std::string_view dir) {
Expand Down Expand Up @@ -165,18 +160,5 @@ void JournalSlice::UnregisterOnChange(uint32_t id) {
change_cb_arr_.erase(it);
}

void JournalSlice::PingsLoop(std::chrono::milliseconds period_ms) {
Entry entry(/*txid=*/0, Op::PING, /*dbid=*/0,
/*shard_cnt=*/1, /*pl=*/{});
while (!lameduck_) {
VLOG(1) << "Sending PING to journal writer.";
AddLogRecord(entry, true);
if (pings_done_.WaitFor(period_ms)) {
VLOG(1) << "Finished running periodic acks";
return;
}
}
}

} // namespace journal
} // namespace dfly
5 changes: 0 additions & 5 deletions src/server/journal/journal_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class JournalSlice {
return !change_cb_arr_.empty();
}

void PingsLoop(std::chrono::milliseconds period_ms);

private:
struct RingItem;

Expand All @@ -70,9 +68,6 @@ class JournalSlice {
std::error_code status_ec_;

bool lameduck_ = false;

Fiber pings_fb_;
Done pings_done_;
};

} // namespace journal
Expand Down
55 changes: 41 additions & 14 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ Replica::~Replica() {
if (sync_fb_.IsJoinable()) {
sync_fb_.Join();
}
if (acks_fb_.IsJoinable()) {
acks_fb_.Join();
}
if (execution_fb_.IsJoinable()) {
execution_fb_.Join();
}
Expand Down Expand Up @@ -202,8 +205,11 @@ void Replica::Stop() {

// Make sure the replica fully stopped and did all cleanup,
// so we can freely release resources (connections).
waker_.notifyAll();
if (sync_fb_.IsJoinable())
sync_fb_.Join();
if (acks_fb_.IsJoinable())
acks_fb_.Join();
}

void Replica::Pause(bool pause) {
Expand Down Expand Up @@ -752,6 +758,9 @@ void Replica::JoinAllFlows() {
if (flow->sync_fb_.IsJoinable()) {
flow->sync_fb_.Join();
}
if (flow->acks_fb_.IsJoinable()) {
flow->acks_fb_.Join();
}
if (flow->execution_fb_.IsJoinable()) {
flow->execution_fb_.Join();
}
Expand Down Expand Up @@ -903,9 +912,8 @@ void Replica::StableSyncDflyReadFb(Context* cntx) {

JournalReader reader{&ps, 0};
TransactionReader tx_reader{};
std::string ack_cmd;
time_t last_ack = 0;
ReqSerializer serializer{sock_.get()};

acks_fb_ = MakeFiber(&Replica::StableSyncDflyAcksFb, this, cntx);

while (!cntx->IsCancelled()) {
waker_.await([&]() {
Expand All @@ -927,24 +935,43 @@ void Replica::StableSyncDflyReadFb(Context* cntx) {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
}
} else {
force_ping_ = true;
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
}

waker_.notify();
}
}

void Replica::StableSyncDflyAcksFb(Context* cntx) {
constexpr std::chrono::duration ACK_TIME_MAX_INTERVAL = 3s;
constexpr size_t ACK_RECORD_MAX_INTERVAL = 1024;
std::string ack_cmd;
ReqSerializer serializer{sock_.get()};

auto next_ack_tp = std::chrono::steady_clock::now();

while (!cntx->IsCancelled()) {
// Handle ACKs with the master. PING opcodes from the master mean we should immediately
// answer.
uint64_t current_offset = journal_rec_executed_.load(std::memory_order_relaxed);
if (tx_data->is_ping || current_offset > ack_offs_ + 1024 || time(nullptr) > last_ack + 1) {
VLOG(1) << "Sending an ACK with offset=" << current_offset << " forced=" << tx_data->is_ping;
ack_cmd = absl::StrCat("REPLCONF ACK ", current_offset);
last_ack = time(nullptr);
ack_offs_ = current_offset;
if (auto ec = SendCommand(ack_cmd, &serializer); ec) {
cntx->ReportError(ec);
break;
}
VLOG(1) << "Sending an ACK with offset=" << current_offset << " forced=" << force_ping_;
ack_cmd = absl::StrCat("REPLCONF ACK ", current_offset);
if (auto ec = SendCommand(ack_cmd, &serializer); ec) {
cntx->ReportError(ec);
break;
}

waker_.notify();
ack_offs_ = current_offset;
force_ping_ = false;
next_ack_tp = std::chrono::steady_clock::now() + ACK_TIME_MAX_INTERVAL;

waker_.await_until(
[&]() {
return journal_rec_executed_.load(std::memory_order_relaxed) >
ack_offs_ + ACK_RECORD_MAX_INTERVAL ||
force_ping_;
},
next_ack_tp);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ class Replica {
// Single flow stable state sync fiber spawned by StartStableSyncFlow.
void StableSyncDflyReadFb(Context* cntx);

void StableSyncDflyAcksFb(Context* cntx);

void StableSyncDflyExecFb(Context* cntx);

private: /* Utility */
Expand Down Expand Up @@ -247,6 +249,8 @@ class Replica {

// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.
Fiber sync_fb_;
Fiber acks_fb_;
bool force_ping_ = false;
Fiber execution_fb_;

std::vector<std::unique_ptr<Replica>> shard_flows_;
Expand Down
4 changes: 2 additions & 2 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ async def test_disconnect_master(df_local_factory, df_seeder_factory, t_master,
seeder = df_seeder_factory.create(port=master.port, keys=n_keys, dbcount=2)

async def crash_master_fs():
await asyncio.sleep(random.random() / 10 + 0.1 * len(replicas))
master.stop(kill=True)

async def start_master():
Expand All @@ -307,7 +306,8 @@ async def start_master():

# Crash master during full sync, but with all passing initial connection phase
await asyncio.gather(*(c_replica.execute_command("REPLICAOF localhost " + str(master.port))
for c_replica in c_replicas), crash_master_fs())
for c_replica in c_replicas))
await crash_master_fs()

await asyncio.sleep(1 + len(replicas) * 0.5)

Expand Down

0 comments on commit cf82beb

Please sign in to comment.