Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce --replicaof flag #1583

Merged
merged 22 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/facade/reply_builder.cc
talbii marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ namespace facade {

namespace {

using namespace constants;

inline iovec constexpr IoVec(std::string_view s) {
iovec r{const_cast<char*>(s.data()), s.size()};
return r;
}

constexpr char kCRLF[] = "\r\n";
constexpr char kErrPref[] = "-ERR ";
constexpr char kSimplePref[] = "+";

constexpr unsigned kConvFlags =
DoubleToStringConverter::UNIQUE_ZERO | DoubleToStringConverter::EMIT_POSITIVE_EXPONENT_SIGN;

Expand Down
6 changes: 6 additions & 0 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
#include "io/io.h"

namespace facade {
// some constants, used in reply_builder.cc; can be used elsewhere.
namespace constants {
talbii marked this conversation as resolved.
Show resolved Hide resolved
constexpr std::string_view kCRLF = "\r\n";
constexpr std::string_view kErrPref = "-ERR ";
constexpr std::string_view kSimplePref = "+";
}; // namespace constants

// Reply mode allows filtering replies.
enum class ReplyMode {
Expand Down
12 changes: 12 additions & 0 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,18 @@ error_code Replica::Start(ConnectionContext* cntx) {
return {};
} // namespace dfly

error_code Replica::EnableReplication(ConnectionContext* cntx) {
talbii marked this conversation as resolved.
Show resolved Hide resolved
VLOG(1) << "Starting replication";
talbii marked this conversation as resolved.
Show resolved Hide resolved
ProactorBase* mythread = ProactorBase::me();
CHECK(mythread);

talbii marked this conversation as resolved.
Show resolved Hide resolved
state_mask_.store(R_ENABLED); // set replica state to enabled
sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); // call replication fiber

(*cntx)->SendOk();
return {};
}

void Replica::Stop() {
VLOG(1) << "Stopping replication";
// Stops the loop in MainReplicationFb.
Expand Down
5 changes: 5 additions & 0 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class Replica : ProtocolClient {
// false if it has failed.
std::error_code Start(ConnectionContext* cntx);

// Sets the server state to have replication enabled.
// It is like Start(), but does not attempt to establish
// a connection right-away, but instead lets MainReplicationFb do the work.
std::error_code EnableReplication(ConnectionContext* cntx);

void Stop(); // thread-safe

void Pause(bool pause);
Expand Down
136 changes: 126 additions & 10 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "facade/reply_builder.h"
#include "io/file_util.h"
#include "io/proc_reader.h"
#include "server/command_registry.h"
Expand Down Expand Up @@ -52,6 +53,18 @@ extern "C" {

using namespace std;

struct ReplicaOfFlag {
string host;
string port;
talbii marked this conversation as resolved.
Show resolved Hide resolved

bool has_value() const {
return !host.empty() && !port.empty();
}
};

static bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err);
static std::string AbslUnparseFlag(const ReplicaOfFlag& flag);
talbii marked this conversation as resolved.
Show resolved Hide resolved

ABSL_FLAG(string, dir, "", "working directory");
ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the DB");
ABSL_FLAG(string, requirepass, "",
Expand All @@ -64,11 +77,64 @@ ABSL_FLAG(bool, df_snapshot_format, true,
ABSL_FLAG(int, epoll_file_threads, 0,
"thread size for file workers when running in epoll mode, default is hardware concurrent "
"threads");
ABSL_FLAG(ReplicaOfFlag, replicaof, ReplicaOfFlag{},
"Specifies a host and port which point to a target master "
"to replicate. "
"Format should be <IPv4>:<PORT> or host:<PORT> or [<IPv6>]:<PORT>");

ABSL_DECLARE_FLAG(uint32_t, port);
ABSL_DECLARE_FLAG(bool, cache_mode);
ABSL_DECLARE_FLAG(uint32_t, hz);

bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
#define RETURN_ERROR(m) \
talbii marked this conversation as resolved.
Show resolved Hide resolved
do { \
*err = m; \
return false; \
} while (0)
talbii marked this conversation as resolved.
Show resolved Hide resolved

if (in.empty()) {
*flag = ReplicaOfFlag{};
return true;
talbii marked this conversation as resolved.
Show resolved Hide resolved
}

auto pos = in.find_last_of(':');
if (pos == string::npos)
talbii marked this conversation as resolved.
Show resolved Hide resolved
RETURN_ERROR("missing ':'.");

string_view ip = in.substr(0, pos);
flag->port = in.substr(pos + 1);

if (ip.empty() || flag->port.empty())
RETURN_ERROR("IP/host or port are empty.");

// For IPv6: ip1.front == '[' AND ip1.back == ']'
// For IPv4: ip1.front != '[' AND ip1.back != ']'
// Together, this ip1.front == '[' iff ip1.back == ']', which can be implemented as XNOR (NOT XOR)
if ((ip.front() == '[') ^ (ip.back() == ']'))
RETURN_ERROR("unclosed brackets.");

if (ip.front() == '[') {
if (ip.length() <= 2)
RETURN_ERROR("IPv6 host name is too short.");

flag->host = ip.substr(1, ip.length() - 2);
VLOG(1) << "received IP of type IPv6: " << flag->host;
} else {
flag->host = ip;
VLOG(1) << "received IP of type IPv4 (or a host): " << flag->host;
}

LOG(INFO) << flag->host << " : " << flag->port;

return true;
#undef RETURN_ERROR
}

std::string AbslUnparseFlag(const ReplicaOfFlag& flag) {
return (flag.has_value()) ? absl::StrCat(flag.host, ":", flag.port) : "";
}

namespace dfly {

namespace fs = std::filesystem;
Expand Down Expand Up @@ -526,6 +592,12 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
stats_caching_task_ =
pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); });

// check for '--replicaof' before loading anything
if (ReplicaOfFlag flag = GetFlag(FLAGS_replicaof); flag.has_value()) {
service_.proactor_pool().GetNextProactor()->Await(
[this, &flag]() { this->Replicate(flag.host, flag.port, nullptr); });
}

string flag_dir = GetFlag(FLAGS_dir);
if (IsCloudPath(flag_dir)) {
aws_ = make_unique<cloud::AWS>("s3");
Expand Down Expand Up @@ -1888,7 +1960,8 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}

void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, bool flush_db,
bool return_on_connection_fail) {
std::string_view host = ArgS(args, 0);
std::string_view port_s = ArgS(args, 1);
auto& pool = service_.proactor_pool();
Expand Down Expand Up @@ -1946,20 +2019,28 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
return;
}

// Flushing all the data after we marked this instance as replica.
Transaction* transaction = cntx->transaction;
transaction->Schedule();
if (flush_db) {
// Flushing all the data after we marked this instance as replica.
Transaction* transaction = cntx->transaction;
transaction->Schedule();

auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};
transaction->Execute(std::move(cb), true);
auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};
transaction->Execute(std::move(cb), true);
}

// Replica sends response in either case. No need to send response in this function.
// It's a bit confusing but simpler.
lk.unlock();
error_code ec = new_replica->Start(cntx);
error_code ec{};

if (return_on_connection_fail) // try to connect, exit on error
ec = new_replica->Start(cntx);
else // set DF to replicate, and forget about it
ec = new_replica->EnableReplication(cntx);

VLOG(1) << "Acquire replica lock";
lk.lock();

Expand All @@ -1976,6 +2057,41 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
}
}

void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
ReplicaOfInternal(args, cntx, true, true);
}

void ServerFamily::Replicate(string& host, string& port, bool* success) {
talbii marked this conversation as resolved.
Show resolved Hide resolved
io::StringSink sink;
ConnectionContext ctxt{&sink, nullptr};

vector<MutableSlice> vargs{
{host.data(), host.size()},
{port.data(), port.size()},
};

CmdArgList args{vargs.data(), vargs.size()};

// we don't flush the database as the context is null
talbii marked this conversation as resolved.
Show resolved Hide resolved
// (and also because there is nothing to flush)
ReplicaOfInternal(args, &ctxt, false, false);
talbii marked this conversation as resolved.
Show resolved Hide resolved

// Check whether replication succeeded

if (success != nullptr) {
using namespace facade::constants;
string_view sv = sink.str();
if (absl::StartsWith(sv, kSimplePref)) {
LOG(INFO) << "Replication success!";
*success = true;
} else {
LOG(ERROR) << "Replication failure!";
LOG(ERROR) << "Error: " << sv.substr(kErrPref.length(), sv.size() - kErrPref.length());
*success = false;
}
}
}

void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Starting take over";
VLOG(1) << "Acquire replica lock";
Expand Down
15 changes: 14 additions & 1 deletion src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ class ServerFamily {
bool AwaitDispatches(absl::Duration timeout,
const std::function<bool(util::Connection*)>& filter);

// Sets the server to replicate another instance. Does not flush the database beforehand!
// Sets 'success' to true/false depending on whether the replication connected successfully or
// not. If 'success' is null, it is ignored.
// ! Note: the method does not modify 'host', 'port' but requires them to be references.
talbii marked this conversation as resolved.
Show resolved Hide resolved
void Replicate(std::string& port, std::string& host, bool* success);

private:
uint32_t shard_count() const {
return shard_set->size();
Expand Down Expand Up @@ -191,7 +197,14 @@ class ServerFamily {

void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);

// Returns the number of loaded keys if successfull.
// REPLICAOF implementation
// If flush_db == true, flushes the database before replicating
// If return_on_connection_fail == true, returns (and does not replicate) on failure to connect to
// master.
void ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, bool flush_db,
bool return_on_connection_fail);

// Returns the number of loaded keys if successful.
io::Result<size_t> LoadRdb(const std::string& rdb_file);

void SnapshotScheduling(const SnapshotSpec& time);
Expand Down