Skip to content

Commit

Permalink
Merge pull request #19985 from Dr15Jones/limitedQueue
Browse files Browse the repository at this point in the history
Created LimitedTaskQueue
  • Loading branch information
cmsbuild committed Aug 1, 2017
2 parents 46f824c + c4daddd commit 31281ca
Show file tree
Hide file tree
Showing 3 changed files with 360 additions and 9 deletions.
112 changes: 112 additions & 0 deletions FWCore/Concurrency/interface/LimitedTaskQueue.h
@@ -0,0 +1,112 @@
#ifndef FWCore_Concurrency_LimitedTaskQueue_h
#define FWCore_Concurrency_LimitedTaskQueue_h
// -*- C++ -*-
//
// Package: Concurrency
// Class : LimitedTaskQueue
//
/**\class LimitedTaskQueue LimitedTaskQueue.h "FWCore/Concurrency/interface/LimitedTaskQueue.h"
Description: Runs a set number of tasks from the queue at a time
Usage:
A LimitedTaskQueue is used to provide access to a limited thread-safe resource. You create a LimitedTaskQueue
for the resource. When every you need to perform an operation on the resource, you push a 'task' that
does that operation onto the queue. The queue then makes sure to run a limited number of tasks at a time.
The 'tasks' managed by the LimitedTaskQueue are just functor objects who which take no arguments and
return no values. The simplest way to create a task is to use a C++11 lambda.
*/
//
// Original Author: Chris Jones
// Created: Thu Feb 21 11:14:39 CST 2013
// $Id$
//

// system include files
#include <atomic>
#include <vector>
#include <memory>

#include "FWCore/Concurrency/interface/SerialTaskQueue.h"

// user include files

// forward declarations
namespace edm {
class LimitedTaskQueue
{
public:
LimitedTaskQueue(unsigned int iLimit):
m_queues{iLimit}
{ }


// ---------- member functions ---------------------------

/// asynchronously pushes functor iAction into queue
/**
* The function will return immediately and iAction will either
* process concurrently with the calling thread or wait until the
* protected resource becomes available or until a CPU becomes available.
* \param[in] iAction Must be a functor that takes no arguments and return no values.
*/
template<typename T>
void push(const T& iAction);

/// synchronously pushes functor iAction into queue
/**
* The function will wait until iAction has completed before returning.
* If another task is already running on the queue, the system is allowed
* to find another TBB task to execute while waiting for the iAction to finish.
* In that way the core is not idled while waiting.
* \param[in] iAction Must be a functor that takes no arguments and return no values.
*/
template<typename T>
void pushAndWait(const T& iAction);

private:
LimitedTaskQueue(const LimitedTaskQueue&) = delete;
const LimitedTaskQueue& operator=(const LimitedTaskQueue&) = delete;

// ---------- member data --------------------------------
std::vector<SerialTaskQueue> m_queues;
};

template<typename T>
void LimitedTaskQueue::push(const T& iAction) {
auto set_to_run = std::make_shared<std::atomic<bool>>(false);
for(auto& q: m_queues) {
q.push([set_to_run,iAction]() {
bool expected = false;
if(set_to_run->compare_exchange_strong(expected,true)) {
iAction();
}
});
}
}

template<typename T>
void LimitedTaskQueue::pushAndWait(const T& iAction) {
tbb::empty_task* waitTask = new (tbb::task::allocate_root()) tbb::empty_task;
waitTask->set_ref_count(2);
auto set_to_run = std::make_shared<std::atomic<bool>>(false);
for(auto& q: m_queues) {
q.push([set_to_run,waitTask,iAction]() {
bool expected = false;
if(set_to_run->compare_exchange_strong(expected,true)) {
try {
iAction();
}catch(...) {}
waitTask->decrement_ref_count();
}
});
}
waitTask->wait_for_all();
tbb::task::destroy(*waitTask);
}

}

#endif
244 changes: 244 additions & 0 deletions FWCore/Concurrency/test/limitedtaskqueue_t.cppunit.cpp
@@ -0,0 +1,244 @@
//
// LimitedTaskQueue_test.cpp
// DispatchProcessingDemo
//
// Created by Chris Jones on 9/27/11.
//

#include <iostream>

#include <cppunit/extensions/HelperMacros.h>
#include <unistd.h>
#include <memory>
#include <atomic>
#include "tbb/task.h"
#include "FWCore/Concurrency/interface/LimitedTaskQueue.h"
#include "FWCore/Concurrency/interface/FunctorTask.h"

class LimitedTaskQueue_test : public CppUnit::TestFixture {
CPPUNIT_TEST_SUITE(LimitedTaskQueue_test);
CPPUNIT_TEST(testPush);
CPPUNIT_TEST(testPushAndWait);
CPPUNIT_TEST(stressTest);
CPPUNIT_TEST_SUITE_END();

public:
void testPush();
void testPushAndWait();
void stressTest();
void setUp(){}
void tearDown(){}
};

CPPUNIT_TEST_SUITE_REGISTRATION( LimitedTaskQueue_test );

void LimitedTaskQueue_test::testPush()
{
{
std::atomic<unsigned int> count{0};

edm::LimitedTaskQueue queue{1};
{
std::shared_ptr<tbb::task> waitTask{new (tbb::task::allocate_root()) tbb::empty_task{},
[](tbb::task* iTask){tbb::task::destroy(*iTask);} };
waitTask->set_ref_count(1+3);
tbb::task* pWaitTask = waitTask.get();

queue.push([&count,pWaitTask]{
CPPUNIT_ASSERT(count++ == 0);
usleep(10);
pWaitTask->decrement_ref_count();
});

queue.push([&count,pWaitTask]{
CPPUNIT_ASSERT(count++ == 1);
usleep(10);
pWaitTask->decrement_ref_count();
});

queue.push([&count,pWaitTask]{
CPPUNIT_ASSERT(count++ == 2);
usleep(10);
pWaitTask->decrement_ref_count();
});

waitTask->wait_for_all();
CPPUNIT_ASSERT(count==3);
}
}

{
std::atomic<unsigned int> count{0};

constexpr unsigned int kMax = 2;
edm::LimitedTaskQueue queue{kMax};
{
std::shared_ptr<tbb::task> waitTask{new (tbb::task::allocate_root()) tbb::empty_task{},
[](tbb::task* iTask){tbb::task::destroy(*iTask);} };
waitTask->set_ref_count(1+3);
tbb::task* pWaitTask = waitTask.get();

queue.push([&count,pWaitTask]{
CPPUNIT_ASSERT(count++ < kMax);
usleep(10);
--count;
pWaitTask->decrement_ref_count();
});

queue.push([&count,pWaitTask]{
CPPUNIT_ASSERT(count++ < kMax);
usleep(10);
--count;
pWaitTask->decrement_ref_count();
});

queue.push([&count,pWaitTask]{
CPPUNIT_ASSERT(count++ < kMax);
usleep(10);
--count;
pWaitTask->decrement_ref_count();
});

waitTask->wait_for_all();
CPPUNIT_ASSERT(count==0);
}
}
}

void LimitedTaskQueue_test::testPushAndWait()
{
{
std::atomic<unsigned int> count{0};

edm::LimitedTaskQueue queue{1};
{
queue.push([&count]{
CPPUNIT_ASSERT(count++ == 0);
usleep(10);
});

queue.push([&count]{
CPPUNIT_ASSERT(count++ == 1);
usleep(10);
});

queue.pushAndWait([&count]{
CPPUNIT_ASSERT(count++ == 2);
usleep(10);
});

CPPUNIT_ASSERT(count==3);
}
}

{
std::atomic<unsigned int> count{0};
std::atomic<unsigned int> countTasksRun{0};
constexpr unsigned int kMax = 2;

edm::LimitedTaskQueue queue{kMax};
{

queue.pushAndWait([&count,&countTasksRun]{
CPPUNIT_ASSERT(count++ < kMax);
usleep(10);
--count;
CPPUNIT_ASSERT( 1==++countTasksRun);
});

queue.pushAndWait([&count,&countTasksRun]{
CPPUNIT_ASSERT(count++ < kMax);
usleep(10);
--count;
CPPUNIT_ASSERT( 2==++countTasksRun);
});

queue.pushAndWait([&count,&countTasksRun]{
CPPUNIT_ASSERT(count++ < kMax);
usleep(10);
--count;
CPPUNIT_ASSERT( 3==++countTasksRun);
});

auto c = count.load();
if( c != 0) {
std::cout <<"ERROR count "<< c<<" != 0"<<std::endl;
}
CPPUNIT_ASSERT(count==0);

auto v = countTasksRun.load();
if(v != 3) {
std::cout <<"ERROR # tasks Run "<< v<<" != 3"<<std::endl;
}
CPPUNIT_ASSERT(v==3);
}
}

}

void LimitedTaskQueue_test::stressTest()
{
constexpr unsigned int kMax = 3;
edm::LimitedTaskQueue queue{kMax};

unsigned int index = 100;
const unsigned int nTasks = 1000;
while(0 != --index) {
std::shared_ptr<tbb::task> waitTask{new (tbb::task::allocate_root()) tbb::empty_task{},
[](tbb::task* iTask){tbb::task::destroy(*iTask);} };
waitTask->set_ref_count(3);
tbb::task* pWaitTask=waitTask.get();
std::atomic<unsigned int> count{0};
std::atomic<unsigned int> nRunningTasks{0};

std::atomic<bool> waitToStart{true};
{
auto j = edm::make_functor_task(tbb::task::allocate_root(),
[&queue,&waitToStart,pWaitTask,&count,&nRunningTasks]{
//gcc 4.7 doesn't preserve the 'atomic' nature of waitToStart in the loop
while(waitToStart.load()) {__sync_synchronize();};
std::shared_ptr<tbb::task> guard{pWaitTask,[](tbb::task*iTask) {
iTask->decrement_ref_count();}};
for(unsigned int i = 0; i<nTasks;++i) {
pWaitTask->increment_ref_count();
queue.push([i,&count,pWaitTask,&nRunningTasks] {
std::shared_ptr<tbb::task> guard{pWaitTask,[](tbb::task*iTask) {
iTask->decrement_ref_count();}};
auto nrt = nRunningTasks++;
if( nrt >= kMax) {
std::cout <<"ERROR "<<nRunningTasks<< " >= "<<kMax<<std::endl;
}
CPPUNIT_ASSERT(nrt < kMax);
++count;
--nRunningTasks;
});
}
});
tbb::task::enqueue(*j);

waitToStart=false;
for(unsigned int i=0; i<nTasks;++i) {
pWaitTask->increment_ref_count();
queue.push([i,&count,pWaitTask,&nRunningTasks] {
std::shared_ptr<tbb::task> guard{pWaitTask,[](tbb::task*iTask) {
iTask->decrement_ref_count();}};
auto nrt = nRunningTasks++;
if( nrt >= kMax) {
std::cout <<"ERROR "<<nRunningTasks<< " >= "<<kMax<<std::endl;
}
CPPUNIT_ASSERT(nrt < kMax);
++count;
--nRunningTasks;
});
}
pWaitTask->decrement_ref_count();
}
waitTask->wait_for_all();

CPPUNIT_ASSERT( 0 == nRunningTasks);
CPPUNIT_ASSERT(2*nTasks==count);
}
}



0 comments on commit 31281ca

Please sign in to comment.