Skip to content

Commit

Permalink
(selectdb-cloud) Implement inverted check to ensure all useless data …
Browse files Browse the repository at this point in the history
…reclaimed (apache#1826)
  • Loading branch information
platoneko committed Jun 6, 2023
1 parent 38986a7 commit 5b0aba8
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 1 deletion.
2 changes: 2 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ CONF_Strings(recycle_whitelist, ""); // Comma seprated list
CONF_Strings(recycle_blacklist, ""); // Comma seprated list
CONF_mInt32(instance_recycler_worker_pool_size, "10");
CONF_Bool(enable_checker, "false");
// Currently only used for recycler test
CONF_Bool(enable_inverted_check, "false");
CONF_mInt32(check_object_interval_seconds, "259200"); // 72h

CONF_String(test_s3_ak, "ak");
Expand Down
133 changes: 133 additions & 0 deletions cloud/src/recycler/checker.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@

#include "recycler/checker.h"

#include <aws/s3/S3Client.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <butil/endpoint.h>
#include <butil/strings/string_split.h>
#include <fmt/core.h>
#include <gen_cpp/olap_file.pb.h>
#include <glog/logging.h>

#include <chrono>
Expand Down Expand Up @@ -30,6 +34,7 @@ extern int32_t recycle_job_lease_expired_ms;
extern int32_t recycle_concurrency;
extern std::vector<std::string> recycle_whitelist;
extern std::vector<std::string> recycle_blacklist;
extern bool enable_inverted_check;
} // namespace config

Checker::Checker(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) {
Expand Down Expand Up @@ -122,6 +127,9 @@ int Checker::start() {
}
if (stopped()) return;
ret = checker->do_check();
if (config::enable_inverted_check) {
if (checker->do_inverted_check() != 0) ret = -1;
}
// If instance checker has been aborted, don't finish this job
if (!checker->stopped()) {
finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(),
Expand Down Expand Up @@ -344,4 +352,129 @@ int InstanceChecker::do_check() {
return num_check_failed == 0 ? 0 : -1;
}

int InstanceChecker::do_inverted_check() {
LOG(INFO) << "begin to inverted check objects instance_id=" << instance_id_;
long num_scanned = 0;
long num_check_failed = 0;
using namespace std::chrono;
auto start_time = steady_clock::now();
std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
auto cost = duration<float>(steady_clock::now() - start_time).count();
LOG(INFO) << "inverted check instance objects finished, cost=" << cost
<< "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
<< " num_check_failed=" << num_check_failed;
});

struct TabletRowsets {
int64_t tablet_id {0};
std::unordered_set<std::string> rowset_ids;
};
TabletRowsets tablet_rowsets_cache;

auto check_object_key = [&](const std::string& obj_key) {
std::vector<std::string> str;
butil::SplitString(obj_key, '/', &str);
// {prefix}/data/{tablet_id}/{rowset_id}_{seg_num}.dat
if (str.size() < 4) {
return -1;
}
int64_t tablet_id = atol((str.end() - 2)->c_str());
if (tablet_id <= 0) {
LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
return -1;
}
std::string rowset_id;
if (auto pos = str.back().find('_'); pos != std::string::npos) {
rowset_id = str.back().substr(0, pos);
} else {
LOG(WARNING) << "failed to parse rowset_id, key=" << obj_key;
return -1;
}
if (tablet_rowsets_cache.tablet_id == tablet_id) {
if (tablet_rowsets_cache.rowset_ids.count(rowset_id) > 0) {
return 0;
} else {
LOG(WARNING) << "rowset not exists, key=" << obj_key;
return -1;
}
}
// Get all rowset id of this tablet
tablet_rowsets_cache.tablet_id = tablet_id;
tablet_rowsets_cache.rowset_ids.clear();
std::unique_ptr<Transaction> txn;
int ret = txn_kv_->create_txn(&txn);
if (ret != 0) {
LOG(WARNING) << "failed to create txn";
return -1;
}
std::unique_ptr<RangeGetIterator> it;
auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
do {
ret = txn->get(begin, end, &it);
if (ret != 0) {
LOG(WARNING) << "failed to get rowset kv, ret=" << ret;
return -1;
}
if (!it->has_next()) {
break;
}
while (it->has_next()) {
// recycle corresponding resources
auto [k, v] = it->next();
doris::RowsetMetaPB rowset;
if (!rowset.ParseFromArray(v.data(), v.size())) {
LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
return -1;
}
tablet_rowsets_cache.rowset_ids.insert(rowset.rowset_id_v2());
if (!it->has_next()) {
begin = k;
begin.push_back('\x00'); // Update to next smallest key for iteration
break;
}
}
} while (it->more() && !stopped());
if (tablet_rowsets_cache.rowset_ids.count(rowset_id) > 0) {
return 0;
} else {
LOG(WARNING) << "rowset not exists, key=" << obj_key;
return -1;
}
return 0;
};

for (auto& [_, accessor] : accessor_map_) {
auto s3_accessor = static_cast<S3Accessor*>(accessor.get());
auto client = s3_accessor->s3_client();
auto& conf = s3_accessor->conf();
Aws::S3::Model::ListObjectsV2Request request;
request.WithBucket(conf.bucket).WithPrefix(conf.prefix + "/data/");
bool is_trucated = false;
do {
auto outcome = client->ListObjectsV2(request);
if (!outcome.IsSuccess()) {
LOG(WARNING) << "failed to list objects, endpoint=" << conf.endpoint
<< " bucket=" << conf.bucket << " prefix=" << request.GetPrefix();
return -1;
}
LOG(INFO) << "get " << outcome.GetResult().GetContents().size()
<< " objects, endpoint=" << conf.endpoint << " bucket=" << conf.bucket
<< " prefix=" << request.GetPrefix();
const auto& result = outcome.GetResult();
num_scanned += result.GetContents().size();
for (auto& obj : result.GetContents()) {
if (check_object_key(obj.GetKey()) != 0) {
LOG(WARNING) << "failed to check object key, endpoint=" << conf.endpoint
<< " bucket=" << conf.bucket << " key=" << obj.GetKey();
++num_check_failed;
}
}
is_trucated = result.GetIsTruncated();
request.SetContinuationToken(result.GetNextContinuationToken());
} while (is_trucated && !stopped());
}
return num_check_failed == 0 ? 0 : -1;
}

} // namespace selectdb
6 changes: 5 additions & 1 deletion cloud/src/recycler/checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,12 @@ class InstanceChecker {
explicit InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id);
// Return 0 if success, otherwise error
int init(const InstanceInfoPB& instance);
// Return 0 if success, otherwise error
// Return 0 if success, otherwise failed
int do_check();
// Check whether the objects in the object store of the instance belong to the visible rowsets.
// This function is used to verify that there is no garbage data leakage, should only be called in recycler test.
// Return 0 if success, otherwise failed
int do_inverted_check();
void stop() { stopped_.store(true, std::memory_order_release); }
bool stopped() const { return stopped_.load(std::memory_order_acquire); }

Expand Down
5 changes: 5 additions & 0 deletions cloud/src/recycler/s3_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ class S3Accessor : public ObjStoreAccessor {

const std::string& path() const override { return path_; }

const std::shared_ptr<Aws::S3::S3Client>& s3_client() const { return s3_client_; }

const S3Conf& conf() const { return conf_; }

// returns 0 for success otherwise error
int init() override;

Expand All @@ -87,6 +91,7 @@ class S3Accessor : public ObjStoreAccessor {
// delete objects which last modified time is less than the input expired time and under the input relative path
// returns 0 for success otherwise error
int delete_expired_objects(const std::string& relative_path, int64_t expired_time) override;

private:
std::string get_key(const std::string& relative_path) const;
// return empty string if the input key does not start with the prefix of S3 conf
Expand Down

0 comments on commit 5b0aba8

Please sign in to comment.