diff --git a/Makefile b/Makefile index 062f2c0..32e7fd3 100644 --- a/Makefile +++ b/Makefile @@ -46,19 +46,19 @@ ifneq (,$(findstring Darwin,$(shell uname))) ifneq (,$(findstring 13,$(shell uname -r))) CPP = clang++ CC = clang - EXTRA_FLAGS += -stdlib=libstdc++ + override EXTRA_FLAGS += -stdlib=libstdc++ endif ifneq (,$(findstring 14,$(shell uname -r))) CPP = clang++ CC = clang - EXTRA_FLAGS += -stdlib=libstdc++ + override EXTRA_FLAGS += -stdlib=libstdc++ endif endif LINUX = 0 ifneq (,$(findstring Linux,$(shell uname))) LINUX = 1 - EXTRA_FLAGS += -Wl,--hash-style=both + override EXTRA_FLAGS += -Wl,--hash-style=both endif MM_DEF = @@ -75,21 +75,25 @@ PTHREAD_DEF = ifeq (1,$(MINGW)) PTHREAD_LIB = - EXTRA_FLAGS += -static-libgcc -static-libstdc++ + override EXTRA_FLAGS += -static-libgcc -static-libstdc++ else PTHREAD_LIB = -lpthread endif +ifeq (1,$(NO_SPINLOCK)) + override EXTRA_FLAGS += -DNO_SPINLOCK +endif + ifeq (1,$(WITH_TBB)) LIBS = $(PTHREAD_LIB) -ltbb -ltbbmalloc_proxy - EXTRA_FLAGS += -DWITH_TBB + override EXTRA_FLAGS += -DWITH_TBB else LIBS = $(PTHREAD_LIB) endif POPCNT_CAPABILITY ?= 1 ifeq (1, $(POPCNT_CAPABILITY)) - EXTRA_FLAGS += -DPOPCNT_CAPABILITY + override EXTRA_FLAGS += -DPOPCNT_CAPABILITY INC += -I third_party endif @@ -110,12 +114,27 @@ ifeq (1,$(WITH_THREAD_PROFILING)) override EXTRA_FLAGS += -DPER_THREAD_TIMING=1 endif +ifeq (1,$(WITH_AFFINITY)) + override EXTRA_FLAGS += -DWITH_AFFINITY=1 +endif + +ifeq (1,$(WITH_QUEUELOCK)) + override EXTRA_FLAGS += -DWITH_QUEUELOCK=1 +endif + ifeq (1,$(WITH_FINE_TIMER)) override EXTRA_FLAGS += -DUSE_FINE_TIMER=1 endif OTHER_CPPS = ccnt_lut.cpp ref_read.cpp alphabet.cpp shmem.cpp \ edit.cpp ebwt.cpp + +ifeq (1,$(WITH_COHORTLOCK)) + LIBS += -lnuma + override EXTRA_FLAGS += -DWITH_COHORTLOCK=1 + OTHER_CPPS += cohort.cpp cpu_numa_info.cpp +endif + ifneq (1,$(WITH_TBB)) OTHER_CPPS += tinythread.cpp endif diff --git a/SeqAn-1.1/seqan/file/file_format_cgviz.h b/SeqAn-1.1/seqan/file/file_format_cgviz.h index 9cd11f0..726851f 100644 --- a/SeqAn-1.1/seqan/file/file_format_cgviz.h +++ b/SeqAn-1.1/seqan/file/file_format_cgviz.h @@ -63,7 +63,6 @@ void _write_impl(TFile& target, Align& align, TStringContainer& SEQAN_CHECKPOINT typedef Align const TAlign; - typedef typename Row::Type TRow; typedef typename Position::Type>::Type TRowsPosition; typedef typename Position::Type TPosition; TRowsPosition row_count = length(rows(align)); diff --git a/SeqAn-1.1/seqan/file/file_format_fasta_align.h b/SeqAn-1.1/seqan/file/file_format_fasta_align.h index 6790f4b..648b4b3 100644 --- a/SeqAn-1.1/seqan/file/file_format_fasta_align.h +++ b/SeqAn-1.1/seqan/file/file_format_fasta_align.h @@ -134,9 +134,6 @@ SEQAN_CHECKPOINT TSize numRows=length(beg_end_length) / 3; resize(rows(align), numRows); //rows - typedef Align TAlign; - typedef typename Row::Type TRow; - for(TSize i=0;i& align, TStringContainer& id typedef Align const TAlign; typedef typename Row::Type TRow; typedef typename Position::Type>::Type TRowsPosition; - typedef typename Position::Type TPosition; TRowsPosition row_count = length(rows(align)); for(TRowsPosition i=0;i= _sz) { // Slow path: bitset needs to be expanded before the // specified bit can be set @@ -123,7 +123,7 @@ class SyncBitset { * synchronization. */ void setOver(uint32_t i) { - GUARD_LOCK(mutex_m); + ThreadSafe _ts(&mutex_m); while(i >= _sz) { // Slow path: bitset needs to be expanded before the // specified bit can be set diff --git a/ebwt_search.cpp b/ebwt_search.cpp index f8910ee..35233ff 100644 --- a/ebwt_search.cpp +++ b/ebwt_search.cpp @@ -1138,7 +1138,7 @@ static Ebwt >* exactSearch_ebwt; static vector >* exactSearch_os; static BitPairReference* exactSearch_refs; #ifdef WITH_TBB -void exactSearchWorker::operator()() { +void exactSearchWorker::operator()() const { #else static void exactSearchWorker(void *vp) { int tid = *((int*)vp); @@ -1218,7 +1218,7 @@ static void exactSearchWorker(void *vp) { * A statefulness-aware worker driver. Uses UnpairedExactAlignerV1. */ #ifdef WITH_TBB -void exactSearchWorkerStateful::operator()() { +void exactSearchWorkerStateful::operator()() const { #else static void exactSearchWorkerStateful(void *vp) { int tid = *((int*)vp); @@ -1402,7 +1402,7 @@ static BitPairReference* mismatchSearch_refs; * A statefulness-aware worker driver. Uses Unpaired/Paired1mmAlignerV1. */ #ifdef WITH_TBB -void mismatchSearchWorkerFullStateful::operator()() { +void mismatchSearchWorkerFullStateful::operator()() const { #else static void mismatchSearchWorkerFullStateful(void *vp) { int tid = *((int*)vp); @@ -1487,7 +1487,7 @@ static void mismatchSearchWorkerFullStateful(void *vp) { return; } #ifdef WITH_TBB -void mismatchSearchWorkerFull::operator()(){ +void mismatchSearchWorkerFull::operator()() const { #else static void mismatchSearchWorkerFull(void *vp){ int tid = *((int*)vp); @@ -1734,7 +1734,7 @@ static BitPairReference* twoOrThreeMismatchSearch_refs; * A statefulness-aware worker driver. Uses UnpairedExactAlignerV1. */ #ifdef WITH_TBB -void twoOrThreeMismatchSearchWorkerStateful::operator()(){ +void twoOrThreeMismatchSearchWorkerStateful::operator()() const { #else static void twoOrThreeMismatchSearchWorkerStateful(void *vp) { int tid = *((int*)vp); @@ -1822,7 +1822,7 @@ static void twoOrThreeMismatchSearchWorkerStateful(void *vp) { return; } #ifdef WITH_TBB -void twoOrThreeMismatchSearchWorkerFull::operator()(){ +void twoOrThreeMismatchSearchWorkerFull::operator()() const { #else static void twoOrThreeMismatchSearchWorkerFull(void *vp) { int tid = *((int*)vp); @@ -2042,7 +2042,7 @@ static int seededQualSearch_qualCutoff; static BitPairReference* seededQualSearch_refs; #ifdef WITH_TBB -void seededQualSearchWorkerFull::operator()(){ +void seededQualSearchWorkerFull::operator()() const { #else static void seededQualSearchWorkerFull(void *vp) { int tid = *((int*)vp); @@ -2268,7 +2268,7 @@ static void seededQualSearchWorkerFull(void *vp) { WORKER_EXIT(); } #ifdef WITH_TBB -void seededQualSearchWorkerFullStateful::operator()(){ +void seededQualSearchWorkerFullStateful::operator()() const { #else static void seededQualSearchWorkerFullStateful(void *vp) { int tid = *((int*)vp); diff --git a/ebwt_search.h b/ebwt_search.h index 61eff40..033e6f4 100644 --- a/ebwt_search.h +++ b/ebwt_search.h @@ -18,7 +18,7 @@ class exactSearchWorkerStateful { public: exactSearchWorkerStateful(const exactSearchWorkerStateful& W): tid(W.tid) {}; exactSearchWorkerStateful(int id):tid(id) {}; - void operator()(); + void operator()() const; }; class exactSearchWorker { @@ -27,7 +27,7 @@ class exactSearchWorker { public: exactSearchWorker(const exactSearchWorker& W): tid(W.tid) {}; exactSearchWorker(int id):tid(id) {}; - void operator()(); + void operator()() const; }; @@ -37,7 +37,7 @@ class mismatchSearchWorkerFull { public: mismatchSearchWorkerFull(const mismatchSearchWorkerFull& W): tid(W.tid) {}; mismatchSearchWorkerFull(int id):tid(id) {}; - void operator()(); + void operator()() const; }; @@ -47,7 +47,7 @@ class mismatchSearchWorkerFullStateful { public: mismatchSearchWorkerFullStateful(const mismatchSearchWorkerFullStateful& W): tid(W.tid) {}; mismatchSearchWorkerFullStateful(int id):tid(id) {}; - void operator()(); + void operator()() const; }; class twoOrThreeMismatchSearchWorkerStateful { @@ -56,7 +56,7 @@ class twoOrThreeMismatchSearchWorkerStateful { public: twoOrThreeMismatchSearchWorkerStateful(const twoOrThreeMismatchSearchWorkerStateful& W): tid(W.tid) {}; twoOrThreeMismatchSearchWorkerStateful(int id):tid(id) {}; - void operator()(); + void operator()() const; }; class twoOrThreeMismatchSearchWorkerFull { @@ -65,7 +65,7 @@ class twoOrThreeMismatchSearchWorkerFull { public: twoOrThreeMismatchSearchWorkerFull(const twoOrThreeMismatchSearchWorkerFull& W): tid(W.tid) {}; twoOrThreeMismatchSearchWorkerFull(int id):tid(id) {}; - void operator()(); + void operator()() const; }; @@ -75,7 +75,7 @@ class seededQualSearchWorkerFullStateful { public: seededQualSearchWorkerFullStateful(const seededQualSearchWorkerFullStateful& W): tid(W.tid) {}; seededQualSearchWorkerFullStateful(int id):tid(id) {}; - void operator()(); + void operator()() const; }; class seededQualSearchWorkerFull { @@ -84,7 +84,7 @@ class seededQualSearchWorkerFull { public: seededQualSearchWorkerFull(const seededQualSearchWorkerFull& W): tid(W.tid) {}; seededQualSearchWorkerFull(int id):tid(id) {}; - void operator()(); + void operator()() const; }; diff --git a/ebwt_search_util.h b/ebwt_search_util.h index 8381fa2..4eca65c 100644 --- a/ebwt_search_util.h +++ b/ebwt_search_util.h @@ -181,7 +181,7 @@ class PartialAlignmentManager { */ void addPartials(uint32_t patid, const vector& ps) { if(ps.size() == 0) return; - GUARD_LOCK(mutex_m); + ThreadSafe _ts(&mutex_m); size_t origPlSz = _partialsList.size(); // Assert that the entry doesn't exist yet assert(_partialsMap.find(patid) == _partialsMap.end()); @@ -238,7 +238,7 @@ class PartialAlignmentManager { */ void getPartials(uint32_t patid, vector& ps) { assert_eq(0, ps.size()); - GUARD_LOCK(mutex_m); + ThreadSafe _ts(&mutex_m); getPartialsUnsync(patid, ps); } diff --git a/hit.h b/hit.h index 5b5aa16..ab399a6 100644 --- a/hit.h +++ b/hit.h @@ -308,6 +308,7 @@ class HitSink { _refnames(refnames), _numWrappers(0), _locks(), + ts_wrap(NULL), INIT_HIT_DUMPS, onePairFile_(onePairFile), sampleMax_(sampleMax), @@ -321,9 +322,8 @@ class HitSink { ssmode_(ios_base::out) { _outs.push_back(out); - vector::iterator it; - - _locks.push_back(new MUTEX_T); + vector::iterator it; + _locks.push_back(new MUTEX_T); initDumps(); } @@ -381,9 +381,8 @@ class HitSink { * lock or any of the per-stream locks will be contended. */ void addWrapper() { - numWrapper_mutex_m.lock(); - _numWrappers++; - numWrapper_mutex_m.unlock(); + ThreadSafe ts(&numWrapper_mutex_m); + _numWrappers++; } /** @@ -437,7 +436,7 @@ class HitSink { out(h.h.first).writeChars(buf, ss.tellp()); } unlock(hs[end-1].h.first); - GUARD_LOCK(main_mutex_m); + ThreadSafe _ts(&main_mutex_m); commitHits(hs); first_ = false; numAligned_++; @@ -551,22 +550,6 @@ class HitSink { } /** - * Lock the monolithic lock for this HitSink. This is useful when, - * for example, outputting a read to an unaligned-read file. - */ - void mainlock() { - main_mutex_m.lock(); - } - - /** - * Unlock the monolithic lock for this HitSink. This is useful - * when, for example, outputting a read to an unaligned-read file. - */ - void mainunlock() { - main_mutex_m.unlock(); - } - - /** * Return true iff this HitSink dumps aligned reads to an output * stream (i.e., iff --alfa or --alfq are specified). */ @@ -609,7 +592,7 @@ class HitSink { if(!p.paired() || onePairFile_) { // Dump unpaired read to an aligned-read file of the same format if(!dumpAlBase_.empty()) { - GUARD_LOCK(dumpAlignLock_); + ThreadSafe _ts(&dumpAlignLock_); if(dumpAl_ == NULL) { assert(dumpAlQv_ == NULL); dumpAl_ = openOf(dumpAlBase_, 0, ""); @@ -628,7 +611,7 @@ class HitSink { // Dump paired-end read to an aligned-read file (or pair of // files) of the same format if(!dumpAlBase_.empty()) { - GUARD_LOCK(dumpAlignLockPE_); + ThreadSafe _ts(&dumpAlignLockPE_); if(dumpAl_1_ == NULL) { assert(dumpAlQv_1_ == NULL); assert(dumpAlQv_2_ == NULL); @@ -663,7 +646,7 @@ class HitSink { if(!p.paired() || onePairFile_) { // Dump unpaired read to an unaligned-read file of the same format if(!dumpUnalBase_.empty()) { - GUARD_LOCK(dumpUnalLock_); + ThreadSafe _ts(&dumpUnalLock_); if(dumpUnal_ == NULL) { assert(dumpUnalQv_ == NULL); dumpUnal_ = openOf(dumpUnalBase_, 0, ""); @@ -682,7 +665,7 @@ class HitSink { // Dump paired-end read to an unaligned-read file (or pair // of files) of the same format if(!dumpUnalBase_.empty()) { - GUARD_LOCK(dumpUnalLockPE_); + ThreadSafe _ts(&dumpUnalLockPE_); if(dumpUnal_1_ == NULL) { assert(dumpUnal_1_ == NULL); assert(dumpUnal_2_ == NULL); @@ -718,7 +701,7 @@ class HitSink { if(!p.paired() || onePairFile_) { // Dump unpaired read to an maxed-out-read file of the same format if(!dumpMaxBase_.empty()) { - GUARD_LOCK(dumpMaxLock_); + ThreadSafe _ts(&dumpMaxLock_); if(dumpMax_ == NULL) { dumpMax_ = openOf(dumpMaxBase_, 0, ""); assert(dumpMax_ != NULL); @@ -735,7 +718,7 @@ class HitSink { // Dump paired-end read to a maxed-out-read file (or pair // of files) of the same format if(!dumpMaxBase_.empty()) { - GUARD_LOCK(dumpMaxLockPE_); + ThreadSafe _ts(&dumpMaxLockPE_); if(dumpMax_1_ == NULL) { assert(dumpMaxQv_1_ == NULL); assert(dumpMaxQv_2_ == NULL); @@ -763,7 +746,7 @@ class HitSink { * want to print a placeholder when output is chained. */ virtual void reportMaxed(vector& hs, PatternSourcePerThread& p) { - GUARD_LOCK(main_mutex_m); + ThreadSafe _ts(&main_mutex_m); numMaxed_++; } @@ -772,7 +755,7 @@ class HitSink { * want to print a placeholder when output is chained. */ virtual void reportUnaligned(PatternSourcePerThread& p) { - GUARD_LOCK(main_mutex_m); + ThreadSafe _ts(&main_mutex_m); numUnaligned_++; } @@ -781,7 +764,7 @@ class HitSink { /// Implementation of hit-report virtual void reportHit(const Hit& h) { assert(h.repOk()); - GUARD_LOCK(main_mutex_m); + ThreadSafe _ts(&main_mutex_m); commitHit(h); first_ = false; if(h.mate > 0) numReportedPaired_++; @@ -809,7 +792,8 @@ class HitSink { */ void lock(size_t refIdx) { size_t strIdx = refIdxToStreamIdx(refIdx); - _locks[strIdx]->lock(); + assert(ts_wrap == NULL); + ts_wrap = new ThreadSafe(_locks[strIdx]); } /** @@ -819,8 +803,8 @@ class HitSink { * specified, each reference has its own reference stream. */ void unlock(size_t refIdx) { - size_t strIdx = refIdxToStreamIdx(refIdx); - _locks[strIdx]->unlock(); + assert(ts_wrap != NULL); + delete ts_wrap; } vector _outs; /// the alignment output stream(s) @@ -831,6 +815,7 @@ class HitSink { vector _locks; /// pthreads mutexes for per-file critical sections MUTEX_T main_mutex_m; /// pthreads mutexes for fields of this object MUTEX_T numWrapper_mutex_m; + ThreadSafe* ts_wrap; /// for mutual exclusion // Output filenames for dumping std::string dumpAlBase_; diff --git a/log.h b/log.h index 93bd5c2..581dee4 100644 --- a/log.h +++ b/log.h @@ -6,16 +6,15 @@ class SyncLogger { public: - SyncLogger() { - } + SyncLogger() : mutex_m() { } void msg(const char *s) { - GUARD_LOCK(mutex_m); + ThreadSafe ts(&mutex_m); std::cout << s << std::endl; } void msg(const std::string& s) { - GUARD_LOCK(mutex_m); + ThreadSafe ts(&mutex_m); std::cout << s << std::endl; } diff --git a/pat.h b/pat.h index 3133932..40eddc4 100644 --- a/pat.h +++ b/pat.h @@ -411,9 +411,8 @@ class PatternSource { * whether locks will be contended. */ void addWrapper() { - lock(); - numWrappers_++; - unlock(); + ThreadSafe ts(&mutex_m); + numWrappers_++; } /** @@ -517,24 +516,6 @@ class PatternSource { virtual void reset() { readCnt_ = 0; } /** - * Concrete subclasses call lock() to enter a critical region. - * What constitutes a critical region depends on the subclass. - */ - void lock() { - if(!doLocking_) return; // no contention - mutex_m.lock(); - } - - /** - * Concrete subclasses call unlock() to exit a critical region - * What constitutes a critical region depends on the subclass. - */ - void unlock() { - if(!doLocking_) return; // no contention - mutex_m.unlock(); - } - - /** * Return the number of reads attempted. */ uint64_t readCnt() const { return readCnt_ - 1; } @@ -621,21 +602,6 @@ class PairedPatternSource { virtual bool nextReadPair(ReadBuf& ra, ReadBuf& rb, uint32_t& patid) = 0; virtual pair readCnt() const = 0; - /** - * Lock this PairedPatternSource, usually because one of its shared - * fields is being updated. - */ - void lock() { - mutex_m.lock(); - } - - /** - * Unlock this PairedPatternSource. - */ - void unlock() { - mutex_m.unlock(); - } - protected: MUTEX_T mutex_m; /// mutex for locking critical regions @@ -696,10 +662,9 @@ class PairedSoloPatternSource : public PairedPatternSource { if(seqan::empty(ra.patFw)) { // If patFw is empty, that's our signal that the // input dried up - lock(); + ThreadSafe ts(&mutex_m); if(cur + 1 > cur_) cur_++; cur = cur_; - unlock(); continue; // on to next pair of PatternSources } ra.seed = genRandSeed(ra.patFw, ra.qual, ra.name, seed_); @@ -796,9 +761,8 @@ class PairedDualPatternSource : public PairedPatternSource { * pair; returns false if ra contains a new unpaired read. */ virtual bool nextReadPair(ReadBuf& ra, ReadBuf& rb, uint32_t& patid) { - lock(); - uint32_t cur = cur_; - unlock(); + // 'cur' indexes the current pair of PatternSources + uint32_t cur = cur_; while(cur < srca_.size()) { if(srcb_[cur] == NULL) { // Patterns from srca_[cur_] are unpaired @@ -806,10 +770,9 @@ class PairedDualPatternSource : public PairedPatternSource { if(seqan::empty(ra.patFw)) { // If patFw is empty, that's our signal that the // input dried up - lock(); + ThreadSafe ts(&mutex_m); if(cur + 1 > cur_) cur_++; cur = cur_; - unlock(); continue; // on to next pair of PatternSources } ra.patid = patid; @@ -819,41 +782,41 @@ class PairedDualPatternSource : public PairedPatternSource { // Patterns from srca_[cur_] and srcb_[cur_] are paired uint32_t patid_a = 0; uint32_t patid_b = 0; + bool cont = false; // Lock to ensure that this thread gets parallel reads // in the two mate files - lock(); - srca_[cur]->nextRead(ra, patid_a); - srcb_[cur]->nextRead(rb, patid_b); - bool cont = false; - // Did the pair obtained fail to match up? - while(patid_a != patid_b) { - // Is either input exhausted? If so, bail. - if(seqan::empty(ra.patFw) || seqan::empty(rb.patFw)) { - seqan::clear(ra.patFw); - if(cur + 1 > cur_) cur_++; - cur = cur_; - cont = true; - break; - } - if(patid_a < patid_b) { - srca_[cur]->nextRead(ra, patid_a); - ra.fixMateName(1); - } else { - srcb_[cur]->nextRead(rb, patid_b); - rb.fixMateName(2); + { + ThreadSafe ts(&mutex_m); + srca_[cur]->nextRead(ra, patid_a); + srcb_[cur]->nextRead(rb, patid_b); + // Did the pair obtained fail to match up? + while(patid_a != patid_b) { + // Is either input exhausted? If so, bail. + if(seqan::empty(ra.patFw) || seqan::empty(rb.patFw)) { + seqan::clear(ra.patFw); + if(cur + 1 > cur_) cur_++; + cur = cur_; + cont = true; + break; + } + if(patid_a < patid_b) { + srca_[cur]->nextRead(ra, patid_a); + ra.fixMateName(1); + } else { + srcb_[cur]->nextRead(rb, patid_b); + rb.fixMateName(2); + } } } - unlock(); if(cont) continue; // on to next pair of PatternSources ra.fixMateName(1); rb.fixMateName(2); if(seqan::empty(ra.patFw)) { // If patFw is empty, that's our signal that the // input dried up - lock(); + ThreadSafe ts(&mutex_m); if(cur + 1 > cur_) cur_++; cur = cur_; - unlock(); continue; // on to next pair of PatternSources } assert_eq(patid_a, patid_b); @@ -1076,35 +1039,35 @@ class RandomPatternSource : public PatternSource { /** Get the next random read and set patid */ virtual void nextReadImpl(ReadBuf& r, uint32_t& patid) { - // Begin critical section - lock(); - if(readCnt_ >= numReads_) { - r.clearAll(); - unlock(); - return; + uint32_t ra; + { + ThreadSafe ts(&mutex_m); + if(readCnt_ >= numReads_) { + r.clearAll(); + return; + } + ra = rand_.nextU32(); + patid = (uint32_t)readCnt_; + readCnt_++; } - uint32_t ra = rand_.nextU32(); - patid = (uint32_t)readCnt_; - readCnt_++; - unlock(); fillRandomRead(r, ra, length_, patid); } /** Get the next random read and set patid */ virtual void nextReadPairImpl(ReadBuf& ra, ReadBuf& rb, uint32_t& patid) { - // Begin critical section - lock(); - if(readCnt_ >= numReads_) { - ra.clearAll(); - rb.clearAll(); - unlock(); - return; + uint32_t rna, rnb; + { + ThreadSafe ts(&mutex_m); + if(readCnt_ >= numReads_) { + ra.clearAll(); + rb.clearAll(); + return; + } + rna = rand_.nextU32(); + rnb = rand_.nextU32(); + patid = (uint32_t)readCnt_; + readCnt_++; } - uint32_t rna = rand_.nextU32(); - uint32_t rnb = rand_.nextU32(); - patid = (uint32_t)readCnt_; - readCnt_++; - unlock(); fillRandomRead(ra, rna, length_, patid); fillRandomRead(rb, rnb, length_, patid); } @@ -1402,9 +1365,8 @@ class VectorPatternSource : public TrimmingPatternSource { virtual void nextReadImpl(ReadBuf& r, uint32_t& patid) { // Let Strings begin at the beginning of the respective bufs r.reset(); - lock(); + ThreadSafe ts(&mutex_m); if(cur_ >= v_.size()) { - unlock(); // Clear all the Strings, as a signal to the caller that // we're out of reads r.clearAll(); @@ -1423,7 +1385,6 @@ class VectorPatternSource : public TrimmingPatternSource { cur_++; readCnt_++; patid = (uint32_t)readCnt_; - unlock(); } /** * This is unused, but implementation is given for completeness. @@ -1436,9 +1397,8 @@ class VectorPatternSource : public TrimmingPatternSource { paired_ = true; cur_ <<= 1; } - lock(); + ThreadSafe ts(&mutex_m); if(cur_ >= v_.size()-1) { - unlock(); // Clear all the Strings, as a signal to the caller that // we're out of reads ra.clearAll(); @@ -1465,7 +1425,6 @@ class VectorPatternSource : public TrimmingPatternSource { cur_++; readCnt_++; patid = (uint32_t)readCnt_; - unlock(); } virtual void reset() { TrimmingPatternSource::reset(); @@ -1480,8 +1439,8 @@ class VectorPatternSource : public TrimmingPatternSource { vector > v_; /// forward sequences vector > quals_; /// quality values parallel to v_ vector > names_; /// names - vector trimmed3_; // names - vector trimmed5_; // names + vector trimmed3_; /// # bases trimmed from 3' end + vector trimmed5_; /// # bases trimmed from 5' end }; /** @@ -1540,7 +1499,7 @@ class BufferedFilePatternSource : public TrimmingPatternSource { virtual void nextReadImpl(ReadBuf& r, uint32_t& patid) { // We are entering a critical region, because we're // manipulating our file handle and filecur_ state - lock(); + ThreadSafe ts(&mutex_m); bool notDone = true; do { read(r, patid); @@ -1550,7 +1509,6 @@ class BufferedFilePatternSource : public TrimmingPatternSource { notDone = seqan::empty(r.patFw) && !fb_.eof(); } while(notDone || (!fb_.eof() && patid < skip_)); if(patid < skip_) { - unlock(); r.clearAll(); assert(seqan::empty(r.patFw)); return; @@ -1574,8 +1532,6 @@ class BufferedFilePatternSource : public TrimmingPatternSource { } filecur_++; } - // Leaving critical region - unlock(); // If r.patFw is empty, then the caller knows that we are // finished with the reads } @@ -1585,7 +1541,7 @@ class BufferedFilePatternSource : public TrimmingPatternSource { virtual void nextReadPairImpl(ReadBuf& ra, ReadBuf& rb, uint32_t& patid) { // We are entering a critical region, because we're // manipulating our file handle and filecur_ state - lock(); + ThreadSafe ts(&mutex_m); bool notDone = true; do { readPair(ra, rb, patid); @@ -1595,7 +1551,6 @@ class BufferedFilePatternSource : public TrimmingPatternSource { notDone = seqan::empty(ra.patFw) && !fb_.eof(); } while(notDone || (!fb_.eof() && patid < skip_)); if(patid < skip_) { - unlock(); ra.clearAll(); rb.clearAll(); assert(seqan::empty(ra.patFw)); @@ -1620,8 +1575,6 @@ class BufferedFilePatternSource : public TrimmingPatternSource { } filecur_++; } - // Leaving critical region - unlock(); // If ra.patFw is empty, then the caller knows that we are // finished with the reads } diff --git a/ptl.hpp b/ptl.hpp new file mode 100644 index 0000000..49b29d3 --- /dev/null +++ b/ptl.hpp @@ -0,0 +1,67 @@ +#ifndef PTLLOCK_H_ +#define PTLLOCK_H_ + +#include +#include + + +//same as number of threads unless that's not-appropriate +const int DEFAULT_NUM_PARTITIONS=120; + +//from https://www.quora.com/How-does-an-MCS-lock-work +//and Lock Cohorting: A General Technique for Designing NUMA Locks +//(DAVID DICE and VIRENDRA J. MARATHE, Oracle Labs NIR SHAVIT, MIT) +//Partition Lock +class PTLLock +{ +public: + PTLLock() + { + this->reset_lock(DEFAULT_NUM_PARTITIONS); + } + + PTLLock(int num_partitions_) + { + this->reset_lock(num_partitions_); + } + + ~PTLLock() + { + delete[] partitions; + } + + void reset_lock(int num_partitions_) + { + assert(num_partitions_>=1); + this->num_partitions = num_partitions_; + //offset by 1 since we're starting grant/request at 1 due to + //defauls to 0 + partitions = new uint64_t [num_partitions_+1](); + request=1; + grant=1; + partitions[request % num_partitions_]=grant; + } + + void lock() + { + uint64_t my_request = request.fetch_and_increment(); + uint64_t my_request_modded = my_request % num_partitions; + while(partitions[my_request_modded] != my_request){} + grant = my_request; + } + + void unlock() + { + uint64_t my_grant = grant + 1; + uint64_t my_grant_modded = my_grant % num_partitions; + partitions[my_grant_modded] = my_grant; + } +private: + tbb::atomic request; + uint64_t grant; + //use a large integer to avoid false sharing + volatile uint64_t* partitions; + int num_partitions; +}; + +#endif diff --git a/sam.cpp b/sam.cpp index 1daf5bb..e445923 100644 --- a/sam.cpp +++ b/sam.cpp @@ -250,12 +250,11 @@ void SAMHitSink::reportSamHits( out(0).writeChars(buf, ss.tellp()); } unlock(0); - mainlock(); + ThreadSafe ts(&main_mutex_m); commitHits(hs); first_ = false; numAligned_++; numReportedPaired_ += (end-start); - mainunlock(); } /** diff --git a/threading.h b/threading.h index 2df6e8a..2ae9746 100644 --- a/threading.h +++ b/threading.h @@ -2,35 +2,57 @@ #define THREADING_H_ #include +#include +#include +#include + #ifdef WITH_TBB +# include "tkt.hpp" +# include "ptl.hpp" # include # include -# include +# include +# ifdef WITH_AFFINITY +# ifdef WITH_COHORTLOCK +# include "cohort.hpp" +# endif +# include +# include +# include +# include +# include +# endif #else # include "tinythread.h" # include "fast_mutex.h" #endif #ifdef NO_SPINLOCK -# ifdef WITH_TBB -# define MUTEX_T tbb::mutex -# else -# define MUTEX_T tthread::mutex -# endif +# ifdef WITH_TBB +# ifdef WITH_QUEUELOCK +# define MUTEX_T tbb::queuing_mutex +# else +# define MUTEX_T tbb::mutex +# endif +# else +# define MUTEX_T tthread::mutex +# endif #else -# ifdef WITH_TBB -# define MUTEX_T tbb::spin_mutex -# else -# define MUTEX_T tthread::fast_mutex -# endif +# ifdef WITH_TBB +# ifdef WITH_AFFINITY +# ifdef WITH_COHORTLOCK +# define MUTEX_T CohortLock +# else +# define MUTEX_T tbb::spin_mutex +# endif +# else +# define MUTEX_T tbb::spin_mutex +# endif +# else +# define MUTEX_T tthread::fast_mutex +# endif #endif /* NO_SPINLOCK */ -#ifdef WITH_TBB -# define GUARD_LOCK(lock) tbb::spin_mutex::scoped_lock tbb_guard(lock) -#else -# define GUARD_LOCK(lock) tthread::lock_guard tt_guard(lock) -#endif - /** * Wrap a lock; obtain lock upon construction, release upon destruction. */ @@ -38,8 +60,15 @@ class ThreadSafe { public: ThreadSafe(MUTEX_T* ptr_mutex, bool locked = true) { if(locked) { +#if WITH_TBB && WITH_QUEUELOCK + //have to use the heap as we can't copy + //the scoped lock + this->ptr_mutex = new MUTEX_T::scoped_lock(*ptr_mutex); +#else +//TODO: need to add special conditional for CohortLock here this->ptr_mutex = ptr_mutex; ptr_mutex->lock(); +#endif } else this->ptr_mutex = NULL; @@ -47,12 +76,121 @@ class ThreadSafe { ~ThreadSafe() { if (ptr_mutex != NULL) - ptr_mutex->unlock(); +#if WITH_TBB && WITH_QUEUELOCK + delete ptr_mutex; } - +#else + ptr_mutex->unlock(); + } +#endif + private: +#if WITH_TBB && WITH_QUEUELOCK + MUTEX_T::scoped_lock* ptr_mutex; +#else MUTEX_T *ptr_mutex; +#endif +}; + +#ifdef WITH_TBB +#ifdef WITH_AFFINITY +//ripped entirely from; +//https://software.intel.com/en-us/blogs/2013/10/31/applying-intel-threading-building-blocks-observers-for-thread-affinity-on-intel +class concurrency_tracker: public tbb::task_scheduler_observer { + tbb::atomic num_threads; +public: + concurrency_tracker() : num_threads() { observe(true); } + /*override*/ void on_scheduler_entry( bool ) { ++num_threads; } + /*override*/ void on_scheduler_exit( bool ) { --num_threads; } + + int get_concurrency() { return num_threads; } +}; + +class pinning_observer: public tbb::task_scheduler_observer { + cpu_set_t *mask; + int ncpus; + + const int pinning_step; + tbb::atomic thread_index; +public: + pinning_observer( int pinning_step=1 ) : pinning_step(pinning_step), thread_index() { + for ( ncpus = sizeof(cpu_set_t)/CHAR_BIT; ncpus < 16*1024 /* some reasonable limit */; ncpus <<= 1 ) { + mask = CPU_ALLOC( ncpus ); + if ( !mask ) break; + const size_t size = CPU_ALLOC_SIZE( ncpus ); + CPU_ZERO_S( size, mask ); + const int err = sched_getaffinity( 0, size, mask ); + if ( !err ) break; + + CPU_FREE( mask ); + mask = NULL; + if ( errno != EINVAL ) break; + } + if ( !mask ) + std::cout << "Warning: Failed to obtain process affinity mask. Thread affinitization is disabled." << std::endl; + } + +/*override*/ void on_scheduler_entry( bool ) { + if ( !mask ) return; + + const size_t size = CPU_ALLOC_SIZE( ncpus ); + const int num_cpus = CPU_COUNT_S( size, mask ); + int thr_idx = +//cwilks: we're one interface version lower than what +//is required for task arena (7000 vs. 7001) +#if USE_TASK_ARENA_CURRENT_SLOT + tbb::task_arena::current_slot(); +#else + thread_index++; +#endif +#if __MIC__ + thr_idx += 1; // To avoid logical thread zero for the master thread on Intel(R) Xeon Phi(tm) +#endif + thr_idx %= num_cpus; // To limit unique number in [0; num_cpus-1] range + + // Place threads with specified step + int cpu_idx = 0; + for ( int i = 0, offset = 0; i= num_cpus ) + cpu_idx = ++offset; + } + + // Find index of 'cpu_idx'-th bit equal to 1 + int mapped_idx = -1; + while ( cpu_idx >= 0 ) { + if ( CPU_ISSET_S( ++mapped_idx, size, mask ) ) + --cpu_idx; + } + + cpu_set_t *target_mask = CPU_ALLOC( ncpus ); + CPU_ZERO_S( size, target_mask ); + CPU_SET_S( mapped_idx, size, target_mask ); + const int err = sched_setaffinity( 0, size, target_mask ); + + //std::cout << "Just set affinity for thread " << thr_idx << "\n"; + if ( err ) { + std::cout << "Failed to set thread affinity!n"; + exit( EXIT_FAILURE ); + } +#if LOG_PINNING + else { + std::stringstream ss; + ss << "Set thread affinity: Thread " << thr_idx << ": CPU " << mapped_idx << std::endl; + std::cerr << ss.str(); + } +#endif + CPU_FREE( target_mask ); + } + + ~pinning_observer() { + if ( mask ) + CPU_FREE( mask ); + } }; #endif +#endif + +#endif diff --git a/timer.h b/timer.h index 94018d6..05bc675 100644 --- a/timer.h +++ b/timer.h @@ -3,6 +3,7 @@ #include #include +#include #include #ifdef USE_FINE_TIMER #include @@ -110,13 +111,15 @@ class Timer { void write(ostream& out) { time_t passed = elapsed(); // Print the message supplied at construction time followed - // by time elapsed formatted HH:MM:SS - unsigned int hours = (unsigned int)((passed / 60) / 60); - unsigned int minutes = (unsigned int)((passed / 60) % 60); - unsigned int seconds = (unsigned int)((passed % 60)); - out << _msg << setfill ('0') << setw (2) << hours << ":" - << setfill ('0') << setw (2) << minutes << ":" - << setfill ('0') << setw (2) << seconds << endl; + // by time elapsed formatted HH:MM:SS + time_t hours = (passed / 60) / 60; + time_t minutes = (passed / 60) % 60; + time_t seconds = (passed % 60); + std::ostringstream oss; + oss << _msg << setfill ('0') << setw (2) << hours << ":" + << setfill ('0') << setw (2) << minutes << ":" + << setfill ('0') << setw (2) << seconds << endl; + out << oss.str().c_str(); } private: @@ -132,13 +135,15 @@ static inline void logTime(std::ostream& os, bool nl = true) { time_t now; time(&now); current = localtime(&now); - os << setfill('0') << setw(2) + std::ostringstream oss; + oss << setfill('0') << setw(2) << current->tm_hour << ":" << setfill('0') << setw(2) << current->tm_min << ":" << setfill('0') << setw(2) << current->tm_sec; - if(nl) os << std::endl; + if(nl) oss << std::endl; + os << oss.str().c_str(); } #endif /*TIMER_H_*/ diff --git a/tkt.hpp b/tkt.hpp new file mode 100644 index 0000000..fdfe5bc --- /dev/null +++ b/tkt.hpp @@ -0,0 +1,47 @@ +#ifndef TKTLOCK_H_ +#define TKTLOCK_H_ + +#include + +//from https://www.quora.com/How-does-an-MCS-lock-work +//and Lock Cohorting: A General Technique for Designing NUMA Locks +//(DAVID DICE and VIRENDRA J. MARATHE, Oracle Labs NIR SHAVIT, MIT) +class TKTLock +{ +public: + TKTLock() + { + //printf("TKTLock initialized\n"); + request=0; + grant=0; + } + + void lock() + { + //printf("lock %p b4 request:%u grant:%u\n",this,request,grant); + uint64_t volatile my_request = request.fetch_and_increment(); + //request.fetch_and_increment(); + //printf("lock %p mid request:%u my_request:%d grant:%u\n",this,request,my_request,grant); + while(my_request != grant) + { + //printf("lock %p mid request:%u my_request:%d grant:%u\r",this,request,my_request,grant); + } + //printf("lock %p af request:%u grant:%u\n",this,request,grant); + } + + void unlock() + { + //printf("unlock %p b4 request:%u grant:%u\n",this,request,grant); + grant++; + //printf("unlock %p af request:%u grant:%u\n",this,request,grant); + } + + //here for compatibility with PTLLock + void reset_lock(int void_){} + +private: + tbb::atomic request; + volatile uint64_t grant; +}; + +#endif