Skip to content

Commit

Permalink
Use IOThreadPoolExecutor to create Proxygen worker threads.
Browse files Browse the repository at this point in the history
Summary: Make `RequestWorkerThread` no longer inherit from `WorkerThread` so it can be managed by a `folly::Executor`. Introduce `RequestWorkerThreadNoExecutor` that behaves the same way that `RequestWorkerThread` previously did.

Reviewed By: dddmello, mjoras

Differential Revision: D45195785

fbshipit-source-id: 45e5ff6470b5bcdbb6bcbccecb509f7942c88f10
  • Loading branch information
Keivaun Waugh authored and facebook-github-bot committed May 3, 2023
1 parent 86a686a commit d608c1a
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 31 deletions.
1 change: 1 addition & 0 deletions proxygen/lib/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ add_library(
pools/generators/ServerListGenerator.cpp
sampling/Sampling.cpp
services/RequestWorkerThread.cpp
services/RequestWorkerThreadNoExecutor.cpp
services/Service.cpp
services/WorkerThread.cpp
stats/ResourceStats.cpp
Expand Down
54 changes: 40 additions & 14 deletions proxygen/lib/services/RequestWorkerThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <proxygen/lib/services/RequestWorkerThread.h>

#include <csignal>
#include <folly/io/async/EventBaseManager.h>
#include <proxygen/lib/services/ServiceWorker.h>

Expand All @@ -16,19 +17,15 @@ namespace proxygen {
static const uint32_t requestIdBits = 56;
static const uint64_t requestIdMask = ((1ULL << requestIdBits) - 1);

thread_local RequestWorkerThread* RequestWorkerThread::currentRequestWorker_ =
nullptr;

RequestWorkerThread::RequestWorkerThread(FinishCallback& callback,
uint8_t threadId,
const std::string& evbName)
: WorkerThread(folly::EventBaseManager::get(), evbName),
nextRequestId_(static_cast<uint64_t>(threadId) << requestIdBits),
callback_(callback) {
}

RequestWorkerThread::~RequestWorkerThread() {
// It is important to reset the underlying event base in advance of this
// class' destruction as it may be that there are functions awaiting
// execution that possess a reference to this class.
resetEventBase();
folly::EventBase* evb)
: nextRequestId_(static_cast<uint64_t>(threadId) << requestIdBits),
callback_(callback),
evb_(evb) {
}

uint8_t RequestWorkerThread::getWorkerId() const {
Expand All @@ -50,12 +47,41 @@ void RequestWorkerThread::flushStats() {
}

void RequestWorkerThread::setup() {
WorkerThread::setup();
callback_.workerStarted(this);
CHECK(evb_);
evb_->runImmediatelyOrRunInEventBaseThreadAndWait([&]() {
sigset_t ss;

// Ignore some signals
sigemptyset(&ss);
sigaddset(&ss, SIGHUP);
sigaddset(&ss, SIGINT);
sigaddset(&ss, SIGQUIT);
sigaddset(&ss, SIGUSR1);
sigaddset(&ss, SIGUSR2);
sigaddset(&ss, SIGPIPE);
sigaddset(&ss, SIGALRM);
sigaddset(&ss, SIGTERM);
sigaddset(&ss, SIGCHLD);
sigaddset(&ss, SIGIO);
PCHECK(pthread_sigmask(SIG_BLOCK, &ss, nullptr) == 0);

currentRequestWorker_ = this;
callback_.workerStarted(this);
});
}

void RequestWorkerThread::forceStop() {
forceStopped_.store(true);
evb_->terminateLoopSoon();
}

void RequestWorkerThread::cleanup() {
WorkerThread::cleanup();
LOG(INFO) << "Worker " << getWorkerId() << " in cleanup";
if (!forceStopped_.load()) {
LOG(INFO) << "Looping to finish pending work";
evb_->loop();
}
currentRequestWorker_ = nullptr;
callback_.workerFinished(this);
}

Expand Down
37 changes: 20 additions & 17 deletions proxygen/lib/services/RequestWorkerThread.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#pragma once

#include <cstdint>
#include <folly/io/async/EventBase.h>
#include <map>
#include <proxygen/lib/services/WorkerThread.h>
#include <wangle/acceptor/LoadShedConfiguration.h>

namespace proxygen {
Expand All @@ -19,10 +19,10 @@ class Service;
class ServiceWorker;

/**
* RequestWorkerThread extends WorkerThread, and also contains a list of
* ServiceWorkers running in this thread.
* RequestWorkerThread intended to be used with a folly::Executor, and also
* contains a list of ServiceWorkers running in this thread.
*/
class RequestWorkerThread : public WorkerThread {
class RequestWorkerThread {
public:
class FinishCallback {
public:
Expand All @@ -41,12 +41,7 @@ class RequestWorkerThread : public WorkerThread {
*/
RequestWorkerThread(FinishCallback& callback,
uint8_t threadId,
const std::string& evbName = std::string());

/**
* Reset the underlying event base prior to WorkerThread destruction.
*/
~RequestWorkerThread() override;
folly::EventBase* evb);

/**
* Return a unique 64bit identifier.
Expand All @@ -59,10 +54,7 @@ class RequestWorkerThread : public WorkerThread {
uint8_t getWorkerId() const;

static RequestWorkerThread* getRequestWorkerThread() {
RequestWorkerThread* self = dynamic_cast<RequestWorkerThread*>(
WorkerThread::getCurrentWorkerThread());
CHECK_NOTNULL(self);
return self;
return currentRequestWorker_;
}

/**
Expand Down Expand Up @@ -105,10 +97,16 @@ class RequestWorkerThread : public WorkerThread {
*/
void flushStats();

private:
void setup() override;
void cleanup() override;
void forceStop();

folly::EventBase* getEventBase() {
return evb_;
}

void setup();
void cleanup();

private:
// The next request id within this thread. The id has its highest byte set to
// the thread id, so is unique across the process.
uint64_t nextRequestId_;
Expand All @@ -122,6 +120,11 @@ class RequestWorkerThread : public WorkerThread {
std::shared_ptr<const wangle::LoadShedConfiguration> loadShedConfig_{nullptr};

FinishCallback& callback_;
folly::EventBase* evb_{nullptr};

static thread_local RequestWorkerThread* currentRequestWorker_;

std::atomic_bool forceStopped_{false};
};

} // namespace proxygen
61 changes: 61 additions & 0 deletions proxygen/lib/services/RequestWorkerThreadNoExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <proxygen/lib/services/RequestWorkerThreadNoExecutor.h>

#include <folly/io/async/EventBaseManager.h>
#include <proxygen/lib/services/ServiceWorker.h>

namespace proxygen {

static const uint32_t requestIdBits = 56;
static const uint64_t requestIdMask = ((1ULL << requestIdBits) - 1);

RequestWorkerThreadNoExecutor::RequestWorkerThreadNoExecutor(
FinishCallback& callback, uint8_t threadId, const std::string& evbName)
: WorkerThread(folly::EventBaseManager::get(), evbName),
nextRequestId_(static_cast<uint64_t>(threadId) << requestIdBits),
callback_(callback) {
}

RequestWorkerThreadNoExecutor::~RequestWorkerThreadNoExecutor() {
// It is important to reset the underlying event base in advance of this
// class' destruction as it may be that there are functions awaiting
// execution that possess a reference to this class.
resetEventBase();
}

uint8_t RequestWorkerThreadNoExecutor::getWorkerId() const {
return static_cast<uint8_t>(nextRequestId_ >> requestIdBits);
}

uint64_t RequestWorkerThreadNoExecutor::nextRequestId() {
uint64_t requestId = getRequestWorkerThreadNoExecutor()->nextRequestId_;
getRequestWorkerThreadNoExecutor()->nextRequestId_ =
(requestId & ~requestIdMask) | ((requestId + 1) & requestIdMask);
return requestId;
}

void RequestWorkerThreadNoExecutor::flushStats() {
CHECK(getEventBase()->isInEventBaseThread());
for (auto& p : serviceWorkers_) {
p.second->flushStats();
}
}

void RequestWorkerThreadNoExecutor::setup() {
WorkerThread::setup();
callback_.workerStarted(this);
}

void RequestWorkerThreadNoExecutor::cleanup() {
WorkerThread::cleanup();
callback_.workerFinished(this);
}

} // namespace proxygen
126 changes: 126 additions & 0 deletions proxygen/lib/services/RequestWorkerThreadNoExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <cstdint>
#include <map>
#include <proxygen/lib/services/WorkerThread.h>
#include <wangle/acceptor/LoadShedConfiguration.h>

namespace proxygen {

class Service;
class ServiceWorker;

/**
* RequestWorkerThreadNoExecutor extends WorkerThread, and also contains a list
* of ServiceWorkers running in this thread.
*/
class RequestWorkerThreadNoExecutor : public WorkerThread {
public:
class FinishCallback {
public:
virtual ~FinishCallback() noexcept = default;
virtual void workerStarted(RequestWorkerThreadNoExecutor*) = 0;
virtual void workerFinished(RequestWorkerThreadNoExecutor*) = 0;
};

/**
* Create a new RequestWorkerThreadNoExecutor.
*
* @param proxygen The object to notify when this worker finishes.
* @param threadId A unique ID for this worker.
* @param evbName The event base will ne named to this name (thread name)
*/
RequestWorkerThreadNoExecutor(FinishCallback& callback,
uint8_t threadId,
const std::string& evbName = std::string());

/**
* Reset the underlying event base prior to WorkerThread destruction.
*/
~RequestWorkerThreadNoExecutor() override;

/**
* Return a unique 64bit identifier.
*/
static uint64_t nextRequestId();

/**
* Return unique 8bit worker ID.
*/
[[nodiscard]] uint8_t getWorkerId() const;

static RequestWorkerThreadNoExecutor* getRequestWorkerThreadNoExecutor() {
auto* self = dynamic_cast<RequestWorkerThreadNoExecutor*>(
WorkerThread::getCurrentWorkerThread());
CHECK_NOTNULL(self);
return self;
}

/**
* Track the ServiceWorker objects in-use by this worker.
*/
void addServiceWorker(Service* service, ServiceWorker* sw) {
CHECK(serviceWorkers_.find(service) == serviceWorkers_.end());
serviceWorkers_[service] = sw;
}

/**
* For a given service, returns the ServiceWorker associated with this
* RequestWorkerThreadNoExecutor
*/
ServiceWorker* getServiceWorker(Service* service) const {
auto it = serviceWorkers_.find(service);
CHECK(it != serviceWorkers_.end());
return it->second;
}

/**
* Get/set the worker thread's bound load shed configuration instance.
* Used by derivative classes. Updates are propagated seamlessly via
* the use of swapping such that threads will automatically see updated
* fields on update.
*/
[[nodiscard]] std::shared_ptr<const wangle::LoadShedConfiguration>
getLoadShedConfig() const {
return loadShedConfig_;
}
void setLoadShedConfig(
std::shared_ptr<const wangle::LoadShedConfiguration> loadShedConfig) {
loadShedConfig_.swap(loadShedConfig);
}

/**
* Flush any thread-local stats being tracked by our ServiceWorkers.
*
* This must be invoked from within worker's thread.
*/
void flushStats();

private:
void setup() override;
void cleanup() override;

// The next request id within this thread. The id has its highest byte set to
// the thread id, so is unique across the process.
uint64_t nextRequestId_;

// The ServiceWorkers executing in this worker
folly::F14ValueMap<Service*, ServiceWorker*> serviceWorkers_;

// Every worker instance has their own version of load shed config.
// This enables every request worker thread, and derivative there of,
// to both access and update this field in a thread-safe way.
std::shared_ptr<const wangle::LoadShedConfiguration> loadShedConfig_{nullptr};

FinishCallback& callback_;
};

} // namespace proxygen
7 changes: 7 additions & 0 deletions proxygen/lib/services/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <proxygen/lib/services/Service.h>

#include <proxygen/lib/services/RequestWorkerThread.h>
#include <proxygen/lib/services/RequestWorkerThreadNoExecutor.h>
#include <proxygen/lib/services/ServiceWorker.h>

namespace proxygen {
Expand All @@ -25,6 +26,12 @@ void Service::addServiceWorker(std::unique_ptr<ServiceWorker> worker,
workers_.emplace_back(std::move(worker));
}

void Service::addServiceWorker(std::unique_ptr<ServiceWorker> worker,
RequestWorkerThreadNoExecutor* reqWorker) {
reqWorker->addServiceWorker(this, worker.get());
workers_.emplace_back(std::move(worker));
}

void Service::clearServiceWorkers() {
workers_.clear();
}
Expand Down
3 changes: 3 additions & 0 deletions proxygen/lib/services/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ namespace proxygen {

class ServiceWorker;
class RequestWorkerThread;
class RequestWorkerThreadNoExecutor;

/*
* A Service object represents a network service running in proxygen.
Expand Down Expand Up @@ -159,6 +160,8 @@ class Service {
*/
void addServiceWorker(std::unique_ptr<ServiceWorker> worker,
RequestWorkerThread* reqWorker);
void addServiceWorker(std::unique_ptr<ServiceWorker> worker,
RequestWorkerThreadNoExecutor* reqWorker);

/**
* List of workers
Expand Down

0 comments on commit d608c1a

Please sign in to comment.