Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 105 additions & 0 deletions be/src/common/thread_safety_annotations.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// Thread safety annotation macros and annotated mutex wrappers for
// Clang's -Wthread-safety static analysis.
// Reference: https://clang.llvm.org/docs/ThreadSafetyAnalysis.html

#pragma once

#include <mutex>

// Enable thread safety attributes only with clang.
// The attributes can be safely erased when compiling with other compilers.
#if defined(__clang__) && (!defined(SWIG))
#define THREAD_ANNOTATION_ATTRIBUTE__(x) __attribute__((x))
#else
#define THREAD_ANNOTATION_ATTRIBUTE__(x) // no-op
#endif

#define TSA_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(capability(x))

#define TSA_SCOPED_CAPABILITY THREAD_ANNOTATION_ATTRIBUTE__(scoped_lockable)

#define TSA_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(guarded_by(x))

#define TSA_PT_GUARDED_BY(x) THREAD_ANNOTATION_ATTRIBUTE__(pt_guarded_by(x))

#define TSA_ACQUIRED_BEFORE(...) THREAD_ANNOTATION_ATTRIBUTE__(acquired_before(__VA_ARGS__))

#define TSA_ACQUIRED_AFTER(...) THREAD_ANNOTATION_ATTRIBUTE__(acquired_after(__VA_ARGS__))

#define TSA_REQUIRES(...) THREAD_ANNOTATION_ATTRIBUTE__(requires_capability(__VA_ARGS__))

#define TSA_REQUIRES_SHARED(...) \
THREAD_ANNOTATION_ATTRIBUTE__(requires_shared_capability(__VA_ARGS__))

#define TSA_ACQUIRE(...) THREAD_ANNOTATION_ATTRIBUTE__(acquire_capability(__VA_ARGS__))

#define TSA_ACQUIRE_SHARED(...) \
THREAD_ANNOTATION_ATTRIBUTE__(acquire_shared_capability(__VA_ARGS__))

#define TSA_RELEASE(...) THREAD_ANNOTATION_ATTRIBUTE__(release_capability(__VA_ARGS__))

#define TSA_RELEASE_SHARED(...) \
THREAD_ANNOTATION_ATTRIBUTE__(release_shared_capability(__VA_ARGS__))

#define TSA_TRY_ACQUIRE(...) THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_capability(__VA_ARGS__))

#define TSA_TRY_ACQUIRE_SHARED(...) \
THREAD_ANNOTATION_ATTRIBUTE__(try_acquire_shared_capability(__VA_ARGS__))

#define TSA_EXCLUDES(...) THREAD_ANNOTATION_ATTRIBUTE__(locks_excluded(__VA_ARGS__))

#define TSA_ASSERT_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_capability(x))

#define TSA_ASSERT_SHARED_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(assert_shared_capability(x))

#define TSA_RETURN_CAPABILITY(x) THREAD_ANNOTATION_ATTRIBUTE__(lock_returned(x))

#define TSA_NO_THREAD_SAFETY_ANALYSIS THREAD_ANNOTATION_ATTRIBUTE__(no_thread_safety_analysis)

// Annotated mutex wrapper for use with Clang thread safety analysis.
// Wraps std::mutex and provides the CAPABILITY annotation so that
// GUARDED_BY / REQUIRES / etc. annotations can reference it.
class TSA_CAPABILITY("mutex") AnnotatedMutex {
public:
void lock() TSA_ACQUIRE() { _mutex.lock(); }
void unlock() TSA_RELEASE() { _mutex.unlock(); }
bool try_lock() TSA_TRY_ACQUIRE(true) { return _mutex.try_lock(); }

// Access the underlying std::mutex (e.g., for std::condition_variable).
// Use with care — this bypasses thread safety annotations.
std::mutex& native_handle() { return _mutex; }

private:
std::mutex _mutex;
};

// RAII scoped lock guard annotated for thread safety analysis.
template <typename MutexType>
class TSA_SCOPED_CAPABILITY AnnotatedLockGuard {
public:
explicit AnnotatedLockGuard(MutexType& mu) TSA_ACQUIRE(mu) : _mu(mu) { _mu.lock(); }
~AnnotatedLockGuard() TSA_RELEASE() { _mu.unlock(); }

AnnotatedLockGuard(const AnnotatedLockGuard&) = delete;
AnnotatedLockGuard& operator=(const AnnotatedLockGuard&) = delete;

private:
MutexType& _mu;
};
8 changes: 8 additions & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ if (WITH_LZO)
)
endif()

# Enable Clang thread safety analysis for specific files
if (COMPILER_CLANG)
set_source_files_properties(
${CMAKE_CURRENT_SOURCE_DIR}/operator/exchange_sink_buffer.cpp
PROPERTIES COMPILE_FLAGS "-Wthread-safety"
)
endif()

add_library(Exec STATIC
${EXEC_FILES}
)
Expand Down
28 changes: 19 additions & 9 deletions be/src/exec/operator/exchange_sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,12 @@ Status ExchangeSinkBuffer::add_block(Channel* channel, TransmitInfo&& request) {
}
instance_data.package_queue[channel].emplace(std::move(request));
_total_queue_size++;
if (_total_queue_size > _queue_capacity) {
for (auto& dep : _queue_deps) {
dep->block();
{
AnnotatedLockGuard l(_m);
if (_total_queue_size > _queue_capacity) {
for (auto& dep : _queue_deps) {
dep->block();
}
}
}
}
Expand Down Expand Up @@ -377,9 +380,12 @@ Status ExchangeSinkBuffer::_send_rpc(RpcInstance& instance_data) {
}
DCHECK_GE(_total_queue_size, requests.size());
_total_queue_size -= (int)requests.size();
if (_total_queue_size <= _queue_capacity) {
for (auto& dep : _queue_deps) {
dep->set_ready();
{
AnnotatedLockGuard l(_m);
if (_total_queue_size <= _queue_capacity) {
for (auto& dep : _queue_deps) {
dep->set_ready();
}
}
}
} else if (broadcast_q_ptr && !broadcast_q_ptr->empty()) {
Expand Down Expand Up @@ -557,9 +563,12 @@ void ExchangeSinkBuffer::_set_receiver_eof(RpcInstance& ins) {
}

// Try to wake up pipeline after clearing the queue
if (_total_queue_size <= _queue_capacity) {
for (auto& dep : _queue_deps) {
dep->set_ready();
{
AnnotatedLockGuard l(_m);
if (_total_queue_size <= _queue_capacity) {
for (auto& dep : _queue_deps) {
dep->set_ready();
}
}
}

Expand All @@ -579,6 +588,7 @@ void ExchangeSinkBuffer::_turn_off_channel(RpcInstance& ins,
ins.rpc_channel_is_turn_off = true;
auto weak_task_ctx = weak_task_exec_ctx();
if (auto pip_ctx = weak_task_ctx.lock()) {
AnnotatedLockGuard l(_m);
for (auto& parent : _parents) {
parent->on_channel_finished(ins.id);
}
Expand Down
9 changes: 5 additions & 4 deletions be/src/exec/operator/exchange_sink_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#include "common/global_types.h"
#include "common/status.h"
#include "common/thread_safety_annotations.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "util/brpc_closure.h"
Expand Down Expand Up @@ -277,7 +278,7 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {

void set_dependency(InstanceLoId sender_ins_id, std::shared_ptr<Dependency> queue_dependency,
ExchangeSinkLocalState* local_state) {
std::lock_guard l(_m);
AnnotatedLockGuard l(_m);
_queue_deps.push_back(queue_dependency);
_parents.push_back(local_state);
}
Expand Down Expand Up @@ -330,11 +331,11 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx {
std::atomic<int> _total_queue_size = 0;

// protected the `_queue_deps` and `_parents`
std::mutex _m;
AnnotatedMutex _m;
// _queue_deps is used for memory control.
std::vector<std::shared_ptr<Dependency>> _queue_deps;
std::vector<std::shared_ptr<Dependency>> _queue_deps TSA_GUARDED_BY(_m);
// The ExchangeSinkLocalState in _parents is only used in _turn_off_channel.
std::vector<ExchangeSinkLocalState*> _parents;
std::vector<ExchangeSinkLocalState*> _parents TSA_GUARDED_BY(_m);
const int64_t _exchange_sink_num;
bool _send_multi_blocks = false;
int _send_multi_blocks_byte_size = 256 * 1024;
Expand Down
Loading