Skip to content

Commit

Permalink
Revert "introduce --replicaof flag (#1583)"
Browse files Browse the repository at this point in the history
This reverts commit 16c2353.
  • Loading branch information
talbii committed Aug 9, 2023
1 parent 16c2353 commit b96d5c4
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 291 deletions.
30 changes: 4 additions & 26 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,6 @@ error_code Replica::Start(ConnectionContext* cntx) {
return {};
} // namespace dfly

void Replica::EnableReplication(ConnectionContext* cntx) {
VLOG(1) << "Enabling replication";

state_mask_.store(R_ENABLED); // set replica state to enabled
sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); // call replication fiber

(*cntx)->SendOk();
}

void Replica::Stop() {
VLOG(1) << "Stopping replication";
// Stops the loop in MainReplicationFb.
Expand Down Expand Up @@ -1019,7 +1010,9 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d
}

Replica::Info Replica::GetInfo() const {
auto f = [this]() {
CHECK(Sock());

return Proactor()->AwaitBrief([this] {
auto last_io_time = LastIoTime();
for (const auto& flow : shard_flows_) { // Get last io time from all sub flows.
last_io_time = std::max(last_io_time, flow->LastIoTime());
Expand All @@ -1033,22 +1026,7 @@ Replica::Info Replica::GetInfo() const {
res.full_sync_done = (state_mask_.load() & R_SYNC_OK);
res.master_last_io_sec = (ProactorBase::GetMonotonicTimeNs() - last_io_time) / 1000000000UL;
return res;
};

if (Sock())
return Proactor()->AwaitBrief(f);
else {
/**
* when this branch happens: there is a very short grace period
* where Sock() is not initialized, yet the server can
* receive ROLE/INFO commands. That period happens when launching
* an instance with '--replicaof' and then immediately
* sending a command.
*
* In that instance, we have to run f() on the current fiber.
*/
return f();
}
});
}

std::vector<uint64_t> Replica::GetReplicaOffset() const {
Expand Down
5 changes: 0 additions & 5 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ class Replica : ProtocolClient {
// false if it has failed.
std::error_code Start(ConnectionContext* cntx);

// Sets the server state to have replication enabled.
// It is like Start(), but does not attempt to establish
// a connection right-away, but instead lets MainReplicationFb do the work.
void EnableReplication(ConnectionContext* cntx);

void Stop(); // thread-safe

void Pause(bool pause);
Expand Down
134 changes: 18 additions & 116 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ extern "C" {
#include "base/logging.h"
#include "croncpp.h" // cron::cronexpr
#include "facade/dragonfly_connection.h"
#include "facade/reply_builder.h"
#include "io/file_util.h"
#include "io/proc_reader.h"
#include "server/command_registry.h"
Expand Down Expand Up @@ -55,18 +54,6 @@ extern "C" {

using namespace std;

struct ReplicaOfFlag {
string host;
string port;

bool has_value() const {
return !host.empty() && !port.empty();
}
};

static bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err);
static std::string AbslUnparseFlag(const ReplicaOfFlag& flag);

ABSL_FLAG(string, dir, "", "working directory");
ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the DB");
ABSL_FLAG(string, requirepass, "",
Expand All @@ -81,10 +68,6 @@ ABSL_FLAG(bool, df_snapshot_format, true,
ABSL_FLAG(int, epoll_file_threads, 0,
"thread size for file workers when running in epoll mode, default is hardware concurrent "
"threads");
ABSL_FLAG(ReplicaOfFlag, replicaof, ReplicaOfFlag{},
"Specifies a host and port which point to a target master "
"to replicate. "
"Format should be <IPv4>:<PORT> or host:<PORT> or [<IPv6>]:<PORT>");

ABSL_DECLARE_FLAG(uint32_t, port);
ABSL_DECLARE_FLAG(bool, cache_mode);
Expand All @@ -93,54 +76,6 @@ ABSL_DECLARE_FLAG(bool, tls);
ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);

bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
#define RETURN_ON_ERROR(cond, m) \
do { \
if ((cond)) { \
*err = m; \
LOG(WARNING) << "Error in parsing arguments for --replicaof: " << m; \
return false; \
} \
} while (0)

if (in.empty()) { // on empty flag "parse" nothing. If we return false then DF exists.
*flag = ReplicaOfFlag{};
return true;
}

auto pos = in.find_last_of(':');
RETURN_ON_ERROR(pos == string::npos, "missing ':'.");

string_view ip = in.substr(0, pos);
flag->port = in.substr(pos + 1);

RETURN_ON_ERROR(ip.empty() || flag->port.empty(), "IP/host or port are empty.");

// For IPv6: ip1.front == '[' AND ip1.back == ']'
// For IPv4: ip1.front != '[' AND ip1.back != ']'
// Together, this ip1.front == '[' iff ip1.back == ']', which can be implemented as XNOR (NOT XOR)
RETURN_ON_ERROR(((ip.front() == '[') ^ (ip.back() == ']')), "unclosed brackets.");

if (ip.front() == '[') {
// shortest possible IPv6 is '::1' (loopback)
RETURN_ON_ERROR(ip.length() <= 2, "IPv6 host name is too short");

flag->host = ip.substr(1, ip.length() - 2);
VLOG(1) << "received IP of type IPv6: " << flag->host;
} else {
flag->host = ip;
VLOG(1) << "received IP of type IPv4 (or a host): " << flag->host;
}

VLOG(1) << "--replicaof: Received " << flag->host << " : " << flag->port;
return true;
#undef RETURN_ON_ERROR
}

std::string AbslUnparseFlag(const ReplicaOfFlag& flag) {
return (flag.has_value()) ? absl::StrCat(flag.host, ":", flag.port) : "";
}

namespace dfly {

namespace fs = std::filesystem;
Expand Down Expand Up @@ -518,10 +453,6 @@ void ValidateServerTlsFlags() {
}
}

bool IsReplicatingNoOne(string_view host, string_view port) {
return absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port, "one");
}

} // namespace

std::optional<SnapshotSpec> ParseSaveSchedule(string_view time) {
Expand Down Expand Up @@ -663,13 +594,6 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
stats_caching_task_ =
pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); });

// check for '--replicaof' before loading anything
if (ReplicaOfFlag flag = GetFlag(FLAGS_replicaof); flag.has_value()) {
service_.proactor_pool().GetNextProactor()->Await(
[this, &flag]() { this->Replicate(flag.host, flag.port); });
return; // DONT load any snapshots
}

string flag_dir = GetFlag(FLAGS_dir);
if (IsCloudPath(flag_dir)) {
aws_ = make_unique<cloud::AWS>("s3");
Expand Down Expand Up @@ -2076,10 +2000,12 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}

void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, ConnectionContext* cntx,
ActionOnConnectionFail on_err) {
void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
std::string_view host = ArgS(args, 0);
std::string_view port_s = ArgS(args, 1);
auto& pool = service_.proactor_pool();
LOG(INFO) << "Replicating " << host << ":" << port_sv;

LOG(INFO) << "Replicating " << host << ":" << port_s;

// We lock to protect global state changes that we perform during the replication setup:
// The replica_ pointer, GlobalState, and the DB itself (we do a flushall txn before syncing).
Expand All @@ -2094,7 +2020,7 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
VLOG(1) << "Acquire replica lock";
unique_lock lk(replicaof_mu_);

if (IsReplicatingNoOne(host, port_sv)) {
if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) {
if (!ServerState::tlocal()->is_master) {
auto repl_ptr = replica_;
CHECK(repl_ptr);
Expand All @@ -2105,15 +2031,12 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
replica_.reset();
}

CHECK(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE) == GlobalState::ACTIVE)
<< "Server is set to replica no one, yet state is not active!";

return (*cntx)->SendOk();
}

uint32_t port;

if (!absl::SimpleAtoi(port_sv, &port) || port < 1 || port > 65535) {
if (!absl::SimpleAtoi(port_s, &port) || port < 1 || port > 65535) {
(*cntx)->SendError(kInvalidIntErr);
return;
}
Expand All @@ -2135,20 +2058,20 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
return;
}

// Replica sends response in either case. No need to send response in this function.
// It's a bit confusing but simpler.
lk.unlock();
error_code ec{};
// Flushing all the data after we marked this instance as replica.
Transaction* transaction = cntx->transaction;
transaction->Schedule();

switch (on_err) {
case ActionOnConnectionFail::kReturnOnError:
ec = new_replica->Start(cntx);
break;
case ActionOnConnectionFail::kContinueReplication: // set DF to replicate, and forget about it
new_replica->EnableReplication(cntx);
break;
auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};
transaction->Execute(std::move(cb), true);

// Replica sends response in either case. No need to send response in this function.
// It's a bit confusing but simpler.
lk.unlock();
error_code ec = new_replica->Start(cntx);
VLOG(1) << "Acquire replica lock";
lk.lock();

Expand All @@ -2168,27 +2091,6 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
}
}

void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
string_view host = ArgS(args, 0);
string_view port = ArgS(args, 1);

// don't flush if input is NO ONE
if (!IsReplicatingNoOne(host, port))
Drakarys(cntx->transaction, DbSlice::kDbAll);

ReplicaOfInternal(host, port, cntx, ActionOnConnectionFail::kReturnOnError);
}

void ServerFamily::Replicate(string_view host, string_view port) {
io::NullSink sink;
ConnectionContext ctxt{&sink, nullptr};

// we don't flush the database as the context is null
// (and also because there is nothing to flush)

ReplicaOfInternal(host, port, &ctxt, ActionOnConnectionFail::kContinueReplication);
}

void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Starting take over";
VLOG(1) << "Acquire replica lock";
Expand Down
15 changes: 1 addition & 14 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,6 @@ class ServerFamily {
bool AwaitDispatches(absl::Duration timeout,
const std::function<bool(util::Connection*)>& filter);

// Sets the server to replicate another instance. Does not flush the database beforehand!
void Replicate(std::string_view host, std::string_view port);

private:
uint32_t shard_count() const {
return shard_set->size();
Expand Down Expand Up @@ -195,17 +192,7 @@ class ServerFamily {

void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);

enum ActionOnConnectionFail {
kReturnOnError, // if we fail to connect to master, return to err
kContinueReplication, // continue attempting to connect to master, regardless of initial
// failure
};

// REPLICAOF implementation. See arguments above
void ReplicaOfInternal(std::string_view host, std::string_view port, ConnectionContext* cntx,
ActionOnConnectionFail on_error);

// Returns the number of loaded keys if successful.
// Returns the number of loaded keys if successfull.
io::Result<size_t> LoadRdb(const std::string& rdb_file);

void SnapshotScheduling();
Expand Down

0 comments on commit b96d5c4

Please sign in to comment.