Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,37 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
auto expiration_time = file_block->expiration_time();
auto* cell = get_cell(hash, offset, cache_lock);
file_block->cell = nullptr;
DCHECK(cell);
// Holder cleanup can race with prior cache metadata cleanup. In that case,
// skip the duplicate remove instead of touching a detached or replaced cell.
if (cell == nullptr) {
LOG(WARNING) << "remove skipped because cache cell is missing. hash=" << hash.to_string()
<< " offset=" << offset << " size=" << file_block->range().size()
<< " type=" << cache_type_to_string(type)
<< " state=" << FileBlock::state_to_string(file_block->state_unsafe())
<< " expiration_time=" << expiration_time << " sync=" << sync;
return;
}
if (cell->file_block.get() != file_block.get()) {
auto* cell_file_block = cell->file_block.get();
LOG(WARNING)
<< "remove skipped because cache cell points to a different file block. hash="
<< hash.to_string() << " offset=" << offset
<< " size=" << file_block->range().size() << " type=" << cache_type_to_string(type)
<< " state=" << FileBlock::state_to_string(file_block->state_unsafe())
<< " expiration_time=" << expiration_time << " sync=" << sync << " cell_block_hash="
<< (cell_file_block ? cell_file_block->get_hash_value().to_string() : "<null>")
<< " cell_block_offset="
<< (cell_file_block ? std::to_string(cell_file_block->offset()) : "<null>")
<< " cell_block_size="
<< (cell_file_block ? std::to_string(cell_file_block->range().size()) : "<null>")
<< " cell_block_type="
<< (cell_file_block ? cache_type_to_string(cell_file_block->cache_type())
: "<null>")
<< " cell_block_state="
<< (cell_file_block ? FileBlock::state_to_string(cell_file_block->state_unsafe())
: "<null>");
return;
}
DCHECK(cell->queue_iterator);
if (cell->queue_iterator) {
auto& queue = get_queue(file_block->cache_type());
Expand Down
173 changes: 173 additions & 0 deletions be/test/io/cache/block_file_cache_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3110,6 +3110,179 @@ TEST_F(BlockFileCacheTest, remove_directly) {
}
}

TEST_F(BlockFileCacheTest, late_holder_remove_skips_missing_cache_cell) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
fs::create_directories(cache_base_path);

io::FileCacheSettings settings;
settings.query_queue_size = 30;
settings.query_queue_elements = 5;
settings.index_queue_size = 30;
settings.index_queue_elements = 5;
settings.disposable_queue_size = 30;
settings.disposable_queue_elements = 5;
settings.capacity = 90;
settings.max_file_block_size = 30;
settings.max_query_cache_size = 0;

io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
wait_until_cache_ready(cache);

io::CacheContext context;
ReadStatistics rstats;
context.stats = &rstats;
context.cache_type = io::FileCacheType::NORMAL;

auto key = io::BlockFileCache::hash("late-holder-remove-skips-missing-cache-cell");
auto holder = std::make_unique<FileBlocksHolder>(cache.get_or_set(key, 0, 5, context));
auto blocks = fromHolder(*holder);
ASSERT_EQ(blocks.size(), 1);

auto file_block = blocks[0];
ASSERT_EQ(file_block->get_or_set_downloader(), io::FileBlock::get_caller_id());
download(file_block);
file_block->set_deleting();

ASSERT_EQ(cache._cur_cache_size, 5);
{
auto* cache_ptr = &cache;
SCOPED_CACHE_LOCK(cache_ptr->_mutex, cache_ptr);
auto file_it = cache._files.find(key);
ASSERT_NE(file_it, cache._files.end());
auto cell_it = file_it->second.find(0);
ASSERT_NE(cell_it, file_it->second.end());
auto& cell = cell_it->second;
ASSERT_TRUE(cell.queue_iterator.has_value());

auto& queue = cache.get_queue(file_block->cache_type());
queue.remove(*cell.queue_iterator, cache_lock);
cache._cur_cache_size -= file_block->range().size();

file_it->second.erase(cell_it);
if (file_it->second.empty()) {
cache._files.erase(file_it);
}
}

blocks.clear();
ASSERT_EQ(file_block.use_count(), 2);

holder.reset();

EXPECT_EQ(cache._cur_cache_size, 0);
EXPECT_EQ(cache.get_file_blocks_num(io::FileCacheType::NORMAL), 0);

file_block.reset();
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
}

TEST_F(BlockFileCacheTest, late_holder_remove_skips_replaced_cache_cell) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
fs::create_directories(cache_base_path);

io::FileCacheSettings settings;
settings.query_queue_size = 30;
settings.query_queue_elements = 5;
settings.index_queue_size = 30;
settings.index_queue_elements = 5;
settings.disposable_queue_size = 30;
settings.disposable_queue_elements = 5;
settings.capacity = 90;
settings.max_file_block_size = 30;
settings.max_query_cache_size = 0;

io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
wait_until_cache_ready(cache);

io::CacheContext context;
ReadStatistics rstats;
context.stats = &rstats;
context.cache_type = io::FileCacheType::NORMAL;

auto key = io::BlockFileCache::hash("late-holder-remove-skips-replaced-cache-cell");
auto old_holder = std::make_unique<FileBlocksHolder>(cache.get_or_set(key, 0, 5, context));
auto old_blocks = fromHolder(*old_holder);
ASSERT_EQ(old_blocks.size(), 1);

auto old_file_block = old_blocks[0];
ASSERT_EQ(old_file_block->get_or_set_downloader(), io::FileBlock::get_caller_id());
download(old_file_block);
old_file_block->set_deleting();

ASSERT_EQ(cache._cur_cache_size, 5);
{
auto* cache_ptr = &cache;
SCOPED_CACHE_LOCK(cache_ptr->_mutex, cache_ptr);
auto file_it = cache._files.find(key);
ASSERT_NE(file_it, cache._files.end());
auto cell_it = file_it->second.find(0);
ASSERT_NE(cell_it, file_it->second.end());
auto& cell = cell_it->second;
ASSERT_TRUE(cell.queue_iterator.has_value());

auto& queue = cache.get_queue(old_file_block->cache_type());
queue.remove(*cell.queue_iterator, cache_lock);
cache._cur_cache_size -= old_file_block->range().size();

file_it->second.erase(cell_it);
if (file_it->second.empty()) {
cache._files.erase(file_it);
}
}

old_blocks.clear();
ASSERT_EQ(old_file_block.use_count(), 2);

auto new_holder = std::make_unique<FileBlocksHolder>(cache.get_or_set(key, 0, 5, context));
auto new_blocks = fromHolder(*new_holder);
ASSERT_EQ(new_blocks.size(), 1);

auto new_file_block = new_blocks[0];
ASSERT_NE(new_file_block.get(), old_file_block.get());
ASSERT_EQ(cache._cur_cache_size, 5);
{
auto* cache_ptr = &cache;
SCOPED_CACHE_LOCK(cache_ptr->_mutex, cache_ptr);
auto* cell = cache.get_cell(key, 0, cache_lock);
ASSERT_NE(cell, nullptr);
ASSERT_EQ(cell->file_block.get(), new_file_block.get());
}

old_holder.reset();

EXPECT_EQ(old_file_block->cell, nullptr);
EXPECT_EQ(cache._cur_cache_size, 5);
EXPECT_EQ(cache.get_file_blocks_num(io::FileCacheType::NORMAL), 1);
{
auto* cache_ptr = &cache;
SCOPED_CACHE_LOCK(cache_ptr->_mutex, cache_ptr);
auto* cell = cache.get_cell(key, 0, cache_lock);
ASSERT_NE(cell, nullptr);
EXPECT_EQ(cell->file_block.get(), new_file_block.get());
}

new_blocks.clear();
new_file_block->set_deleting();
new_file_block.reset();
new_holder.reset();
old_file_block.reset();

EXPECT_EQ(cache._cur_cache_size, 0);
EXPECT_EQ(cache.get_file_blocks_num(io::FileCacheType::NORMAL), 0);

if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
}

TEST_F(BlockFileCacheTest, test_factory_1) {
std::string cache_path2 = caches_dir / "cache2" / "";
std::string cache_path3 = caches_dir / "cache3" / "";
Expand Down
12 changes: 11 additions & 1 deletion be/test/io/cache/block_file_cache_test_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,16 @@ extern void complete_into_memory(const io::FileBlocksHolder& holder);
extern void test_file_cache(io::FileCacheType cache_type);
extern void test_file_cache_memory_storage(io::FileCacheType cache_type);

inline void wait_until_cache_ready(io::BlockFileCache& cache) {
for (int i = 0; i < 100; ++i) {
if (cache.get_async_open_success()) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
ASSERT_TRUE(cache.get_async_open_success());
}

class BlockFileCacheTest : public testing::Test {
public:
static void SetUpTestSuite() {
Expand Down Expand Up @@ -131,4 +141,4 @@ class BlockFileCacheTest : public testing::Test {
inline static std::unique_ptr<FileCacheFactory> factory = std::make_unique<FileCacheFactory>();
};

} // end of namespace doris::io
} // end of namespace doris::io
Loading