Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce --replicaof flag #1583

Merged
merged 22 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ error_code Replica::Start(ConnectionContext* cntx) {
return {};
} // namespace dfly

void Replica::EnableReplication(ConnectionContext* cntx) {
VLOG(1) << "Enabling replication";

state_mask_.store(R_ENABLED); // set replica state to enabled
sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); // call replication fiber

(*cntx)->SendOk();
}

void Replica::Stop() {
VLOG(1) << "Stopping replication";
// Stops the loop in MainReplicationFb.
Expand Down Expand Up @@ -1007,9 +1016,7 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d
}

Replica::Info Replica::GetInfo() const {
CHECK(Sock());

return Proactor()->AwaitBrief([this] {
auto f = [this]() {
auto last_io_time = LastIoTime();
for (const auto& flow : shard_flows_) { // Get last io time from all sub flows.
last_io_time = std::max(last_io_time, flow->LastIoTime());
Expand All @@ -1023,7 +1030,21 @@ Replica::Info Replica::GetInfo() const {
res.full_sync_done = (state_mask_.load() & R_SYNC_OK);
res.master_last_io_sec = (ProactorBase::GetMonotonicTimeNs() - last_io_time) / 1000000000UL;
return res;
});
};

if (ABSL_PREDICT_TRUE(Sock()))
talbii marked this conversation as resolved.
Show resolved Hide resolved
return Proactor()->AwaitBrief(f);
else
return ProactorBase::me()->AwaitBrief(f);
/**
* when this branch happens: there is a very short grace period
* where Sock() is not initialized, yet the server can
* receive ROLE/INFO commands. That period happens when launching
* an instance with '--replicaof' and then \b immediately
talbii marked this conversation as resolved.
Show resolved Hide resolved
* sending a command.
*
* In that instance, we have to run f() on the current fiber.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what's the downside of running in current fiber? Why not always do it as such? And why not run it just via f() instead of awaiting for our own thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that I am not entirely sure about. Presumably this was done in order to not block the current fiber, the same fiber that runs MainReplicationFb.

About why I run f() using AwaitBrief, I was not sure that was the same outcome. I'll change it, thanks!

*/
talbii marked this conversation as resolved.
Show resolved Hide resolved
}

std::vector<uint64_t> Replica::GetReplicaOffset() const {
Expand Down
5 changes: 5 additions & 0 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class Replica : ProtocolClient {
// false if it has failed.
std::error_code Start(ConnectionContext* cntx);

// Sets the server state to have replication enabled.
// It is like Start(), but does not attempt to establish
// a connection right-away, but instead lets MainReplicationFb do the work.
void EnableReplication(ConnectionContext* cntx);

void Stop(); // thread-safe

void Pause(bool pause);
Expand Down
134 changes: 116 additions & 18 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern "C" {
#include "base/logging.h"
#include "croncpp.h" // cron::cronexpr
#include "facade/dragonfly_connection.h"
#include "facade/reply_builder.h"
#include "io/file_util.h"
#include "io/proc_reader.h"
#include "server/command_registry.h"
Expand Down Expand Up @@ -54,6 +55,18 @@ extern "C" {

using namespace std;

struct ReplicaOfFlag {
string host;
string port;
talbii marked this conversation as resolved.
Show resolved Hide resolved

bool has_value() const {
return !host.empty() && !port.empty();
}
};

static bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err);
static std::string AbslUnparseFlag(const ReplicaOfFlag& flag);
talbii marked this conversation as resolved.
Show resolved Hide resolved

ABSL_FLAG(string, dir, "", "working directory");
ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the DB");
ABSL_FLAG(string, requirepass, "",
Expand All @@ -68,6 +81,10 @@ ABSL_FLAG(bool, df_snapshot_format, true,
ABSL_FLAG(int, epoll_file_threads, 0,
"thread size for file workers when running in epoll mode, default is hardware concurrent "
"threads");
ABSL_FLAG(ReplicaOfFlag, replicaof, ReplicaOfFlag{},
"Specifies a host and port which point to a target master "
"to replicate. "
"Format should be <IPv4>:<PORT> or host:<PORT> or [<IPv6>]:<PORT>");

ABSL_DECLARE_FLAG(uint32_t, port);
ABSL_DECLARE_FLAG(bool, cache_mode);
Expand All @@ -76,6 +93,54 @@ ABSL_DECLARE_FLAG(bool, tls);
ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);

bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
#define RETURN_ON_ERROR(cond, m) \
do { \
if ((cond)) { \
*err = m; \
LOG(WARNING) << "Error in parsing arguments for --replicaof: " << m; \
return false; \
} \
} while (0)
talbii marked this conversation as resolved.
Show resolved Hide resolved

if (in.empty()) { // on empty flag "parse" nothing. If we return false then DF exists.
*flag = ReplicaOfFlag{};
return true;
talbii marked this conversation as resolved.
Show resolved Hide resolved
}

auto pos = in.find_last_of(':');
RETURN_ON_ERROR(pos == string::npos, "missing ':'.");

string_view ip = in.substr(0, pos);
flag->port = in.substr(pos + 1);

RETURN_ON_ERROR(ip.empty() || flag->port.empty(), "IP/host or port are empty.");

// For IPv6: ip1.front == '[' AND ip1.back == ']'
// For IPv4: ip1.front != '[' AND ip1.back != ']'
// Together, this ip1.front == '[' iff ip1.back == ']', which can be implemented as XNOR (NOT XOR)
RETURN_ON_ERROR(((ip.front() == '[') ^ (ip.back() == ']')), "unclosed brackets.");

if (ip.front() == '[') {
// shortest possible IPv6 is '::1' (loopback)
RETURN_ON_ERROR(ip.length() <= 2, "IPv6 host name is too short");

flag->host = ip.substr(1, ip.length() - 2);
VLOG(1) << "received IP of type IPv6: " << flag->host;
} else {
flag->host = ip;
VLOG(1) << "received IP of type IPv4 (or a host): " << flag->host;
}

VLOG(1) << "--replicaof: Received " << flag->host << " : " << flag->port;
return true;
#undef RETURN_ON_ERROR
}

std::string AbslUnparseFlag(const ReplicaOfFlag& flag) {
return (flag.has_value()) ? absl::StrCat(flag.host, ":", flag.port) : "";
}

namespace dfly {

namespace fs = std::filesystem;
Expand Down Expand Up @@ -452,6 +517,10 @@ void ValidateServerTlsFlags() {
}
}

bool IsReplicatingNoOne(string_view host, string_view port) {
return absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port, "one");
}

} // namespace

std::optional<SnapshotSpec> ParseSaveSchedule(string_view time) {
Expand Down Expand Up @@ -593,6 +662,13 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
stats_caching_task_ =
pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); });

// check for '--replicaof' before loading anything
if (ReplicaOfFlag flag = GetFlag(FLAGS_replicaof); flag.has_value()) {
service_.proactor_pool().GetNextProactor()->Await(
[this, &flag]() { this->Replicate(flag.host, flag.port); });
return; // DONT load any snapshots
}

string flag_dir = GetFlag(FLAGS_dir);
if (IsCloudPath(flag_dir)) {
aws_ = make_unique<cloud::AWS>("s3");
Expand Down Expand Up @@ -1973,12 +2049,10 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}

void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
std::string_view host = ArgS(args, 0);
std::string_view port_s = ArgS(args, 1);
void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, ConnectionContext* cntx,
ActionOnConnectionFail on_err) {
auto& pool = service_.proactor_pool();

LOG(INFO) << "Replicating " << host << ":" << port_s;
LOG(INFO) << "Replicating " << host << ":" << port_sv;

// We lock to protect global state changes that we perform during the replication setup:
// The replica_ pointer, GlobalState, and the DB itself (we do a flushall txn before syncing).
Expand All @@ -1993,7 +2067,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Acquire replica lock";
unique_lock lk(replicaof_mu_);

if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) {
if (IsReplicatingNoOne(host, port_sv)) {
if (!ServerState::tlocal()->is_master) {
auto repl_ptr = replica_;
CHECK(repl_ptr);
Expand All @@ -2004,12 +2078,15 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
replica_.reset();
}

CHECK(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE) == GlobalState::ACTIVE)
<< "Server is set to replica no one, yet state is not active!";

return (*cntx)->SendOk();
}

uint32_t port;

if (!absl::SimpleAtoi(port_s, &port) || port < 1 || port > 65535) {
if (!absl::SimpleAtoi(port_sv, &port) || port < 1 || port > 65535) {
(*cntx)->SendError(kInvalidIntErr);
return;
}
Expand All @@ -2031,20 +2108,20 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
return;
}

// Flushing all the data after we marked this instance as replica.
Transaction* transaction = cntx->transaction;
transaction->Schedule();

auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};
transaction->Execute(std::move(cb), true);

// Replica sends response in either case. No need to send response in this function.
// It's a bit confusing but simpler.
lk.unlock();
error_code ec = new_replica->Start(cntx);
error_code ec{};

switch (on_err) {
case ActionOnConnectionFail::kReturnOnError:
ec = new_replica->Start(cntx);
break;
case ActionOnConnectionFail::kContinueReplication: // set DF to replicate, and forget about it
new_replica->EnableReplication(cntx);
break;
};

VLOG(1) << "Acquire replica lock";
lk.lock();

Expand All @@ -2064,6 +2141,27 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
}
}

void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
string_view host = ArgS(args, 0);
string_view port = ArgS(args, 1);

// don't flush if input is NO ONE
if (!IsReplicatingNoOne(host, port))
Drakarys(cntx->transaction, DbSlice::kDbAll);

ReplicaOfInternal(host, port, cntx, ActionOnConnectionFail::kReturnOnError);
}

void ServerFamily::Replicate(string_view host, string_view port) {
io::NullSink sink;
ConnectionContext ctxt{&sink, nullptr};

// we don't flush the database as the context is null
talbii marked this conversation as resolved.
Show resolved Hide resolved
// (and also because there is nothing to flush)

ReplicaOfInternal(host, port, &ctxt, ActionOnConnectionFail::kContinueReplication);
}

void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Starting take over";
VLOG(1) << "Acquire replica lock";
Expand Down
15 changes: 14 additions & 1 deletion src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ class ServerFamily {
bool AwaitDispatches(absl::Duration timeout,
const std::function<bool(util::Connection*)>& filter);

// Sets the server to replicate another instance. Does not flush the database beforehand!
void Replicate(std::string_view host, std::string_view port);

private:
uint32_t shard_count() const {
return shard_set->size();
Expand Down Expand Up @@ -192,7 +195,17 @@ class ServerFamily {

void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);

// Returns the number of loaded keys if successfull.
enum ActionOnConnectionFail {
kReturnOnError, // if we fail to connect to master, return to err
kContinueReplication, // continue attempting to connect to master, regardless of initial
// failure
};

// REPLICAOF implementation. See arguments above
void ReplicaOfInternal(std::string_view host, std::string_view port, ConnectionContext* cntx,
ActionOnConnectionFail on_error);

// Returns the number of loaded keys if successful.
io::Result<size_t> LoadRdb(const std::string& rdb_file);

void SnapshotScheduling();
Expand Down
Loading
Loading