From b96d5c4c0a893c1638798dd861217f1e4d8f2f40 Mon Sep 17 00:00:00 2001 From: talbii <41526934+talbii@users.noreply.github.com> Date: Wed, 9 Aug 2023 14:42:48 +0300 Subject: [PATCH] Revert "introduce `--replicaof` flag (#1583)" This reverts commit 16c2353faf35df164359898a84966be7cb512e6e. --- src/server/replica.cc | 30 +------ src/server/replica.h | 5 -- src/server/server_family.cc | 134 ++++------------------------ src/server/server_family.h | 15 +--- tests/dragonfly/replication_test.py | 130 --------------------------- 5 files changed, 23 insertions(+), 291 deletions(-) diff --git a/src/server/replica.cc b/src/server/replica.cc index 30bcc38ae212..0f3557bfd787 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -117,15 +117,6 @@ error_code Replica::Start(ConnectionContext* cntx) { return {}; } // namespace dfly -void Replica::EnableReplication(ConnectionContext* cntx) { - VLOG(1) << "Enabling replication"; - - state_mask_.store(R_ENABLED); // set replica state to enabled - sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); // call replication fiber - - (*cntx)->SendOk(); -} - void Replica::Stop() { VLOG(1) << "Stopping replication"; // Stops the loop in MainReplicationFb. @@ -1019,7 +1010,9 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d } Replica::Info Replica::GetInfo() const { - auto f = [this]() { + CHECK(Sock()); + + return Proactor()->AwaitBrief([this] { auto last_io_time = LastIoTime(); for (const auto& flow : shard_flows_) { // Get last io time from all sub flows. last_io_time = std::max(last_io_time, flow->LastIoTime()); @@ -1033,22 +1026,7 @@ Replica::Info Replica::GetInfo() const { res.full_sync_done = (state_mask_.load() & R_SYNC_OK); res.master_last_io_sec = (ProactorBase::GetMonotonicTimeNs() - last_io_time) / 1000000000UL; return res; - }; - - if (Sock()) - return Proactor()->AwaitBrief(f); - else { - /** - * when this branch happens: there is a very short grace period - * where Sock() is not initialized, yet the server can - * receive ROLE/INFO commands. That period happens when launching - * an instance with '--replicaof' and then immediately - * sending a command. - * - * In that instance, we have to run f() on the current fiber. - */ - return f(); - } + }); } std::vector Replica::GetReplicaOffset() const { diff --git a/src/server/replica.h b/src/server/replica.h index 569260e3733b..4ef14fb04dbb 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -77,11 +77,6 @@ class Replica : ProtocolClient { // false if it has failed. std::error_code Start(ConnectionContext* cntx); - // Sets the server state to have replication enabled. - // It is like Start(), but does not attempt to establish - // a connection right-away, but instead lets MainReplicationFb do the work. - void EnableReplication(ConnectionContext* cntx); - void Stop(); // thread-safe void Pause(bool pause); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 32779e1ec4f1..92dee420fb28 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -25,7 +25,6 @@ extern "C" { #include "base/logging.h" #include "croncpp.h" // cron::cronexpr #include "facade/dragonfly_connection.h" -#include "facade/reply_builder.h" #include "io/file_util.h" #include "io/proc_reader.h" #include "server/command_registry.h" @@ -55,18 +54,6 @@ extern "C" { using namespace std; -struct ReplicaOfFlag { - string host; - string port; - - bool has_value() const { - return !host.empty() && !port.empty(); - } -}; - -static bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err); -static std::string AbslUnparseFlag(const ReplicaOfFlag& flag); - ABSL_FLAG(string, dir, "", "working directory"); ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the DB"); ABSL_FLAG(string, requirepass, "", @@ -81,10 +68,6 @@ ABSL_FLAG(bool, df_snapshot_format, true, ABSL_FLAG(int, epoll_file_threads, 0, "thread size for file workers when running in epoll mode, default is hardware concurrent " "threads"); -ABSL_FLAG(ReplicaOfFlag, replicaof, ReplicaOfFlag{}, - "Specifies a host and port which point to a target master " - "to replicate. " - "Format should be : or host: or []:"); ABSL_DECLARE_FLAG(uint32_t, port); ABSL_DECLARE_FLAG(bool, cache_mode); @@ -93,54 +76,6 @@ ABSL_DECLARE_FLAG(bool, tls); ABSL_DECLARE_FLAG(string, tls_ca_cert_file); ABSL_DECLARE_FLAG(string, tls_ca_cert_dir); -bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) { -#define RETURN_ON_ERROR(cond, m) \ - do { \ - if ((cond)) { \ - *err = m; \ - LOG(WARNING) << "Error in parsing arguments for --replicaof: " << m; \ - return false; \ - } \ - } while (0) - - if (in.empty()) { // on empty flag "parse" nothing. If we return false then DF exists. - *flag = ReplicaOfFlag{}; - return true; - } - - auto pos = in.find_last_of(':'); - RETURN_ON_ERROR(pos == string::npos, "missing ':'."); - - string_view ip = in.substr(0, pos); - flag->port = in.substr(pos + 1); - - RETURN_ON_ERROR(ip.empty() || flag->port.empty(), "IP/host or port are empty."); - - // For IPv6: ip1.front == '[' AND ip1.back == ']' - // For IPv4: ip1.front != '[' AND ip1.back != ']' - // Together, this ip1.front == '[' iff ip1.back == ']', which can be implemented as XNOR (NOT XOR) - RETURN_ON_ERROR(((ip.front() == '[') ^ (ip.back() == ']')), "unclosed brackets."); - - if (ip.front() == '[') { - // shortest possible IPv6 is '::1' (loopback) - RETURN_ON_ERROR(ip.length() <= 2, "IPv6 host name is too short"); - - flag->host = ip.substr(1, ip.length() - 2); - VLOG(1) << "received IP of type IPv6: " << flag->host; - } else { - flag->host = ip; - VLOG(1) << "received IP of type IPv4 (or a host): " << flag->host; - } - - VLOG(1) << "--replicaof: Received " << flag->host << " : " << flag->port; - return true; -#undef RETURN_ON_ERROR -} - -std::string AbslUnparseFlag(const ReplicaOfFlag& flag) { - return (flag.has_value()) ? absl::StrCat(flag.host, ":", flag.port) : ""; -} - namespace dfly { namespace fs = std::filesystem; @@ -518,10 +453,6 @@ void ValidateServerTlsFlags() { } } -bool IsReplicatingNoOne(string_view host, string_view port) { - return absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port, "one"); -} - } // namespace std::optional ParseSaveSchedule(string_view time) { @@ -663,13 +594,6 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vectorAwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); }); - // check for '--replicaof' before loading anything - if (ReplicaOfFlag flag = GetFlag(FLAGS_replicaof); flag.has_value()) { - service_.proactor_pool().GetNextProactor()->Await( - [this, &flag]() { this->Replicate(flag.host, flag.port); }); - return; // DONT load any snapshots - } - string flag_dir = GetFlag(FLAGS_dir); if (IsCloudPath(flag_dir)) { aws_ = make_unique("s3"); @@ -2076,10 +2000,12 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave"); } -void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, ConnectionContext* cntx, - ActionOnConnectionFail on_err) { +void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { + std::string_view host = ArgS(args, 0); + std::string_view port_s = ArgS(args, 1); auto& pool = service_.proactor_pool(); - LOG(INFO) << "Replicating " << host << ":" << port_sv; + + LOG(INFO) << "Replicating " << host << ":" << port_s; // We lock to protect global state changes that we perform during the replication setup: // The replica_ pointer, GlobalState, and the DB itself (we do a flushall txn before syncing). @@ -2094,7 +2020,7 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn VLOG(1) << "Acquire replica lock"; unique_lock lk(replicaof_mu_); - if (IsReplicatingNoOne(host, port_sv)) { + if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) { if (!ServerState::tlocal()->is_master) { auto repl_ptr = replica_; CHECK(repl_ptr); @@ -2105,15 +2031,12 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn replica_.reset(); } - CHECK(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE) == GlobalState::ACTIVE) - << "Server is set to replica no one, yet state is not active!"; - return (*cntx)->SendOk(); } uint32_t port; - if (!absl::SimpleAtoi(port_sv, &port) || port < 1 || port > 65535) { + if (!absl::SimpleAtoi(port_s, &port) || port < 1 || port > 65535) { (*cntx)->SendError(kInvalidIntErr); return; } @@ -2135,20 +2058,20 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn return; } - // Replica sends response in either case. No need to send response in this function. - // It's a bit confusing but simpler. - lk.unlock(); - error_code ec{}; + // Flushing all the data after we marked this instance as replica. + Transaction* transaction = cntx->transaction; + transaction->Schedule(); - switch (on_err) { - case ActionOnConnectionFail::kReturnOnError: - ec = new_replica->Start(cntx); - break; - case ActionOnConnectionFail::kContinueReplication: // set DF to replicate, and forget about it - new_replica->EnableReplication(cntx); - break; + auto cb = [](Transaction* t, EngineShard* shard) { + shard->db_slice().FlushDb(DbSlice::kDbAll); + return OpStatus::OK; }; + transaction->Execute(std::move(cb), true); + // Replica sends response in either case. No need to send response in this function. + // It's a bit confusing but simpler. + lk.unlock(); + error_code ec = new_replica->Start(cntx); VLOG(1) << "Acquire replica lock"; lk.lock(); @@ -2168,27 +2091,6 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn } } -void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { - string_view host = ArgS(args, 0); - string_view port = ArgS(args, 1); - - // don't flush if input is NO ONE - if (!IsReplicatingNoOne(host, port)) - Drakarys(cntx->transaction, DbSlice::kDbAll); - - ReplicaOfInternal(host, port, cntx, ActionOnConnectionFail::kReturnOnError); -} - -void ServerFamily::Replicate(string_view host, string_view port) { - io::NullSink sink; - ConnectionContext ctxt{&sink, nullptr}; - - // we don't flush the database as the context is null - // (and also because there is nothing to flush) - - ReplicaOfInternal(host, port, &ctxt, ActionOnConnectionFail::kContinueReplication); -} - void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) { VLOG(1) << "Starting take over"; VLOG(1) << "Acquire replica lock"; diff --git a/src/server/server_family.h b/src/server/server_family.h index 8275a603c4bc..edc3f6ccae24 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -163,9 +163,6 @@ class ServerFamily { bool AwaitDispatches(absl::Duration timeout, const std::function& filter); - // Sets the server to replicate another instance. Does not flush the database beforehand! - void Replicate(std::string_view host, std::string_view port); - private: uint32_t shard_count() const { return shard_set->size(); @@ -195,17 +192,7 @@ class ServerFamily { void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx); - enum ActionOnConnectionFail { - kReturnOnError, // if we fail to connect to master, return to err - kContinueReplication, // continue attempting to connect to master, regardless of initial - // failure - }; - - // REPLICAOF implementation. See arguments above - void ReplicaOfInternal(std::string_view host, std::string_view port, ConnectionContext* cntx, - ActionOnConnectionFail on_error); - - // Returns the number of loaded keys if successful. + // Returns the number of loaded keys if successfull. io::Result LoadRdb(const std::string& rdb_file); void SnapshotScheduling(); diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 6d30001657af..b4c55ee0bf58 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1371,133 +1371,3 @@ async def test_tls_replication( await c_replica.close() await c_master.close() - - -# busy wait for 'replica' instance to have replication status 'status' -async def wait_for_replica_status(replica: aioredis.Redis, status: str, wait_for_seconds=0.01): - while True: - await asyncio.sleep(wait_for_seconds) - - info = await replica.info("replication") - if info["master_link_status"] == status: - return - - -@pytest.mark.asyncio -async def test_replicaof_flag(df_local_factory): - # tests --replicaof works under normal conditions - master = df_local_factory.create( - port=BASE_PORT, - proactor_threads=2, - ) - - # set up master - master.start() - c_master = aioredis.Redis(port=master.port) - await c_master.set("KEY", b"VALUE") - db_size = await c_master.dbsize() - assert 1 == db_size - - replica = df_local_factory.create( - port=BASE_PORT + 1, - proactor_threads=2, - replicaof=f"localhost:{BASE_PORT}", # start to replicate master - ) - - # set up replica. check that it is replicating - replica.start() - c_replica = aioredis.Redis(port=replica.port) - - await wait_available_async(c_replica) # give it time to startup - await wait_for_replica_status(c_replica, status="up") # wait until we have a connection - - dbsize = await c_replica.dbsize() - assert 1 == dbsize - - val = await c_replica.get("KEY") - assert b"VALUE" == val - - -@pytest.mark.asyncio -async def test_replicaof_flag_replication_waits(df_local_factory): - # tests --replicaof works when we launch replication before the master - replica = df_local_factory.create( - port=BASE_PORT + 1, - proactor_threads=2, - replicaof=f"localhost:{BASE_PORT}", # start to replicate master - ) - - # set up replica first - replica.start() - c_replica = aioredis.Redis(port=replica.port) - await wait_for_replica_status(c_replica, status="down") - - # check that it is in replica mode, yet status is down - info = await c_replica.info("replication") - assert info["role"] == "replica" - assert info["master_host"] == "localhost" - assert info["master_port"] == BASE_PORT - assert info["master_link_status"] == "down" - - # set up master - master = df_local_factory.create( - port=BASE_PORT, - proactor_threads=2, - ) - - master.start() - c_master = aioredis.Redis(port=master.port) - await c_master.set("KEY", b"VALUE") - db_size = await c_master.dbsize() - assert 1 == db_size - - # check that replication works now - await wait_for_replica_status(c_replica, status="up") - - dbsize = await c_replica.dbsize() - assert 1 == dbsize - - val = await c_replica.get("KEY") - assert b"VALUE" == val - - -@pytest.mark.asyncio -async def test_replicaof_flag_disconnect(df_local_factory): - # test stopping replication when started using --replicaof - master = df_local_factory.create( - port=BASE_PORT, - proactor_threads=2, - ) - - # set up master - master.start() - c_master = aioredis.Redis(port=master.port) - await wait_available_async(c_master) - - await c_master.set("KEY", b"VALUE") - db_size = await c_master.dbsize() - assert 1 == db_size - - replica = df_local_factory.create( - port=BASE_PORT + 1, - proactor_threads=2, - replicaof=f"localhost:{BASE_PORT}", # start to replicate master - ) - - # set up replica. check that it is replicating - replica.start() - - c_replica = aioredis.Redis(port=replica.port) - await wait_available_async(c_replica) - await wait_for_replica_status(c_replica, status="up") - - dbsize = await c_replica.dbsize() - assert 1 == dbsize - - val = await c_replica.get("KEY") - assert b"VALUE" == val - - await c_replica.replicaof("no", "one") # disconnect - - role = await c_replica.role() - assert role[0] == b"master"