Skip to content

Commit

Permalink
feat: Add support for dynamicaly reconfig rsync-timeout-ms and thrott…
Browse files Browse the repository at this point in the history
…le-bytes-per-second (OpenAtomFoundation#2633)

* 1 added conf item rsync-timeout-ms
2 add support for dynamically modify throttle-bytes-per-second and rsync-timeout-ms
ps: some debug output is waiting to be removed

* remove the debug info in start_master_and_slave.sh

---------

Co-authored-by: chejinge <chejinge@360.cn>

* feat: Add a feature that is IO speed limiting (OpenAtomFoundation#2599)

* add a feature that support IO rate

* update IO rate limit mode

* Name of variable change to rate-limiter-mode from rate_limiter_mode

---------

Co-authored-by: Vachel <vachel@example.com>

* feat: Add a feature which support partitioned index filter (OpenAtomFoundation#2601)

* add a feature which support partitioned index filter

* Name of variable change to enable-partitioned-index-filters from enable_partitioned_index_filters

---------

Co-authored-by: Vachel <vachel@example.com>

* chore(deps): bump golang.org/x/net from 0.17.0 to 0.23.0 in /codis (OpenAtomFoundation#2619)

Bumps [golang.org/x/net](https://github.com/golang/net) from 0.17.0 to 0.23.0.
- [Commits](golang/net@v0.17.0...v0.23.0)

---
updated-dependencies:
- dependency-name: golang.org/x/net
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* fix: Revised CI start script to remove invalid cp command (OpenAtomFoundation#2615)

* revised CI start script to remove invalid cp operation and throw from sed command

* use sed -i.bak instead of two scripts

---------

Co-authored-by: cjh <1271435567@qq.com>

* Update Manual compilation in README (OpenAtomFoundation#2617)

* enable tests

* revised go test

* revised go test2

* add flush db operation for test

* add Ping operation when get conn from poll to clear unread data in the conn(if the conn has)

* 1 reduce the amount of filling data to avoid disk run out

* Revert "add Ping operation when get conn from poll to clear unread data in the conn(if the conn has)"

This reverts commit 6ad70bc.

* removed an debug log

* add an comment in pika.conf

* simplify the calling chain

* revised some logic

* fix compile error

* get timeout value before enter into lock

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: cjh <1271435567@qq.com>
Co-authored-by: chejinge <945997690@qq.com>
Co-authored-by: chejinge <chejinge@360.cn>
Co-authored-by: vacheli <vachelwh@gmail.com>
Co-authored-by: Vachel <vachel@example.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: chenbt <34958405+chenbt-hz@users.noreply.github.com>
  • Loading branch information
8 people committed May 8, 2024
1 parent 1da3073 commit 8aacddd
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 35 deletions.
9 changes: 7 additions & 2 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,14 @@ default-slot-num : 1024
# The cache will be sharded into 2^blob-num-shard-bits shards.
# blob-num-shard-bits : -1

# Rsync Rate limiting configuration 200MB/s
# Rsync Rate limiting configuration [Default value is 200MB/s]
# [USED BY SLAVE] The transmitting speed(Rsync Rate) In full replication is controlled BY SLAVE NODE, You should modify the throttle-bytes-per-second in slave's pika.conf if you wanna change the rsync rate limit.
# [Dynamic Change Supported] send command 'config set throttle-bytes-per-second new_value' to SLAVE NODE can dynamically adjust rsync rate during full sync(use config rewrite can persist the changes).
throttle-bytes-per-second : 207200000

# Rsync timeout in full sync stage[Default value is 1000 ms], unnecessary retries will happen if this value is too small.
# [Dynamic Change Supported] similar to throttle-bytes-per-second, rsync-timeout-ms can be dynamically changed by configset command
# [USED BY SLAVE] Similar to throttle-bytes-per-second, you should change rsync-timeout-ms's value in slave's conf file if it is needed to adjust.
rsync-timeout-ms : 1000
# The valid range for max-rsync-parallel-num is [1, 4].
# If an invalid value is provided, max-rsync-parallel-num will automatically be reset to 4.
max-rsync-parallel-num : 4
Expand Down
14 changes: 12 additions & 2 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class PikaConf : public pstd::BaseConf {
uint64_t MaxTotalWalSize() {
std::shared_lock l(rwlock_);
return max_total_wal_size_;
}
}
int64_t max_client_response_size() {
std::shared_lock l(rwlock_);
return max_client_response_size_;
Expand Down Expand Up @@ -414,7 +414,9 @@ class PikaConf : public pstd::BaseConf {
std::shared_lock l(rwlock_);
return max_rsync_parallel_num_;
}

int64_t rsync_timeout_ms() {
return rsync_timeout_ms_.load(std::memory_order::memory_order_relaxed);
}
// Slow Commands configuration
const std::string GetSlowCmd() {
std::shared_lock l(rwlock_);
Expand Down Expand Up @@ -735,6 +737,13 @@ class PikaConf : public pstd::BaseConf {
TryPushDiffCommands("max-rsync-parallel-num", std::to_string(value));
max_rsync_parallel_num_ = value;
}

void SetRsyncTimeoutMs(int64_t value){
std::lock_guard l(rwlock_);
TryPushDiffCommands("rsync-timeout-ms", std::to_string(value));
rsync_timeout_ms_.store(value);
}

void SetAclPubsubDefault(const std::string& value) {
std::lock_guard l(rwlock_);
TryPushDiffCommands("acl-pubsub-default", value);
Expand Down Expand Up @@ -930,6 +939,7 @@ class PikaConf : public pstd::BaseConf {
// Rsync Rate limiting configuration
int throttle_bytes_per_second_ = 207200000;
int max_rsync_parallel_num_ = kMaxRsyncParallelNum;
std::atomic_int64_t rsync_timeout_ms_ = 1000;
};

#endif
2 changes: 1 addition & 1 deletion include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class SyncSlaveDB : public SyncDB {
void SetLocalIp(const std::string& local_ip);
void StopRsync();
pstd::Status ActivateRsync();
bool IsRsyncRunning() {return rsync_cli_->IsRunning();}
bool IsRsyncRunning() { return rsync_cli_->IsRunning(); }

private:
std::unique_ptr<rsync::RsyncClient> rsync_cli_;
Expand Down
27 changes: 12 additions & 15 deletions include/rsync_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ class RsyncClient : public net::Thread {
}
bool IsIdle() { return state_.load() == IDLE;}
void OnReceive(RsyncService::RsyncResponse* resp);

private:
bool ComparisonUpdate();
Status CopyRemoteFile(const std::string& filename, int index);
Expand Down Expand Up @@ -98,6 +97,7 @@ class RsyncClient : public net::Thread {
std::condition_variable cond_;
std::mutex mu_;


std::string master_ip_;
int master_port_;
int parallel_num_;
Expand Down Expand Up @@ -157,19 +157,18 @@ class WaitObject {
}

pstd::Status Wait(ResponseSPtr& resp) {
pstd::Status s = Status::Timeout("rsync timeout", "timeout");
{
std::unique_lock<std::mutex> lock(mu_);
auto cv_s = cond_.wait_for(lock, std::chrono::seconds(1), [this] {
return resp_.get() != nullptr;
});
if (!cv_s) {
return s;
}
resp = resp_;
s = Status::OK();
auto timeout = g_pika_conf->rsync_timeout_ms();
std::unique_lock<std::mutex> lock(mu_);
auto cv_s = cond_.wait_for(lock, std::chrono::milliseconds(timeout), [this] {
return resp_.get() != nullptr;
});
if (!cv_s) {
std::string timout_info("timeout during(in ms) is ");
timout_info.append(std::to_string(timeout));
return pstd::Status::Timeout("rsync timeout", timout_info);
}
return s;
resp = resp_;
return pstd::Status::OK();
}

void WakeUp(RsyncService::RsyncResponse* resp) {
Expand Down Expand Up @@ -234,12 +233,10 @@ class WaitObjectManager {
}
wo_vec_[index]->WakeUp(resp);
}

private:
std::vector<WaitObject*> wo_vec_;
std::mutex mu_;
};

} // end namespace rsync
#endif

10 changes: 6 additions & 4 deletions include/throttle.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@ class Throttle {
Throttle() {}
Throttle(size_t throttle_throughput_bytes, size_t check_cycle);
~Throttle();

void ResetThrottleThroughputBytes(size_t new_throughput_bytes_per_s) {
throttle_throughput_bytes_.store(new_throughput_bytes_per_s);
};
size_t ThrottledByThroughput(size_t bytes);
void ReturnUnusedThroughput(size_t acquired, size_t consumed, size_t elaspe_time_us);
static Throttle& GetInstance() {
static Throttle instance(g_pika_conf->throttle_bytes_per_second(), 10);
return instance;
}

private:
private:
std::atomic<size_t> throttle_throughput_bytes_ = 100 * 1024 * 1024;
// the num of tasks doing install_snapshot
std::atomic<size_t> last_throughput_check_time_us_;
std::atomic<size_t> cur_throughput_bytes_;
// user defined check cycles of throughput per second
// check cycles of throughput per second
size_t check_cycle_ = 10;
pstd::Mutex keys_mutex_;
size_t caculate_check_time_us_(int64_t current_time_us, int64_t check_cycle) {
Expand Down
19 changes: 17 additions & 2 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
#include "include/pika_version.h"
#include "include/pika_conf.h"
#include "pstd/include/rsync.h"

#include "include/throttle.h"
using pstd::Status;

extern PikaServer* g_pika_server;
Expand Down Expand Up @@ -2620,7 +2620,22 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'throttle-bytes-per-second'\r\n");
return;
}
g_pika_conf->SetThrottleBytesPerSecond(static_cast<int>(ival));
int32_t new_throughput_limit = static_cast<int>(ival);
g_pika_conf->SetThrottleBytesPerSecond(new_throughput_limit);
//The rate limiter of rsync(Throttle) is used in singleton mode, all db shares the same rate limiter
rsync::Throttle::GetInstance().ResetThrottleThroughputBytes(new_throughput_limit);
LOG(INFO) << "The conf item [throttle-bytes-per-second] is changed by Config Set command. "
"The rsync rate limit now is "
<< new_throughput_limit << "(Which Is Around " << (new_throughput_limit >> 20) << " MB/s)";
res_.AppendStringRaw("+OK\r\n");
} else if(set_item == "rsync-timeout-ms"){
if(pstd::string2int(value.data(), value.size(), &ival) == 0 || ival <= 0){
res_.AppendStringRaw("-ERR Invalid argument \'" + value + "\' for CONFIG SET 'rsync-timeout-ms'\r\n");
return;
}
g_pika_conf->SetRsyncTimeoutMs(ival);
LOG(INFO) << "The conf item [rsync-timeout-ms] is changed by Config Set command. "
"The rsync-timeout-ms now is " << ival << " ms";
res_.AppendStringRaw("+OK\r\n");
} else if (set_item == "max-rsync-parallel-num") {
if ((pstd::string2int(value.data(), value.size(), &ival) == 0) || ival > kMaxRsyncParallelNum || ival <= 0) {
Expand Down
7 changes: 7 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,13 @@ int PikaConf::Load() {
max_rsync_parallel_num_ = kMaxRsyncParallelNum;
}

int64_t tmp_rsync_timeout_ms = -1;
GetConfInt64("rsync-timeout-ms", &tmp_rsync_timeout_ms);
if(tmp_rsync_timeout_ms <= 0){
rsync_timeout_ms_.store(1000);
} else {
rsync_timeout_ms_.store(tmp_rsync_timeout_ms);
}
return ret;
}

Expand Down
2 changes: 1 addition & 1 deletion src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ Status PikaReplicaManager::RunSyncSlaveDBStateMachine() {
Status s = s_db->ActivateRsync();
if (!s.ok()) {
g_pika_server->SetForceFullSync(true);
LOG(WARNING) << "Slave DB: " << s_db->DBName() << " rsync failed! full synchronization will be retried later";
LOG(WARNING) << "Slave DB: " << s_db->DBName() << " rsync failed! full synchronization will be retried later, error info:" << s.ToString();
continue;
}

Expand Down
3 changes: 2 additions & 1 deletion src/rsync_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ Status RsyncClient::CopyRemoteFile(const std::string& filename, int index) {
std::shared_ptr<RsyncResponse> resp = nullptr;
s = wo->Wait(resp);
if (s.IsTimeout() || resp == nullptr) {
LOG(WARNING) << "rsync request timeout";
LOG(WARNING) << s.ToString();
retries++;
continue;
}
Expand Down Expand Up @@ -360,6 +360,7 @@ Status RsyncClient::PullRemoteMeta(std::string* snapshot_uuid, std::set<std::str

if (resp.get() == nullptr || resp->code() != RsyncService::kOk) {
s = Status::IOError("kRsyncMeta request failed! db is not exist or doing bgsave");
LOG(WARNING) << s.ToString() << ", retries:" << retries;
sleep(1);
retries++;
continue;
Expand Down
10 changes: 3 additions & 7 deletions src/throttle.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
#include <algorithm>
#include "pstd/include/env.h"

DEFINE_uint64(raft_minimal_throttle_threshold_mb, 0, "minimal throttle throughput threshold per second");
namespace rsync{
namespace rsync {

Throttle::Throttle(size_t throttle_throughput_bytes, size_t check_cycle)
: throttle_throughput_bytes_(throttle_throughput_bytes),
Expand All @@ -21,9 +20,7 @@ Throttle::~Throttle() {}
size_t Throttle::ThrottledByThroughput(size_t bytes) {
size_t available_size = bytes;
size_t now = pstd::NowMicros();
size_t limit_throughput_bytes_s = std::max(static_cast<uint64_t>(throttle_throughput_bytes_),
FLAGS_raft_minimal_throttle_threshold_mb * 1024 * 1024);
size_t limit_per_cycle = limit_throughput_bytes_s / check_cycle_;
size_t limit_per_cycle = throttle_throughput_bytes_.load() / check_cycle_;
std::unique_lock lock(keys_mutex_);
if (cur_throughput_bytes_ + bytes > limit_per_cycle) {
// reading another |bytes| excceds the limit
Expand Down Expand Up @@ -57,5 +54,4 @@ void Throttle::ReturnUnusedThroughput(size_t acquired, size_t consumed, size_t e
}
cur_throughput_bytes_ = std::max(cur_throughput_bytes_ - (acquired - consumed), size_t(0));
}
}

} // namespace rsync
Loading

0 comments on commit 8aacddd

Please sign in to comment.