Skip to content

Commit

Permalink
Experiment with new scheduler.
Browse files Browse the repository at this point in the history
  • Loading branch information
maierlars committed Apr 3, 2024
1 parent e84f3af commit 35d0162
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 46 deletions.
10 changes: 5 additions & 5 deletions arangod/Scheduler/Scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ class Scheduler {
// Returns the scheduler's server object
ArangodServer& server() noexcept { return _server; }

struct WorkItemBase {
virtual ~WorkItemBase() = default;
virtual void invoke() = 0;
};

class DelayedWorkItem {
public:
~DelayedWorkItem() {
Expand Down Expand Up @@ -172,11 +177,6 @@ class Scheduler {
protected:
ArangodServer& _server;

struct WorkItemBase {
virtual ~WorkItemBase() = default;
virtual void invoke() = 0;
};

template<typename F>
struct WorkItem final : WorkItemBase, F {
explicit WorkItem(F f)
Expand Down
25 changes: 15 additions & 10 deletions arangod/Scheduler/SchedulerFeature.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
#include "Scheduler/SupervisedScheduler.h"
#ifdef USE_V8
#include "VocBase/Methods/Tasks.h"
#include "ThreadPoolScheduler.h"
#endif

using namespace arangodb::application_features;
Expand Down Expand Up @@ -321,16 +322,20 @@ void SchedulerFeature::prepare() {
// on a DB server we intentionally disable throttling of incoming requests.
// this is because coordinators are the gatekeepers, and they should
// perform all the throttling.
uint64_t ongoingLowPriorityLimit =
ServerState::instance()->isDBServer()
? 0
: static_cast<uint64_t>(_ongoingLowPriorityMultiplier *
_nrMaximalThreads);

auto sched = std::make_unique<SupervisedScheduler>(
server(), _nrMinimalThreads, _nrMaximalThreads, _queueSize, _fifo1Size,
_fifo2Size, _fifo3Size, ongoingLowPriorityLimit,
_unavailabilityQueueFillGrade, _metricsFeature);
// uint64_t ongoingLowPriorityLimit =
// ServerState::instance()->isDBServer()
// ? 0
// : static_cast<uint64_t>(_ongoingLowPriorityMultiplier *
// _nrMaximalThreads);

// auto sched = std::make_unique<SupervisedScheduler>(
// server(), _nrMinimalThreads, _nrMaximalThreads, _queueSize, _fifo1Size,
// _fifo2Size, _fifo3Size, ongoingLowPriorityLimit,
// _unavailabilityQueueFillGrade, _nrMaximalDetachedThreads,
// _metricsFeature);

auto sched =
std::make_unique<ThreadPoolScheduler>(server(), _nrMaximalThreads);

SCHEDULER = sched.get();

Expand Down
22 changes: 4 additions & 18 deletions arangod/Scheduler/SimpleThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ ThreadPool::~ThreadPool() {
_cv.notify_all();
}

void ThreadPool::push(std::unique_ptr<Task>&& task) noexcept {
void ThreadPool::push(std::unique_ptr<WorkItem>&& task) noexcept {
std::unique_lock guard(_mutex);
_tasks.emplace_back(std::move(task));
_cv.notify_one();
}

auto ThreadPool::pop(std::stop_token stoken) noexcept -> std::unique_ptr<Task> {
auto ThreadPool::pop(std::stop_token stoken) noexcept
-> std::unique_ptr<WorkItem> {
std::unique_lock guard(_mutex);
bool more = _cv.wait(guard, stoken, [&] { return !_tasks.empty(); });
if (more) {
Expand All @@ -62,7 +63,7 @@ std::jthread ThreadPool::makeThread() noexcept {
myThreadPool = this;
auto stoken = _stop.get_token();
while (auto item = pop(stoken)) {
item->execute();
item->invoke();

// exit point for detached threads
if (myThreadPool == nullptr) {
Expand All @@ -71,18 +72,3 @@ std::jthread ThreadPool::makeThread() noexcept {
}
});
}

void ThreadPool::detachSelf() noexcept {
std::unique_lock guard(_mutex);
auto self =
std::find_if(_threads.begin(), _threads.end(), [](auto const& other) {
return std::this_thread::get_id() == other.get_id();
});

if (self != _threads.end()) {
auto replacement = makeThread();
std::swap(replacement, *self);
myThreadPool = nullptr; // detach this thread from the thread pool
replacement.detach();
}
}
21 changes: 9 additions & 12 deletions arangod/Scheduler/SimpleThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,25 @@
#include <vector>
#include <deque>

#include "Scheduler.h"

namespace arangodb {

struct ThreadPool {
using WorkItem = Scheduler::WorkItemBase;

explicit ThreadPool(std::size_t threadCount);
~ThreadPool();

void detachSelf() noexcept;

struct Task {
virtual ~Task() = default;
virtual void execute() noexcept = 0;
};
void push(std::unique_ptr<Task>&& task) noexcept;
void push(std::unique_ptr<WorkItem>&& task) noexcept;

template<std::invocable F>
void push(F&& fn) noexcept {
struct LambdaTask : Task {
struct LambdaTask : WorkItem {
F _func;
explicit LambdaTask(F&& func) : _func(std::forward<F>(func)) {}

static_assert(std::is_nothrow_invocable_r_v<void, F>);
void execute() noexcept override { std::forward<F>(_func)(); }
void invoke() noexcept override { std::forward<F>(_func)(); }
};

push(std::make_unique<LambdaTask>(std::forward<F>(fn)));
Expand All @@ -55,12 +52,12 @@ struct ThreadPool {
private:
static thread_local ThreadPool* myThreadPool;

std::unique_ptr<Task> pop(std::stop_token) noexcept;
std::unique_ptr<WorkItem> pop(std::stop_token) noexcept;
std::jthread makeThread() noexcept;

std::mutex _mutex;
std::condition_variable_any _cv;
std::deque<std::unique_ptr<Task>> _tasks;
std::deque<std::unique_ptr<WorkItem>> _tasks;
std::stop_source _stop;
std::vector<std::jthread> _threads;
};
Expand Down
77 changes: 77 additions & 0 deletions arangod/Scheduler/ThreadPoolScheduler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2024 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Business Source License 1.1 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// https://github.com/arangodb/arangodb/blob/devel/LICENSE
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////

#include "ThreadPoolScheduler.h"

using namespace arangodb;

void ThreadPoolScheduler::toVelocyPack(velocypack::Builder& builder) const {}
Scheduler::QueueStatistics ThreadPoolScheduler::queueStatistics() const {
return QueueStatistics();
}
void ThreadPoolScheduler::trackCreateHandlerTask() noexcept {}
void ThreadPoolScheduler::trackBeginOngoingLowPriorityTask() noexcept {}
void ThreadPoolScheduler::trackEndOngoingLowPriorityTask() noexcept {}
void ThreadPoolScheduler::trackQueueTimeViolation() {}
void ThreadPoolScheduler::trackQueueItemSize(std::int64_t int64) noexcept {}

uint64_t ThreadPoolScheduler::getLastLowPriorityDequeueTime() const noexcept {
return 0;
}

void ThreadPoolScheduler::setLastLowPriorityDequeueTime(
uint64_t time) noexcept {}

std::pair<uint64_t, uint64_t>
ThreadPoolScheduler::getNumberLowPrioOngoingAndQueued() const {
return std::pair<uint64_t, uint64_t>();
}

double ThreadPoolScheduler::approximateQueueFillGrade() const { return 0; }

double ThreadPoolScheduler::unavailabilityQueueFillGrade() const { return 0; }

bool ThreadPoolScheduler::queueItem(RequestLane lane,
std::unique_ptr<WorkItemBase> item,
bool bounded) {
auto prio = PriorityRequestLane(lane);
_threadPools[int(prio)]->push(std::move(item));
return true;
}

ThreadPoolScheduler::ThreadPoolScheduler(ArangodServer& server,
uint64_t maxThreads)
: Scheduler(server) {
_threadPools.reserve(4);
_threadPools.emplace_back(
std::make_unique<ThreadPool>(std::min(std::ceil(maxThreads * 0.1), 2.)));
_threadPools.emplace_back(
std::make_unique<ThreadPool>(std::min(std::ceil(maxThreads * 0.6), 8.)));
_threadPools.emplace_back(
std::make_unique<ThreadPool>(std::min(std::ceil(maxThreads * 0.4), 4.)));
_threadPools.emplace_back(
std::make_unique<ThreadPool>(std::min(std::ceil(maxThreads * 0.4), 4.)));
}

void ThreadPoolScheduler::shutdown() {
_stopping = true;
Scheduler::shutdown();
}
55 changes: 55 additions & 0 deletions arangod/Scheduler/ThreadPoolScheduler.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
////////////////////////////////////////////////////////////////////////////////
/// DISCLAIMER
///
/// Copyright 2014-2024 ArangoDB GmbH, Cologne, Germany
/// Copyright 2004-2014 triAGENS GmbH, Cologne, Germany
///
/// Licensed under the Business Source License 1.1 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// https://github.com/arangodb/arangodb/blob/devel/LICENSE
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
/// Copyright holder is ArangoDB GmbH, Cologne, Germany
////////////////////////////////////////////////////////////////////////////////

#include "Scheduler.h"
#include "SimpleThreadPool.h"

namespace arangodb {

struct ThreadPoolScheduler final : Scheduler {
explicit ThreadPoolScheduler(ArangodServer& server, uint64_t maxThreads);
void toVelocyPack(velocypack::Builder& builder) const override;
QueueStatistics queueStatistics() const override;
void trackCreateHandlerTask() noexcept override;
void trackBeginOngoingLowPriorityTask() noexcept override;
void trackEndOngoingLowPriorityTask() noexcept override;
void trackQueueTimeViolation() override;
void trackQueueItemSize(std::int64_t int64) noexcept override;
uint64_t getLastLowPriorityDequeueTime() const noexcept override;
void setLastLowPriorityDequeueTime(uint64_t time) noexcept override;
std::pair<uint64_t, uint64_t> getNumberLowPrioOngoingAndQueued()
const override;
double approximateQueueFillGrade() const override;
double unavailabilityQueueFillGrade() const override;

void shutdown() override;

protected:
bool queueItem(RequestLane lane, std::unique_ptr<WorkItemBase> item,
bool bounded) override;
bool isStopping() override { return _stopping; }

private:
std::atomic<bool> _stopping;
std::vector<std::unique_ptr<ThreadPool>> _threadPools;
};

} // namespace arangodb
4 changes: 3 additions & 1 deletion arangod/arangoserver.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ add_library(arangoserver STATIC
Transaction/ReplicatedContext.cpp
Transaction/SmartContext.cpp
Transaction/StandaloneContext.cpp
Transaction/Status.cpp)
Transaction/Status.cpp
Scheduler/ThreadPoolScheduler.cpp
Scheduler/ThreadPoolScheduler.h)
if (MSVC)
target_sources(arangoserver PRIVATE
RestServer/WindowsServiceFeature.cpp)
Expand Down

0 comments on commit 35d0162

Please sign in to comment.