Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix:bitmap cache #2253

Merged
merged 13 commits into from
Jan 24, 2024
2 changes: 1 addition & 1 deletion .github/workflows/pika.yml
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ jobs:
- name: Install Deps
run: |
brew update
brew install --overwrite python autoconf protobuf llvm wget git
brew install --overwrite autoconf protobuf llvm wget git
brew install gcc@10 automake cmake make binutils

- name: Configure CMake
Expand Down
2 changes: 1 addition & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ cache-num : 16
# cache-model 0:cache_none 1:cache_read
cache-model : 1
# cache-type: string, set, zset, list, hash, bit
cache-type: string, set, zset, list, hash
cache-type: string, set, zset, list, hash, bit
chejinge marked this conversation as resolved.
Show resolved Hide resolved

# Maximum number of keys in the zset redis cache
# On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum
Expand Down
7 changes: 6 additions & 1 deletion include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,11 @@ class PikaConf : public pstd::BaseConf {
acl_Log_max_len_ = value;
}

const std::string scache_type() {
std::lock_guard l(rwlock_);
chejinge marked this conversation as resolved.
Show resolved Hide resolved
return pstd::StringConcat(cache_type_, COMMA);
}

int64_t cache_maxmemory() { return cache_maxmemory_; }
void SetSlowCmd(const std::string& value) {
std::lock_guard l(rwlock_);
Expand Down Expand Up @@ -686,7 +691,7 @@ class PikaConf : public pstd::BaseConf {
std::string masterauth_;
std::atomic<bool> classic_mode_;
int databases_ = 0;
int default_slot_num_ = 0;
int default_slot_num_ = 1;
std::vector<DBStruct> db_structs_;
std::string default_db_;
std::string bgsave_path_;
Expand Down
2 changes: 1 addition & 1 deletion include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -372,12 +372,12 @@ const std::string kDBSyncModule = "document";

const std::string kBgsaveInfoFile = "info";

// prefix of pika cache
const std::string PCacheKeyPrefixK = "K";
const std::string PCacheKeyPrefixH = "H";
const std::string PCacheKeyPrefixS = "S";
const std::string PCacheKeyPrefixZ = "Z";
const std::string PCacheKeyPrefixL = "L";
const std::string PCacheKeyPrefixB = "B";


/*
Expand Down
1 change: 0 additions & 1 deletion include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ class LPushxCmd : public Cmd {

private:
std::string key_;
std::string value_;
rocksdb::Status s_;
std::vector<std::string> values_;
void DoInitial() override;
Expand Down
68 changes: 64 additions & 4 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2066,6 +2066,61 @@ void ConfigCmd::ConfigGet(std::string& ret) {
EncodeString(&config_body, g_pika_conf->replication_id());
}


if (pstd::stringmatch(pattern.data(), "cache-num", 1)) {
elements += 2;
EncodeString(&config_body, "cache-num");
EncodeNumber(&config_body, g_pika_conf->GetCacheNum());
}

if (pstd::stringmatch(pattern.data(), "cache-model", 1)) {
elements += 2;
EncodeString(&config_body, "cache-model");
EncodeNumber(&config_body, g_pika_conf->cache_model());
}

if (pstd::stringmatch(pattern.data(), "cache-type", 1)) {
elements += 2;
EncodeString(&config_body, "cache-type");
EncodeString(&config_body, g_pika_conf->scache_type());
}

if (pstd::stringmatch(pattern.data(), "zset-cache-start-direction", 1)) {
elements += 2;
EncodeString(&config_body, "zset-cache-start-direction");
EncodeNumber(&config_body, g_pika_conf->zset_cache_start_pos());
chejinge marked this conversation as resolved.
Show resolved Hide resolved
}

if (pstd::stringmatch(pattern.data(), "zset-cache-field-num-per-key", 1)) {
elements += 2;
EncodeString(&config_body, "zset-cache-field-num-per-key");
EncodeNumber(&config_body, g_pika_conf->zset_cache_field_num_per_key());
}

if (pstd::stringmatch(pattern.data(), "cache-maxmemory", 1)) {
elements += 2;
EncodeString(&config_body, "cache-maxmemory");
EncodeNumber(&config_body, g_pika_conf->cache_maxmemory());
}

if (pstd::stringmatch(pattern.data(), "cache-maxmemory-policy", 1)) {
elements += 2;
EncodeString(&config_body, "cache-maxmemory-policy");
EncodeNumber(&config_body, g_pika_conf->cache_maxmemory_policy());
}

if (pstd::stringmatch(pattern.data(), "cache-maxmemory-samples", 1)) {
elements += 2;
EncodeString(&config_body, "cache-maxmemory-samples");
EncodeNumber(&config_body, g_pika_conf->cache_maxmemory_samples());
}

if (pstd::stringmatch(pattern.data(), "cache-lfu-decay-time", 1)) {
elements += 2;
EncodeString(&config_body, "cache-lfu-decay-time");
EncodeNumber(&config_body, g_pika_conf->cache_lfu_decay_time());
}

if (pstd::stringmatch(pattern.data(), "acl-pubsub-default", 1) != 0) {
elements += 2;
EncodeString(&config_body, "acl-pubsub-default");
Expand Down Expand Up @@ -2121,6 +2176,11 @@ void ConfigCmd::ConfigSet(std::shared_ptr<DB> db) {
"arena-block-size",
"throttle-bytes-per-second",
"max-rsync-parallel-num",
"cache-model",
"cache-type",
"zset-cache-start-direction",
"zset-cache-field-num-per-key",
"cache-lfu-decay-time",
});
res_.AppendStringVector(replyVt);
return;
Expand Down Expand Up @@ -3070,17 +3130,17 @@ void CacheCmd::DoInitial() {
return;
}
if (!strcasecmp(argv_[1].data(), "clear")) {
if (!strcasecmp(argv_[2].data(), "db")) {
if (argv_.size() == 3 && !strcasecmp(argv_[2].data(), "db")) {
condition_ = kCLEAR_DB;
} else if (!strcasecmp(argv_[2].data(), "hitratio")) {
} else if (argv_.size() == 3 && !strcasecmp(argv_[2].data(), "hitratio")) {
condition_ = kCLEAR_HITRATIO;
} else {
res_.SetRes(CmdRes::kErrOther, "Unknown cache subcommand or wrong # of args.");
}
} else if (!strcasecmp(argv_[1].data(), "del")) {
} else if (argv_.size() >= 3 && !strcasecmp(argv_[1].data(), "del")) {
condition_ = kDEL_KEYS;
keys_.assign(argv_.begin() + 2, argv_.end());
} else if (!strcasecmp(argv_[1].data(), "randomkey")) {
} else if (argv_.size() == 2 && !strcasecmp(argv_[1].data(), "randomkey")) {
condition_ = kRANDOM_KEY;
} else {
res_.SetRes(CmdRes::kErrOther, "Unknown cache subcommand or wrong # of args.");
Expand Down
24 changes: 12 additions & 12 deletions src/pika_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ void BitSetCmd::DoThroughDB() {

void BitSetCmd::DoUpdateCache() {
if (s_.ok()) {
std::string CachePrefixKeyB = PCacheKeyPrefixB + key_;
db_->cache()->SetBitIfKeyExist(CachePrefixKeyB, bit_offset_, on_);
std::string CachePrefixKeyK = PCacheKeyPrefixK + key_;
db_->cache()->SetBitIfKeyExist(CachePrefixKeyK, bit_offset_, on_);
}
}

Expand Down Expand Up @@ -94,8 +94,8 @@ void BitGetCmd::Do() {

void BitGetCmd::ReadCache() {
int64_t bit_val = 0;
std::string CachePrefixKeyB = PCacheKeyPrefixB + key_;
auto s = db_->cache()->GetBit(CachePrefixKeyB, bit_offset_, &bit_val);
std::string CachePrefixKeyK = PCacheKeyPrefixK + key_;
auto s = db_->cache()->GetBit(CachePrefixKeyK, bit_offset_, &bit_val);
if (s.ok()) {
res_.AppendInteger(bit_val);
} else if (s.IsNotFound()) {
Expand Down Expand Up @@ -159,11 +159,11 @@ void BitCountCmd::ReadCache() {
int64_t start = static_cast<long>(start_offset_);
int64_t end = static_cast<long>(end_offset_);
rocksdb::Status s;
std::string CachePrefixKeyB = PCacheKeyPrefixB + key_;
std::string CachePrefixKeyK = PCacheKeyPrefixK + key_;
if (count_all_) {
s = db_->cache()->BitCount(CachePrefixKeyB, start, end, &count, 0);
s = db_->cache()->BitCount(CachePrefixKeyK, start, end, &count, 0);
} else {
s = db_->cache()->BitCount(CachePrefixKeyB, start, end, &count, 1);
s = db_->cache()->BitCount(CachePrefixKeyK, start, end, &count, 1);
}

if (s.ok()) {
Expand Down Expand Up @@ -249,13 +249,13 @@ void BitPosCmd::ReadCache() {
int64_t bit = static_cast<long>(bit_val_);
int64_t start = static_cast<long>(start_offset_);
int64_t end = static_cast<long>(end_offset_);\
std::string CachePrefixKeyB = PCacheKeyPrefixB + key_;
std::string CachePrefixKeyK = PCacheKeyPrefixK + key_;
if (pos_all_) {
s = db_->cache()->BitPos(CachePrefixKeyB, bit, &pos);
s = db_->cache()->BitPos(CachePrefixKeyK, bit, &pos);
} else if (!pos_all_ && !endoffset_set_) {
s = db_->cache()->BitPos(CachePrefixKeyB, bit, start, &pos);
s = db_->cache()->BitPos(CachePrefixKeyK, bit, start, &pos);
} else if (!pos_all_ && endoffset_set_) {
s = db_->cache()->BitPos(CachePrefixKeyB, bit, start, end, &pos);
s = db_->cache()->BitPos(CachePrefixKeyK, bit, start, end, &pos);
}
if (s.ok()) {
res_.AppendInteger(pos);
Expand Down Expand Up @@ -329,7 +329,7 @@ void BitOpCmd::DoThroughDB() {
void BitOpCmd::DoUpdateCache() {
if (s_.ok()) {
std::vector<std::string> v;
v.emplace_back(PCacheKeyPrefixB + dest_key_);
v.emplace_back(PCacheKeyPrefixK + dest_key_);
db_->cache()->Del(v);
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGet, std::move(hgetptr)));
////HGetallCmd
std::unique_ptr<Cmd> hgetallptr =
std::make_unique<HGetallCmd>(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow);
std::make_unique<HGetallCmd>(kCmdNameHGetall, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHGetall, std::move(hgetallptr)));
////HExistsCmd
std::unique_ptr<Cmd> hexistsptr =
Expand Down Expand Up @@ -441,7 +441,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHStrlen, std::move(hstrlenptr)));
////HValsCmd
std::unique_ptr<Cmd> hvalsptr =
std::make_unique<HValsCmd>(kCmdNameHVals, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow);
std::make_unique<HValsCmd>(kCmdNameHVals, 2, kCmdFlagsRead | kCmdFlagsHash | kCmdFlagsSlow | kCmdFlagsUpdateCache | kCmdFlagsDoThroughDB | kCmdFlagsReadCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameHVals, std::move(hvalsptr)));
////HScanCmd
std::unique_ptr<Cmd> hscanptr = std::make_unique<HScanCmd>(
Expand Down Expand Up @@ -672,29 +672,29 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSMove, std::move(smoveptr)));
////SRandmemberCmd
std::unique_ptr<Cmd> srandmemberptr =
std::make_unique<SRandmemberCmd>(kCmdNameSRandmember, -2, kCmdFlagsRead | kCmdFlagsSet|kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache | kCmdFlagsSlow);
std::make_unique<SRandmemberCmd>(kCmdNameSRandmember, -2, kCmdFlagsRead | kCmdFlagsSet | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache | kCmdFlagsSlow);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameSRandmember, std::move(srandmemberptr)));

// BitMap
////bitsetCmd
std::unique_ptr<Cmd> bitsetptr =
std::make_unique<BitSetCmd>(kCmdNameBitSet, 4, kCmdFlagsWrite | kCmdFlagsBit | kCmdFlagsSlow);
std::make_unique<BitSetCmd>(kCmdNameBitSet, 4, kCmdFlagsWrite | kCmdFlagsBit | kCmdFlagsSlow | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitSet, std::move(bitsetptr)));
////bitgetCmd
std::unique_ptr<Cmd> bitgetptr =
std::make_unique<BitGetCmd>(kCmdNameBitGet, 3, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow);
std::make_unique<BitGetCmd>(kCmdNameBitGet, 3, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitGet, std::move(bitgetptr)));
////bitcountCmd
std::unique_ptr<Cmd> bitcountptr =
std::make_unique<BitCountCmd>(kCmdNameBitCount, -2, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow);
std::make_unique<BitCountCmd>(kCmdNameBitCount, -2, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitCount, std::move(bitcountptr)));
////bitposCmd
std::unique_ptr<Cmd> bitposptr =
std::make_unique<BitPosCmd>(kCmdNameBitPos, -3, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow);
std::make_unique<BitPosCmd>(kCmdNameBitPos, -3, kCmdFlagsRead | kCmdFlagsBit | kCmdFlagsSlow | kCmdFlagsDoThroughDB | kCmdFlagsReadCache | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitPos, std::move(bitposptr)));
////bitopCmd
std::unique_ptr<Cmd> bitopptr =
std::make_unique<BitOpCmd>(kCmdNameBitOp, -3, kCmdFlagsWrite | kCmdFlagsBit | kCmdFlagsSlow);
std::make_unique<BitOpCmd>(kCmdNameBitOp, -3, kCmdFlagsWrite | kCmdFlagsBit | kCmdFlagsSlow | kCmdFlagsDoThroughDB | kCmdFlagsUpdateCache);
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNameBitOp, std::move(bitopptr)));

// HyperLogLog
Expand Down
15 changes: 10 additions & 5 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ int PikaConf::Load() {
}

GetConfInt("default-slot-num", &default_slot_num_);
if (default_slot_num_ <= 0) {
LOG(FATAL) << "config default-slot-num error,"
<< " it should greater than zero, the actual is: " << default_slot_num_;
}
GetConfStr("dump-path", &bgsave_path_);
bgsave_path_ = bgsave_path_.empty() ? "./dump/" : bgsave_path_;
if (bgsave_path_[bgsave_path_.length() - 1] != '/') {
Expand Down Expand Up @@ -621,7 +617,7 @@ void PikaConf::SetCacheType(const std::string& value) {

int PikaConf::ConfigRewrite() {
// std::string userblacklist = suser_blacklist();

std::string scachetype = scache_type();
std::lock_guard l(rwlock_);
// Only set value for config item that can be config set.
SetConfInt("timeout", timeout_);
Expand Down Expand Up @@ -649,6 +645,7 @@ int PikaConf::ConfigRewrite() {
SetConfStr("compact-cron", compact_cron_);
SetConfStr("compact-interval", compact_interval_);
SetConfStr("disable_auto_compactions", disable_auto_compactions_ ? "true" : "false");
SetConfStr("cache-type", scachetype);
SetConfInt64("least-free-disk-resume-size", least_free_disk_to_resume_);
SetConfInt64("manually-resume-interval", resume_check_interval_);
SetConfDouble("min-check-resume-ratio", min_check_resume_ratio_);
Expand All @@ -669,6 +666,14 @@ int PikaConf::ConfigRewrite() {
SetConfInt64("slotmigrate", slotmigrate_);
// slaveof config item is special
SetConfStr("slaveof", slaveof_);
// cache config
SetConfStr("share-block-cache", share_block_cache_ ? "yes" : "no");
SetConfInt("block-size", block_size_);
SetConfInt("block-cache", block_cache_);
SetConfStr("cache-index-and-filter-blocks", cache_index_and_filter_blocks_ ? "yes" : "no");
SetConfInt("cache-model", cache_model_);
SetConfInt("zset_cache_start_pos", zset_cache_start_pos_);
SetConfInt("zset_cache_field_num_per_key", zset_cache_field_num_per_key_);

if (!diff_commands_.empty()) {
std::vector<pstd::BaseConf::Rep::ConfItem> filtered_items;
Expand Down
2 changes: 1 addition & 1 deletion src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ void PsetexCmd::DoThroughDB() {
void PsetexCmd::DoUpdateCache() {
if (s_.ok()) {
std::string CachePrefixKeyK = PCacheKeyPrefixK + key_;
db_->cache()->WriteKVToCache(CachePrefixKeyK, value_, static_cast<int32_t>(usec_ / 1000));
db_->cache()->Setxx(CachePrefixKeyK, value_, static_cast<int32_t>(usec_ / 1000));
}
}

Expand Down
8 changes: 2 additions & 6 deletions src/pika_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,7 @@ void LPushxCmd::DoThroughDB() {
void LPushxCmd::DoUpdateCache() {
if (s_.ok()) {
std::string CachePrefixKeyL = PCacheKeyPrefixL + key_;
std::vector<std::string> values;
values.push_back(value_);
db_->cache()->LPushx(CachePrefixKeyL, values);
db_->cache()->LPushx(CachePrefixKeyL, values_);
}
}

Expand Down Expand Up @@ -898,8 +896,6 @@ void RPushxCmd::DoThroughDB() {
void RPushxCmd::DoUpdateCache() {
if (s_.ok()) {
std::string CachePrefixKeyL = PCacheKeyPrefixL + key_;
std::vector<std::string> values;
values.push_back(value_);
db_->cache()->RPushx(CachePrefixKeyL, values);
db_->cache()->RPushx(CachePrefixKeyL, values_);
}
}
9 changes: 5 additions & 4 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1742,6 +1742,8 @@ void PikaServer::ResetCacheAsync(uint32_t cache_num, std::shared_ptr<DB> db, cac
}

void PikaServer::ClearCacheDbAsync(std::shared_ptr<DB> db) {
// disable cache temporarily, and restore it after cache cleared
g_pika_conf->SetCacheDisableFlag();
if (PIKA_CACHE_STATUS_OK != db->cache()->CacheStatus()) {
LOG(WARNING) << "can not clear cache in status: " << db->cache()->CacheStatus();
return;
Expand Down Expand Up @@ -1784,8 +1786,9 @@ void PikaServer::DoCacheBGTask(void* arg) {
LOG(WARNING) << "invalid cache task type: " << pCacheTaskArg->task_type;
break;
}

db->cache()->SetCacheStatus(PIKA_CACHE_STATUS_OK);
if (pCacheTaskArg->reenable_cache && pCacheTaskArg->conf) {
if (pCacheTaskArg->reenable_cache) {
pCacheTaskArg->conf->UnsetCacheDisableFlag();
}
}
Expand All @@ -1806,10 +1809,8 @@ void PikaServer::ClearHitRatio(std::shared_ptr<DB> db) {
}

void PikaServer::OnCacheStartPosChanged(int zset_cache_start_pos, std::shared_ptr<DB> db) {
// disable cache temporarily, and restore it after cache cleared
g_pika_conf->SetCacheDisableFlag();
ResetCacheConfig(db);
ClearCacheDbAsyncV2(db);
chejinge marked this conversation as resolved.
Show resolved Hide resolved
ClearCacheDbAsync(db);
}

void PikaServer::ClearCacheDbAsyncV2(std::shared_ptr<DB> db) {
Expand Down
2 changes: 2 additions & 0 deletions tests/integration/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pika_integration

import (
"context"
"sort"
"time"

. "github.com/bsm/ginkgo/v2"
Expand Down Expand Up @@ -305,6 +306,7 @@ var _ = Describe("Hash Commands", func() {
var slice []string
err = client.HVals(ctx, "hash121").ScanSlice(&slice)
Expect(err).NotTo(HaveOccurred())
sort.Strings(slice)
Expect(slice).To(Equal([]string{"hello1", "hello2"}))
})

Expand Down
Loading
Loading