Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: spop binlog , rewritten as srem #2541

Merged
merged 8 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 35 additions & 26 deletions include/pika_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,38 @@ class SAddCmd : public Cmd {
void DoInitial() override;
};

class SRemCmd : public Cmd {
public:
SRemCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::SET)) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do() override;
void DoUpdateCache() override;
void DoThroughDB() override;
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new SRemCmd(*this); }

private:
void DoInitial() override;

private:
std::string key_;
std::vector<std::string> members_;
rocksdb::Status s_;
int32_t deleted_ = 0;
};

class SPopCmd : public Cmd {
public:
SPopCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::SET)) {}
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::SET)) {
srem_cmd_ = std::make_shared<SRemCmd>(kCmdNameSRem, -3, kCmdFlagsWrite | kCmdFlagsSet);
}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
Expand All @@ -51,13 +79,18 @@ class SPopCmd : public Cmd {
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new SPopCmd(*this); }
void DoBinlog() override;

private:
void DoInitial() override;

private:
std::string key_;
std::vector<std::string> members_;
// used for write binlog
std::shared_ptr<SRemCmd> srem_cmd_;
int64_t count_ = 1;
rocksdb::Status s_;
void DoInitial() override;
};

class SCardCmd : public Cmd {
Expand Down Expand Up @@ -131,30 +164,6 @@ class SScanCmd : public Cmd {
}
};

class SRemCmd : public Cmd {
public:
SRemCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::SET)) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_);
return res;
}
void Do() override;
void DoUpdateCache() override;
void DoThroughDB() override;
void Split(const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new SRemCmd(*this); }

private:
std::string key_;
std::vector<std::string> members_;
rocksdb::Status s_;
int32_t deleted_ = 0;
void DoInitial() override;
};

class SUnionCmd : public Cmd {
public:
SUnionCmd(const std::string& name, int arity, uint32_t flag)
Expand Down
18 changes: 18 additions & 0 deletions src/pika_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,24 @@ void SPopCmd::DoUpdateCache() {
}
}

void SPopCmd::DoBinlog() {
if (!s_.ok()) {
return;
}

PikaCmdArgsType srem_args;
srem_args.emplace_back("srem");
srem_args.emplace_back(key_);
for (auto m = members_.begin(); m != members_.end(); ++m) {
srem_args.emplace_back(*m);
}

srem_cmd_->Initial(srem_args, db_name_);
srem_cmd_->SetConn(GetConn());
srem_cmd_->SetResp(resp_.lock());
srem_cmd_->DoBinlog();
}

void SCardCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameSCard);
Expand Down
50 changes: 50 additions & 0 deletions tests/integration/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# README
chenbt-hz marked this conversation as resolved.
Show resolved Hide resolved
这是用golang编写的pika 集成测试代码,默认提交代码到pika仓库后会自动运行。

## 本地跑golang集成测试
如果你想在本地运行测试,需要完成以下的准备工作:

### 1.准备程序和配置文件
在../../output/pika目录确保有编译好的pika程序。
(也可以提前编译好mac版本的pika程序,并手动将pika文件拷贝到start_master_and_slave.sh中制定的目录,将pika未改动的conf文件拷贝到test目录;或者直接修改start_master_and_slave.sh启动路径。)

手动执行测试的前提是,已安装ginkgo,例如
```
cd tests/integration/
go get github.com/onsi/ginkgo/v2/ginkgo
go install github.com/onsi/ginkgo/v2/ginkgo
go get github.com/onsi/gomega/...
```

### 2.启动Pika服务
在项目主目录下执行
```
cd test

sh ./integration/start_master_and_slave.sh
```

### 3.运行单测
在test目录下执行
cd integration
sh integrate_test.sh

### 4.运行指定文件的测试


添加环境变量
```
go env |grep GOBIN
export PATH="$PATH:$GOBIN"
```

执行`ginkgo --focus-file="slowlog_test.go" -vv`

ginkgo框架参考 https://onsi.github.io/ginkgo/#mental-model-ginkgo-assumes-specs-are-independent
备注:
--focus-file执行匹配文件
--skip-file过滤不匹配的文件
--focus执行匹配描述的测试
--skip过滤匹配描述的测试
例如,`ginkgo --focus=dog --focus=fish --skip=cat --skip=purple`
则只运行运行It(描述内容中)例如"likes dogs"、"likes dog fish"的单测,而跳过"purple"相关的测试。
36 changes: 36 additions & 0 deletions tests/integration/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,27 @@ func randomSunionstroeThread(ctx *context.Context, clientMaster *redis.Client, w
}
}

func randomSpopstroeThread(ctx *context.Context, clientMaster *redis.Client, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
clientMaster.SAdd(*ctx, "set1", randomString(5))
clientMaster.SAdd(*ctx, "set1", randomString(5))
clientMaster.SAdd(*ctx, "set1", randomString(5))
clientMaster.SAdd(*ctx, "set1", randomString(5))
clientMaster.SAdd(*ctx, "set1", randomString(5))
clientMaster.SAdd(*ctx, "set1", randomString(5))
clientMaster.SPop(*ctx, "set1")

clientMaster.SAdd(*ctx, "set2", randomString(5))
clientMaster.SAdd(*ctx, "set2", randomString(5))
clientMaster.SAdd(*ctx, "set2", randomString(5))
clientMaster.SAdd(*ctx, "set2", randomString(5))
clientMaster.SAdd(*ctx, "set2", randomString(5))
clientMaster.SAdd(*ctx, "set2", randomString(5))
clientMaster.SPopN(*ctx, "set2", int64(randomInt(5)))
}
}

func randomXaddThread(ctx *context.Context, clientMaster *redis.Client, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -566,6 +587,21 @@ var _ = Describe("should replication ", func() {
clientMaster.Del(ctx, "set1", "set2", "set_out")
log.Println("randomSunionstore test success")

log.Println("randomSpopstore test start")
execute(&ctx, clientMaster, 4, randomSpopstroeThread)
master_spopstore_set := clientMaster.SMembers(ctx, "set1")
Expect(master_spopstore_set.Err()).NotTo(HaveOccurred())
slave_spopstore_set := clientSlave.SMembers(ctx, "set1")
Expect(slave_spopstore_set.Err()).NotTo(HaveOccurred())
Expect(master_spopstore_set.Val()).To(Equal(slave_spopstore_set.Val()))
master_spopstore_set2 := clientMaster.SMembers(ctx, "set2")
Expect(master_spopstore_set2.Err()).NotTo(HaveOccurred())
slave_spopstore_set2 := clientSlave.SMembers(ctx, "set2")
Expect(slave_spopstore_set2.Err()).NotTo(HaveOccurred())
Expect(master_spopstore_set2.Val()).To(Equal(slave_spopstore_set2.Val()))
clientMaster.Del(ctx, "set1", "set2")
log.Println("randomSpopstore test success")

// Stream replication test
log.Println("randomXadd test start")
clientMaster.Del(ctx, "mystream")
Expand Down
Loading