diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 339a5a757a230b..e9b9cf934fde6c 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -641,7 +641,10 @@ void CloudTablet::remove_unused_rowsets() { continue; } tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version()); - _rowset_warm_up_states.erase(rs->rowset_id()); + { + LockGuard warmup_wlock(_rowset_warm_up_states_mutex); + _rowset_warm_up_states.erase(rs->rowset_id()); + } rs->clear_cache(); g_unused_rowsets_count << -1; g_unused_rowsets_bytes << -rs->total_disk_size(); @@ -1503,7 +1506,7 @@ Status CloudTablet::check_delete_bitmap_cache(int64_t txn_id, } WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) { - std::shared_lock rlock(_meta_lock); + LockGuard warmup_wlock(_rowset_warm_up_states_mutex); if (!_rowset_warm_up_states.contains(rowset_id)) { return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE}; } @@ -1514,13 +1517,13 @@ WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) { bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpTriggerSource source, std::chrono::steady_clock::time_point start_tp) { - std::lock_guard wlock(_meta_lock); + LockGuard warmup_wlock(_rowset_warm_up_states_mutex); return add_rowset_warmup_state_unlocked(rowset, source, start_tp); } bool CloudTablet::update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source, RowsetId rowset_id, int64_t delta) { - std::lock_guard wlock(_meta_lock); + LockGuard warmup_wlock(_rowset_warm_up_states_mutex); return update_rowset_warmup_state_inverted_idx_num_unlocked(source, rowset_id, delta); } @@ -1593,7 +1596,7 @@ WarmUpState CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trig RowsetId rowset_id, Status status, int64_t segment_num, int64_t inverted_idx_num) { - std::lock_guard wlock(_meta_lock); + LockGuard warmup_wlock(_rowset_warm_up_states_mutex); auto it = _rowset_warm_up_states.find(rowset_id); if (it == _rowset_warm_up_states.end()) { return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE}; @@ -1621,6 +1624,7 @@ WarmUpState CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trig } bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const { + SharedLockGuard warmup_rlock(_rowset_warm_up_states_mutex); auto it = _rowset_warm_up_states.find(rowset_id); if (it == _rowset_warm_up_states.end()) { // The rowset is not in warmup state, which means the rowset has never been warmed up. @@ -1654,6 +1658,7 @@ bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const { } void CloudTablet::add_warmed_up_rowset(const RowsetId& rowset_id) { + LockGuard warmup_wlock(_rowset_warm_up_states_mutex); _rowset_warm_up_states[rowset_id] = { .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET, .progress = WarmUpProgress::DONE}, @@ -1662,6 +1667,7 @@ void CloudTablet::add_warmed_up_rowset(const RowsetId& rowset_id) { } void CloudTablet::add_not_warmed_up_rowset(const RowsetId& rowset_id) { + LockGuard warmup_wlock(_rowset_warm_up_states_mutex); _rowset_warm_up_states[rowset_id] = { .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET, .progress = WarmUpProgress::DOING}, @@ -1790,7 +1796,8 @@ void CloudTablet::_submit_inverted_index_download_task(const RowsetSharedPtr& rs }}, .tablet_id = _tablet_meta->tablet_id(), }; - self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), 1); + self->update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource::SYNC_ROWSET, + rowset_meta->rowset_id(), 1); _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); g_file_cache_cloud_tablet_submitted_index_num << 1; g_file_cache_cloud_tablet_submitted_index_size << idx_size; @@ -1835,6 +1842,7 @@ void CloudTablet::_add_rowsets_directly(std::vector& rowsets, if (!warm_up_state_updated) { VLOG_DEBUG << "warm up rowset " << rs->version() << "(" << rs->rowset_id() << ") triggerd by sync rowset"; + LockGuard warmup_wlock(_rowset_warm_up_states_mutex); if (!add_rowset_warmup_state_unlocked(*(rs->rowset_meta()), WarmUpTriggerSource::SYNC_ROWSET)) { LOG(INFO) << "found duplicate warmup task for rowset " << rs->rowset_id() diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h index 807ca6207c4156..f280dbf7857e58 100644 --- a/be/src/cloud/cloud_tablet.h +++ b/be/src/cloud/cloud_tablet.h @@ -19,6 +19,7 @@ #include +#include "common/thread_safety_annotations.h" #include "storage/partial_update_info.h" #include "storage/rowset/rowset.h" #include "storage/tablet/base_tablet.h" @@ -366,7 +367,8 @@ class CloudTablet final : public BaseTablet { bool update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source, RowsetId rowset_id, int64_t delta); bool update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource source, - RowsetId rowset_id, int64_t delta); + RowsetId rowset_id, int64_t delta) + REQUIRES(_rowset_warm_up_states_mutex); WarmUpState complete_rowset_segment_warmup(WarmUpTriggerSource trigger_source, RowsetId rowset_id, Status status, int64_t segment_num, int64_t inverted_idx_num); @@ -394,6 +396,7 @@ class CloudTablet final : public BaseTablet { std::string res; auto add_log = [&](const RowsetSharedPtr& rs) { auto tmp = fmt::format("{}{}", rs->rowset_id().to_string(), rs->version().to_string()); + SharedLockGuard warmup_rlock(_rowset_warm_up_states_mutex); if (_rowset_warm_up_states.contains(rs->rowset_id())) { tmp += fmt::format( ", progress={}, segments_warmed_up={}/{}, inverted_idx_warmed_up={}/{}", @@ -417,7 +420,8 @@ class CloudTablet final : public BaseTablet { bool add_rowset_warmup_state_unlocked( const RowsetMeta& rowset, WarmUpTriggerSource source, - std::chrono::steady_clock::time_point start_tp = std::chrono::steady_clock::now()); + std::chrono::steady_clock::time_point start_tp = std::chrono::steady_clock::now()) + REQUIRES(_rowset_warm_up_states_mutex); // used by capture_rs_reader_xxx functions bool rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const; @@ -520,7 +524,9 @@ class CloudTablet final : public BaseTablet { void update_state(); }; - std::unordered_map _rowset_warm_up_states; + mutable AnnotatedSharedMutex _rowset_warm_up_states_mutex; + std::unordered_map _rowset_warm_up_states + GUARDED_BY(_rowset_warm_up_states_mutex); mutable std::shared_mutex _warmed_up_rowsets_mutex; std::unordered_set _warmed_up_rowsets; diff --git a/be/src/common/thread_safety_annotations.h b/be/src/common/thread_safety_annotations.h index 6cd8d4b0cae45c..6bbdb8ce6546ad 100644 --- a/be/src/common/thread_safety_annotations.h +++ b/be/src/common/thread_safety_annotations.h @@ -22,6 +22,7 @@ #pragma once #include +#include #ifdef BE_TEST namespace doris { @@ -93,6 +94,27 @@ class CAPABILITY("mutex") AnnotatedMutex { std::mutex _mutex; }; +// Annotated shared mutex wrapper for use with Clang thread safety analysis. +// Wraps std::shared_mutex and provides both exclusive and shared capability +// operations so GUARDED_BY / REQUIRES_SHARED / etc. can reference it. +class CAPABILITY("mutex") AnnotatedSharedMutex { +public: + void lock() ACQUIRE() { _mutex.lock(); } + void unlock() RELEASE() { _mutex.unlock(); } + bool try_lock() TRY_ACQUIRE(true) { return _mutex.try_lock(); } + + void lock_shared() ACQUIRE_SHARED() { _mutex.lock_shared(); } + void unlock_shared() RELEASE_SHARED() { _mutex.unlock_shared(); } + bool try_lock_shared() TRY_ACQUIRE_SHARED(true) { return _mutex.try_lock_shared(); } + + // Access the underlying std::shared_mutex (e.g., for std::condition_variable_any). + // Use with care — this bypasses thread safety annotations. + std::shared_mutex& native_handle() { return _mutex; } + +private: + std::shared_mutex _mutex; +}; + // RAII scoped lock guard annotated for thread safety analysis. // In BE_TEST builds, injects a random sleep before acquiring and after // releasing the lock to exercise concurrent code paths. @@ -119,6 +141,32 @@ class SCOPED_CAPABILITY LockGuard { MutexType& _mu; }; +// RAII scoped shared lock guard annotated for thread safety analysis. +// In BE_TEST builds, injects a random sleep before acquiring and after +// releasing the lock to exercise concurrent code paths. +template +class SCOPED_CAPABILITY SharedLockGuard { +public: + explicit SharedLockGuard(MutexType& mu) ACQUIRE_SHARED(mu) : _mu(mu) { +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + _mu.lock_shared(); + } + ~SharedLockGuard() RELEASE() { + _mu.unlock_shared(); +#ifdef BE_TEST + doris::mock_random_sleep(); +#endif + } + + SharedLockGuard(const SharedLockGuard&) = delete; + SharedLockGuard& operator=(const SharedLockGuard&) = delete; + +private: + MutexType& _mu; +}; + // RAII unique lock annotated for thread safety analysis. // Supports manual lock/unlock while preserving capability tracking. template diff --git a/be/src/runtime/small_file_mgr.cpp b/be/src/runtime/small_file_mgr.cpp index bfcded942f3c9b..b2c2ec42aec899 100644 --- a/be/src/runtime/small_file_mgr.cpp +++ b/be/src/runtime/small_file_mgr.cpp @@ -49,7 +49,7 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(small_file_cache_count, MetricUnit::NOUNIT); SmallFileMgr::SmallFileMgr(ExecEnv* env, const std::string& local_path) : _exec_env(env), _local_path(local_path) { REGISTER_HOOK_METRIC(small_file_cache_count, [this]() { - // std::lock_guard l(_lock); + LockGuard l(_lock); return _file_cache.size(); }); } @@ -91,8 +91,11 @@ Status SmallFileMgr::_load_single_file(const std::string& path, const std::strin int64_t file_id = std::stol(parts[0]); std::string md5 = parts[1]; - if (_file_cache.find(file_id) != _file_cache.end()) { - return Status::InternalError("File with same id is already been loaded: {}", file_id); + { + LockGuard l(_lock); + if (_file_cache.find(file_id) != _file_cache.end()) { + return Status::InternalError("File with same id is already been loaded: {}", file_id); + } } std::string file_md5; @@ -105,12 +108,15 @@ Status SmallFileMgr::_load_single_file(const std::string& path, const std::strin entry.path = path + "/" + file_name; entry.md5 = file_md5; - _file_cache.emplace(file_id, entry); + { + LockGuard l(_lock); + _file_cache.emplace(file_id, entry); + } return Status::OK(); } Status SmallFileMgr::get_file(int64_t file_id, const std::string& md5, std::string* file_path) { - std::unique_lock l(_lock); + UniqueLock l(_lock); // find in cache auto it = _file_cache.find(file_id); if (it != _file_cache.end()) { diff --git a/be/src/runtime/small_file_mgr.h b/be/src/runtime/small_file_mgr.h index c3ed3f4a39e0bc..b8c48c3be695a0 100644 --- a/be/src/runtime/small_file_mgr.h +++ b/be/src/runtime/small_file_mgr.h @@ -19,11 +19,11 @@ #include -#include #include #include #include "common/status.h" +#include "common/thread_safety_annotations.h" namespace doris { @@ -59,14 +59,15 @@ class SmallFileMgr { Status _check_file(const CacheEntry& entry, const std::string& md5); - Status _download_file(int64_t file_id, const std::string& md5, std::string* file_path); + Status _download_file(int64_t file_id, const std::string& md5, std::string* file_path) + REQUIRES(_lock); private: - std::mutex _lock; + AnnotatedMutex _lock; ExecEnv* _exec_env = nullptr; std::string _local_path; // file id -> small file - std::unordered_map _file_cache; + std::unordered_map _file_cache GUARDED_BY(_lock); }; } // end namespace doris