Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_recorder.h"
#include "util/brpc_client_cache.h" // BrpcClientCache
#include "util/stack_util.h"
#include "util/thrift_server.h"

namespace doris {
Expand Down Expand Up @@ -213,8 +214,12 @@ void CloudBackendService::warm_up_cache_async(TWarmUpCacheAsyncResponse& respons
}
brpc::Controller cntl;
PGetFileCacheMetaRequest brpc_request;
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(),
[&](int64_t tablet_id) { brpc_request.add_tablet_ids(tablet_id); });
std::stringstream ss;
std::for_each(request.tablet_ids.cbegin(), request.tablet_ids.cend(), [&](int64_t tablet_id) {
brpc_request.add_tablet_ids(tablet_id);
ss << tablet_id << ",";
});
VLOG_DEBUG << "tablets set: " << ss.str() << " stack: " << get_stack_trace();
PGetFileCacheMetaResponse brpc_response;

brpc_stub->get_file_cache_meta_by_tablet_id(&cntl, &brpc_request, &brpc_response, nullptr);
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "util/debug_points.h"
#include "util/stack_util.h"
#include "vec/common/schema_util.h"

namespace doris {
Expand Down Expand Up @@ -382,6 +383,8 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
return;
}

VLOG_DEBUG << "add_rowsets tablet_id=" << tablet_id() << " stack: " << get_stack_trace();

auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& rowsets) {
for (auto& rs : rowsets) {
if (version_overlap || warmup_delta_data) {
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "common/status.h"
#include "olap/lru_cache.h"
#include "runtime/memory/cache_policy.h"
#include "util/stack_util.h"

namespace doris {
uint64_t g_tablet_report_inactive_duration_ms = 0;
Expand Down Expand Up @@ -174,6 +175,8 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
TabletMap& tablet_map;
};

VLOG_DEBUG << "get_tablet tablet_id=" << tablet_id << " stack: " << get_stack_trace();

auto tablet_id_str = std::to_string(tablet_id);
CacheKey key(tablet_id_str);
auto* handle = _cache->lookup(key);
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "util/brpc_client_cache.h" // BrpcClientCache
#include "util/stack_util.h"
#include "util/thrift_rpc_helper.h"
#include "util/time.h"

Expand Down Expand Up @@ -210,6 +211,7 @@ void CloudWarmUpManager::handle_jobs() {
std::make_shared<bthread::CountdownEvent>(0);

for (int64_t tablet_id : cur_job->tablet_ids) {
VLOG_DEBUG << "Warm up tablet " << tablet_id << " stack: " << get_stack_trace();
if (_cur_job_id == 0) { // The job is canceled
break;
}
Expand Down
37 changes: 30 additions & 7 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "io/cache/fs_file_cache_storage.h"
#include "io/cache/mem_file_cache_storage.h"
#include "util/runtime_profile.h"
#include "util/stack_util.h"
#include "util/stopwatch.hpp"
#include "util/thread.h"
#include "util/time.h"
Expand Down Expand Up @@ -922,6 +923,9 @@ size_t BlockFileCache::try_release() {
std::lock_guard lc(cell->file_block->_mutex);
remove_size += file_block->range().size();
remove(file_block, cache_lock, lc);
VLOG_DEBUG << "try_release " << _cache_base_path
<< " hash=" << file_block->get_hash_value().to_string()
<< " offset=" << file_block->offset();
}
*_evict_by_try_release << remove_size;
LOG(INFO) << "Released " << trash.size() << " blocks in file cache " << _cache_base_path;
Expand Down Expand Up @@ -961,12 +965,16 @@ const LRUQueue& BlockFileCache::get_queue(FileCacheType type) const {
}

void BlockFileCache::remove_file_blocks(std::vector<FileBlockCell*>& to_evict,
std::lock_guard<std::mutex>& cache_lock, bool sync) {
std::lock_guard<std::mutex>& cache_lock, bool sync,
std::string& reason) {
auto remove_file_block_if = [&](FileBlockCell* cell) {
FileBlockSPtr file_block = cell->file_block;
if (file_block) {
std::lock_guard block_lock(file_block->_mutex);
remove(file_block, cache_lock, block_lock, sync);
VLOG_DEBUG << "remove_file_blocks"
<< " hash=" << file_block->get_hash_value().to_string()
<< " offset=" << file_block->offset() << " reason=" << reason;
}
};
std::for_each(to_evict.begin(), to_evict.end(), remove_file_block_if);
Expand Down Expand Up @@ -1228,6 +1236,7 @@ bool BlockFileCache::remove_if_ttl_file_blocks(const UInt128Wrapper& file_key, b
// remove specific cache synchronously, for critical operations
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
std::string reason = "remove_if_cached";
SCOPED_CACHE_LOCK(_mutex, this);
bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, true);
if (!is_ttl_file) {
Expand All @@ -1242,14 +1251,15 @@ void BlockFileCache::remove_if_cached(const UInt128Wrapper& file_key) {
}
}
}
remove_file_blocks(to_remove, cache_lock, true);
remove_file_blocks(to_remove, cache_lock, true, reason);
}
}

// the async version of remove_if_cached, for background operations
// cache meta is deleted synchronously if not in use, and the block file is deleted asynchronously
// if in use, cache meta will be deleted after use and the block file is then deleted asynchronously
void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
std::string reason = "remove_if_cached_async";
SCOPED_CACHE_LOCK(_mutex, this);
bool is_ttl_file = remove_if_ttl_file_blocks(file_key, true, cache_lock, /*sync*/ false);
if (!is_ttl_file) {
Expand All @@ -1266,7 +1276,7 @@ void BlockFileCache::remove_if_cached_async(const UInt128Wrapper& file_key) {
}
}
}
remove_file_blocks(to_remove, cache_lock, false);
remove_file_blocks(to_remove, cache_lock, false, reason);
}
}

Expand Down Expand Up @@ -1357,7 +1367,9 @@ bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
*(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
}
bool is_sync_removal = !evict_in_advance;
remove_file_blocks(to_evict, cache_lock, is_sync_removal);
std::string reason = std::string("try_reserve_by_time ") +
" evict_in_advance=" + (evict_in_advance ? "true" : "false");
remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);

return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
}
Expand Down Expand Up @@ -1400,7 +1412,9 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size(
*(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
}
bool is_sync_removal = !evict_in_advance;
remove_file_blocks(to_evict, cache_lock, is_sync_removal);
std::string reason = std::string("try_reserve_by_size") +
" evict_in_advance=" + (evict_in_advance ? "true" : "false");
remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
return !is_overflow(removed_size, size, cur_cache_size, evict_in_advance);
}

Expand Down Expand Up @@ -1447,7 +1461,10 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
find_evict_candidates(queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
cur_removed_size, evict_in_advance);
bool is_sync_removal = !evict_in_advance;
remove_file_blocks(to_evict, cache_lock, is_sync_removal);
std::string reason = std::string("try_reserve for cache type ") +
cache_type_to_string(context.cache_type) +
" evict_in_advance=" + (evict_in_advance ? "true" : "false");
remove_file_blocks(to_evict, cache_lock, is_sync_removal, reason);
*(_evict_by_self_lru_metrics_matrix[context.cache_type]) << cur_removed_size;

if (is_overflow(removed_size, size, cur_cache_size, evict_in_advance)) {
Expand Down Expand Up @@ -1483,6 +1500,11 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
*_queue_evict_size_metrics[static_cast<int>(file_block->cache_type())]
<< file_block->range().size();
*_total_evict_size_metrics << file_block->range().size();

VLOG_DEBUG << "Removing file block from cache. hash: " << hash.to_string()
<< ", offset: " << offset << ", size: " << file_block->range().size()
<< ", type: " << cache_type_to_string(type);

if (file_block->state_unlock(block_lock) == FileBlock::State::DOWNLOADED) {
FileCacheKey key;
key.hash = hash;
Expand Down Expand Up @@ -2327,7 +2349,8 @@ bool BlockFileCache::try_reserve_during_async_load(size_t size,
if (index_queue_size != 0) {
collect_eliminate_fragments(get_queue(FileCacheType::INDEX));
}
remove_file_blocks(to_evict, cache_lock, true);
std::string reason = "async load";
remove_file_blocks(to_evict, cache_lock, true, reason);

return !_disk_resource_limit_mode || removed_size >= size;
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/block_file_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,8 @@ class BlockFileCache {
bool is_overflow(size_t removed_size, size_t need_size, size_t cur_cache_size,
bool evict_in_advance) const;

void remove_file_blocks(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&, bool sync);
void remove_file_blocks(std::vector<FileBlockCell*>&, std::lock_guard<std::mutex>&, bool sync,
std::string& reason);

void remove_file_blocks_and_clean_time_maps(std::vector<FileBlockCell*>&,
std::lock_guard<std::mutex>&);
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/fs/s3_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <utility>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "io/cache/block_file_cache.h"
#include "io/fs/err_utils.h"
#include "io/fs/obj_storage_client.h"
#include "io/fs/s3_common.h"
Expand Down Expand Up @@ -115,6 +116,9 @@ Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_rea
size_t bytes_req = result.size;
char* to = result.data;
bytes_req = std::min(bytes_req, _file_size - offset);
VLOG_DEBUG << fmt::format("S3FileReader::read_at_impl offset={} size={} path={} hash={}",
offset, result.size, _path.native(),
io::BlockFileCache::hash(_path.native()).to_string());
VLOG_DEBUG << "enter s3 read_at_impl, off=" << offset << " n=" << bytes_req
<< " req=" << result.size << " file size=" << _file_size;
if (UNLIKELY(bytes_req == 0)) {
Expand Down
Loading