Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,10 @@ void CloudTablet::remove_unused_rowsets() {
continue;
}
tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
_rowset_warm_up_states.erase(rs->rowset_id());
{
LockGuard warmup_wlock(_rowset_warm_up_states_mutex);
_rowset_warm_up_states.erase(rs->rowset_id());
}
rs->clear_cache();
g_unused_rowsets_count << -1;
g_unused_rowsets_bytes << -rs->total_disk_size();
Expand Down Expand Up @@ -1503,7 +1506,7 @@ Status CloudTablet::check_delete_bitmap_cache(int64_t txn_id,
}

WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
std::shared_lock rlock(_meta_lock);
LockGuard warmup_wlock(_rowset_warm_up_states_mutex);
if (!_rowset_warm_up_states.contains(rowset_id)) {
return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
}
Expand All @@ -1514,13 +1517,13 @@ WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {

bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpTriggerSource source,
std::chrono::steady_clock::time_point start_tp) {
std::lock_guard wlock(_meta_lock);
LockGuard warmup_wlock(_rowset_warm_up_states_mutex);
return add_rowset_warmup_state_unlocked(rowset, source, start_tp);
}

bool CloudTablet::update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source,
RowsetId rowset_id, int64_t delta) {
std::lock_guard wlock(_meta_lock);
LockGuard warmup_wlock(_rowset_warm_up_states_mutex);
return update_rowset_warmup_state_inverted_idx_num_unlocked(source, rowset_id, delta);
}

Expand Down Expand Up @@ -1593,7 +1596,7 @@ WarmUpState CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trig
RowsetId rowset_id, Status status,
int64_t segment_num,
int64_t inverted_idx_num) {
std::lock_guard wlock(_meta_lock);
LockGuard warmup_wlock(_rowset_warm_up_states_mutex);
auto it = _rowset_warm_up_states.find(rowset_id);
if (it == _rowset_warm_up_states.end()) {
return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
Expand Down Expand Up @@ -1621,6 +1624,7 @@ WarmUpState CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trig
}

bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const {
SharedLockGuard warmup_rlock(_rowset_warm_up_states_mutex);
auto it = _rowset_warm_up_states.find(rowset_id);
if (it == _rowset_warm_up_states.end()) {
// The rowset is not in warmup state, which means the rowset has never been warmed up.
Expand Down Expand Up @@ -1654,6 +1658,7 @@ bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const {
}

void CloudTablet::add_warmed_up_rowset(const RowsetId& rowset_id) {
LockGuard warmup_wlock(_rowset_warm_up_states_mutex);
_rowset_warm_up_states[rowset_id] = {
.state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
.progress = WarmUpProgress::DONE},
Expand All @@ -1662,6 +1667,7 @@ void CloudTablet::add_warmed_up_rowset(const RowsetId& rowset_id) {
}

void CloudTablet::add_not_warmed_up_rowset(const RowsetId& rowset_id) {
LockGuard warmup_wlock(_rowset_warm_up_states_mutex);
_rowset_warm_up_states[rowset_id] = {
.state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
.progress = WarmUpProgress::DOING},
Expand Down Expand Up @@ -1790,7 +1796,8 @@ void CloudTablet::_submit_inverted_index_download_task(const RowsetSharedPtr& rs
}},
.tablet_id = _tablet_meta->tablet_id(),
};
self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), 1);
self->update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource::SYNC_ROWSET,
rowset_meta->rowset_id(), 1);
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
g_file_cache_cloud_tablet_submitted_index_num << 1;
g_file_cache_cloud_tablet_submitted_index_size << idx_size;
Expand Down Expand Up @@ -1835,6 +1842,7 @@ void CloudTablet::_add_rowsets_directly(std::vector<RowsetSharedPtr>& rowsets,
if (!warm_up_state_updated) {
VLOG_DEBUG << "warm up rowset " << rs->version() << "(" << rs->rowset_id()
<< ") triggerd by sync rowset";
LockGuard warmup_wlock(_rowset_warm_up_states_mutex);
if (!add_rowset_warmup_state_unlocked(*(rs->rowset_meta()),
WarmUpTriggerSource::SYNC_ROWSET)) {
LOG(INFO) << "found duplicate warmup task for rowset " << rs->rowset_id()
Expand Down
12 changes: 9 additions & 3 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <memory>

#include "common/thread_safety_annotations.h"
#include "storage/partial_update_info.h"
#include "storage/rowset/rowset.h"
#include "storage/tablet/base_tablet.h"
Expand Down Expand Up @@ -366,7 +367,8 @@ class CloudTablet final : public BaseTablet {
bool update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source, RowsetId rowset_id,
int64_t delta);
bool update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource source,
RowsetId rowset_id, int64_t delta);
RowsetId rowset_id, int64_t delta)
REQUIRES(_rowset_warm_up_states_mutex);
WarmUpState complete_rowset_segment_warmup(WarmUpTriggerSource trigger_source,
RowsetId rowset_id, Status status,
int64_t segment_num, int64_t inverted_idx_num);
Expand Down Expand Up @@ -394,6 +396,7 @@ class CloudTablet final : public BaseTablet {
std::string res;
auto add_log = [&](const RowsetSharedPtr& rs) {
auto tmp = fmt::format("{}{}", rs->rowset_id().to_string(), rs->version().to_string());
SharedLockGuard warmup_rlock(_rowset_warm_up_states_mutex);
if (_rowset_warm_up_states.contains(rs->rowset_id())) {
tmp += fmt::format(
", progress={}, segments_warmed_up={}/{}, inverted_idx_warmed_up={}/{}",
Expand All @@ -417,7 +420,8 @@ class CloudTablet final : public BaseTablet {

bool add_rowset_warmup_state_unlocked(
const RowsetMeta& rowset, WarmUpTriggerSource source,
std::chrono::steady_clock::time_point start_tp = std::chrono::steady_clock::now());
std::chrono::steady_clock::time_point start_tp = std::chrono::steady_clock::now())
REQUIRES(_rowset_warm_up_states_mutex);

// used by capture_rs_reader_xxx functions
bool rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const;
Expand Down Expand Up @@ -520,7 +524,9 @@ class CloudTablet final : public BaseTablet {

void update_state();
};
std::unordered_map<RowsetId, RowsetWarmUpInfo> _rowset_warm_up_states;
mutable AnnotatedSharedMutex _rowset_warm_up_states_mutex;
std::unordered_map<RowsetId, RowsetWarmUpInfo> _rowset_warm_up_states
GUARDED_BY(_rowset_warm_up_states_mutex);

mutable std::shared_mutex _warmed_up_rowsets_mutex;
std::unordered_set<RowsetId> _warmed_up_rowsets;
Expand Down
48 changes: 48 additions & 0 deletions be/src/common/thread_safety_annotations.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#pragma once

#include <mutex>
#include <shared_mutex>

#ifdef BE_TEST
namespace doris {
Expand Down Expand Up @@ -93,6 +94,27 @@ class CAPABILITY("mutex") AnnotatedMutex {
std::mutex _mutex;
};

// Annotated shared mutex wrapper for use with Clang thread safety analysis.
// Wraps std::shared_mutex and provides both exclusive and shared capability
// operations so GUARDED_BY / REQUIRES_SHARED / etc. can reference it.
class CAPABILITY("mutex") AnnotatedSharedMutex {
public:
void lock() ACQUIRE() { _mutex.lock(); }
void unlock() RELEASE() { _mutex.unlock(); }
bool try_lock() TRY_ACQUIRE(true) { return _mutex.try_lock(); }

void lock_shared() ACQUIRE_SHARED() { _mutex.lock_shared(); }
void unlock_shared() RELEASE_SHARED() { _mutex.unlock_shared(); }
bool try_lock_shared() TRY_ACQUIRE_SHARED(true) { return _mutex.try_lock_shared(); }

// Access the underlying std::shared_mutex (e.g., for std::condition_variable_any).
// Use with care — this bypasses thread safety annotations.
std::shared_mutex& native_handle() { return _mutex; }

private:
std::shared_mutex _mutex;
};

// RAII scoped lock guard annotated for thread safety analysis.
// In BE_TEST builds, injects a random sleep before acquiring and after
// releasing the lock to exercise concurrent code paths.
Expand All @@ -119,6 +141,32 @@ class SCOPED_CAPABILITY LockGuard {
MutexType& _mu;
};

// RAII scoped shared lock guard annotated for thread safety analysis.
// In BE_TEST builds, injects a random sleep before acquiring and after
// releasing the lock to exercise concurrent code paths.
template <typename MutexType>
class SCOPED_CAPABILITY SharedLockGuard {
public:
explicit SharedLockGuard(MutexType& mu) ACQUIRE_SHARED(mu) : _mu(mu) {
#ifdef BE_TEST
doris::mock_random_sleep();
#endif
_mu.lock_shared();
}
~SharedLockGuard() RELEASE() {
_mu.unlock_shared();
#ifdef BE_TEST
doris::mock_random_sleep();
#endif
}

SharedLockGuard(const SharedLockGuard&) = delete;
SharedLockGuard& operator=(const SharedLockGuard&) = delete;

private:
MutexType& _mu;
};

// RAII unique lock annotated for thread safety analysis.
// Supports manual lock/unlock while preserving capability tracking.
template <typename MutexType>
Expand Down
16 changes: 11 additions & 5 deletions be/src/runtime/small_file_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(small_file_cache_count, MetricUnit::NOUNIT);
SmallFileMgr::SmallFileMgr(ExecEnv* env, const std::string& local_path)
: _exec_env(env), _local_path(local_path) {
REGISTER_HOOK_METRIC(small_file_cache_count, [this]() {
// std::lock_guard<std::mutex> l(_lock);
LockGuard l(_lock);
return _file_cache.size();
});
}
Expand Down Expand Up @@ -91,8 +91,11 @@ Status SmallFileMgr::_load_single_file(const std::string& path, const std::strin
int64_t file_id = std::stol(parts[0]);
std::string md5 = parts[1];

if (_file_cache.find(file_id) != _file_cache.end()) {
return Status::InternalError("File with same id is already been loaded: {}", file_id);
{
LockGuard l(_lock);
if (_file_cache.find(file_id) != _file_cache.end()) {
return Status::InternalError("File with same id is already been loaded: {}", file_id);
}
}

std::string file_md5;
Expand All @@ -105,12 +108,15 @@ Status SmallFileMgr::_load_single_file(const std::string& path, const std::strin
entry.path = path + "/" + file_name;
entry.md5 = file_md5;

_file_cache.emplace(file_id, entry);
{
LockGuard l(_lock);
_file_cache.emplace(file_id, entry);
}
return Status::OK();
}

Status SmallFileMgr::get_file(int64_t file_id, const std::string& md5, std::string* file_path) {
std::unique_lock<std::mutex> l(_lock);
UniqueLock l(_lock);
// find in cache
auto it = _file_cache.find(file_id);
if (it != _file_cache.end()) {
Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/small_file_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

#include <stdint.h>

#include <mutex>
#include <string>
#include <unordered_map>

#include "common/status.h"
#include "common/thread_safety_annotations.h"

namespace doris {

Expand Down Expand Up @@ -59,14 +59,15 @@ class SmallFileMgr {

Status _check_file(const CacheEntry& entry, const std::string& md5);

Status _download_file(int64_t file_id, const std::string& md5, std::string* file_path);
Status _download_file(int64_t file_id, const std::string& md5, std::string* file_path)
REQUIRES(_lock);

private:
std::mutex _lock;
AnnotatedMutex _lock;
ExecEnv* _exec_env = nullptr;
std::string _local_path;
// file id -> small file
std::unordered_map<int64_t, CacheEntry> _file_cache;
std::unordered_map<int64_t, CacheEntry> _file_cache GUARDED_BY(_lock);
};

} // end namespace doris
Loading