diff --git a/conf/pika.conf b/conf/pika.conf index 3fcb5d515..090f71996 100644 --- a/conf/pika.conf +++ b/conf/pika.conf @@ -36,9 +36,18 @@ slow-cmd-pool : no # are dedicated to handling slow user requests. slow-cmd-thread-pool-size : 1 +# Size of the low level thread pool, The threads within this pool +# are dedicated to handling slow user requests. +admin-thread-pool-size : 2 + # Slow cmd list e.g. hgetall, mset slow-cmd-list : +# List of commands considered as administrative. These commands will be handled by the admin thread pool. Modify this list as needed. +# Default commands: info, ping, monitor +# This parameter is only supported by the CONFIG GET command and not by CONFIG SET. +admin-cmd-list : info, ping, monitor + # The number of threads to write DB in slaveNode when replicating. # It's preferable to set slave's sync-thread-num value close to master's thread-pool-size. sync-thread-num : 6 diff --git a/include/pika_conf.h b/include/pika_conf.h index e93a5e7e5..2c0cb17d0 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -61,6 +61,10 @@ class PikaConf : public pstd::BaseConf { std::shared_lock l(rwlock_); return slow_cmd_thread_pool_size_; } + int admin_thread_pool_size() { + std::shared_lock l(rwlock_); + return admin_thread_pool_size_; + } int sync_thread_num() { std::shared_lock l(rwlock_); return sync_thread_num_; @@ -441,6 +445,12 @@ class PikaConf : public pstd::BaseConf { return pstd::Set2String(slow_cmd_set_, ','); } + // Admin Commands configuration + const std::string GetAdminCmd() { + std::shared_lock l(rwlock_); + return pstd::Set2String(admin_cmd_set_, ','); + } + const std::string GetUserBlackList() { std::shared_lock l(rwlock_); return userblacklist_; @@ -451,6 +461,10 @@ class PikaConf : public pstd::BaseConf { return slow_cmd_set_.find(cmd) != slow_cmd_set_.end(); } + bool is_admin_cmd(const std::string& cmd) { + return admin_cmd_set_.find(cmd) != admin_cmd_set_.end(); + } + // Immutable config items, we don't use lock. bool daemonize() { return daemonize_; } std::string pidfile() { return pidfile_; } @@ -489,6 +503,11 @@ class PikaConf : public pstd::BaseConf { slow_cmd_thread_pool_size_ = value; } + void SetAdminThreadPoolSize(const int value) { + std::lock_guard l(rwlock_); + admin_thread_pool_size_ = value; + } + void SetSlaveof(const std::string& value) { std::lock_guard l(rwlock_); TryPushDiffCommands("slaveof", value); @@ -814,6 +833,14 @@ class PikaConf : public pstd::BaseConf { pstd::StringSplit2Set(lower_value, ',', slow_cmd_set_); } + void SetAdminCmd(const std::string& value) { + std::lock_guard l(rwlock_); + std::string lower_value = value; + pstd::StringToLower(lower_value); + TryPushDiffCommands("admin-cmd-list", lower_value); + pstd::StringSplit2Set(lower_value, ',', admin_cmd_set_); + } + void SetCacheType(const std::string &value); void SetCacheDisableFlag() { tmp_cache_disable_flag_ = true; } int zset_cache_start_direction() { return zset_cache_start_direction_; } @@ -832,7 +859,9 @@ class PikaConf : public pstd::BaseConf { int thread_num_ = 0; int thread_pool_size_ = 0; int slow_cmd_thread_pool_size_ = 0; + int admin_thread_pool_size_ = 0; std::unordered_set slow_cmd_set_; + std::unordered_set admin_cmd_set_ = {"info", "ping", "monitor"}; int sync_thread_num_ = 0; int sync_binlog_thread_num_ = 0; int expire_dump_days_ = 3; diff --git a/include/pika_server.h b/include/pika_server.h index 02aaad1bf..1374158f8 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -180,7 +180,7 @@ class PikaServer : public pstd::noncopyable { /* * PikaClientProcessor Process Task */ - void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd); + void ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd); // for info debug size_t ClientProcessorThreadPoolCurQueueSize(); @@ -554,6 +554,7 @@ class PikaServer : public pstd::noncopyable { int worker_num_ = 0; std::unique_ptr pika_client_processor_; std::unique_ptr pika_slow_cmd_thread_pool_; + std::unique_ptr pika_admin_cmd_thread_pool_; std::unique_ptr pika_dispatch_thread_ = nullptr; /* diff --git a/src/pika_admin.cc b/src/pika_admin.cc index 9e974bd7c..81a5a36ab 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -1496,6 +1496,13 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, "slow-cmd-thread-pool-size"); EncodeNumber(&config_body, g_pika_conf->slow_cmd_thread_pool_size()); } + + if (pstd::stringmatch(pattern.data(), "admin-thread-pool-size", 1) != 0) { + elements += 2; + EncodeString(&config_body, "admin-thread-pool-size"); + EncodeNumber(&config_body, g_pika_conf->admin_thread_pool_size()); + } + if (pstd::stringmatch(pattern.data(), "userblacklist", 1) != 0) { elements += 2; EncodeString(&config_body, "userblacklist"); @@ -1506,7 +1513,11 @@ void ConfigCmd::ConfigGet(std::string& ret) { EncodeString(&config_body, "slow-cmd-list"); EncodeString(&config_body, g_pika_conf->GetSlowCmd()); } - + if (pstd::stringmatch(pattern.data(), "admin-cmd-list", 1) != 0) { + elements += 2; + EncodeString(&config_body, "admin-cmd-list"); + EncodeString(&config_body, g_pika_conf->GetAdminCmd()); + } if (pstd::stringmatch(pattern.data(), "sync-thread-num", 1) != 0) { elements += 2; EncodeString(&config_body, "sync-thread-num"); diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index c5f0a0984..1156cc3d9 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -272,7 +272,8 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& std::string opt = argvs[0][0]; pstd::StringToLower(opt); bool is_slow_cmd = g_pika_conf->is_slow_cmd(opt); - g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd); + bool is_admin_cmd = g_pika_conf->is_admin_cmd(opt); + g_pika_server->ScheduleClientPool(&DoBackgroundTask, arg, is_slow_cmd, is_admin_cmd); return; } BatchExecRedisCmd(argvs); diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 3d54e3e89..1e06c9953 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -166,10 +166,25 @@ int PikaConf::Load() { slow_cmd_thread_pool_size_ = 50; } + GetConfInt("admin-thread-pool-size", &admin_thread_pool_size_); + if (admin_thread_pool_size_ <= 0) { + admin_thread_pool_size_ = 2; + } + if (admin_thread_pool_size_ > 4) { + admin_thread_pool_size_ = 4; + } + std::string slow_cmd_list; GetConfStr("slow-cmd-list", &slow_cmd_list); SetSlowCmd(slow_cmd_list); + std::string admin_cmd_list; + GetConfStr("admin-cmd-list", &admin_cmd_list); + if (admin_cmd_list == ""){ + admin_cmd_list = "info, monitor, ping"; + SetAdminCmd(admin_cmd_list); + } + GetConfInt("sync-thread-num", &sync_thread_num_); if (sync_thread_num_ <= 0) { sync_thread_num_ = 3; diff --git a/src/pika_list.cc b/src/pika_list.cc index 1ecf00518..06ae9e24f 100644 --- a/src/pika_list.cc +++ b/src/pika_list.cc @@ -171,7 +171,8 @@ void BlockingBaseCmd::TryToServeBLrPopWithThisKey(const std::string& key, std::s auto* args = new UnblockTaskArgs(key, std::move(db), dispatchThread); bool is_slow_cmd = g_pika_conf->is_slow_cmd("LPOP") || g_pika_conf->is_slow_cmd("RPOP"); - g_pika_server->ScheduleClientPool(&ServeAndUnblockConns, args, is_slow_cmd); + bool is_admin_cmd = false; + g_pika_server->ScheduleClientPool(&ServeAndUnblockConns, args, is_slow_cmd, is_admin_cmd); } void BlockingBaseCmd::ServeAndUnblockConns(void* args) { diff --git a/src/pika_server.cc b/src/pika_server.cc index 450a18001..35efd4674 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -82,6 +82,7 @@ PikaServer::PikaServer() pika_client_processor_ = std::make_unique(g_pika_conf->thread_pool_size(), 100000); pika_slow_cmd_thread_pool_ = std::make_unique(g_pika_conf->slow_cmd_thread_pool_size(), 100000); + pika_admin_cmd_thread_pool_ = std::make_unique(g_pika_conf->admin_thread_pool_size(), 100000); instant_ = std::make_unique(); exit_mutex_.lock(); int64_t lastsave = GetLastSaveTime(g_pika_conf->bgsave_path()); @@ -110,6 +111,7 @@ PikaServer::~PikaServer() { // so we need to delete dispatch before worker. pika_client_processor_->Stop(); pika_slow_cmd_thread_pool_->stop_thread_pool(); + pika_admin_cmd_thread_pool_->stop_thread_pool(); { std::lock_guard l(slave_mutex_); auto iter = slaves_.begin(); @@ -168,6 +170,19 @@ void PikaServer::Start() { LOG(FATAL) << "Start PikaClientProcessor Error: " << ret << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); } + + ret = pika_slow_cmd_thread_pool_->start_thread_pool(); + if (ret != net::kSuccess) { + dbs_.clear(); + LOG(FATAL) << "Start PikaLowLevelThreadPool Error: " << ret + << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); + } + ret = pika_admin_cmd_thread_pool_->start_thread_pool(); + if (ret != net::kSuccess) { + dbs_.clear(); + LOG(FATAL) << "Start PikaAdminThreadPool Error: " << ret + << (ret == net::kCreateThreadError ? ": create thread error " : ": other error"); + } ret = pika_dispatch_thread_->StartThread(); if (ret != net::kSuccess) { dbs_.clear(); @@ -720,11 +735,15 @@ void PikaServer::SetFirstMetaSync(bool v) { first_meta_sync_ = v; } -void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd) { +void PikaServer::ScheduleClientPool(net::TaskFunc func, void* arg, bool is_slow_cmd, bool is_admin_cmd) { if (is_slow_cmd && g_pika_conf->slow_cmd_pool()) { pika_slow_cmd_thread_pool_->Schedule(func, arg); return; } + if (is_admin_cmd) { + pika_admin_cmd_thread_pool_->Schedule(func, arg); + return; + } pika_client_processor_->SchedulePool(func, arg); }