Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport #50034 to 23.4: Fix assert in SpanHolder::finish() with fibers attempt 2 #50371

Merged
merged 1 commit into from May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/Client/ConnectionEstablisher.cpp
Expand Up @@ -116,7 +116,7 @@ ConnectionEstablisherAsync::ConnectionEstablisherAsync(
epoll.add(timeout_descriptor.getDescriptor());
}

void ConnectionEstablisherAsync::Task::run(AsyncCallback async_callback, ResumeCallback)
void ConnectionEstablisherAsync::Task::run(AsyncCallback async_callback, SuspendCallback)
{
connection_establisher_async.reset();
connection_establisher_async.connection_establisher.setAsyncCallback(async_callback);
Expand Down
2 changes: 1 addition & 1 deletion src/Client/ConnectionEstablisher.h
Expand Up @@ -91,7 +91,7 @@ class ConnectionEstablisherAsync : public AsyncTaskExecutor

ConnectionEstablisherAsync & connection_establisher_async;

void run(AsyncCallback async_callback, ResumeCallback suspend_callback) override;
void run(AsyncCallback async_callback, SuspendCallback suspend_callback) override;
};

void cancelAfter() override;
Expand Down
2 changes: 1 addition & 1 deletion src/Client/PacketReceiver.cpp
Expand Up @@ -57,7 +57,7 @@ bool PacketReceiver::checkTimeout()
return true;
}

void PacketReceiver::Task::run(AsyncCallback async_callback, ResumeCallback suspend_callback)
void PacketReceiver::Task::run(AsyncCallback async_callback, SuspendCallback suspend_callback)
{
while (true)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Client/PacketReceiver.h
Expand Up @@ -57,7 +57,7 @@ class PacketReceiver : public AsyncTaskExecutor

PacketReceiver & receiver;

void run(AsyncCallback async_callback, ResumeCallback suspend_callback) override;
void run(AsyncCallback async_callback, SuspendCallback suspend_callback) override;
};

/// When epoll file descriptor is ready, check if it's an expired timeout.
Expand Down
36 changes: 7 additions & 29 deletions src/Common/AsyncTaskExecutor.cpp
Expand Up @@ -3,18 +3,11 @@
namespace DB
{

thread_local FiberInfo current_fiber_info;

AsyncTaskExecutor::AsyncTaskExecutor(std::unique_ptr<AsyncTask> task_) : task(std::move(task_))
{
createFiber();
}

FiberInfo AsyncTaskExecutor::getCurrentFiberInfo()
{
return current_fiber_info;
}

void AsyncTaskExecutor::resume()
{
if (routine_is_finished)
Expand All @@ -38,10 +31,7 @@ void AsyncTaskExecutor::resume()

void AsyncTaskExecutor::resumeUnlocked()
{
auto parent_fiber_info = current_fiber_info;
current_fiber_info = FiberInfo{&fiber, &parent_fiber_info};
fiber = std::move(fiber).resume();
current_fiber_info = parent_fiber_info;
fiber.resume();
}

void AsyncTaskExecutor::cancel()
Expand Down Expand Up @@ -69,30 +59,19 @@ struct AsyncTaskExecutor::Routine
struct AsyncCallback
{
AsyncTaskExecutor & executor;
Fiber & fiber;
SuspendCallback suspend_callback;

void operator()(int fd, Poco::Timespan timeout, AsyncEventTimeoutType type, const std::string & desc, uint32_t events)
{
executor.processAsyncEvent(fd, timeout, type, desc, events);
fiber = std::move(fiber).resume();
suspend_callback();
executor.clearAsyncEvent();
}
};

struct ResumeCallback
{
Fiber & fiber;

void operator()()
{
fiber = std::move(fiber).resume();
}
};

Fiber operator()(Fiber && sink)
void operator()(SuspendCallback suspend_callback)
{
auto async_callback = AsyncCallback{executor, sink};
auto suspend_callback = ResumeCallback{sink};
auto async_callback = AsyncCallback{executor, suspend_callback};
try
{
executor.task->run(async_callback, suspend_callback);
Expand All @@ -110,18 +89,17 @@ struct AsyncTaskExecutor::Routine
}

executor.routine_is_finished = true;
return std::move(sink);
}
};

void AsyncTaskExecutor::createFiber()
{
fiber = boost::context::fiber(std::allocator_arg_t(), fiber_stack, Routine{*this});
fiber = Fiber(fiber_stack, Routine{*this});
}

void AsyncTaskExecutor::destroyFiber()
{
boost::context::fiber to_destroy = std::move(fiber);
Fiber to_destroy = std::move(fiber);
}

String getSocketTimeoutExceededMessageByTimeoutType(AsyncEventTimeoutType type, Poco::Timespan timeout, const String & socket_description)
Expand Down
47 changes: 2 additions & 45 deletions src/Common/AsyncTaskExecutor.h
Expand Up @@ -22,7 +22,7 @@ enum class AsyncEventTimeoutType
};

using AsyncCallback = std::function<void(int, Poco::Timespan, AsyncEventTimeoutType, const std::string &, uint32_t)>;
using ResumeCallback = std::function<void()>;
using SuspendCallback = std::function<void()>;

struct FiberInfo
{
Expand All @@ -38,7 +38,7 @@ struct FiberInfo
struct AsyncTask
{
public:
virtual void run(AsyncCallback async_callback, ResumeCallback suspend_callback) = 0;
virtual void run(AsyncCallback async_callback, SuspendCallback suspend_callback) = 0;
virtual ~AsyncTask() = default;
};

Expand Down Expand Up @@ -80,7 +80,6 @@ class AsyncTaskExecutor
};
#endif

static FiberInfo getCurrentFiberInfo();
protected:
/// Method that is called in resume() before actual fiber resuming.
/// If it returns false, resume() will return immediately without actual fiber resuming.
Expand Down Expand Up @@ -124,48 +123,6 @@ class AsyncTaskExecutor
std::unique_ptr<AsyncTask> task;
};

/// Simple implementation for fiber local variable.
template <typename T>
struct FiberLocal
{
public:
FiberLocal()
{
/// Initialize main instance for this thread. Instances for fibers will inherit it,
/// (it's needed because main instance could be changed before creating fibers
/// and changes should be visible in fibers).
data[nullptr] = T();
}

T & operator*()
{
return get();
}

T * operator->()
{
return &get();
}

private:
T & get()
{
return getInstanceForFiber(AsyncTaskExecutor::getCurrentFiberInfo());
}

T & getInstanceForFiber(FiberInfo info)
{
auto it = data.find(info.fiber);
/// If it's the first request, we need to initialize instance for the fiber
/// using instance from parent fiber or main thread that created fiber.
if (it == data.end())
it = data.insert({info.fiber, getInstanceForFiber(*info.parent_fiber_info)}).first;
return it->second;
}

std::unordered_map<const Fiber *, T> data;
};

String getSocketTimeoutExceededMessageByTimeoutType(AsyncEventTimeoutType type, Poco::Timespan timeout, const String & socket_description);

}
144 changes: 143 additions & 1 deletion src/Common/Fiber.h
Expand Up @@ -3,5 +3,147 @@
/// BOOST_USE_ASAN, BOOST_USE_TSAN and BOOST_USE_UCONTEXT should be correctly defined for sanitizers.
#include <base/defines.h>
#include <boost/context/fiber.hpp>
#include <map>

/// Class wrapper for boost::context::fiber.
/// It tracks current executing fiber for thread and
/// supports storing fiber-specific data
/// that will be destroyed on fiber destructor.
class Fiber
{
private:
using Impl = boost::context::fiber;
using FiberPtr = Fiber *;
template <typename T> friend class FiberLocal;

public:
template< typename StackAlloc, typename Fn>
Fiber(StackAlloc && salloc, Fn && fn) : impl(std::allocator_arg_t(), std::forward<StackAlloc>(salloc), RoutineImpl(std::forward<Fn>(fn)))
{
}

Fiber() = default;

Fiber(Fiber && other) = default;
Fiber & operator=(Fiber && other) = default;

Fiber(const Fiber &) = delete;
Fiber & operator =(const Fiber &) = delete;

explicit operator bool() const
{
return impl.operator bool();
}

void resume()
{
/// Update information about current executing fiber.
FiberPtr & current_fiber = getCurrentFiber();
FiberPtr parent_fiber = current_fiber;
current_fiber = this;
impl = std::move(impl).resume();
/// Restore parent fiber.
current_fiber = parent_fiber;
}

private:
template <typename Fn>
struct RoutineImpl
{
struct SuspendCallback
{
Impl & impl;

void operator()()
{
impl = std::move(impl).resume();
}
};

explicit RoutineImpl(Fn && fn_) : fn(std::move(fn_))
{
}

Impl operator()(Impl && sink)
{
SuspendCallback suspend_callback{sink};
fn(suspend_callback);
return std::move(sink);
}

Fn fn;
};

static FiberPtr & getCurrentFiber()
{
thread_local static FiberPtr current_fiber;
return current_fiber;
}

/// Special wrapper to store data in uniquer_ptr.
struct DataWrapper
{
virtual ~DataWrapper() = default;
};

using DataPtr = std::unique_ptr<DataWrapper>;

/// Get reference to fiber-specific data by key
/// (the pointer to the structure that uses this data).
DataPtr & getLocalData(void * key)
{
return local_data[key];
}

Impl && release()
{
return std::move(impl);
}

Impl impl;
std::map<void *, DataPtr> local_data;
};

/// Implementation for fiber local variable.
/// If we are in fiber, it returns fiber local data,
/// otherwise it returns it's single field.
/// Fiber local data is destroyed in Fiber destructor.
/// Implementation is similar to boost::fiber::fiber_specific_ptr
/// (we cannot use it because we don't use boost::fiber API.
template <typename T>
class FiberLocal
{
public:
T & operator*()
{
return get();
}

T * operator->()
{
return &get();
}

private:
struct DataWrapperImpl : public Fiber::DataWrapper
{
T impl;
};

T & get()
{
Fiber * current_fiber = Fiber::getCurrentFiber();
if (!current_fiber)
return main_instance;

Fiber::DataPtr & ptr = current_fiber->getLocalData(this);
/// Initialize instance on first request.
if (!ptr)
ptr = std::make_unique<DataWrapperImpl>();

return dynamic_cast<DataWrapperImpl *>(ptr.get())->impl;
}

T main_instance;
};

using Fiber = boost::context::fiber;