Skip to content

Commit

Permalink
Define callback types for folly::ExecutionObserver
Browse files Browse the repository at this point in the history
Summary:
For folly::ExecutionObserver we want to know what type of callback are we dealing with. Currently introducing 4 different types of callbacks:

1. LibeventCallbacks - Those are the ones inside EventHandler::libeventCallback
2. LoopCallbacks -  Those are user defined callbacks, classes derived from LoopCallback and implementing `runLoopCallback` method
3. NotificationQueueCallbacks - Anytime someones does `runInEventBaseThread` from different thread we put this on notification queue and then execute them as a batch
4. Fiber callbacks - Not sure if we really needs this?

Reviewed By: ot

Differential Revision: D54395157

fbshipit-source-id: fe79adf5986f9e8dc79f93fae9f39c457d9961b2
  • Loading branch information
Giorgi Papakerashvili authored and facebook-github-bot committed Mar 16, 2024
1 parent ac06dfd commit 8aceb9b
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 16 deletions.
12 changes: 10 additions & 2 deletions folly/experimental/ExecutionObserver.h
Expand Up @@ -31,6 +31,14 @@ class ExecutionObserver
: public boost::intrusive::list_base_hook<
boost::intrusive::link_mode<boost::intrusive::auto_unlink>> {
public:
enum class CallbackType {
// Owned by EventBase.
Event,
Loop,
NotificationQueue,
// Owned by FiberManager.
Fiber,
};
// Constant time size = false to support auto_unlink behavior, options are
// mutually exclusive
typedef boost::intrusive::
Expand All @@ -44,14 +52,14 @@ class ExecutionObserver
*
* @param id Unique id for the task which is starting.
*/
virtual void starting(uintptr_t id) noexcept = 0;
virtual void starting(uintptr_t id, CallbackType callbackType) noexcept = 0;

/**
* Called just after a task stops executing.
*
* @param id Unique id for the task which stopped.
*/
virtual void stopped(uintptr_t id) noexcept = 0;
virtual void stopped(uintptr_t id, CallbackType callbackType) noexcept = 0;
};

} // namespace folly
16 changes: 12 additions & 4 deletions folly/fibers/FiberManagerInternal-inl.h
Expand Up @@ -138,7 +138,9 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) {

if (!observerList_.empty()) {
for (auto& observer : observerList_) {
observer.starting(reinterpret_cast<uintptr_t>(fiber));
observer.starting(
reinterpret_cast<uintptr_t>(fiber),
folly::ExecutionObserver::CallbackType::Fiber);
}
}

Expand All @@ -160,7 +162,9 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) {
awaitFunc_(*fiber);
awaitFunc_ = nullptr;
for (auto& observer : observerList_) {
observer.stopped(reinterpret_cast<uintptr_t>(fiber));
observer.stopped(
reinterpret_cast<uintptr_t>(fiber),
folly::ExecutionObserver::CallbackType::Fiber);
}
currentFiber_ = nullptr;
fiber->rcontext_ = RequestContext::saveContext();
Expand All @@ -184,7 +188,9 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) {
}
// Make sure LocalData is not accessible from its destructor
for (auto& observer : observerList_) {
observer.stopped(reinterpret_cast<uintptr_t>(fiber));
observer.stopped(
reinterpret_cast<uintptr_t>(fiber),
folly::ExecutionObserver::CallbackType::Fiber);
}

currentFiber_ = nullptr;
Expand All @@ -209,7 +215,9 @@ inline void FiberManager::runReadyFiber(Fiber* fiber) {
}
} else if (fiber->state_ == Fiber::YIELDED) {
for (auto& observer : observerList_) {
observer.stopped(reinterpret_cast<uintptr_t>(fiber));
observer.stopped(
reinterpret_cast<uintptr_t>(fiber),
folly::ExecutionObserver::CallbackType::Fiber);
}
currentFiber_ = nullptr;
fiber->rcontext_ = RequestContext::saveContext();
Expand Down
22 changes: 16 additions & 6 deletions folly/io/async/EventBase.cpp
Expand Up @@ -157,26 +157,31 @@ EventBaseBackend::~EventBaseBackend() {
class ExecutionObserverScopeGuard {
public:
ExecutionObserverScopeGuard(
folly::ExecutionObserver::List* observerList, void* id)
: observerList_(observerList), id_{reinterpret_cast<uintptr_t>(id)} {
folly::ExecutionObserver::List* observerList,
void* id,
folly::ExecutionObserver::CallbackType callbackType)
: observerList_(observerList),
id_{reinterpret_cast<uintptr_t>(id)},
callbackType_(callbackType) {
if (!observerList_->empty()) {
for (auto& observer : *observerList_) {
observer.starting(id_);
observer.starting(id_, callbackType_);
}
}
}

~ExecutionObserverScopeGuard() {
if (!observerList_->empty()) {
for (auto& observer : *observerList_) {
observer.stopped(id_);
observer.stopped(id_, callbackType_);
}
}
}

private:
folly::ExecutionObserver::List* observerList_;
uintptr_t id_;
folly::ExecutionObserver::CallbackType callbackType_;
};
} // namespace

Expand All @@ -187,7 +192,9 @@ class EventBase::FuncRunner {
explicit FuncRunner(EventBase& eventBase) : eventBase_(eventBase) {}
void operator()(Func&& func) noexcept {
ExecutionObserverScopeGuard guard(
&eventBase_.getExecutionObserverList(), &func);
&eventBase_.getExecutionObserverList(),
&func,
folly::ExecutionObserver::CallbackType::NotificationQueue);
std::exchange(func, {})();
}

Expand Down Expand Up @@ -924,7 +931,10 @@ void EventBase::runLoopCallbacks(LoopCallbackList& currentCallbacks) {
LoopCallback* callback = &currentCallbacks.front();
currentCallbacks.pop_front();
folly::RequestContextScopeGuard rctx(std::move(callback->context_));
ExecutionObserverScopeGuard guard(&executionObserverList_, callback);
ExecutionObserverScopeGuard guard(
&executionObserverList_,
callback,
folly::ExecutionObserver::CallbackType::Loop);
callback->runLoopCallback();
}
}
Expand Down
8 changes: 6 additions & 2 deletions folly/io/async/EventHandler.cpp
Expand Up @@ -151,7 +151,9 @@ void EventHandler::libeventCallback(libevent_fd_t fd, short events, void* arg) {
auto& observers = handler->eventBase_->getExecutionObserverList();
if (!observers.empty()) {
for (auto& observer : observers) {
observer.starting(reinterpret_cast<uintptr_t>(handler));
observer.starting(
reinterpret_cast<uintptr_t>(handler),
folly::ExecutionObserver::CallbackType::Event);
}
}

Expand All @@ -162,7 +164,9 @@ void EventHandler::libeventCallback(libevent_fd_t fd, short events, void* arg) {

if (!observers.empty()) {
for (auto& observer : observers) {
observer.stopped(reinterpret_cast<uintptr_t>(handler));
observer.stopped(
reinterpret_cast<uintptr_t>(handler),
folly::ExecutionObserver::CallbackType::Event);
}
}
}
Expand Down
10 changes: 8 additions & 2 deletions folly/io/async/test/EventBaseTestLib.h
Expand Up @@ -163,13 +163,19 @@ FOLLY_ALWAYS_INLINE void scheduleEvents(

class TestObserver : public folly::ExecutionObserver {
public:
virtual void starting(uintptr_t /* id */) noexcept override {
virtual void starting(
uintptr_t /* id */,
folly::ExecutionObserver::CallbackType /* callbackType */) noexcept
override {
if (nestedStart_ == 0) {
nestedStart_ = 1;
}
numStartingCalled_++;
}
virtual void stopped(uintptr_t /* id */) noexcept override {
virtual void stopped(
uintptr_t /* id */,
folly::ExecutionObserver::CallbackType /* callbackType */) noexcept
override {
nestedStart_--;
numStoppedCalled_++;
}
Expand Down

0 comments on commit 8aceb9b

Please sign in to comment.