Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
WorkerPool: complete rewrite
Browse files Browse the repository at this point in the history
The old implementation was using 2 thread pools: the main one, used to
schedule and run tasks in secondary threads; and a helper one, used to
schedule a health check routine.

This design wasn't good because EACH.SINGLE.CALL to WorkerPool::post()
would imply TWO new tasks being scheduled:
1. The task itself
2. A timed task to measure execution time

In case the execution time was longer than 3 seconds, the task was
considered "locked", and somehow assumed that it would never recover, so
a new thread would be spawned.

Now picture this: the server is under heavy load and tasks start to get
too much time to finish. The existing implementation would start to
consider all tasks locked, and would spawn threads for all of them,
which soon enough would again take too much time, thus causing an
explosion of thread spawns.

The fact that two tasks are scheduled per post() only exacerbated the
issue.

The new code is much simpler. We realize that, for now, in case of CPU
exhaustion there is not much we can do: tasks will take longer, and
growing the numbr of threads won't necessarily help.

The code is based on the simple but effective ideas shown in several
sources:

* Asio C++ library recipes: A thread pool for executing arbitrary tasks
- http://think-async.com/Asio/Recipes
- https://stackoverflow.com/questions/19500404/how-to-create-a-thread-
pool-using-boost-in-c/19500405#19500405

* Boost Asio timer tutorial
-
https://www.boost.org/doc/libs/1_72_0/doc/html/boost_asio/tutorial.html

versions of Boost >= 1.66.0 have a 'thread_pool' class:
https://www.boost.org/doc/libs/1_66_0/doc/html/boost_asio/reference/thread_pool.html
which maybe would have been interesting for this, but right now we have
to use Boost 1.58.0 (Ubuntu 16.04 Xenial) and Boost 1.65.1 (Ubuntu 18.04
Bionic)
  • Loading branch information
j1elo committed May 22, 2020
1 parent 958193c commit 62020ff
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 170 deletions.
35 changes: 26 additions & 9 deletions CMakeLists.txt
Expand Up @@ -40,23 +40,40 @@ endif()

# Development
#set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-error=unused-function")
#set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=unused-function")

# Decide between std::regex or boost::regex
include(CheckCXXSourceCompiles)
set(CMAKE_REQUIRED_FLAGS ${CMAKE_CXX_FLAGS})
check_cxx_source_compiles(
"
set(CMAKE_REQUIRED_FLAGS "${CMAKE_CXX_FLAGS}")
check_cxx_source_compiles("
#include <iostream>
#include <regex>
int main() {
std::regex re(\"//*\");
std::string orig = \"//\";
std::cout << std::regex_replace(orig, re, \"/\") << std::endl;
return 0;
}
"
}"
HAS_STD_REGEX_REPLACE
)
unset(CMAKE_REQUIRED_FLAGS)

# Detect if pthread_setname is available
include(CheckCSourceCompiles)
set(CMAKE_REQUIRED_LIBRARIES pthread)
check_c_source_compiles("
#include <pthread.h>
int main() {
pthread_setname_np((pthread_t)0, (const char *)0);
return 0;
}"
HAVE_PTHREAD_SETNAME_NP_WITH_TID
)
unset(CMAKE_REQUIRED_LIBRARIES)
if(${HAVE_PTHREAD_SETNAME_NP_WITH_TID})
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -DHAVE_PTHREAD_SETNAME_NP_WITH_TID")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DHAVE_PTHREAD_SETNAME_NP_WITH_TID")
endif()

# Generate file "config.h"
set(VERSION ${PROJECT_VERSION})
Expand All @@ -76,11 +93,11 @@ set(GLIBMM_REQUIRED ^2.37)
include(GenericFind)

if(${HAS_STD_REGEX_REPLACE})
set(BOOST_REQUIRED_COMPONENTS "filesystem system")
generic_find(LIBNAME Boost REQUIRED COMPONENTS filesystem system unit_test_framework)
set(BOOST_REQUIRED_COMPONENTS "filesystem system thread")
generic_find(LIBNAME Boost REQUIRED COMPONENTS filesystem system thread unit_test_framework)
else()
set(BOOST_REQUIRED_COMPONENTS "filesystem system regex")
generic_find(LIBNAME Boost REQUIRED COMPONENTS filesystem system regex unit_test_framework)
set(BOOST_REQUIRED_COMPONENTS "filesystem regex system thread")
generic_find(LIBNAME Boost REQUIRED COMPONENTS filesystem regex system thread unit_test_framework)
endif()

generic_find(LIBNAME gstreamer-1.5 VERSION ${GST_REQUIRED} REQUIRED)
Expand Down
2 changes: 2 additions & 0 deletions debian/control
Expand Up @@ -16,6 +16,7 @@ Build-Depends: debhelper (>= 9),
libboost-regex-dev,
libboost-system-dev,
libboost-test-dev,
libboost-thread-dev,
libglibmm-2.4-dev,
libgstreamer-plugins-base1.5-dev,
libgstreamer1.5-dev,
Expand Down Expand Up @@ -49,6 +50,7 @@ Depends: kms-core (= ${binary:Version}),
libboost-regex-dev,
libboost-system-dev,
libboost-test-dev,
libboost-thread-dev,
libglibmm-2.4-dev,
libgstreamer1.5-dev,
libsigc++-2.0-dev,
Expand Down
5 changes: 3 additions & 2 deletions src/server/implementation/EventHandler.cpp
Expand Up @@ -16,15 +16,16 @@
*/

#include "EventHandler.hpp"
#include <WorkerPool.hpp>
#include "WorkerPool.hpp"

namespace kurento
{

static void
post_task (std::function <void () > cb)
{
static WorkerPool workers (1);
// Use a single thread pool for all EventHandlers
static kurento::WorkerPool workers {};

workers.post (cb);
}
Expand Down
13 changes: 3 additions & 10 deletions src/server/implementation/MediaSet.cpp
Expand Up @@ -33,8 +33,6 @@
GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
#define GST_DEFAULT_NAME "KurentoMediaSet"

const int MEDIASET_THREADS_DEFAULT = 10;

namespace kurento
{

Expand Down Expand Up @@ -134,16 +132,13 @@ void MediaSet::doGarbageCollection ()
}
}

MediaSet::MediaSet()
MediaSet::MediaSet () : workers {}
{
terminated = false;

workers = std::make_shared<WorkerPool>(MEDIASET_THREADS_DEFAULT);

thread = std::thread ( [&] () {
std::unique_lock <std::recursive_mutex> lock (recMutex);


while (!terminated && waitCond.wait_for (lock,
collectorInterval) == std::cv_status::timeout) {

Expand Down Expand Up @@ -176,8 +171,6 @@ MediaSet::~MediaSet ()

lock.unlock();

workers.reset();

if (std::this_thread::get_id() != thread.get_id() ) {
try {
thread.join();
Expand All @@ -200,8 +193,8 @@ MediaSet::post (std::function<void (void) > f)
{
std::unique_lock <std::recursive_mutex> lock (recMutex);

if (!terminated && workers) {
workers->post (f);
if (!terminated) {
workers.post (f);
} else {
lock.unlock();
f();
Expand Down
2 changes: 1 addition & 1 deletion src/server/implementation/MediaSet.hpp
Expand Up @@ -155,7 +155,7 @@ class MediaSet
>
> eventHandler;

std::shared_ptr<WorkerPool> workers;
WorkerPool workers;

static std::chrono::seconds collectorInterval;

Expand Down

0 comments on commit 62020ff

Please sign in to comment.