Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

introduce --replicaof flag #1583

Merged
merged 22 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/facade/reply_builder.cc
talbii marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ namespace facade {

namespace {

constexpr std::string_view kCRLF = "\r\n";
constexpr std::string_view kErrPref = "-ERR ";
constexpr std::string_view kSimplePref = "+";

inline iovec constexpr IoVec(std::string_view s) {
iovec r{const_cast<char*>(s.data()), s.size()};
return r;
}

constexpr char kCRLF[] = "\r\n";
constexpr char kErrPref[] = "-ERR ";
constexpr char kSimplePref[] = "+";

constexpr unsigned kConvFlags =
DoubleToStringConverter::UNIQUE_ZERO | DoubleToStringConverter::EMIT_POSITIVE_EXPONENT_SIGN;

Expand Down
9 changes: 9 additions & 0 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ error_code Replica::Start(ConnectionContext* cntx) {
return {};
} // namespace dfly

void Replica::EnableReplication(ConnectionContext* cntx) {
VLOG(1) << "Enabling replication";

state_mask_.store(R_ENABLED); // set replica state to enabled
sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); // call replication fiber

(*cntx)->SendOk();
}

void Replica::Stop() {
VLOG(1) << "Stopping replication";
// Stops the loop in MainReplicationFb.
Expand Down
5 changes: 5 additions & 0 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class Replica : ProtocolClient {
// false if it has failed.
std::error_code Start(ConnectionContext* cntx);

// Sets the server state to have replication enabled.
// It is like Start(), but does not attempt to establish
// a connection right-away, but instead lets MainReplicationFb do the work.
void EnableReplication(ConnectionContext* cntx);

void Stop(); // thread-safe

void Pause(bool pause);
Expand Down
141 changes: 124 additions & 17 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ extern "C" {
#include "base/logging.h"
#include "croncpp.h" // cron::cronexpr
#include "facade/dragonfly_connection.h"
#include "facade/reply_builder.h"
#include "io/file_util.h"
#include "io/proc_reader.h"
#include "server/command_registry.h"
Expand Down Expand Up @@ -54,6 +55,18 @@ extern "C" {

using namespace std;

struct ReplicaOfFlag {
string host;
string port;
talbii marked this conversation as resolved.
Show resolved Hide resolved

bool has_value() const {
return !host.empty() && !port.empty();
}
};

static bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err);
static std::string AbslUnparseFlag(const ReplicaOfFlag& flag);
talbii marked this conversation as resolved.
Show resolved Hide resolved

ABSL_FLAG(string, dir, "", "working directory");
ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the DB");
ABSL_FLAG(string, requirepass, "",
Expand All @@ -68,6 +81,10 @@ ABSL_FLAG(bool, df_snapshot_format, true,
ABSL_FLAG(int, epoll_file_threads, 0,
"thread size for file workers when running in epoll mode, default is hardware concurrent "
"threads");
ABSL_FLAG(ReplicaOfFlag, replicaof, ReplicaOfFlag{},
"Specifies a host and port which point to a target master "
"to replicate. "
"Format should be <IPv4>:<PORT> or host:<PORT> or [<IPv6>]:<PORT>");

ABSL_DECLARE_FLAG(uint32_t, port);
ABSL_DECLARE_FLAG(bool, cache_mode);
Expand All @@ -76,6 +93,55 @@ ABSL_DECLARE_FLAG(bool, tls);
ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);

bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
#define RETURN_ON_ERROR(m) \
do { \
*err = m; \
LOG(WARNING) << "Error in parsing arguments for --replicaof: " << m; \
return false; \
} while (0)
talbii marked this conversation as resolved.
Show resolved Hide resolved

if (in.empty()) { // on empty flag "parse" nothing. If we return false then DF exists.
*flag = ReplicaOfFlag{};
return true;
talbii marked this conversation as resolved.
Show resolved Hide resolved
}

auto pos = in.find_last_of(':');
if (pos == string::npos)
talbii marked this conversation as resolved.
Show resolved Hide resolved
RETURN_ON_ERROR("missing ':'.");

string_view ip = in.substr(0, pos);
flag->port = in.substr(pos + 1);

if (ip.empty() || flag->port.empty())
RETURN_ON_ERROR("IP/host or port are empty.");

// For IPv6: ip1.front == '[' AND ip1.back == ']'
// For IPv4: ip1.front != '[' AND ip1.back != ']'
// Together, this ip1.front == '[' iff ip1.back == ']', which can be implemented as XNOR (NOT XOR)
if ((ip.front() == '[') ^ (ip.back() == ']'))
RETURN_ON_ERROR("unclosed brackets.");

if (ip.front() == '[') {
if (ip.length() <= 2)
RETURN_ON_ERROR("IPv6 host name is too short.");

flag->host = ip.substr(1, ip.length() - 2);
VLOG(1) << "received IP of type IPv6: " << flag->host;
} else {
flag->host = ip;
VLOG(1) << "received IP of type IPv4 (or a host): " << flag->host;
}

VLOG(1) << "--replicaof: Received " << flag->host << " : " << flag->port;
return true;
#undef RETURN_ON_ERROR
}

std::string AbslUnparseFlag(const ReplicaOfFlag& flag) {
return (flag.has_value()) ? absl::StrCat(flag.host, ":", flag.port) : "";
}

namespace dfly {

namespace fs = std::filesystem;
Expand Down Expand Up @@ -426,6 +492,17 @@ void SlowLog(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType);
}

void FlushEntireDb(Transaction* transaction) {
talbii marked this conversation as resolved.
Show resolved Hide resolved
transaction->Schedule();

auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};

transaction->Execute(std::move(cb), true);
}

// Check that if TLS is used at least one form of client authentication is
// enabled. That means either using a password or giving a root
// certificate for authenticating client certificates which will
Expand Down Expand Up @@ -593,6 +670,13 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
stats_caching_task_ =
pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); });

// check for '--replicaof' before loading anything
if (ReplicaOfFlag flag = GetFlag(FLAGS_replicaof); flag.has_value()) {
service_.proactor_pool().GetNextProactor()->Await(
[this, &flag]() { this->Replicate(flag.host, flag.port); });
return; // DONT load any snapshots
}

string flag_dir = GetFlag(FLAGS_dir);
if (IsCloudPath(flag_dir)) {
aws_ = make_unique<cloud::AWS>("s3");
Expand Down Expand Up @@ -1973,12 +2057,10 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}

void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
std::string_view host = ArgS(args, 0);
std::string_view port_s = ArgS(args, 1);
void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, ConnectionContext* cntx,
ShouldFlushDb should_flush_db, ActionOnConnectionFail on_err) {
auto& pool = service_.proactor_pool();

LOG(INFO) << "Replicating " << host << ":" << port_s;
LOG(INFO) << "Replicating " << host << ":" << port_sv;

// We lock to protect global state changes that we perform during the replication setup:
// The replica_ pointer, GlobalState, and the DB itself (we do a flushall txn before syncing).
Expand All @@ -1993,7 +2075,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Acquire replica lock";
unique_lock lk(replicaof_mu_);

if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) {
if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_sv, "one")) {
if (!ServerState::tlocal()->is_master) {
auto repl_ptr = replica_;
CHECK(repl_ptr);
Expand All @@ -2009,7 +2091,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {

uint32_t port;

if (!absl::SimpleAtoi(port_s, &port) || port < 1 || port > 65535) {
if (!absl::SimpleAtoi(port_sv, &port) || port < 1 || port > 65535) {
(*cntx)->SendError(kInvalidIntErr);
return;
}
Expand All @@ -2031,20 +2113,23 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
return;
}

// Flushing all the data after we marked this instance as replica.
Transaction* transaction = cntx->transaction;
transaction->Schedule();

auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};
transaction->Execute(std::move(cb), true);
if (should_flush_db == ShouldFlushDb::kFlush)
FlushEntireDb(cntx->transaction);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this be a problem, in that after this function finishes the transaction will be concluded afterwards? Or is it ok to Schedule() an already concluded transaction?

Copy link
Contributor Author

@talbii talbii Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that I am not too sure about, but I'm not sure I understand your concern? The logic of FlushEntireDb (or FLUSHDB or Drakarys..) schedules the transaction first and then executes it, which is fine.

Are you worried that replication continues past the already-executed transaction?

Maybe @romange could chime in, I saw that he originally wrote this code 🙂

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried that transactions can't be used more than once, in which case maybe something unintended can happen after we flush the DB, when we try to use the tx again


// Replica sends response in either case. No need to send response in this function.
// It's a bit confusing but simpler.
lk.unlock();
error_code ec = new_replica->Start(cntx);
error_code ec{};

switch (on_err) {
case ActionOnConnectionFail::kReturnOnError:
ec = new_replica->Start(cntx);
break;
case ActionOnConnectionFail::kContinueReplication: // set DF to replicate, and forget about it
new_replica->EnableReplication(cntx);
break;
};

VLOG(1) << "Acquire replica lock";
lk.lock();

Expand All @@ -2064,6 +2149,28 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
}
}

void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
string_view host = ArgS(args, 0);
string_view port = ArgS(args, 1);

ReplicaOfInternal(host, port, cntx, ShouldFlushDb::kFlush,
talbii marked this conversation as resolved.
Show resolved Hide resolved
ActionOnConnectionFail::kReturnOnError);
}

void ServerFamily::Replicate(string_view host, string_view port) {
io::NullSink sink;
ConnectionContext ctxt{&sink, nullptr};

// we don't flush the database as the context is null
talbii marked this conversation as resolved.
Show resolved Hide resolved
// (and also because there is nothing to flush)

ReplicaOfInternal(host, port, &ctxt, ShouldFlushDb::kDontFlush,
ActionOnConnectionFail::kContinueReplication);

CHECK(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE) == GlobalState::ACTIVE)
<< "error in switching state from LOADING to ACTIVE when replicating via --replicaof.";
}

void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Starting take over";
VLOG(1) << "Acquire replica lock";
Expand Down
20 changes: 19 additions & 1 deletion src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,9 @@ class ServerFamily {
bool AwaitDispatches(absl::Duration timeout,
const std::function<bool(util::Connection*)>& filter);

// Sets the server to replicate another instance. Does not flush the database beforehand!
void Replicate(std::string_view host, std::string_view port);

private:
uint32_t shard_count() const {
return shard_set->size();
Expand Down Expand Up @@ -192,7 +195,22 @@ class ServerFamily {

void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);

// Returns the number of loaded keys if successfull.
enum ShouldFlushDb {
kFlush, // flush database before replicating master
kDontFlush,
};

enum ActionOnConnectionFail {
kReturnOnError, // if we fail to connect to master, return to err
kContinueReplication, // continue attempting to connect to master, regardless of initial
// failure
};

// REPLICAOF implementation. See arguments above
void ReplicaOfInternal(std::string_view host, std::string_view port, ConnectionContext* cntx,
ShouldFlushDb should_flush_db, ActionOnConnectionFail on_error);

// Returns the number of loaded keys if successful.
io::Result<size_t> LoadRdb(const std::string& rdb_file);

void SnapshotScheduling();
Expand Down
80 changes: 80 additions & 0 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1371,3 +1371,83 @@ async def test_tls_replication(

await c_replica.close()
await c_master.close()


WAIT_FOR_REPLICATION = 2.0


@pytest.mark.asyncio
async def test_replicaof_flag(df_local_factory):
# tests --replicaof works under normal conditions
master = df_local_factory.create(
port=BASE_PORT,
proactor_threads=2,
)

# set up master
master.start()
c_master = aioredis.Redis(port=master.port)
await c_master.set("KEY", b"VALUE")
db_size = await c_master.dbsize()
assert 1 == db_size

replica = df_local_factory.create(
port=BASE_PORT + 1,
proactor_threads=2,
replicaof=f"localhost:{BASE_PORT}", # start to replicate master
)

# set up replica. check that it is replicating
replica.start()
time.sleep(WAIT_FOR_REPLICATION) # give it time to startup, replication takes longer

c_replica = aioredis.Redis(port=replica.port)

dbsize = await c_replica.dbsize()
assert 1 == dbsize

val = await c_replica.get("KEY")
assert b"VALUE" == val


@pytest.mark.asyncio
async def test_replicaof_flag_replication_waits(df_local_factory):
# tests --replicaof works when we launch replication before the master
replica = df_local_factory.create(
port=BASE_PORT + 1,
proactor_threads=2,
replicaof=f"localhost:{BASE_PORT}", # start to replicate master
)

# set up replica first
replica.start()
time.sleep(WAIT_FOR_REPLICATION)
talbii marked this conversation as resolved.
Show resolved Hide resolved
c_replica = aioredis.Redis(port=replica.port)

# check that it is in replica mode, yet status is down
info = await c_replica.info("replication")
assert info["role"] == "replica"
assert info["master_host"] == "localhost"
assert info["master_port"] == BASE_PORT
assert info["master_link_status"] == "down"

# set up master
master = df_local_factory.create(
port=BASE_PORT,
proactor_threads=2,
)

master.start()
c_master = aioredis.Redis(port=master.port)
await c_master.set("KEY", b"VALUE")
db_size = await c_master.dbsize()
assert 1 == db_size

# check that replication works now
time.sleep(WAIT_FOR_REPLICATION)
talbii marked this conversation as resolved.
Show resolved Hide resolved

dbsize = await c_replica.dbsize()
assert 1 == dbsize

val = await c_replica.get("KEY")
assert b"VALUE" == val
Loading