Skip to content

Commit

Permalink
Merge branch 'neo' into 'master'
Browse files Browse the repository at this point in the history
* commit '50d0d65a8':
  MB-50988: Introduce LimitedConcurrencyTask
  MB-50988: Allow excess ThreadGate::threadUp() calls

Change-Id: Ibb97ac20f2e6312210b00862cc4d1cbac376fa53
  • Loading branch information
daverigby committed Mar 10, 2022
2 parents b1c30d7 + 50d0d65 commit 2db9135
Show file tree
Hide file tree
Showing 8 changed files with 477 additions and 3 deletions.
1 change: 1 addition & 0 deletions engines/ep/CMakeLists.txt
Expand Up @@ -369,6 +369,7 @@ ADD_LIBRARY(ep_objs OBJECT
src/kvstore/kvstore_config.cc
src/kv_bucket.cc
src/kvshard.cc
src/limited_concurrency_task.cc
src/mutation_log.cc
src/mutation_log_entry.cc
src/paging_visitor.cc
Expand Down
36 changes: 36 additions & 0 deletions engines/ep/src/limited_concurrency_task.cc
@@ -0,0 +1,36 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2022-Present Couchbase, Inc.
*
* Use of this software is governed by the Business Source License included
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
* in that file, in accordance with the Business Source License, use of this
* software will be governed by the Apache License, Version 2.0, included in
* the file licenses/APL2.txt.
*/

#include "limited_concurrency_task.h"

LimitedConcurrencyTask::LimitedConcurrencyTask(
EventuallyPersistentEngine* e,
TaskId id,
cb::AwaitableSemaphore& semaphore,
bool completeBeforeShutdown)
: NotifiableTask(e, id, 0 /* initial sleeptime*/, completeBeforeShutdown),
semaphore(semaphore) {
}

void LimitedConcurrencyTask::signal() {
wakeup();
}

cb::SemaphoreGuard<cb::Semaphore*> LimitedConcurrencyTask::acquireOrWait() {
if (semaphore.acquire_or_wait(shared_from_this())) {
// token was acquired! Return an RAII guard holding it, and let
// the task continue.
return {&semaphore, cb::adopt_token};
}
// could not acquire a token. We have been queued as a waiter, and will
// be NotifiableTask::wakeup()-ed when a token becomes available.
return {};
}
95 changes: 95 additions & 0 deletions engines/ep/src/limited_concurrency_task.h
@@ -0,0 +1,95 @@
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
/*
* Copyright 2022-Present Couchbase, Inc.
*
* Use of this software is governed by the Business Source License included
* in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
* in that file, in accordance with the Business Source License, use of this
* software will be governed by the Apache License, Version 2.0, included in
* the file licenses/APL2.txt.
*/

#pragma once

#include <executor/notifiable_task.h>
#include <platform/awaitable_semaphore.h>
#include <platform/semaphore_guard.h>

/**
* Base type for tasks which need to be able to limit how many instances run
* concurrently, like CompactTask.
*
* A cb::AwaitableSemaphore limits how many instances may run. Tasks must
* acquire a token before running. If none are available, the task will snooze
* forever. When tokens become available, the task will be notified to run
* again.
*
* This is not currently transparent to the task - it is required that the
* task call:
*
* bool runInner() override {
* auto guard = acquireOrWait();
* if (!guard) {
* // could not acquire a token, queued for notification.
* // already snooze()-ed forever, just return true to
* // reschedule.
* return true;
* }
* // Do concurrency-limited work
* }
*
* However, a future refactor could avoid this by, for example,
* restructuring as a mixin or re-implementing at the thread pool level.
*/
class LimitedConcurrencyTask : public cb::Waiter, public NotifiableTask {
public:
/**
* Construct a task which will be concurrency limited by the provided
* semaphore.
*
* @param e engine pointer
* @param id task id
* @param semaphore semphore from which a token must be acquired before the
* task can run
* @param completeBeforeShutdown should the task be required to complete
* before shutdown
*/
LimitedConcurrencyTask(EventuallyPersistentEngine* e,
TaskId id,
cb::AwaitableSemaphore& semaphore,
bool completeBeforeShutdown);

/**
* Subtypes should provide an implementation for the task.
*
* Note: overriden in this class for informative/documentation
* purposes, this is part of the NotifiableTask interface.
*/
bool runInner() override = 0;

/**
* Called by cb::AwaitableSemaphore when tokens become available.
*
* Notifies the task to run.
*
* Implements the cb::Waiter interface.
*/
void signal() override;

protected:
/**
* Attempt to acquire a token from the semaphore provided at construction.
*
* If a token is available, return a valid guard. The task should then
* proceed.
*
* If no tokens are available, snooze() the task forever and queue to be
* notified by the semaphore when tokens become available.
*
* @return a guard, valid if a token was acquired and execution can continue
*/
cb::SemaphoreGuard<cb::Semaphore*> acquireOrWait();

// semaphore used to restrict how many tasks can run at once
cb::AwaitableSemaphore& semaphore;
};
1 change: 1 addition & 0 deletions engines/ep/tests/CMakeLists.txt
Expand Up @@ -112,6 +112,7 @@ cb_add_test_executable(ep-engine_ep_unit_tests
module_tests/stream_container_test.cc
module_tests/systemevent_test.cc
module_tests/tagged_ptr_test.cc
module_tests/task_concurrency_test.cc
module_tests/test_helpers.cc
module_tests/vbucket_test.cc
module_tests/vbucket_durability_test.cc
Expand Down

0 comments on commit 2db9135

Please sign in to comment.