Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix assert in SpanHolder::finish() with fibers #49673

Merged
merged 15 commits into from May 16, 2023
18 changes: 13 additions & 5 deletions contrib/boost-cmake/CMakeLists.txt
Expand Up @@ -103,11 +103,19 @@ set (SRCS_CONTEXT
)

if (ARCH_AARCH64)
set (SRCS_CONTEXT ${SRCS_CONTEXT}
"${LIBRARY_DIR}/libs/context/src/asm/jump_arm64_aapcs_elf_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/make_arm64_aapcs_elf_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/ontop_arm64_aapcs_elf_gas.S"
)
if (OS_DARWIN)
set (SRCS_CONTEXT ${SRCS_CONTEXT}
"${LIBRARY_DIR}/libs/context/src/asm/jump_arm64_aapcs_macho_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/make_arm64_aapcs_macho_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/ontop_arm64_aapcs_macho_gas.S"
)
else()
set (SRCS_CONTEXT ${SRCS_CONTEXT}
"${LIBRARY_DIR}/libs/context/src/asm/jump_arm64_aapcs_elf_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/make_arm64_aapcs_elf_gas.S"
"${LIBRARY_DIR}/libs/context/src/asm/ontop_arm64_aapcs_elf_gas.S"
)
endif()
elseif (ARCH_PPC64LE)
set (SRCS_CONTEXT ${SRCS_CONTEXT}
"${LIBRARY_DIR}/libs/context/src/asm/jump_ppc64_sysv_elf_gas.S"
Expand Down
10 changes: 10 additions & 0 deletions src/Common/AsyncTaskExecutor.cpp
Expand Up @@ -3,11 +3,18 @@
namespace DB
{

thread_local FiberInfo current_fiber_info;

AsyncTaskExecutor::AsyncTaskExecutor(std::unique_ptr<AsyncTask> task_) : task(std::move(task_))
{
createFiber();
}

FiberInfo AsyncTaskExecutor::getCurrentFiberInfo()
{
return current_fiber_info;
}

void AsyncTaskExecutor::resume()
{
if (routine_is_finished)
Expand All @@ -31,7 +38,10 @@ void AsyncTaskExecutor::resume()

void AsyncTaskExecutor::resumeUnlocked()
{
auto parent_fiber_info = current_fiber_info;
current_fiber_info = FiberInfo{&fiber, &parent_fiber_info};
fiber = std::move(fiber).resume();
current_fiber_info = parent_fiber_info;
}

void AsyncTaskExecutor::cancel()
Expand Down
48 changes: 48 additions & 0 deletions src/Common/AsyncTaskExecutor.h
Expand Up @@ -24,6 +24,11 @@ enum class AsyncEventTimeoutType
using AsyncCallback = std::function<void(int, Poco::Timespan, AsyncEventTimeoutType, const std::string &, uint32_t)>;
using ResumeCallback = std::function<void()>;

struct FiberInfo
{
const Fiber * fiber = nullptr;
const FiberInfo * parent_fiber_info = nullptr;
};

/// Base class for a task that will be executed in a fiber.
/// It has only one method - run, that takes 2 callbacks:
Expand Down Expand Up @@ -75,6 +80,7 @@ class AsyncTaskExecutor
};
#endif

static FiberInfo getCurrentFiberInfo();
protected:
/// Method that is called in resume() before actual fiber resuming.
/// If it returns false, resume() will return immediately without actual fiber resuming.
Expand Down Expand Up @@ -118,6 +124,48 @@ class AsyncTaskExecutor
std::unique_ptr<AsyncTask> task;
};

/// Simple implementation for fiber local variable.
template <typename T>
struct FiberLocal
{
public:
FiberLocal()
{
/// Initialize main instance for this thread. Instances for fibers will inherit it,
/// (it's needed because main instance could be changed before creating fibers
/// and changes should be visible in fibers).
data[nullptr] = T();
}

T & operator*()
{
return get();
}

T * operator->()
{
return &get();
}

private:
T & get()
{
return getInstanceForFiber(AsyncTaskExecutor::getCurrentFiberInfo());
}

T & getInstanceForFiber(FiberInfo info)
{
auto it = data.find(info.fiber);
/// If it's the first request, we need to initialize instance for the fiber
/// using instance from parent fiber or main thread that created fiber.
if (it == data.end())
it = data.insert({info.fiber, getInstanceForFiber(*info.parent_fiber_info)}).first;
return it->second;
}

std::unordered_map<const Fiber *, T> data;
};

String getSocketTimeoutExceededMessageByTimeoutType(AsyncEventTimeoutType type, Poco::Timespan timeout, const String & socket_description);

}
44 changes: 24 additions & 20 deletions src/Common/OpenTelemetryTraceContext.cpp
Expand Up @@ -7,12 +7,16 @@
#include <Core/Settings.h>
#include <IO/Operators.h>

#include <Common/AsyncTaskExecutor.h>

namespace DB
{
namespace OpenTelemetry
{

thread_local TracingContextOnThread current_thread_trace_context;
///// This code can be executed inside several fibers in one thread,
///// we should use fiber local tracing context.
thread_local FiberLocal<TracingContextOnThread> current_fiber_trace_context;

bool Span::addAttribute(std::string_view name, UInt64 value) noexcept
{
Expand Down Expand Up @@ -104,16 +108,16 @@ bool Span::addAttributeImpl(std::string_view name, std::string_view value) noexc

SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
{
if (!current_thread_trace_context.isTraceEnabled())
if (!current_fiber_trace_context->isTraceEnabled())
{
return;
}

/// Use try-catch to make sure the ctor is exception safe.
try
{
this->trace_id = current_thread_trace_context.trace_id;
this->parent_span_id = current_thread_trace_context.span_id;
this->trace_id = current_fiber_trace_context->trace_id;
this->parent_span_id = current_fiber_trace_context->span_id;
this->span_id = thread_local_rng(); // create a new id for this span
this->operation_name = _operation_name;
this->kind = _kind;
Expand All @@ -132,7 +136,7 @@ SpanHolder::SpanHolder(std::string_view _operation_name, SpanKind _kind)
}

/// Set current span as parent of other spans created later on this thread.
current_thread_trace_context.span_id = this->span_id;
current_fiber_trace_context->span_id = this->span_id;
}

void SpanHolder::finish() noexcept
Expand All @@ -141,12 +145,12 @@ void SpanHolder::finish() noexcept
return;

// First of all, restore old value of current span.
assert(current_thread_trace_context.span_id == span_id);
current_thread_trace_context.span_id = parent_span_id;
assert(current_fiber_trace_context->span_id == span_id);
current_fiber_trace_context->span_id = parent_span_id;

try
{
auto log = current_thread_trace_context.span_log.lock();
auto log = current_fiber_trace_context->span_log.lock();

/// The log might be disabled, check it before use
if (log)
Expand Down Expand Up @@ -269,7 +273,7 @@ void TracingContext::serialize(WriteBuffer & buf) const

const TracingContextOnThread & CurrentContext()
{
return current_thread_trace_context;
return *current_fiber_trace_context;
}

void TracingContextOnThread::reset() noexcept
Expand All @@ -291,7 +295,7 @@ TracingContextHolder::TracingContextHolder(
/// If any exception is raised during the construction, the tracing is not enabled on current thread.
try
{
if (current_thread_trace_context.isTraceEnabled())
if (current_fiber_trace_context->isTraceEnabled())
{
///
/// This is not the normal case,
Expand All @@ -304,15 +308,15 @@ TracingContextHolder::TracingContextHolder(
/// So this branch ensures this class can be instantiated multiple times on one same thread safely.
///
this->is_context_owner = false;
this->root_span.trace_id = current_thread_trace_context.trace_id;
this->root_span.parent_span_id = current_thread_trace_context.span_id;
this->root_span.trace_id = current_fiber_trace_context->trace_id;
this->root_span.parent_span_id = current_fiber_trace_context->span_id;
this->root_span.span_id = thread_local_rng();
this->root_span.operation_name = _operation_name;
this->root_span.start_time_us
= std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();

/// Set the root span as parent of other spans created on current thread
current_thread_trace_context.span_id = this->root_span.span_id;
current_fiber_trace_context->span_id = this->root_span.span_id;
return;
}

Expand Down Expand Up @@ -356,10 +360,10 @@ TracingContextHolder::TracingContextHolder(
}

/// Set up trace context on current thread only when the root span is successfully initialized.
current_thread_trace_context = _parent_trace_context;
current_thread_trace_context.span_id = this->root_span.span_id;
current_thread_trace_context.trace_flags = TRACE_FLAG_SAMPLED;
current_thread_trace_context.span_log = _span_log;
*current_fiber_trace_context = _parent_trace_context;
current_fiber_trace_context->span_id = this->root_span.span_id;
current_fiber_trace_context->trace_flags = TRACE_FLAG_SAMPLED;
current_fiber_trace_context->span_log = _span_log;
}

TracingContextHolder::~TracingContextHolder()
Expand All @@ -371,7 +375,7 @@ TracingContextHolder::~TracingContextHolder()

try
{
auto shared_span_log = current_thread_trace_context.span_log.lock();
auto shared_span_log = current_fiber_trace_context->span_log.lock();
if (shared_span_log)
{
try
Expand Down Expand Up @@ -402,11 +406,11 @@ TracingContextHolder::~TracingContextHolder()
if (this->is_context_owner)
{
/// Clear the context on current thread
current_thread_trace_context.reset();
current_fiber_trace_context->reset();
}
else
{
current_thread_trace_context.span_id = this->root_span.parent_span_id;
current_fiber_trace_context->span_id = this->root_span.parent_span_id;
}
}

Expand Down