From 707d0b5adc069363393294f4e124e6fe7bc01175 Mon Sep 17 00:00:00 2001 From: Mryange Date: Thu, 2 Apr 2026 14:42:15 +0800 Subject: [PATCH] upd --- be/src/common/thread_safety_annotations.h | 105 ++++++++++++++++++ be/src/exec/CMakeLists.txt | 8 ++ be/src/exec/operator/exchange_sink_buffer.cpp | 28 +++-- be/src/exec/operator/exchange_sink_buffer.h | 9 +- 4 files changed, 137 insertions(+), 13 deletions(-) create mode 100644 be/src/common/thread_safety_annotations.h diff --git a/be/src/common/thread_safety_annotations.h b/be/src/common/thread_safety_annotations.h new file mode 100644 index 00000000000000..6a33be004b206f --- /dev/null +++ b/be/src/common/thread_safety_annotations.h @@ -0,0 +1,105 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Thread safety annotation macros and annotated mutex wrappers for +// Clang's -Wthread-safety static analysis. +// Reference: https://clang.llvm.org/docs/ThreadSafetyAnalysis.html + +#pragma once + +#include + +// Enable thread safety attributes only with clang. +// The attributes can be safely erased when compiling with other compilers. +#if defined(__clang__) && (!defined(SWIG)) +#define THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x)) +#else +#define THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op +#endif + +#define TSA_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(capability(x)) + +#define TSA_SCOPED_CAPABILITY THREAD_ANNOTATION_ATTRIBUTE__(scoped_lockable) + +#define TSA_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x)) + +#define TSA_PT_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(pt_guarded_by(x)) + +#define TSA_ACQUIRED_BEFORE(...) THREAD_ANNOTATION_ATTRIBUTE__(acquired_before(__VA_ARGS__)) + +#define TSA_ACQUIRED_AFTER(...) THREAD_ANNOTATION_ATTRIBUTE__(acquired_after(__VA_ARGS__)) + +#define TSA_REQUIRES(...) THREAD_ANNOTATION_ATTRIBUTE__(requires_capability(__VA_ARGS__)) + +#define TSA_REQUIRES_SHARED(...) \ + THREAD_ANNOTATION_ATTRIBUTE__(requires_shared_capability(__VA_ARGS__)) + +#define TSA_ACQUIRE(...) THREAD_ANNOTATION_ATTRIBUTE__(acquire_capability(__VA_ARGS__)) + +#define TSA_ACQUIRE_SHARED(...) \ + THREAD_ANNOTATION_ATTRIBUTE__(acquire_shared_capability(__VA_ARGS__)) + +#define TSA_RELEASE(...) THREAD_ANNOTATION_ATTRIBUTE__(release_capability(__VA_ARGS__)) + +#define TSA_RELEASE_SHARED(...) \ + THREAD_ANNOTATION_ATTRIBUTE__(release_shared_capability(__VA_ARGS__)) + +#define TSA_TRY_ACQUIRE(...) THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_capability(__VA_ARGS__)) + +#define TSA_TRY_ACQUIRE_SHARED(...) \ + THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_shared_capability(__VA_ARGS__)) + +#define TSA_EXCLUDES(...) THREAD_ANNOTATION_ATTRIBUTE__(locks_excluded(__VA_ARGS__)) + +#define TSA_ASSERT_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_capability(x)) + +#define TSA_ASSERT_SHARED_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_shared_capability(x)) + +#define TSA_RETURN_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(lock_returned(x)) + +#define TSA_NO_THREAD_SAFETY_ANALYSIS THREAD_ANNOTATION_ATTRIBUTE__(no_thread_safety_analysis) + +// Annotated mutex wrapper for use with Clang thread safety analysis. +// Wraps std::mutex and provides the CAPABILITY annotation so that +// GUARDED_BY / REQUIRES / etc. annotations can reference it. +class TSA_CAPABILITY("mutex") AnnotatedMutex { +public: + void lock() TSA_ACQUIRE() { _mutex.lock(); } + void unlock() TSA_RELEASE() { _mutex.unlock(); } + bool try_lock() TSA_TRY_ACQUIRE(true) { return _mutex.try_lock(); } + + // Access the underlying std::mutex (e.g., for std::condition_variable). + // Use with care — this bypasses thread safety annotations. + std::mutex& native_handle() { return _mutex; } + +private: + std::mutex _mutex; +}; + +// RAII scoped lock guard annotated for thread safety analysis. +template +class TSA_SCOPED_CAPABILITY AnnotatedLockGuard { +public: + explicit AnnotatedLockGuard(MutexType& mu) TSA_ACQUIRE(mu) : _mu(mu) { _mu.lock(); } + ~AnnotatedLockGuard() TSA_RELEASE() { _mu.unlock(); } + + AnnotatedLockGuard(const AnnotatedLockGuard&) = delete; + AnnotatedLockGuard& operator=(const AnnotatedLockGuard&) = delete; + +private: + MutexType& _mu; +}; diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index 10f59e25dbb90e..183b550429bdf7 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -32,6 +32,14 @@ if (WITH_LZO) ) endif() +# Enable Clang thread safety analysis for specific files +if (COMPILER_CLANG) + set_source_files_properties( + ${CMAKE_CURRENT_SOURCE_DIR}/operator/exchange_sink_buffer.cpp + PROPERTIES COMPILE_FLAGS "-Wthread-safety" + ) +endif() + add_library(Exec STATIC ${EXEC_FILES} ) diff --git a/be/src/exec/operator/exchange_sink_buffer.cpp b/be/src/exec/operator/exchange_sink_buffer.cpp index 22a316c9196df0..e3b9b0f0442a7b 100644 --- a/be/src/exec/operator/exchange_sink_buffer.cpp +++ b/be/src/exec/operator/exchange_sink_buffer.cpp @@ -177,9 +177,12 @@ Status ExchangeSinkBuffer::add_block(Channel* channel, TransmitInfo&& request) { } instance_data.package_queue[channel].emplace(std::move(request)); _total_queue_size++; - if (_total_queue_size > _queue_capacity) { - for (auto& dep : _queue_deps) { - dep->block(); + { + AnnotatedLockGuard l(_m); + if (_total_queue_size > _queue_capacity) { + for (auto& dep : _queue_deps) { + dep->block(); + } } } } @@ -377,9 +380,12 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) { } DCHECK_GE(_total_queue_size, requests.size()); _total_queue_size -= (int)requests.size(); - if (_total_queue_size <= _queue_capacity) { - for (auto& dep : _queue_deps) { - dep->set_ready(); + { + AnnotatedLockGuard l(_m); + if (_total_queue_size <= _queue_capacity) { + for (auto& dep : _queue_deps) { + dep->set_ready(); + } } } } else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) { @@ -557,9 +563,12 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) { } // Try to wake up pipeline after clearing the queue - if (_total_queue_size <= _queue_capacity) { - for (auto& dep : _queue_deps) { - dep->set_ready(); + { + AnnotatedLockGuard l(_m); + if (_total_queue_size <= _queue_capacity) { + for (auto& dep : _queue_deps) { + dep->set_ready(); + } } } @@ -579,6 +588,7 @@ void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins, ins.rpc_channel_is_turn_off = true; auto weak_task_ctx = weak_task_exec_ctx(); if (auto pip_ctx = weak_task_ctx.lock()) { + AnnotatedLockGuard l(_m); for (auto& parent : _parents) { parent->on_channel_finished(ins.id); } diff --git a/be/src/exec/operator/exchange_sink_buffer.h b/be/src/exec/operator/exchange_sink_buffer.h index 706067b9115aa5..bd006325b86807 100644 --- a/be/src/exec/operator/exchange_sink_buffer.h +++ b/be/src/exec/operator/exchange_sink_buffer.h @@ -34,6 +34,7 @@ #include "common/global_types.h" #include "common/status.h" +#include "common/thread_safety_annotations.h" #include "runtime/runtime_state.h" #include "service/backend_options.h" #include "util/brpc_closure.h" @@ -277,7 +278,7 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { void set_dependency(InstanceLoId sender_ins_id, std::shared_ptr queue_dependency, ExchangeSinkLocalState* local_state) { - std::lock_guard l(_m); + AnnotatedLockGuard l(_m); _queue_deps.push_back(queue_dependency); _parents.push_back(local_state); } @@ -330,11 +331,11 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { std::atomic _total_queue_size = 0; // protected the `_queue_deps` and `_parents` - std::mutex _m; + AnnotatedMutex _m; // _queue_deps is used for memory control. - std::vector> _queue_deps; + std::vector> _queue_deps TSA_GUARDED_BY(_m); // The ExchangeSinkLocalState in _parents is only used in _turn_off_channel. - std::vector _parents; + std::vector _parents TSA_GUARDED_BY(_m); const int64_t _exchange_sink_num; bool _send_multi_blocks = false; int _send_multi_blocks_byte_size = 256 * 1024;