Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce --replicaof flag #1583

Merged
merged 22 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/facade/reply_builder.cc
talbii marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ namespace facade {

namespace {

using namespace constants;

inline iovec constexpr IoVec(std::string_view s) {
iovec r{const_cast<char*>(s.data()), s.size()};
return r;
}

constexpr char kCRLF[] = "\r\n";
constexpr char kErrPref[] = "-ERR ";
constexpr char kSimplePref[] = "+";

constexpr unsigned kConvFlags =
DoubleToStringConverter::UNIQUE_ZERO | DoubleToStringConverter::EMIT_POSITIVE_EXPONENT_SIGN;

Expand Down
6 changes: 6 additions & 0 deletions src/facade/reply_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@
#include "io/io.h"

namespace facade {
// some constants, used in reply_builder.cc; can be used elsewhere.
namespace constants {
talbii marked this conversation as resolved.
Show resolved Hide resolved
constexpr char kCRLF[] = "\r\n";
constexpr char kErrPref[] = "-ERR ";
constexpr char kSimplePref[] = "+";
talbii marked this conversation as resolved.
Show resolved Hide resolved
}; // namespace constants

// Reply mode allows filtering replies.
enum class ReplyMode {
Expand Down
46 changes: 46 additions & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ ABSL_FLAG(bool, admin_nopass, false,
"If set, would enable open admin access to console on the assigned port, without auth "
"token needed.");

ABSL_FLAG(string, replicaof, "",
"Empty by default. If not empty - specifies an IP and a port which "
talbii marked this conversation as resolved.
Show resolved Hide resolved
"point to a running Dragonfly instance. The current Dragonfly "
"instance will replicate that machine. "
"Format should be <IPv4>:<PORT> or [<IPv6>]:<PORT>.");

ABSL_FLAG(bool, continue_on_replication_fail, false,
"If set to true, failing to start replication using the "
"'--replicaof' flag would not exit Dragonfly. Disabled by default.");
talbii marked this conversation as resolved.
Show resolved Hide resolved

ABSL_FLAG(MaxMemoryFlag, maxmemory, MaxMemoryFlag{},
"Limit on maximum-memory that is used by the database. "
"0 - means the program will automatically determine its maximum memory usage. "
Expand Down Expand Up @@ -590,6 +600,42 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
GenericFamily::Init(&pp_);
server_family_.Init(acceptor, std::move(listeners), &cluster_family_);

if (string flag = GetFlag(FLAGS_replicaof); !flag.empty()) {
talbii marked this conversation as resolved.
Show resolved Hide resolved
string ip, port;
{
auto pos = flag.find_last_of(':');
CHECK(pos != string::npos) << "Invalid format for --replicaof: missing ':'.";
talbii marked this conversation as resolved.
Show resolved Hide resolved

string ip1 = flag.substr(0, pos);
talbii marked this conversation as resolved.
Show resolved Hide resolved
port = flag.substr(pos + 1);

CHECK(!ip1.empty() && !port.empty())
<< "Invalid format for --replicaof: IP or port are empty.";

// For IPv6
CHECK(!((ip1.front() == '[') ^ (ip1.back() == ']')))
<< "Invalid format for --replicaof: unclosed brackets.";
talbii marked this conversation as resolved.
Show resolved Hide resolved

if (ip1.front() == '[') {
ip = ip1.substr(1, ip1.length() - 2);
talbii marked this conversation as resolved.
Show resolved Hide resolved
LOG(INFO) << "Received IPv6: " << ip;
talbii marked this conversation as resolved.
Show resolved Hide resolved
} else {
ip = move(ip1);
LOG(INFO) << "Received IPv4: " << ip;
}
}

LOG(INFO) << "Replicating instance at " << ip << ":" << port;

bool success = false;
pp_[0].Await([this, ip = move(ip), port = move(port), &success]() {
talbii marked this conversation as resolved.
Show resolved Hide resolved
this->server_family_.Replicate(ip, port, &success);
});

if (!success && !GetFlag(FLAGS_continue_on_replication_fail))
exit(EXIT_FAILURE);
}

ChannelStore* cs = new ChannelStore{};
pp_.Await(
[cs](uint32_t index, ProactorBase* pb) { ServerState::tlocal()->UpdateChannelStore(cs); });
Expand Down
52 changes: 43 additions & 9 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "facade/reply_builder.h"
#include "io/file_util.h"
#include "io/proc_reader.h"
#include "server/command_registry.h"
Expand Down Expand Up @@ -1888,7 +1889,8 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}

void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::ReplicaOfGeneric(CmdArgList args, ConnectionContext* cntx,
bool flush_transactions) {
talbii marked this conversation as resolved.
Show resolved Hide resolved
std::string_view host = ArgS(args, 0);
std::string_view port_s = ArgS(args, 1);
auto& pool = service_.proactor_pool();
Expand Down Expand Up @@ -1946,15 +1948,17 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
return;
}

// Flushing all the data after we marked this instance as replica.
Transaction* transaction = cntx->transaction;
transaction->Schedule();
if (flush_transactions) {
// Flushing all the data after we marked this instance as replica.
Transaction* transaction = cntx->transaction;
transaction->Schedule();

auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};
transaction->Execute(std::move(cb), true);
auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};
transaction->Execute(std::move(cb), true);
}
talbii marked this conversation as resolved.
Show resolved Hide resolved

// Replica sends response in either case. No need to send response in this function.
// It's a bit confusing but simpler.
Expand All @@ -1976,6 +1980,36 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
}
}

void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
ReplicaOfGeneric(args, cntx, true);
talbii marked this conversation as resolved.
Show resolved Hide resolved
}

void ServerFamily::Replicate(string ip_s, string port_s, bool* success) {
io::StringSink sink;
ConnectionContext ctxt{&sink, nullptr};

vector<MutableSlice> vargs{
{ip_s.data(), ip_s.size()},
{port_s.data(), port_s.size()},
};

CmdArgList args{vargs.data(), vargs.size()};

// we don't flush transacitons as the context is null
talbii marked this conversation as resolved.
Show resolved Hide resolved
// (and also because there are none to flush)
ReplicaOfGeneric(args, &ctxt, false);

// Check whether replication succeeded
const string& st = sink.str();
if (st.starts_with(facade::constants::kSimplePref)) {
LOG(INFO) << "Replication success!";
*success = true;
} else {
LOG(ERROR) << "Replication failure!";
*success = false;
}
}

void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Starting take over";
VLOG(1) << "Acquire replica lock";
Expand Down
4 changes: 4 additions & 0 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ class ServerFamily {
bool AwaitDispatches(absl::Duration timeout,
const std::function<bool(util::Connection*)>& filter);

// Sets the server to replicate another instance. Used with --replicaof flag.
talbii marked this conversation as resolved.
Show resolved Hide resolved
void Replicate(std::string ip, std::string port, bool* success);

private:
uint32_t shard_count() const {
return shard_set->size();
Expand Down Expand Up @@ -190,6 +193,7 @@ class ServerFamily {
void Sync(CmdArgList args, ConnectionContext* cntx);

void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);
void ReplicaOfGeneric(CmdArgList args, ConnectionContext* cntx, bool flush_transactions);

// Returns the number of loaded keys if successfull.
io::Result<size_t> LoadRdb(const std::string& rdb_file);
Expand Down
Loading