Skip to content
Open

test #60989

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions be/src/pipeline/exec/cache_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
->data_queue.set_source_dependency(_shared_state->source_deps.front());
const auto& scan_ranges = info.scan_ranges;
bool hit_cache = false;
if (scan_ranges.size() > 1) {
return Status::InternalError("CacheSourceOperator only support one scan range, plan error");
}

const auto& cache_param = _parent->cast<CacheSourceOperatorX>()._cache_param;
// 1. init the slot orders
Expand All @@ -60,8 +57,20 @@ Status CacheSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {

// 2. build cache key by digest_tablet_id
RETURN_IF_ERROR(QueryCache::build_cache_key(scan_ranges, cache_param, &_cache_key, &_version));
custom_profile()->add_info_string(
"CacheTabletId", std::to_string(scan_ranges[0].scan_range.palo_scan_range.tablet_id));
std::vector<int64_t> cache_tablet_ids;
cache_tablet_ids.reserve(scan_ranges.size());
for (const auto& scan_range : scan_ranges) {
cache_tablet_ids.push_back(scan_range.scan_range.palo_scan_range.tablet_id);
}
std::sort(cache_tablet_ids.begin(), cache_tablet_ids.end());
std::string tablet_ids_str;
for (size_t i = 0; i < cache_tablet_ids.size(); ++i) {
tablet_ids_str += std::to_string(cache_tablet_ids[i]);
if (i < cache_tablet_ids.size() - 1) {
tablet_ids_str += ",";
}
}
custom_profile()->add_info_string("CacheTabletId", tablet_ids_str);
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CacheTabletId profile entry now contains a comma-separated list of multiple tablet ids. Consider renaming the profile key to something plural (e.g. CacheTabletIds) to avoid confusion when debugging profiles.

Suggested change
custom_profile()->add_info_string("CacheTabletId", tablet_ids_str);
custom_profile()->add_info_string("CacheTabletIds", tablet_ids_str);

Copilot uses AI. Check for mistakes.

// 3. lookup the cache and find proper slot order
hit_cache = _global_cache->lookup(_cache_key, _version, &_query_cache_handle);
Expand Down
85 changes: 67 additions & 18 deletions be/src/pipeline/query_cache/query_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,27 +109,76 @@ class QueryCache : public LRUCachePolicy {
static Status build_cache_key(const std::vector<TScanRangeParams>& scan_ranges,
const TQueryCacheParam& cache_param, std::string* cache_key,
int64_t* version) {
if (scan_ranges.size() > 1) {
return Status::InternalError(
"CacheSourceOperator only support one scan range, plan error");
if (scan_ranges.empty()) {
return Status::InternalError("scan_ranges is empty, plan error");
}
auto& scan_range = scan_ranges[0];
DCHECK(scan_range.scan_range.__isset.palo_scan_range);
auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;

std::from_chars(scan_range.scan_range.palo_scan_range.version.data(),
scan_range.scan_range.palo_scan_range.version.data() +
scan_range.scan_range.palo_scan_range.version.size(),
*version);

auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
if (find_tablet == cache_param.tablet_to_range.end()) {
return Status::InternalError("Not find tablet in partition_to_tablets, plan error");

std::string digest;
try {
digest = cache_param.digest;
} catch (const std::exception&) {
return Status::InternalError("digest is invalid, plan error");
}
if (digest.empty()) {
return Status::InternalError("digest is empty, plan error");
}

if (cache_param.tablet_to_range.empty()) {
return Status::InternalError("tablet_to_range is empty, plan error");
}

std::vector<int64_t> tablet_ids;
tablet_ids.reserve(scan_ranges.size());
for (const auto& scan_range : scan_ranges) {
auto tablet_id = scan_range.scan_range.palo_scan_range.tablet_id;
tablet_ids.push_back(tablet_id);
}
Comment on lines +132 to +135
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryCache::build_cache_key reads scan_range.scan_range.palo_scan_range.* without checking scan_range.scan_range.__isset.palo_scan_range (the previous DCHECK is gone). If a non-palo scan range reaches here this will build an invalid key / version; please validate the field is set for every range and return a clear error instead of assuming it.

Copilot uses AI. Check for mistakes.
std::sort(tablet_ids.begin(), tablet_ids.end());

Comment on lines +130 to +137
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryCache::build_cache_key now uses std::sort / std::find_if (and std::from_chars) in this header, but the header doesn't include <algorithm> (and should also explicitly include <charconv> if not guaranteed elsewhere). Depending on include order this can break compilation; please add the required standard headers here.

Copilot uses AI. Check for mistakes.
int64_t first_version = -1;
std::string first_tablet_range;
for (size_t i = 0; i < tablet_ids.size(); ++i) {
auto tablet_id = tablet_ids[i];

auto find_tablet = cache_param.tablet_to_range.find(tablet_id);
if (find_tablet == cache_param.tablet_to_range.end()) {
return Status::InternalError("Not find tablet in partition_to_tablets, plan error");
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message says partition_to_tablets, but the code is actually checking cache_param.tablet_to_range. Please update the message to reference the correct field to make diagnosing plan issues easier.

Suggested change
return Status::InternalError("Not find tablet in partition_to_tablets, plan error");
return Status::InternalError("Not find tablet in tablet_to_range, plan error");

Copilot uses AI. Check for mistakes.
}

auto scan_range_iter =
std::find_if(scan_ranges.begin(), scan_ranges.end(),
[&tablet_id](const TScanRangeParams& range) {
return range.scan_range.palo_scan_range.tablet_id == tablet_id;
});
Comment on lines +148 to +152
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

build_cache_key does a std::find_if(scan_ranges.begin(), scan_ranges.end(), ...) inside a loop over tablet_ids, making it O(n^2) per instance. For large instances this adds avoidable CPU overhead; consider a single pass that builds a map (tablet_id -> parsed version) or collects (tablet_id, version, range) pairs and then sorts.

Copilot uses AI. Check for mistakes.
int64_t current_version = -1;
std::from_chars(scan_range_iter->scan_range.palo_scan_range.version.data(),
scan_range_iter->scan_range.palo_scan_range.version.data() +
scan_range_iter->scan_range.palo_scan_range.version.size(),
current_version);
Comment on lines +154 to +157
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::from_chars(...) parse result is ignored. If version is empty/non-numeric/partially-numeric, current_version can remain -1 (or parse partially) and still be treated as valid, leading to wrong cache versioning. Please check the returned std::from_chars_result (ec/ptr) and fail fast on parse errors.

Suggested change
std::from_chars(scan_range_iter->scan_range.palo_scan_range.version.data(),
scan_range_iter->scan_range.palo_scan_range.version.data() +
scan_range_iter->scan_range.palo_scan_range.version.size(),
current_version);
const auto& version_str = scan_range_iter->scan_range.palo_scan_range.version;
const char* version_begin = version_str.data();
const char* version_end = version_begin + version_str.size();
auto parse_result = std::from_chars(version_begin, version_end, current_version);
if (parse_result.ec != std::errc() || parse_result.ptr != version_end) {
return Status::InternalError("tablet version is invalid, plan error");
}

Copilot uses AI. Check for mistakes.

if (i == 0) {
first_version = current_version;
first_tablet_range = find_tablet->second;
} else {
if (current_version != first_version) {
return Status::InternalError(
"All tablets in one instance must have the same version, plan error");
}
if (find_tablet->second != first_tablet_range) {
return Status::InternalError(
"All tablets in one instance must have the same tablet_to_range, plan "
"error");
}
}
}

*cache_key = cache_param.digest +
std::string(reinterpret_cast<char*>(&tablet_id), sizeof(tablet_id)) +
find_tablet->second;
*version = first_version;

*cache_key = digest;
for (auto tablet_id : tablet_ids) {
*cache_key += std::string(reinterpret_cast<char*>(&tablet_id), sizeof(tablet_id));
}
*cache_key += first_tablet_range;

return Status::OK();
}
Expand Down
170 changes: 168 additions & 2 deletions be/test/pipeline/exec/query_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,24 @@ TEST_F(QueryCacheTest, create_global_cache) {
TEST_F(QueryCacheTest, build_cache_key) {
{
std::vector<TScanRangeParams> scan_ranges;
scan_ranges.push_back({});
scan_ranges.push_back({});
TScanRangeParams scan_range1;
TPaloScanRange palp_scan_range1;
palp_scan_range1.__set_tablet_id(1);
palp_scan_range1.__set_version("100");
Comment on lines +44 to +46
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name palp_scan_range* looks like a typo (should likely be palo_scan_range* to match the Thrift type TPaloScanRange). Renaming would improve readability and avoid confusion in future test edits.

Copilot uses AI. Check for mistakes.
scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
scan_ranges.emplace_back(scan_range1);

TScanRangeParams scan_range2;
TPaloScanRange palp_scan_range2;
palp_scan_range2.__set_tablet_id(2);
palp_scan_range2.__set_version("100");
scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
scan_ranges.emplace_back(scan_range2);

TQueryCacheParam cache_param;
cache_param.__set_digest("test_digest");
cache_param.tablet_to_range.insert({1, "range_abc"});
cache_param.tablet_to_range.insert({2, "range_xyz"});
std::string cache_key;
int64_t version = 0;
auto st = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version);
Expand All @@ -59,6 +74,7 @@ TEST_F(QueryCacheTest, build_cache_key) {
scan_range.scan_range.__set_palo_scan_range(palp_scan_range);
scan_ranges.push_back(scan_range);
TQueryCacheParam cache_param;
cache_param.__set_digest("test_digest");
std::string cache_key;
int64_t version = 0;
auto st = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version);
Expand Down Expand Up @@ -87,6 +103,156 @@ TEST_F(QueryCacheTest, build_cache_key) {
}
}

TEST_F(QueryCacheTest, build_cache_key_multiple_tablets) {
{
std::vector<TScanRangeParams> scan_ranges;
TScanRangeParams scan_range1;
TPaloScanRange palp_scan_range1;
palp_scan_range1.__set_tablet_id(3);
palp_scan_range1.__set_version("100");
scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
scan_ranges.push_back(scan_range1);

TScanRangeParams scan_range2;
TPaloScanRange palp_scan_range2;
palp_scan_range2.__set_tablet_id(1);
palp_scan_range2.__set_version("100");
scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
scan_ranges.push_back(scan_range2);

TScanRangeParams scan_range3;
TPaloScanRange palp_scan_range3;
palp_scan_range3.__set_tablet_id(2);
palp_scan_range3.__set_version("100");
scan_range3.scan_range.__set_palo_scan_range(palp_scan_range3);
scan_ranges.push_back(scan_range3);

TQueryCacheParam cache_param;
cache_param.__set_digest("test_digest");
cache_param.tablet_to_range.insert({1, "range_abc"});
cache_param.tablet_to_range.insert({2, "range_abc"});
cache_param.tablet_to_range.insert({3, "range_abc"});

std::string cache_key;
int64_t version = 0;
auto st = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version);

EXPECT_TRUE(st.ok());
EXPECT_EQ(version, 100);

int64_t expected_tablet1 = 1;
int64_t expected_tablet2 = 2;
int64_t expected_tablet3 = 3;
std::string expected_key =
"test_digest" +
std::string(reinterpret_cast<char*>(&expected_tablet1), sizeof(expected_tablet1)) +
std::string(reinterpret_cast<char*>(&expected_tablet2), sizeof(expected_tablet2)) +
std::string(reinterpret_cast<char*>(&expected_tablet3), sizeof(expected_tablet3)) +
"range_abc";

EXPECT_EQ(cache_key, expected_key);
}

{
std::vector<TScanRangeParams> scan_ranges;
TScanRangeParams scan_range1;
TPaloScanRange palp_scan_range1;
palp_scan_range1.__set_tablet_id(1);
palp_scan_range1.__set_version("100");
scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
scan_ranges.push_back(scan_range1);

TScanRangeParams scan_range2;
TPaloScanRange palp_scan_range2;
palp_scan_range2.__set_tablet_id(2);
palp_scan_range2.__set_version("200");
scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
scan_ranges.push_back(scan_range2);

TQueryCacheParam cache_param;
cache_param.__set_digest("test_digest");
cache_param.tablet_to_range.insert({1, "range_abc"});
cache_param.tablet_to_range.insert({2, "range_abc"});

std::string cache_key;
int64_t version = 0;
auto st = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version);

EXPECT_FALSE(st.ok());
EXPECT_TRUE(st.msg().find("same version") != std::string::npos);
}

{
std::vector<TScanRangeParams> scan_ranges;
TScanRangeParams scan_range1;
TPaloScanRange palp_scan_range1;
palp_scan_range1.__set_tablet_id(1);
palp_scan_range1.__set_version("100");
scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
scan_ranges.push_back(scan_range1);

TScanRangeParams scan_range2;
TPaloScanRange palp_scan_range2;
palp_scan_range2.__set_tablet_id(2);
palp_scan_range2.__set_version("100");
scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
scan_ranges.push_back(scan_range2);

TQueryCacheParam cache_param;
cache_param.__set_digest("test_digest");
cache_param.tablet_to_range.insert({1, "range_abc"});
cache_param.tablet_to_range.insert({2, "range_xyz"});

std::string cache_key;
int64_t version = 0;
auto st = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version);

EXPECT_FALSE(st.ok());
EXPECT_TRUE(st.msg().find("same tablet_to_range") != std::string::npos);
}

{
std::vector<TScanRangeParams> scan_ranges;
TScanRangeParams scan_range1;
TPaloScanRange palp_scan_range1;
palp_scan_range1.__set_tablet_id(1);
palp_scan_range1.__set_version("100");
scan_range1.scan_range.__set_palo_scan_range(palp_scan_range1);
scan_ranges.push_back(scan_range1);

TScanRangeParams scan_range2;
TPaloScanRange palp_scan_range2;
palp_scan_range2.__set_tablet_id(2);
palp_scan_range2.__set_version("100");
scan_range2.scan_range.__set_palo_scan_range(palp_scan_range2);
scan_ranges.push_back(scan_range2);

TQueryCacheParam cache_param;
cache_param.__set_digest("test_digest");
cache_param.tablet_to_range.insert({1, "range_abc"});
cache_param.tablet_to_range.insert({3, "range_abc"});

std::string cache_key;
int64_t version = 0;
auto st = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version);

EXPECT_FALSE(st.ok());
EXPECT_TRUE(st.msg().find("Not find tablet") != std::string::npos);
}

{
std::vector<TScanRangeParams> scan_ranges;
TQueryCacheParam cache_param;
cache_param.__set_digest("test_digest");
std::string cache_key;
int64_t version = 0;
auto st = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version);

EXPECT_FALSE(st.ok());
EXPECT_TRUE(st.msg().find("empty") != std::string::npos);
}
}

TEST_F(QueryCacheTest, insert_and_lookup) {
std::unique_ptr<QueryCache> query_cache {QueryCache::create_global_cache(1024 * 1024 * 1024)};
std::string cache_key = "be ut";
Expand Down
Loading
Loading