Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,13 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s
segment->_fs = fs;
segment->_file_reader = std::move(file_reader);
st = segment->_open(stats);
} else if (st.is<ErrorCode::CORRUPTION>() &&
reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) {
}

// Three-tier retry for CORRUPTION errors when file cache is enabled.
// This handles CORRUPTION from both open_file() and _parse_footer() (via _open()).
if (st.is<ErrorCode::CORRUPTION>() &&
reader_options.cache_type == io::FileCachePolicy::FILE_BLOCK_CACHE) {
// Tier 1: Clear file cache and retry with cache support (re-downloads from remote).
LOG(WARNING) << "bad segment file may be read from file cache, try to read remote source "
"file directly, file path: "
<< path << " cache_key: " << file_cache_key_str(path);
Expand All @@ -134,6 +139,7 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s
}
TEST_INJECTION_POINT_CALLBACK("Segment::open:corruption1", &st);
if (st.is<ErrorCode::CORRUPTION>()) { // corrupt again
// Tier 2: Bypass cache entirely and read directly from remote storage.
LOG(WARNING) << "failed to try to read remote source file again with cache support,"
<< " try to read from remote directly, "
<< " file path: " << path << " cache_key: " << file_cache_key_str(path);
Expand All @@ -147,6 +153,7 @@ Status Segment::_open(io::FileSystemSPtr fs, const std::string& path, uint32_t s
segment->_file_reader = std::move(file_reader);
st = segment->_open(stats);
if (!st.ok()) {
// Tier 3: Remote source itself is corrupt.
LOG(WARNING) << "failed to try to read remote source file directly,"
<< " file path: " << path
<< " cache_key: " << file_cache_key_str(path);
Expand Down
71 changes: 71 additions & 0 deletions be/test/olap/rowset/segment_v2/segment_corruption_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,77 @@ TEST_F(SegmentCorruptionTest, TestFsSetInCorruptionRetryPath) {
// If we reach here, _fs was correctly set
}

// Test that CORRUPTION from _parse_footer() (after successful open_file()) triggers the
// three-tier retry logic. Before the fix, the retry was in an else-if branch that was
// only reachable when open_file() itself returned CORRUPTION, not when _parse_footer()
// did. This test verifies the fix by:
// 1. Creating a valid segment
// 2. Corrupting the footer magic number on the first parse attempt via sync point
// 3. Verifying the segment opens successfully after cache-bypass retry
TEST_F(SegmentCorruptionTest, TestFooterCorruptionTriggersRetry) {
auto schema = create_schema_with_inverted_index();
RowsetId rowset_id;
rowset_id.init(3);

auto path = create_segment_with_inverted_index(schema, 0, rowset_id);
auto fs = io::global_local_filesystem();

// Use sync point to corrupt the magic number on the first _parse_footer() call.
// This simulates reading corrupt data from file cache while the remote file is fine.
auto* sp = SyncPoint::get_instance();
sp->enable_processing();

int parse_footer_count = 0;
SyncPoint::CallbackGuard guard;
sp->set_call_back(
"Segment::parse_footer:magic_number_corruption",
[&parse_footer_count](auto&& args) {
// Corrupt magic number only on the first attempt to simulate cache corruption.
// Subsequent retries (which bypass or clear cache) will read correct data.
if (parse_footer_count == 0) {
auto* buf = try_any_cast<uint8_t*>(args[0]);
// Corrupt the magic number (last 4 bytes of the 12-byte trailer)
buf[8] = 0xFF;
buf[9] = 0xFF;
buf[10] = 0xFF;
buf[11] = 0xFF;
parse_footer_count++;
}
},
&guard);

std::shared_ptr<Segment> segment;
// Use FILE_BLOCK_CACHE to enable the corruption retry path
io::FileReaderOptions reader_options;
reader_options.cache_type = io::FileCachePolicy::FILE_BLOCK_CACHE;

auto st = Segment::open(fs, path, /*tablet_id=*/100, /*segment_id=*/0, rowset_id, schema,
reader_options, &segment);

sp->disable_processing();

// Verify that the magic number was corrupted on the first attempt
ASSERT_EQ(parse_footer_count, 1) << "Footer corruption should have been injected once";

// The segment should open successfully after retry (tier 1: clear cache + retry)
ASSERT_TRUE(st.ok()) << st.to_string();
ASSERT_NE(segment, nullptr);

// Verify that _fs was correctly set through the retry path
OlapReaderStatistics stats;
StorageReadOptions read_options;
read_options.stats = &stats;

auto indexes = schema->inverted_indexs(schema->column(1));
ASSERT_FALSE(indexes.empty());
const TabletIndex* idx_meta = indexes[0];
std::unique_ptr<IndexIterator> iter;
st = segment->new_index_iterator(schema->column(1), idx_meta, read_options, &iter);
st = segment->_index_file_reader->init(config::inverted_index_read_buffer_size,
&read_options.io_ctx);
ASSERT_TRUE(st.ok()) << st.to_string();
}

// Test normal segment open path
TEST_F(SegmentCorruptionTest, TestFsSetInNormalPath) {
auto schema = create_schema_with_inverted_index();
Expand Down
Loading