Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -579,11 +579,13 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true},
.download_done = [=, version = rs_meta.version()](Status st) {
handle_segment_download_done(st, tablet_id, rowset_id, segment_id,
tablet, wait, version, segment_size,
request_ts, handle_ts);
}};
.download_done =
[=, version = rs_meta.version()](Status st) {
handle_segment_download_done(
st, tablet_id, rowset_id, segment_id, tablet, wait,
version, segment_size, request_ts, handle_ts);
},
.tablet_id = tablet_id};

g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size;
Expand All @@ -604,11 +606,13 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
.expiration_time = expiration_time,
.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
.is_warmup = true},
.download_done = [=, version = rs_meta.version()](Status st) {
handle_inverted_index_download_done(
st, tablet_id, rowset_id, segment_id, index_path, tablet, wait,
version, idx_size, request_ts, handle_ts);
}};
.download_done =
[=, version = rs_meta.version()](Status st) {
handle_inverted_index_download_done(
st, tablet_id, rowset_id, segment_id, index_path,
tablet, wait, version, idx_size, request_ts, handle_ts);
},
.tablet_id = tablet_id};
g_file_cache_event_driven_warm_up_submitted_index_num << 1;
g_file_cache_event_driven_warm_up_submitted_index_size << idx_size;
tablet->update_rowset_warmup_state_inverted_idx_num(
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1718,6 +1718,7 @@ void CloudTablet::_submit_segment_download_task(const RowsetSharedPtr& rs,
LOG_WARNING("add rowset warm up error ").error(st);
}
}},
.tablet_id = _tablet_meta->tablet_id(),
});
// clang-format on
}
Expand Down Expand Up @@ -1760,6 +1761,7 @@ void CloudTablet::_submit_inverted_index_download_task(const RowsetSharedPtr& rs
LOG_WARNING("add rowset warm up error ").error(st);
}
}},
.tablet_id = _tablet_meta->tablet_id(),
};
self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), 1);
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
Expand Down
16 changes: 11 additions & 5 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size,
io::FileSystemSPtr file_system,
int64_t expiration_time,
std::shared_ptr<bthread::CountdownEvent> wait,
bool is_index, std::function<void(Status)> done_cb) {
bool is_index, std::function<void(Status)> done_cb,
int64_t tablet_id) {
VLOG_DEBUG << "submit warm up task for file: " << path << ", file_size: " << file_size
<< ", expiration_time: " << expiration_time
<< ", is_index: " << (is_index ? "true" : "false");
Expand Down Expand Up @@ -184,6 +185,7 @@ void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size,
}
wait->signal();
},
.tablet_id = tablet_id,
});

offset += current_chunk_size;
Expand Down Expand Up @@ -256,7 +258,8 @@ void CloudWarmUpManager::handle_jobs() {
submit_download_tasks(
storage_resource.value()->remote_segment_path(*rs, seg_id),
rs->segment_file_size(cast_set<int>(seg_id)), rs->fs(),
expiration_time, wait, false, [tablet, rs, seg_id](Status st) {
expiration_time, wait, false,
[tablet, rs, seg_id](Status st) {
VLOG_DEBUG << "warmup rowset " << rs->version() << " segment "
<< seg_id << " completed";
if (tablet->complete_rowset_segment_warmup(
Expand All @@ -266,7 +269,8 @@ void CloudWarmUpManager::handle_jobs() {
VLOG_DEBUG << "warmup rowset " << rs->version()
<< " completed";
}
});
},
tablet_id);
}

// 2nd. download inverted index files
Expand Down Expand Up @@ -313,7 +317,8 @@ void CloudWarmUpManager::handle_jobs() {
VLOG_DEBUG << "warmup rowset " << rs->version()
<< " completed";
}
});
},
tablet_id);
}
} else {
if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
Expand All @@ -336,7 +341,8 @@ void CloudWarmUpManager::handle_jobs() {
VLOG_DEBUG << "warmup rowset " << rs->version()
<< " completed";
}
});
},
tablet_id);
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_warm_up_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class CloudWarmUpManager {
void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system,
int64_t expiration_time,
std::shared_ptr<bthread::CountdownEvent> wait, bool is_index = false,
std::function<void(Status)> done_cb = nullptr);
std::function<void(Status)> done_cb = nullptr,
int64_t tablet_id = -1);
std::mutex _mtx;
std::condition_variable _cond;
int64_t _cur_job_id {0};
Expand Down
2 changes: 2 additions & 0 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ void FileCacheBlockDownloader::download_file_cache_block(
.is_warmup = true,
},
.download_done = std::move(download_done),
.tablet_id = meta.tablet_id(),
};
download_segment_file(download_meta);
});
Expand All @@ -300,6 +301,7 @@ void FileCacheBlockDownloader::download_segment_file(const DownloadFileMeta& met
.is_doris_table = true,
.cache_base_path {},
.file_size = meta.file_size,
.tablet_id = meta.tablet_id,
};
auto st = meta.file_system->open_file(meta.path, &file_reader, &opts);
if (!st.ok()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/io/cache/block_file_cache_downloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct DownloadFileMeta {
io::FileSystemSPtr file_system;
IOContext ctx;
std::function<void(Status)> download_done;
int64_t tablet_id {-1};
};

struct DownloadTask {
Expand Down
57 changes: 24 additions & 33 deletions be/src/io/cache/cached_remote_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
#include "runtime/thread_context.h"
#include "runtime/workload_management/io_throttle.h"
#include "service/backend_options.h"
#include "storage/storage_policy.h"
#include "util/bit_util.h"
#include "util/brpc_client_cache.h" // BrpcClientCache
#include "util/client_cache.h"
Expand Down Expand Up @@ -93,7 +92,8 @@ bvar::Adder<uint64_t> g_failed_get_peer_addr_counter(

CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader,
const FileReaderOptions& opts)
: _remote_file_reader(std::move(remote_file_reader)) {
: _tablet_id(opts.tablet_id), _remote_file_reader(std::move(remote_file_reader)) {
DCHECK(!opts.is_doris_table || _tablet_id > 0);
_is_doris_table = opts.is_doris_table;
if (_is_doris_table) {
_cache_hash = BlockFileCache::hash(path().filename().native());
Expand Down Expand Up @@ -159,28 +159,30 @@ std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset, si
}

namespace {
std::optional<int64_t> extract_tablet_id(const std::string& file_path) {
return StorageResource::parse_tablet_id_from_path(file_path);
// Execute S3 read
Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr<char[]>& buffer,
ReadStatistics& stats, const IOContext* io_ctx,
FileReaderSPtr remote_file_reader) {
s3_read_counter << 1;
SCOPED_RAW_TIMER(&stats.remote_read_timer);
stats.from_peer_cache = false;
return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx);
}

// Get peer connection info from tablet_id
std::pair<std::string, int> get_peer_connection_info(const std::string& file_path) {
std::pair<std::string, int> get_peer_connection_info(int64_t tablet_id,
const std::string& file_path) {
std::string host = "";
int port = 0;

// Try to get tablet_id from actual path and lookup tablet info
if (auto tablet_id = extract_tablet_id(file_path)) {
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) {
host = tablet_info->first;
port = tablet_info->second;
} else {
VLOG_DEBUG << "get peer connection info not found"
<< ", tablet_id=" << *tablet_id << ", file_path=" << file_path;
}
DCHECK(tablet_id > 0);
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
if (auto tablet_info = manager.get_balanced_tablet_info(tablet_id)) {
host = tablet_info->first;
port = tablet_info->second;
} else {
VLOG_DEBUG << "parse tablet id from path failed"
<< "tablet_id=null, file_path=" << file_path;
LOG_EVERY_N(WARNING, 100) << "get peer connection info not found"
<< ", tablet_id=" << tablet_id << ", file_path=" << file_path;
}

DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
Expand All @@ -200,8 +202,8 @@ std::pair<std::string, int> get_peer_connection_info(const std::string& file_pat
Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, size_t empty_start,
size_t& size, std::unique_ptr<char[]>& buffer,
const std::string& file_path, size_t file_size, bool is_doris_table,
ReadStatistics& stats, const IOContext* io_ctx) {
auto [host, port] = get_peer_connection_info(file_path);
int64_t tablet_id, ReadStatistics& stats, const IOContext* io_ctx) {
auto [host, port] = get_peer_connection_info(tablet_id, file_path);
VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", port=" << port
<< ", file_path=" << file_path;

Expand All @@ -224,16 +226,6 @@ Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, size_t
return st;
}

// Execute S3 read
Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr<char[]>& buffer,
ReadStatistics& stats, const IOContext* io_ctx,
FileReaderSPtr remote_file_reader) {
s3_read_counter << 1;
SCOPED_RAW_TIMER(&stats.remote_read_timer);
stats.from_peer_cache = false;
return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx);
}

} // anonymous namespace

Status CachedRemoteFileReader::_execute_remote_read(const std::vector<FileBlockSPtr>& empty_blocks,
Expand All @@ -255,7 +247,7 @@ Status CachedRemoteFileReader::_execute_remote_read(const std::vector<FileBlockS
return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader);
} else {
return execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(),
this->size(), _is_doris_table, stats, io_ctx);
this->size(), _is_doris_table, _tablet_id, stats, io_ctx);
}
});

Expand All @@ -269,7 +261,7 @@ Status CachedRemoteFileReader::_execute_remote_read(const std::vector<FileBlockS
// ATTN: Save original size before peer read, as it may be modified by fetch_blocks, read peer ref size
size_t original_size = size;
auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(),
this->size(), _is_doris_table, stats, io_ctx);
this->size(), _is_doris_table, _tablet_id, stats, io_ctx);
if (!st.ok()) {
// Restore original size for S3 fallback, as peer read may have modified it
size = original_size;
Expand Down Expand Up @@ -386,8 +378,7 @@ Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t*
s_align_size(offset + already_read, bytes_req - already_read, size());
CacheContext cache_context(io_ctx);
cache_context.stats = &stats;
auto tablet_id = get_tablet_id(path().string());
cache_context.tablet_id = tablet_id.value_or(0);
cache_context.tablet_id = _tablet_id;
MonotonicStopWatch sw;
sw.start();

Expand Down
3 changes: 2 additions & 1 deletion be/src/io/cache/cached_remote_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class CachedRemoteFileReader final : public FileReader,
void _update_stats(const ReadStatistics& stats, FileCacheStatistics* state,
bool is_inverted_index) const;

bool _is_doris_table;
bool _is_doris_table = false;
int64_t _tablet_id = -1;
FileReaderSPtr _remote_file_reader;
UInt128Wrapper _cache_hash;
BlockFileCache* _cache;
Expand Down
1 change: 1 addition & 0 deletions be/src/io/tools/file_cache_microbench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1496,6 +1496,7 @@ class JobManager {
doris::io::FileReaderOptions reader_opts;
reader_opts.cache_type = doris::io::FileCachePolicy::FILE_BLOCK_CACHE;
reader_opts.is_doris_table = true;
reader_opts.tablet_id = 1; // microbench placeholder

doris::io::FileDescription fd;
std::string obj_path = "s3://" + doris::config::test_s3_bucket + "/";
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/compaction/collection_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ Status CollectionStatistics::process_segment(const RowsetSharedPtr& rowset, int3
rowset_meta->fs(),
std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path)},
tablet_schema->get_inverted_index_storage_format(),
rowset_meta->inverted_index_file_info(seg_id));
rowset_meta->inverted_index_file_info(seg_id), rowset_meta->tablet_id());
RETURN_IF_ERROR(idx_file_reader->init(config::inverted_index_read_buffer_size, io_ctx));

int32_t total_seg_num_docs = 0;
Expand Down
4 changes: 2 additions & 2 deletions be/src/storage/compaction/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -819,7 +819,7 @@ Status Compaction::do_inverted_index_compaction() {
fs,
std::string {InvertedIndexDescriptor::get_index_file_path_prefix(seg_path.value())},
_cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(seg_id));
rowset->rowset_meta()->inverted_index_file_info(seg_id), _tablet->tablet_id());
auto st = index_file_reader->init(config::inverted_index_read_buffer_size);
DBUG_EXECUTE_IF("Compaction::do_inverted_index_compaction_init_inverted_index_file_reader",
{
Expand Down Expand Up @@ -1020,7 +1020,7 @@ static bool check_rowset_has_inverted_index(const RowsetSharedPtr& src_rs, int32
std::string {InvertedIndexDescriptor::get_index_file_path_prefix(
seg_path.value())},
cur_tablet_schema->get_inverted_index_storage_format(),
rowset->rowset_meta()->inverted_index_file_info(i));
rowset->rowset_meta()->inverted_index_file_info(i), tablet->tablet_id());
auto st = index_file_reader->init(config::inverted_index_read_buffer_size);
index_file_path = index_file_reader->get_index_file_path(index_meta);
DBUG_EXECUTE_IF(
Expand Down
10 changes: 6 additions & 4 deletions be/src/storage/index/index_file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ Status IndexFileReader::_init_from(int32_t read_buffer_size, const io::IOContext
DCHECK(_fs != nullptr) << "file system is nullptr, index_file_full_path: "
<< index_file_full_path;
// 2. open file
auto ok = DorisFSDirectory::FSIndexInput::open(
_fs, index_file_full_path.c_str(), index_input, err, read_buffer_size, file_size);
auto ok =
DorisFSDirectory::FSIndexInput::open(_fs, index_file_full_path.c_str(), index_input,
err, read_buffer_size, file_size, _tablet_id);
if (!ok) {
if (err.number() == CL_ERR_FileNotFound) {
return Status::Error<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>(
Expand Down Expand Up @@ -182,8 +183,9 @@ Result<std::unique_ptr<DorisCompoundReader, DirectoryDeleter>> IndexFileReader::
DCHECK(_fs != nullptr)
<< "file system is nullptr, index_file_path: " << index_file_path;
// 2. open file
auto ok = DorisFSDirectory::FSIndexInput::open(
_fs, index_file_path.c_str(), index_input, err, _read_buffer_size, file_size);
auto ok = DorisFSDirectory::FSIndexInput::open(_fs, index_file_path.c_str(),
index_input, err, _read_buffer_size,
file_size, _tablet_id);
if (!ok) {
// now index_input = nullptr
if (err.number() == CL_ERR_FileNotFound) {
Expand Down
7 changes: 5 additions & 2 deletions be/src/storage/index/index_file_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,13 @@ class IndexFileReader {

IndexFileReader(io::FileSystemSPtr fs, std::string index_path_prefix,
InvertedIndexStorageFormatPB storage_format,
InvertedIndexFileInfo idx_file_info = InvertedIndexFileInfo())
InvertedIndexFileInfo idx_file_info = InvertedIndexFileInfo(),
int64_t tablet_id = -1)
: _fs(std::move(fs)),
_index_path_prefix(std::move(index_path_prefix)),
_storage_format(storage_format),
_idx_file_info(idx_file_info) {}
_idx_file_info(idx_file_info),
_tablet_id(tablet_id) {}
virtual ~IndexFileReader() = default;

MOCK_FUNCTION Status init(int32_t read_buffer_size = config::inverted_index_read_buffer_size,
Expand Down Expand Up @@ -90,6 +92,7 @@ class IndexFileReader {
mutable std::shared_mutex _mutex; // Use mutable for const read operations
bool _inited = false;
InvertedIndexFileInfo _idx_file_info;
int64_t _tablet_id = -1;
};

} // namespace segment_v2
Expand Down
Loading
Loading