Skip to content

Commit

Permalink
Switch from OpenMP to std::thread
Browse files Browse the repository at this point in the history
In order to keep threading technologies consistent over all tests, switched to using std::thread.
  • Loading branch information
Dr15Jones committed Nov 23, 2015
1 parent 6e033a8 commit 9144817
Showing 1 changed file with 58 additions and 54 deletions.
112 changes: 58 additions & 54 deletions DataFormats/Common/test/DetSetNewTS_t.cpp
Expand Up @@ -11,14 +11,6 @@
#include<vector>
#include<algorithm>

#ifndef __clang__
// CLANG does not support OpenMP
#include<omp.h>
#else
inline
int omp_get_thread_num() {return 1;}
int omp_get_num_threads(){return 1;}
#endif
#include <mutex>
typedef std::mutex Mutex;
// typedef std::lock_guard<std::mutex> Lock;
Expand Down Expand Up @@ -47,13 +39,31 @@ inline void spinlockSleep(std::atomic<T> const & lock, T val) {
}

// syncronize all threads in a parallel section (for testing purposes)
void sync(std::atomic<int> & all) {
auto sum = omp_get_num_threads(); sum = sum*(sum+1)/2;
all.fetch_add(omp_get_thread_num()+1,std::memory_order_acq_rel);
spinlock(all,sum);

void sync(std::atomic<int> & all, int total) {
++all;
spinlock(all,total);
}

namespace {
unsigned int number_of_threads() {
auto nThreads = std::thread::hardware_concurrency();
return nThreads == 0? 1 : nThreads;
}

template<typename F>
void parallel_run(F iFunc) {
std::vector<std::thread> threads;

auto nThreads = number_of_threads();
for(unsigned int i=0; i<nThreads;++i) {
threads.emplace_back([i,nThreads,iFunc] { iFunc(i, nThreads); } );
}

for(auto& thread: threads) {
thread.join();
}
}
}


struct B{
Expand Down Expand Up @@ -115,12 +125,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION(TestDetSet);
TestDetSet::TestDetSet() : sv(10){
DSTV::data_type v[10] = {0,1,2,3,4,5,6,7,8,9};
std::copy(v,v+10,sv.begin());
std::atomic<int> nt(1);
#pragma omp parallel
{
nt = omp_get_num_threads();
}
nth = nt;
nth = number_of_threads();
}

void read(DSTV const & detsets, bool all=false) {
Expand All @@ -147,24 +152,23 @@ void TestDetSet::infrastructure() {
int a=0;
std::atomic<int> b(0);
std::atomic<int> lock(0);
std::atomic<int> nt(0);
#pragma omp parallel
std::atomic<int> nt(number_of_threads());
parallel_run([&a,&b,&lock,&nt](unsigned int, unsigned int)
{
nt = omp_get_num_threads();
sync(lock);
sync(lock,nt);
a++;
b.fetch_add(1,std::memory_order_acq_rel);;
}
});

if (i==5) std::cout << "threads "<< lock << " " << a << ' ' << b << std::endl;
CPPUNIT_ASSERT(b==nt);
a=0; b=0;

#pragma omp parallel
parallel_run([&a,&b](unsigned int, unsigned int)
{
a++;
b.fetch_add(1,std::memory_order_acq_rel);
}
});
if (i==5) std::cout << "threads "<< lock << " " << a << ' ' << b << std::endl;

nth = nt;
Expand All @@ -183,9 +187,9 @@ void TestDetSet::fillSeq() {
std::atomic<int> idet(0);
std::atomic<int> trial(0);
int maxDet=100*nth;
#pragma omp parallel
parallel_run([&lock,&idet,&trial,&detsets, maxDet](unsigned int, unsigned int numberOfThreads)
{
sync(lock);
sync(lock,numberOfThreads);
while(true) {
int ldet = idet;
if (!(ldet<maxDet)) break;
Expand All @@ -194,24 +198,24 @@ void TestDetSet::fillSeq() {
unsigned int id=20+ldet;
bool done=false;
while(!done) {
try {
{
FF ff(detsets, id); // serialize
ff.push_back(100*ldet+3);
CPPUNIT_ASSERT(detsets.m_data.back().v==(100*ldet+3));
ff.push_back(-(100*ldet+3));
CPPUNIT_ASSERT(detsets.m_data.back().v==-(100*ldet+3));
}
// read(detsets); // cannot read in parallel while filling in this case
done=true;
} catch (edm::Exception const&) {
trial++;;
//read(detsets);
}
try {
{
FF ff(detsets, id); // serialize
ff.push_back(100*ldet+3);
CPPUNIT_ASSERT(detsets.m_data.back().v==(100*ldet+3));
ff.push_back(-(100*ldet+3));
CPPUNIT_ASSERT(detsets.m_data.back().v==-(100*ldet+3));
}
// read(detsets); // cannot read in parallel while filling in this case
done=true;
} catch (edm::Exception const&) {
trial++;;
//read(detsets);
}
}
}
// read(detsets);
}
});

std::cout << idet << ' ' << detsets.size() << std::endl;
read(detsets,true);
Expand Down Expand Up @@ -263,10 +267,10 @@ void TestDetSet::fillPar() {
DST df31 = detsets[31];

std::cout << "start parallel section" << std::endl;
#pragma omp parallel
parallel_run([&lock,&detsets,&idet,maxDet,&g](unsigned int threadNumber, unsigned int numberOfThreads)
{
sync(lock);
if (omp_get_thread_num()%2==0) {
sync(lock,numberOfThreads);
if (threadNumber%2==0) {
DST df = detsets[25]; // everybody!
CPPUNIT_ASSERT(df.id()==25);
CPPUNIT_ASSERT(df.size()==2);
Expand All @@ -275,24 +279,24 @@ void TestDetSet::fillPar() {

}
while(true) {
if (omp_get_thread_num()==0) read(detsets);
if (threadNumber==0) read(detsets);
int ldet = idet.load(std::memory_order_acquire);
if (!(ldet<maxDet)) break;
while(!idet.compare_exchange_weak(ldet,ldet+1,std::memory_order_acq_rel));
if (ldet>=maxDet) break;
unsigned int id=20+ldet;
{
DST df = *detsets.find(id, true);
CPPUNIT_ASSERT(int(g.ntot)>0);
assert(df.id()==id);
assert(df.size()==2);
assert(df[0]==100*(id-20)+3);
DST df = *detsets.find(id, true);
CPPUNIT_ASSERT(int(g.ntot)>0);
assert(df.id()==id);
assert(df.size()==2);
assert(df[0]==100*(id-20)+3);
assert(df[1]==-(100*(id-20)+3));

}
if (omp_get_thread_num()==1) read(detsets);
if (threadNumber==1) read(detsets);
}
}
});
std::cout << "end parallel section" << std::endl;

CPPUNIT_ASSERT(df31.id()==31);
Expand Down

0 comments on commit 9144817

Please sign in to comment.