From 8aacdddcb5e939d4c90016a1a27b18b42a59976d Mon Sep 17 00:00:00 2001 From: cheniujh <41671101+cheniujh@users.noreply.github.com> Date: Wed, 8 May 2024 20:29:09 +0800 Subject: [PATCH] feat: Add support for dynamicaly reconfig rsync-timeout-ms and throttle-bytes-per-second (#2633) * 1 added conf item rsync-timeout-ms 2 add support for dynamically modify throttle-bytes-per-second and rsync-timeout-ms ps: some debug output is waiting to be removed * remove the debug info in start_master_and_slave.sh --------- Co-authored-by: chejinge * feat: Add a feature that is IO speed limiting (#2599) * add a feature that support IO rate * update IO rate limit mode * Name of variable change to rate-limiter-mode from rate_limiter_mode --------- Co-authored-by: Vachel * feat: Add a feature which support partitioned index filter (#2601) * add a feature which support partitioned index filter * Name of variable change to enable-partitioned-index-filters from enable_partitioned_index_filters --------- Co-authored-by: Vachel * chore(deps): bump golang.org/x/net from 0.17.0 to 0.23.0 in /codis (#2619) Bumps [golang.org/x/net](https://github.com/golang/net) from 0.17.0 to 0.23.0. - [Commits](https://github.com/golang/net/compare/v0.17.0...v0.23.0) --- updated-dependencies: - dependency-name: golang.org/x/net dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> * fix: Revised CI start script to remove invalid cp command (#2615) * revised CI start script to remove invalid cp operation and throw from sed command * use sed -i.bak instead of two scripts --------- Co-authored-by: cjh <1271435567@qq.com> * Update Manual compilation in README (#2617) * enable tests * revised go test * revised go test2 * add flush db operation for test * add Ping operation when get conn from poll to clear unread data in the conn(if the conn has) * 1 reduce the amount of filling data to avoid disk run out * Revert "add Ping operation when get conn from poll to clear unread data in the conn(if the conn has)" This reverts commit 6ad70bc034bb60c525da4a51c00ccb69e6d49e2e. * removed an debug log * add an comment in pika.conf * simplify the calling chain * revised some logic * fix compile error * get timeout value before enter into lock --------- Signed-off-by: dependabot[bot] Co-authored-by: cjh <1271435567@qq.com> Co-authored-by: chejinge <945997690@qq.com> Co-authored-by: chejinge Co-authored-by: vacheli Co-authored-by: Vachel Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: chenbt <34958405+chenbt-hz@users.noreply.github.com> --- conf/pika.conf | 9 +- include/pika_conf.h | 14 +- include/pika_rm.h | 2 +- include/rsync_client.h | 27 ++- include/throttle.h | 10 +- src/pika_admin.cc | 19 +- src/pika_conf.cc | 7 + src/pika_rm.cc | 2 +- src/rsync_client.cc | 3 +- src/throttle.cc | 10 +- tests/integration/rsync_dynamic_reconfig.go | 183 ++++++++++++++++++++ 11 files changed, 251 insertions(+), 35 deletions(-) create mode 100644 tests/integration/rsync_dynamic_reconfig.go diff --git a/conf/pika.conf b/conf/pika.conf index 468d253e8..055208dae 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -466,9 +466,14 @@ default-slot-num : 1024 # The cache will be sharded into 2^blob-num-shard-bits shards. # blob-num-shard-bits : -1 -# Rsync Rate limiting configuration 200MB/s +# Rsync Rate limiting configuration [Default value is 200MB/s] +# [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit. +# [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes). throttle-bytes-per-second : 207200000 - +# Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small. +# [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command +# [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust. +rsync-timeout-ms : 1000 # The valid range for max-rsync-parallel-num is [1, 4]. # If an invalid value is provided, max-rsync-parallel-num will automatically be reset to 4. max-rsync-parallel-num : 4 diff --git a/include/pika_conf.h b/include/pika_conf.h index 9cdf4f280..2a5ed2521 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -165,7 +165,7 @@ class PikaConf : public pstd::BaseConf { uint64_t MaxTotalWalSize() { std::shared_lock l(rwlock_); return max_total_wal_size_; - } + } int64_t max_client_response_size() { std::shared_lock l(rwlock_); return max_client_response_size_; @@ -414,7 +414,9 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return max_rsync_parallel_num_; } - + int64_t rsync_timeout_ms() { + return rsync_timeout_ms_.load(std::memory_order::memory_order_relaxed); + } // Slow Commands configuration const std::string GetSlowCmd() { std::shared_lock l(rwlock_); @@ -735,6 +737,13 @@ class PikaConf : public pstd::BaseConf { TryPushDiffCommands("max-rsync-parallel-num", std::to_string(value)); max_rsync_parallel_num_ = value; } + + void SetRsyncTimeoutMs(int64_t value){ + std::lock_guard l(rwlock_); + TryPushDiffCommands("rsync-timeout-ms", std::to_string(value)); + rsync_timeout_ms_.store(value); + } + void SetAclPubsubDefault(const std::string& value) { std::lock_guard l(rwlock_); TryPushDiffCommands("acl-pubsub-default", value); @@ -930,6 +939,7 @@ class PikaConf : public pstd::BaseConf { // Rsync Rate limiting configuration int throttle_bytes_per_second_ = 207200000; int max_rsync_parallel_num_ = kMaxRsyncParallelNum; + std::atomic_int64_t rsync_timeout_ms_ = 1000; }; #endif diff --git a/include/pika_rm.h b/include/pika_rm.h index 89306a8a0..9cc0c5ef3 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -116,7 +116,7 @@ class SyncSlaveDB : public SyncDB { void SetLocalIp(const std::string& local_ip); void StopRsync(); pstd::Status ActivateRsync(); - bool IsRsyncRunning() {return rsync_cli_->IsRunning();} + bool IsRsyncRunning() { return rsync_cli_->IsRunning(); } private: std::unique_ptr rsync_cli_; diff --git a/include/rsync_client.h b/include/rsync_client.h index da00247e2..2218675ea 100644 --- a/include/rsync_client.h +++ b/include/rsync_client.h @@ -67,7 +67,6 @@ class RsyncClient : public net::Thread { } bool IsIdle() { return state_.load() == IDLE;} void OnReceive(RsyncService::RsyncResponse* resp); - private: bool ComparisonUpdate(); Status CopyRemoteFile(const std::string& filename, int index); @@ -98,6 +97,7 @@ class RsyncClient : public net::Thread { std::condition_variable cond_; std::mutex mu_; + std::string master_ip_; int master_port_; int parallel_num_; @@ -157,19 +157,18 @@ class WaitObject { } pstd::Status Wait(ResponseSPtr& resp) { - pstd::Status s = Status::Timeout("rsync timeout", "timeout"); - { - std::unique_lock lock(mu_); - auto cv_s = cond_.wait_for(lock, std::chrono::seconds(1), [this] { - return resp_.get() != nullptr; - }); - if (!cv_s) { - return s; - } - resp = resp_; - s = Status::OK(); + auto timeout = g_pika_conf->rsync_timeout_ms(); + std::unique_lock lock(mu_); + auto cv_s = cond_.wait_for(lock, std::chrono::milliseconds(timeout), [this] { + return resp_.get() != nullptr; + }); + if (!cv_s) { + std::string timout_info("timeout during(in ms) is "); + timout_info.append(std::to_string(timeout)); + return pstd::Status::Timeout("rsync timeout", timout_info); } - return s; + resp = resp_; + return pstd::Status::OK(); } void WakeUp(RsyncService::RsyncResponse* resp) { @@ -234,7 +233,6 @@ class WaitObjectManager { } wo_vec_[index]->WakeUp(resp); } - private: std::vector wo_vec_; std::mutex mu_; @@ -242,4 +240,3 @@ class WaitObjectManager { } // end namespace rsync #endif - diff --git a/include/throttle.h b/include/throttle.h index 2bdbe6ed7..73184d6c2 100644 --- a/include/throttle.h +++ b/include/throttle.h @@ -18,19 +18,21 @@ class Throttle { Throttle() {} Throttle(size_t throttle_throughput_bytes, size_t check_cycle); ~Throttle(); + + void ResetThrottleThroughputBytes(size_t new_throughput_bytes_per_s) { + throttle_throughput_bytes_.store(new_throughput_bytes_per_s); + }; size_t ThrottledByThroughput(size_t bytes); void ReturnUnusedThroughput(size_t acquired, size_t consumed, size_t elaspe_time_us); static Throttle& GetInstance() { static Throttle instance(g_pika_conf->throttle_bytes_per_second(), 10); return instance; } - - private: +private: std::atomic throttle_throughput_bytes_ = 100 * 1024 * 1024; - // the num of tasks doing install_snapshot std::atomic last_throughput_check_time_us_; std::atomic cur_throughput_bytes_; - // user defined check cycles of throughput per second + // check cycles of throughput per second size_t check_cycle_ = 10; pstd::Mutex keys_mutex_; size_t caculate_check_time_us_(int64_t current_time_us, int64_t check_cycle) { diff --git a/src/pika_admin.cc b/src/pika_admin.cc index f9d7174e4..34a1a4d4f 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -21,7 +21,7 @@ #include "include/pika_version.h" #include "include/pika_conf.h" #include "pstd/include/rsync.h" - +#include "include/throttle.h" using pstd::Status; extern PikaServer* g_pika_server; @@ -2620,7 +2620,22 @@ void ConfigCmd::ConfigSet(std::shared_ptr db) { res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'throttle-bytes-per-second'\r\n"); return; } - g_pika_conf->SetThrottleBytesPerSecond(static_cast(ival)); + int32_t new_throughput_limit = static_cast(ival); + g_pika_conf->SetThrottleBytesPerSecond(new_throughput_limit); + //The rate limiter of rsync(Throttle) is used in singleton mode, all db shares the same rate limiter + rsync::Throttle::GetInstance().ResetThrottleThroughputBytes(new_throughput_limit); + LOG(INFO) << "The conf item [throttle-bytes-per-second] is changed by Config Set command. " + "The rsync rate limit now is " + << new_throughput_limit << "(Which Is Around " << (new_throughput_limit >> 20) << " MB/s)"; + res_.AppendStringRaw("+OK\r\n"); + } else if(set_item == "rsync-timeout-ms"){ + if(pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0){ + res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rsync-timeout-ms'\r\n"); + return; + } + g_pika_conf->SetRsyncTimeoutMs(ival); + LOG(INFO) << "The conf item [rsync-timeout-ms] is changed by Config Set command. " + "The rsync-timeout-ms now is " << ival << " ms"; res_.AppendStringRaw("+OK\r\n"); } else if (set_item == "max-rsync-parallel-num") { if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival > kMaxRsyncParallelNum || ival <= 0) { diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 959bd2819..adc1b26d9 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -641,6 +641,13 @@ int PikaConf::Load() { max_rsync_parallel_num_ = kMaxRsyncParallelNum; } + int64_t tmp_rsync_timeout_ms = -1; + GetConfInt64("rsync-timeout-ms", &tmp_rsync_timeout_ms); + if(tmp_rsync_timeout_ms <= 0){ + rsync_timeout_ms_.store(1000); + } else { + rsync_timeout_ms_.store(tmp_rsync_timeout_ms); + } return ret; } diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 67757272f..3445e9fea 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -966,7 +966,7 @@ Status PikaReplicaManager::RunSyncSlaveDBStateMachine() { Status s = s_db->ActivateRsync(); if (!s.ok()) { g_pika_server->SetForceFullSync(true); - LOG(WARNING) << "Slave DB: " << s_db->DBName() << " rsync failed! full synchronization will be retried later"; + LOG(WARNING) << "Slave DB: " << s_db->DBName() << " rsync failed! full synchronization will be retried later, error info:" << s.ToString(); continue; } diff --git a/src/rsync_client.cc b/src/rsync_client.cc index 0bdbde222..0cf683ba7 100644 --- a/src/rsync_client.cc +++ b/src/rsync_client.cc @@ -201,7 +201,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) { std::shared_ptr resp = nullptr; s = wo->Wait(resp); if (s.IsTimeout() || resp == nullptr) { - LOG(WARNING) << "rsync request timeout"; + LOG(WARNING) << s.ToString(); retries++; continue; } @@ -360,6 +360,7 @@ Status RsyncClient::PullRemoteMeta(std::string* snapshot_uuid, std::setcode() != RsyncService::kOk) { s = Status::IOError("kRsyncMeta request failed! db is not exist or doing bgsave"); + LOG(WARNING) << s.ToString() << ", retries:" << retries; sleep(1); retries++; continue; diff --git a/src/throttle.cc b/src/throttle.cc index 39f93d025..9b956ba78 100644 --- a/src/throttle.cc +++ b/src/throttle.cc @@ -8,8 +8,7 @@ #include #include "pstd/include/env.h" -DEFINE_uint64(raft_minimal_throttle_threshold_mb, 0, "minimal throttle throughput threshold per second"); -namespace rsync{ +namespace rsync { Throttle::Throttle(size_t throttle_throughput_bytes, size_t check_cycle) : throttle_throughput_bytes_(throttle_throughput_bytes), @@ -21,9 +20,7 @@ Throttle::~Throttle() {} size_t Throttle::ThrottledByThroughput(size_t bytes) { size_t available_size = bytes; size_t now = pstd::NowMicros(); - size_t limit_throughput_bytes_s = std::max(static_cast(throttle_throughput_bytes_), - FLAGS_raft_minimal_throttle_threshold_mb * 1024 * 1024); - size_t limit_per_cycle = limit_throughput_bytes_s / check_cycle_; + size_t limit_per_cycle = throttle_throughput_bytes_.load() / check_cycle_; std::unique_lock lock(keys_mutex_); if (cur_throughput_bytes_ + bytes > limit_per_cycle) { // reading another |bytes| excceds the limit @@ -57,5 +54,4 @@ void Throttle::ReturnUnusedThroughput(size_t acquired, size_t consumed, size_t e } cur_throughput_bytes_ = std::max(cur_throughput_bytes_ - (acquired - consumed), size_t(0)); } -} - +} // namespace rsync diff --git a/tests/integration/rsync_dynamic_reconfig.go b/tests/integration/rsync_dynamic_reconfig.go new file mode 100644 index 000000000..c1ec7d84c --- /dev/null +++ b/tests/integration/rsync_dynamic_reconfig.go @@ -0,0 +1,183 @@ +package pika_integration + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "sync" + "time" + + . "github.com/bsm/ginkgo/v2" + . "github.com/bsm/gomega" + "github.com/redis/go-redis/v9" +) + +func RefillMaster(masterAddr string, dataVolumeMB int64, ctx context.Context) { + //the datavolumeMB could not be too large(like 1024MB) or refill shall take a long time to finish + genRandomStr := func(n int, tId int) string { + letters := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + bytes := make([]byte, n) + for i := range bytes { + index := (rand.Intn(1000) + tId) % len(letters) + bytes[i] = letters[index] + } + return string(bytes) + } + writeFun := func(targetAddr string, requestNum int64, wg *sync.WaitGroup, tId int) { + defer wg.Done() + cli := redis.NewClient(PikaOption(targetAddr)) + defer cli.Close() + var i int64 + for i = 0; i < requestNum; i++ { + rKey := genRandomStr(1024, tId) + rValue := genRandomStr(1024, tId) + cli.Set(ctx, rKey, rValue, 0) + } + } + keySize := 1024 + valueSize := 1024 + dataVolumeBytes := dataVolumeMB << 20 + threadNum := 10 + reqNumForEachThead := dataVolumeBytes / int64((keySize + valueSize)) / int64(threadNum) + //fmt.Printf("reqNumForEach:%d\n", reqNumForEachThead) + startTime := time.Now() + var wg sync.WaitGroup + for i := 0; i < threadNum; i++ { + wg.Add(1) + go writeFun(masterAddr, reqNumForEachThead, &wg, i) + } + wg.Wait() + duration := time.Since(startTime) + fmt.Printf("RefillMaster took %s to complete.\n", duration) +} + +func ReleaseRsyncLimit(cli *redis.Client, ctx context.Context) { + //sleep is needed, because the update frequency limit for rsync config is 1 time per 2s + time.Sleep(time.Second * 2) + //fmt.Println("removing rsync limimt") + if err := cli.ConfigSet(ctx, "rsync-timeout-ms", "1000").Err(); err != nil { + fmt.Println("Error setting key:", err) + return + } + time.Sleep(time.Second * 2) + bigRate := 1 << 30 //1GB + if err := cli.ConfigSet(ctx, "throttle-bytes-per-second", strconv.Itoa(bigRate)).Err(); err != nil { + fmt.Println("Error setting key:", err) + return + } + fmt.Println("rsync limit is removed") +} + +func UpdateThrottle(cli *redis.Client, ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + if err := cli.ConfigSet(ctx, "throttle-bytes-per-second", "65535").Err(); err != nil { + fmt.Println("Error setting key:", err) + return + } + time.Sleep(time.Second * 3) + rand.Seed(time.Now().UnixNano()) + for i := 1; i < 200; i++ { + time.Sleep(time.Millisecond * 300) + min := 512 << 10 //512 KB + max := 5 << 20 //5 MB + randomInt := rand.Intn(max-min+1) + min + //do the update throttle bytes, randomly from 64KB to 5MB + if err := cli.ConfigSet(ctx, "throttle-bytes-per-second", strconv.Itoa(randomInt)).Err(); err != nil { + fmt.Println("Error setting key:", err) + return + } + } +} + +func UpdateTimout(cli *redis.Client, ctx context.Context, wg *sync.WaitGroup) { + defer wg.Done() + if err := cli.ConfigSet(ctx, "throttle-bytes-per-second", "65535").Err(); err != nil { + fmt.Println("Error setting key:", err) + return + } + time.Sleep(time.Second * 3) + rand.Seed(time.Now().UnixNano()) + for i := 1; i < 200; i++ { + time.Sleep(time.Millisecond * 300) + min := 20 + max := 200 + randomInt := rand.Intn(max-min+1) + min + //do the update rsync-timeout-ms, randomly from 10 to 100ms + if err := cli.ConfigSet(ctx, "rsync-timeout-ms", strconv.Itoa(randomInt)).Err(); err != nil { + fmt.Println("Error setting key:", err) + return + } + } +} + +var _ = Describe("Rsync Reconfig Test", func() { + ctx := context.TODO() + var ( + slave1 *redis.Client + slave2 *redis.Client + master1 *redis.Client + ) + + BeforeEach(func() { + slave1 = redis.NewClient(PikaOption(SLAVEADDR)) + slave2 = redis.NewClient(PikaOption(SLAVEADDR)) + master1 = redis.NewClient(PikaOption(MASTERADDR)) + }) + + AfterEach(func() { + Expect(slave1.Close()).NotTo(HaveOccurred()) + Expect(slave2.Close()).NotTo(HaveOccurred()) + Expect(master1.Close()).NotTo(HaveOccurred()) + }) + + It("rsync reconfig rsync-timeout-ms, throttle-bytes-per-second", func() { + slave1.SlaveOf(ctx, "no", "one") + slave1.FlushDB(ctx) + master1.FlushDB(ctx) + time.Sleep(3 * time.Second) + RefillMaster(MASTERADDR, 128, ctx) + key1 := "45vs45f4s5d6" + value1 := "afd54g5s4f545" + //set key before sync happened, slave is supposed to fetch it when sync done + err1 := master1.Set(ctx, key1, value1, 0).Err() + Expect(err1).NotTo(HaveOccurred()) + + //limit the rsync to prevent the sync finished before test finished + err2 := slave1.ConfigSet(ctx, "throttle-bytes-per-second", "65535").Err() + Expect(err2).NotTo(HaveOccurred()) + slave1.Do(ctx, "slaveof", "127.0.0.1", "9241", "force") + time.Sleep(time.Second * 2) + + var wg sync.WaitGroup + wg.Add(4) + go UpdateThrottle(slave1, ctx, &wg) + go UpdateTimout(slave1, ctx, &wg) + go UpdateThrottle(slave2, ctx, &wg) + go UpdateTimout(slave2, ctx, &wg) + wg.Wait() + + ReleaseRsyncLimit(slave1, ctx) + //full sync should be done after 20s due to rsync limit is removed + time.Sleep(time.Second * 20) + + key2 := "rekaljfdkslj;" + value2 := "ouifdhgisesdjkf" + err3 := master1.Set(ctx, key2, value2, 0).Err() + Expect(err3).NotTo(HaveOccurred()) + time.Sleep(time.Second * 5) //incr sync should also be done after 5s + + getValue1, err4 := slave1.Get(ctx, key1).Result() + Expect(err4).NotTo(HaveOccurred()) //Get Slave failed after dynamic reset rsync rate and rsync timeout if err not nil + Expect(getValue1).To(Equal(value1)) //Slave Get OK, but didn't fetch expected resp after dynamic reset rsync rate/timeout + getValue2, err5 := slave1.Get(ctx, key2).Result() + Expect(err5).NotTo(HaveOccurred()) //Get Slave failed after dynamic reset rsync rate and rsync timeout if err not nil + Expect(getValue2).To(Equal(value2)) //Slave Get OK, but didn't fetch expected resp after dynamic reset rsync rate/timeout + slave1.SlaveOf(ctx, "no", "one") + //clear the data to avoid disk run out in github action + slave1.FlushDB(ctx) + master1.FlushDB(ctx) + + }) + +})