Skip to content

Commit

Permalink
Merge branch 'bugfix4' into 'master' (merge request !336)
Browse files Browse the repository at this point in the history
[feature]getMinbinlogid maybe too time costing : saveMinBinlogId in rocksdb
### MR描述
<!--- 详细描述MR的细节 -->
把minbinlogid保存到rocksdb里面去


### 修改动机和上下文背景
<!--- 为什么需要此修改, 解决了什么问题 -->
<!---如果解决了相关的#issue, 在此处进行关联(#issue, close #issue) -->
getMinbinlogid性能比较差,把minbinlogid保存到rocksdb里面去  

#70  

#52   

### 此MR如何进行测试 ?
<!--- 请描述测试MR的细节 -->
<!--- 包括测试的环境以及执行的测试用例 -->
<!--- 说明 change 如何影响其他部分的代码 etc. -->
参考 #70 #52

### change 类型
<!---你的代码引入了何种类型的change, 在所有关联的复选框前选择"x" -->
- [ ] Bug fix (修复了issue的非侵入式修改)
- [ ] New feature (增加功能的非侵入式修改)
- [ ] Breaking change (修复或者增加特性, 但是会造成现有行为的非预期行为)

### 清单
<!--- 查看下述选项,并进行"x"勾选 -->
<!--- 如果你对所有都不确定, 请随时咨询我们 -->
- [ ] 遵循项目的Code-Style
- [ ] Change 需要文档的修改
- [ ] 我已经进行相关文档的修改
- [ ] 我的MR已经通过的相关流水线测试
  • Loading branch information
TendisDev committed May 13, 2021
2 parents 436aff0 + 245b012 commit 5cccafd
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/tendisplus/replication/repl_util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ Expected<BinlogResult> applySingleTxnV2(Session* sess,
string err = "binlogId:" + to_string(binlogId) +
" can't be smaller than highestBinlogId:" +
to_string(store->getHighestBinlogId());
LOG(ERROR) << err;
LOG(ERROR) << err << " storeid:" << storeId;
return {ErrorCodes::ERR_MANUAL, err};
}

Expand Down
2 changes: 2 additions & 0 deletions src/tendisplus/server/server_params.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ ServerParams::ServerParams() {
REGISTER_VARS_DIFF_NAME_DYNAMIC("jeprof-auto-dump", jeprofAutoDump);
REGISTER_VARS_DIFF_NAME_DYNAMIC("compactrange-after-deleterange",
compactRangeAfterDeleteRange);
REGISTER_VARS_DIFF_NAME_DYNAMIC("save-min-binlogid",
saveMinBinlogId);
}

ServerParams::~ServerParams() {
Expand Down
1 change: 1 addition & 0 deletions src/tendisplus/server/server_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ class ServerParams {
int64_t luaStateMaxIdleTime = 60*60*1000; // ms
bool jeprofAutoDump = true;
bool compactRangeAfterDeleteRange = false;
bool saveMinBinlogId = true;
};

extern std::shared_ptr<tendisplus::ServerParams> gParams;
Expand Down
28 changes: 26 additions & 2 deletions src/tendisplus/storage/kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,30 @@ Expected<ReplLogRawV2> RepllogCursorV2::getMinBinlog(Transaction* txn) {
}

Expected<uint64_t> RepllogCursorV2::getMinBinlogId(Transaction* txn) {
if (gParams != nullptr && gParams->saveMinBinlogId) {
RecordKey key(REPLLOGKEYV2_META_CHUNKID, REPLLOGKEYV2_META_DBID,
RecordType::RT_META, "", "");
auto eval = txn->getKV(key.encode());
if (eval.ok()) {
auto v = RecordValue::decode(eval.value());
if (!v.ok()) {
LOG(ERROR) << "binlog META decode error:" << v.status().toString();
return {ErrorCodes::ERR_INTERGER, "binlog META decode error"};
}
if (v.value().getRecordType() != RecordType::RT_META
|| v.value().getValue().size() != sizeof(uint64_t)) {
LOG(ERROR) << "binlog META decode error:" << v.status().toString();
return {ErrorCodes::ERR_INTERGER, "binlog META decode error"};
}
uint64_t binlogId = int64Decode(v.value().getValue().c_str());
return binlogId;
} else if (!eval.ok() && eval.status().code() != ErrorCodes::ERR_NOTFOUND) {
LOG(WARNING) << "get binlog META error:" << eval.status().toString();
return eval.status();
}
DLOG(WARNING) << "binlog META is not exists, will use seek.";
}

auto cursor = txn->createBinlogCursor();
if (!cursor) {
return {ErrorCodes::ERR_INTERNAL, "txn->createBinlogCursor() error"};
Expand Down Expand Up @@ -297,7 +321,7 @@ Expected<Record> AllDataCursor::next() {
auto expRcd = _baseCursor->next();
if (expRcd.ok()) {
Record dataRecord(expRcd.value());
if (dataRecord.getRecordKey().getRecordType() != RecordType::RT_BINLOG) {
if (dataRecord.getRecordKey().getChunkId() < REPLLOGKEYV2_META_CHUNKID) {
return dataRecord;
} else {
return {ErrorCodes::ERR_EXHAUST, "no more AllData"};
Expand All @@ -324,7 +348,7 @@ Expected<std::string> AllDataCursor::key() {
auto expKey = _baseCursor->key();
if (expKey.ok()) {
std::string dataKey = expKey.value();
if (RecordKey::decodeType(dataKey) != RecordType::RT_BINLOG) {
if (RecordKey::decodeChunkId(dataKey) < REPLLOGKEYV2_META_CHUNKID) {
return dataKey;
} else {
return {ErrorCodes::ERR_EXHAUST, "no more AllData"};
Expand Down
2 changes: 2 additions & 0 deletions src/tendisplus/storage/kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ class Transaction {
const std::string& val,
const uint64_t ts = 0) = 0;
virtual Status delKV(const std::string& key, const uint64_t ts = 0) = 0;
virtual Status setKVWithoutBinlog(const std::string& key,
const std::string& val) = 0;
virtual Status addDeleteRangeBinlog(const std::string& begin,
const std::string& end) = 0;
virtual uint64_t getBinlogTime() = 0;
Expand Down
8 changes: 6 additions & 2 deletions src/tendisplus/storage/record.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,20 @@
namespace tendisplus {

const uint32_t VERSIONMETA_CHUNKID = 0XFFFE0000U;
const uint32_t ADMINCMD_CHUNKID = 0XFFFE0001U;
const uint32_t TTLINDEX_CHUNKID = 0XFFFF0000U;
// NOTE(takenliu) data chunkid must smaller than REPLLOGKEYV2_META_CHUNKID
const uint32_t REPLLOGKEYV2_META_CHUNKID = 0XFFFFFE01U;
const uint32_t REPLLOGKEY_CHUNKID = 0XFFFFFF00U;
const uint32_t REPLLOGKEYV2_CHUNKID = 0XFFFFFF01U;
const uint32_t ADMINCMD_CHUNKID = 0XFFFE0001U;

const uint32_t VERSIONMETA_DBID = 0XFFFE0000U;
const uint32_t ADMINCMD_DBID = 0XFFFE0001U;
const uint32_t TTLINDEX_DBID = 0XFFFF0000U;
// NOTE(takenliu) data dbid must smaller than REPLLOGKEYV2_META_DBID
const uint32_t REPLLOGKEYV2_META_DBID = 0XFFFFFE01U;
const uint32_t REPLLOGKEY_DBID = 0XFFFFFF00U;
const uint32_t REPLLOGKEYV2_DBID = 0XFFFFFF01U;
const uint32_t ADMINCMD_DBID = 0XFFFE0001U;

/* NOTE(vinchen): if you want to add new RecordType, make sure you handle
the below functions correctly.
Expand Down
40 changes: 40 additions & 0 deletions src/tendisplus/storage/rocks/rocks_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,17 @@ Status RocksTxn::setKV(const std::string& key,
return {ErrorCodes::ERR_OK, ""};
}

Status RocksTxn::setKVWithoutBinlog(const std::string& key,
const std::string& val) {
RESET_PERFCONTEXT();
// put data into default column family
auto s = _txn->Put(key, val);
if (!s.ok()) {
return {ErrorCodes::ERR_INTERNAL, s.ToString()};
}
return {ErrorCodes::ERR_OK, ""};
}

Status RocksTxn::delKV(const std::string& key, const uint64_t ts) {
if (_replOnly) {
return {ErrorCodes::ERR_INTERNAL, "txn is replOnly"};
Expand Down Expand Up @@ -1371,6 +1382,15 @@ Expected<TruncateBinlogResult> RocksKVStore::truncateBinlogV2(
uint64_t size = 0;
uint64_t cur_ts = msSinceEpoch();

const auto guard = MakeGuard([this, &nextStart, &txn] {
if (_cfg->saveMinBinlogId) {
// NOTE(takenliu): as we keep at leat one binlog,
// so nextStart must be in db.
// NOTE(takenliu): be care of not in the same transaction.
saveMinBinlogId(nextStart, txn);
}
});

// TODO(takenliu) change binlogDelRange scope to (1000, 1000000)
INVARIANT_D(_cfg->binlogDelRange >= 1);
// if binlogDelRange > 1 and needn't save binlog, use deleteRange quickly
Expand Down Expand Up @@ -2415,6 +2435,26 @@ Status RocksKVStore::deleteRangeWithoutBinlog(
return {ErrorCodes::ERR_OK, ""};
}

Status RocksKVStore::saveMinBinlogId(uint64_t id, Transaction* txn) {
RecordKey key(REPLLOGKEYV2_META_CHUNKID, REPLLOGKEYV2_META_DBID,
RecordType::RT_META, "", "");

std::string val;
val.resize(sizeof(uint64_t));
// binlogId
int64Encode(&val[0], id);
RecordValue value(std::move(val), RecordType::RT_META, -1);

INVARIANT(txn != nullptr);
auto s = txn->setKVWithoutBinlog(key.encode(), value.encode());
if (!s.ok()) {
LOG(ERROR) << "setKV failed:" << s.toString();
return s;
}
return s;
}

// [begin, end)
Status RocksKVStore::deleteRangeBinlog(uint64_t begin, uint64_t end) {
ReplLogKeyV2 beginKey(begin);
ReplLogKeyV2 endKey(end);
Expand Down
3 changes: 3 additions & 0 deletions src/tendisplus/storage/rocks/rocks_kvstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ class RocksTxn : public Transaction {
const std::string& val,
const uint64_t ts = 0) final;
Status delKV(const std::string& key, const uint64_t ts = 0) final;
Status setKVWithoutBinlog(const std::string& key,
const std::string& val) final;
Status addDeleteRangeBinlog(const std::string& begin,
const std::string& end) final;
#ifdef BINLOG_V1
Expand Down Expand Up @@ -235,6 +237,7 @@ class RocksKVStore : public KVStore {
const std::string& begin,
const std::string& end);
Status deleteRangeBinlog(uint64_t begin, uint64_t end);
Status saveMinBinlogId(uint64_t id, Transaction* txn = nullptr);

#ifdef BINLOG_V1
Status applyBinlog(const std::list<ReplLog>& txnLog, Transaction* txn) final;
Expand Down
1 change: 1 addition & 0 deletions src/tendisplus/storage/rocks/rocks_kvstore_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ std::shared_ptr<ServerParams> genParams() {
auto cfg = std::make_shared<ServerParams>();
auto s = cfg->parseFile("a.cfg");
EXPECT_EQ(s.ok(), true) << s.toString();
gParams = cfg;
return cfg;
}

Expand Down

0 comments on commit 5cccafd

Please sign in to comment.