Permalink
Browse files

reorganization of output locks so everything is buffered

  • Loading branch information...
1 parent 0ef1568 commit 78442216fc8c815496f56ee9df73c5116f073320 @BenLangmead committed Apr 15, 2017
Showing with 610 additions and 779 deletions.
  1. +59 −70 ebwt_search.cpp
  2. +5 −7 hit.cpp
  3. +380 −404 hit.h
  4. +111 −195 sam.cpp
  5. +55 −103 sam.h
View
@@ -1062,16 +1062,16 @@ createPatsrcFactory(PatternComposer& _patsrc, int tid, uint32_t max_buf) {
* global params and return a pointer to it.
*/
static HitSinkPerThreadFactory*
-createSinkFactory(HitSink& _sink) {
+createSinkFactory(HitSink& _sink, size_t threadId) {
HitSinkPerThreadFactory *sink = NULL;
if(!strata) {
// Unstratified
if(!allHits) {
// First N good; "good" inherently ignores strata
- sink = new NGoodHitSinkPerThreadFactory(_sink, khits, mhits);
+ sink = new NGoodHitSinkPerThreadFactory(_sink, khits, mhits, defaultMapq, threadId);
} else {
// All hits, spanning strata
- sink = new AllHitSinkPerThreadFactory(_sink, mhits);
+ sink = new AllHitSinkPerThreadFactory(_sink, mhits, defaultMapq, threadId);
}
} else {
// Stratified
@@ -1080,12 +1080,12 @@ createSinkFactory(HitSink& _sink) {
assert(stateful);
// Buffer best hits, assuming they're arriving in best-
// to-worst order
- sink = new NBestFirstStratHitSinkPerThreadFactory(_sink, khits, mhits);
+ sink = new NBestFirstStratHitSinkPerThreadFactory(_sink, khits, mhits, defaultMapq, threadId);
} else {
assert(stateful);
// Buffer best hits, assuming they're arriving in best-
// to-worst order
- sink = new NBestFirstStratHitSinkPerThreadFactory(_sink, 0xffffffff/2, mhits);
+ sink = new NBestFirstStratHitSinkPerThreadFactory(_sink, 0xffffffff/2, mhits, defaultMapq, threadId);
}
}
assert(sink != NULL);
@@ -1119,9 +1119,8 @@ static void exactSearchWorker(void *vp) {
// Per-thread initialization
PatternSourcePerThreadFactory *patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch);
PatternSourcePerThread *patsrc = patsrcFact->create();
- HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink);
+ HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid);
HitSinkPerThread* sink = sinkFact->create();
- sink->set_thread_id(tid);
EbwtSearchParams<String<Dna> > params(
*sink, // HitSink
os, // reference sequences
@@ -1207,7 +1206,7 @@ static void exactSearchWorkerStateful(void *vp) {
// Global initialization
PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch);
- HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink);
+ HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid);
ChunkPool *pool = new ChunkPool(chunkSz * 1024, chunkPoolMegabytes * 1024 * 1024, chunkVerbose);
UnpairedExactAlignerV1Factory alSEfact(
@@ -1325,11 +1324,10 @@ static void exactSearch(PatternComposer& _patsrc,
}
exactSearch_refs = refs;
#ifdef WITH_TBB
- //tbb::task_group tbb_grp;
- AutoArray<std::thread*> threads(nthreads+1);
+ AutoArray<std::thread*> threads(nthreads);
#else
- AutoArray<tthread::thread*> threads(nthreads+1);
- AutoArray<int> tids(nthreads+1);
+ AutoArray<tthread::thread*> threads(nthreads);
+ AutoArray<int> tids(nthreads);
#endif
#ifdef WITH_TBB
@@ -1345,34 +1343,32 @@ static void exactSearch(PatternComposer& _patsrc,
ts.tv_sec=0;
ts.tv_nsec = mil * 1000000L;
- for(int i = 1; i <= nthreads; i++) {
+ for(int i = 0; i < nthreads; i++) {
#ifdef WITH_TBB
thread_tracking_pair tp;
tp.tid = i;
tp.done = &all_threads_done;
if(stateful) {
- //tbb_grp.run(exactSearchWorkerStateful(i));
threads[i] = new std::thread(exactSearchWorkerStateful, (void*) &tp);
} else {
- //tbb_grp.run(exactSearchWorker(i));
threads[i] = new std::thread(exactSearchWorker, (void*) &tp);
}
threads[i]->detach();
nanosleep(&ts, (struct timespec *) NULL);
}
while(all_threads_done < nthreads);
- //tbb_grp.wait();
#else
tids[i] = i;
if(stateful) {
- threads[i] = new tthread::thread(exactSearchWorkerStateful, (void*)&tids[i]);
+ threads[i] = new tthread::thread(exactSearchWorkerStateful, (void*)&tids[i]);
} else {
- threads[i] = new tthread::thread(exactSearchWorker, (void*)&tids[i]);
+ threads[i] = new tthread::thread(exactSearchWorker, (void*)&tids[i]);
}
}
- for(int i = 1; i <= nthreads; i++)
- threads[i]->join();
+ for(int i = 0; i < nthreads; i++) {
+ threads[i]->join();
+ }
#endif
}
if(refs != NULL) delete refs;
@@ -1417,7 +1413,7 @@ static void mismatchSearchWorkerFullStateful(void *vp) {
// Global initialization
PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch);
- HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink);
+ HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid);
ChunkPool *pool = new ChunkPool(chunkSz * 1024, chunkPoolMegabytes * 1024 * 1024, chunkVerbose);
Unpaired1mmAlignerV1Factory alSEfact(
@@ -1509,9 +1505,8 @@ static void mismatchSearchWorkerFull(void *vp){
// Per-thread initialization
PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch);
PatternSourcePerThread* patsrc = patsrcFact->create();
- HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink);
+ HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid);
HitSinkPerThread* sink = sinkFact->create();
- sink->set_thread_id(tid);
EbwtSearchParams<String<Dna> > params(
*sink, // HitSinkPerThread
os, // reference sequences
@@ -1625,35 +1620,34 @@ static void mismatchSearchFull(PatternComposer& _patsrc,
mismatchSearch_refs = refs;
#ifdef WITH_TBB
- //tbb::task_group tbb_grp;
- AutoArray<std::thread*> threads(nthreads+1);
+ AutoArray<std::thread*> threads(nthreads);
#else
- AutoArray<tthread::thread*> threads(nthreads+1);
- AutoArray<int> tids(nthreads+1);
+ AutoArray<tthread::thread*> threads(nthreads);
+ AutoArray<int> tids(nthreads);
#endif
#ifdef WITH_TBB
tbb::atomic<int> all_threads_done;
all_threads_done = 0;
#endif
- CHUD_START();
- {
+ CHUD_START();
+ {
Timer _t(cerr, "Time for 1-mismatch full-index search: ", timing);
int mil = 10;
struct timespec ts = {0};
ts.tv_sec=0;
ts.tv_nsec = mil * 1000000L;
- for(int i = 1; i <= nthreads; i++) {
+ for(int i = 0; i < nthreads; i++) {
#ifdef WITH_TBB
thread_tracking_pair tp;
tp.tid = i;
tp.done = &all_threads_done;
if(stateful) {
- threads[i] = new std::thread(mismatchSearchWorkerFullStateful, (void*) &tp);
+ threads[i] = new std::thread(mismatchSearchWorkerFullStateful, (void*)&tp);
} else {
- threads[i] = new std::thread(mismatchSearchWorkerFull, (void*) &tp);
+ threads[i] = new std::thread(mismatchSearchWorkerFull, (void*)&tp);
}
threads[i]->detach();
nanosleep(&ts, (struct timespec *) NULL);
@@ -1662,16 +1656,17 @@ static void mismatchSearchFull(PatternComposer& _patsrc,
#else
tids[i] = i;
if(stateful) {
- threads[i] = new tthread::thread(mismatchSearchWorkerFullStateful, (void*)&tids[i]);
+ threads[i] = new tthread::thread(mismatchSearchWorkerFullStateful, (void*)&tids[i]);
} else {
- threads[i] = new tthread::thread(mismatchSearchWorkerFull, (void*)&tids[i]);
+ threads[i] = new tthread::thread(mismatchSearchWorkerFull, (void*)&tids[i]);
}
}
- for(int i = 1; i <= nthreads; i++)
- threads[i]->join();
+ for(int i = 0; i < nthreads; i++) {
+ threads[i]->join();
+ }
#endif
- }
+ }
if(refs != NULL) delete refs;
}
@@ -1780,7 +1775,7 @@ static void twoOrThreeMismatchSearchWorkerStateful(void *vp) {
// Global initialization
PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch);
- HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink);
+ HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid);
ChunkPool *pool = new ChunkPool(chunkSz * 1024, chunkPoolMegabytes * 1024 * 1024, chunkVerbose);
Unpaired23mmAlignerV1Factory alSEfact(
@@ -1868,11 +1863,10 @@ static void twoOrThreeMismatchSearchWorkerFull(void *vp) {
HitSink& _sink = *twoOrThreeMismatchSearch_sink;
vector<String<Dna5> >& os = *twoOrThreeMismatchSearch_os;
bool two = twoOrThreeMismatchSearch_two;
- PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch);
+ PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch);
PatternSourcePerThread* patsrc = patsrcFact->create();
- HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink);
+ HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid);
HitSinkPerThread* sink = sinkFact->create();
- sink->set_thread_id(tid);
/* Per-thread initialization */
EbwtSearchParams<String<Dna> > params(
*sink, /* HitSink */
@@ -2036,57 +2030,54 @@ static void twoOrThreeMismatchSearchFull(
twoOrThreeMismatchSearch_two = two;
#ifdef WITH_TBB
- //tbb::task_group tbb_grp;
- AutoArray<std::thread*> threads(nthreads+1);
+ AutoArray<std::thread*> threads(nthreads);
#else
- AutoArray<tthread::thread*> threads(nthreads+1);
- AutoArray<int> tids(nthreads+1);
+ AutoArray<tthread::thread*> threads(nthreads);
+ AutoArray<int> tids(nthreads);
#endif
#ifdef WITH_TBB
tbb::atomic<int> all_threads_done;
all_threads_done = 0;
#endif
- CHUD_START();
- {
+ CHUD_START();
+ {
Timer _t(cerr, "End-to-end 2/3-mismatch full-index search: ", timing);
int mil = 10;
struct timespec ts = {0};
ts.tv_sec=0;
ts.tv_nsec = mil * 1000000L;
- for(int i = 1; i <= nthreads; i++) {
+ for(int i = 0; i < nthreads; i++) {
#ifdef WITH_TBB
thread_tracking_pair tp;
tp.tid = i;
tp.done = &all_threads_done;
if(stateful) {
- //tbb_grp.run(twoOrThreeMismatchSearchWorkerStateful(i));
threads[i] = new std::thread(twoOrThreeMismatchSearchWorkerStateful, (void*) &tp);
} else {
- //tbb_grp.run(twoOrThreeMismatchSearchWorkerFull(i));
threads[i] = new std::thread(twoOrThreeMismatchSearchWorkerFull, (void*) &tp);
}
threads[i]->detach();
nanosleep(&ts, (struct timespec *) NULL);
}
while(all_threads_done < nthreads);
- //tbb_grp.wait();
#else
tids[i] = i;
if(stateful) {
- threads[i] = new tthread::thread(twoOrThreeMismatchSearchWorkerStateful, (void*)&tids[i]);
+ threads[i] = new tthread::thread(twoOrThreeMismatchSearchWorkerStateful, (void*)&tids[i]);
} else {
- threads[i] = new tthread::thread(twoOrThreeMismatchSearchWorkerFull, (void*)&tids[i]);
+ threads[i] = new tthread::thread(twoOrThreeMismatchSearchWorkerFull, (void*)&tids[i]);
}
}
- for(int i = 1; i <= nthreads; i++)
- threads[i]->join();
+ for(int i = 0; i < nthreads; i++) {
+ threads[i]->join();
+ }
#endif
- }
+ }
if(refs != NULL) delete refs;
return;
}
@@ -2118,9 +2109,8 @@ static void seededQualSearchWorkerFull(void *vp) {
int qualCutoff = seededQualSearch_qualCutoff;
PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch);
PatternSourcePerThread* patsrc = patsrcFact->create();
- HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink);
+ HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid);
HitSinkPerThread* sink = sinkFact->create();
- sink->set_thread_id(tid);
/* Per-thread initialization */
EbwtSearchParams<String<Dna> > params(
*sink, /* HitSink */
@@ -2356,7 +2346,7 @@ static void seededQualSearchWorkerFullStateful(void *vp) {
// Global initialization
PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch);
- HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink);
+ HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid);
ChunkPool *pool = new ChunkPool(chunkSz * 1024, chunkPoolMegabytes * 1024 * 1024, chunkVerbose);
AlignerMetrics *metrics = NULL;
@@ -2500,10 +2490,10 @@ static void seededQualCutoffSearchFull(
#ifdef WITH_TBB
//tbb::task_group tbb_grp;
- AutoArray<std::thread*> threads(nthreads+1);
+ AutoArray<std::thread*> threads(nthreads);
#else
- AutoArray<tthread::thread*> threads(nthreads+1);
- AutoArray<int> tids(nthreads+1);
+ AutoArray<tthread::thread*> threads(nthreads);
+ AutoArray<int> tids(nthreads);
#endif
#ifdef WITH_TBB
@@ -2527,7 +2517,7 @@ static void seededQualCutoffSearchFull(
ts.tv_sec=0;
ts.tv_nsec = mil * 1000000L;
- for(int i = 1; i <= nthreads; i++) {
+ for(int i = 0; i < nthreads; i++) {
#ifdef WITH_TBB
thread_tracking_pair tp;
tp.tid = i;
@@ -2544,14 +2534,15 @@ static void seededQualCutoffSearchFull(
#else
tids[i] = i;
if(stateful) {
- threads[i] = new tthread::thread(seededQualSearchWorkerFullStateful, (void*)&tids[i]);
+ threads[i] = new tthread::thread(seededQualSearchWorkerFullStateful, (void*)&tids[i]);
} else {
- threads[i] = new tthread::thread(seededQualSearchWorkerFull, (void*)&tids[i]);
+ threads[i] = new tthread::thread(seededQualSearchWorkerFull, (void*)&tids[i]);
}
}
- for(int i = 1; i <= nthreads; i++)
- threads[i]->join();
+ for(int i = 0; i < nthreads; i++) {
+ threads[i]->join();
+ }
#endif
}
if(refs != NULL) {
@@ -2978,9 +2969,7 @@ static void driver(const char * type,
if(ebwtBw != NULL) {
delete ebwtBw;
}
- if(!quiet) {
- sink->finish(hadoopOut); // end the hits section of the hit file
- }
+ sink->finish(hadoopOut); // end the hits section of the hit file
for(size_t i = 0; i < patsrcs_a.size(); i++) {
assert(patsrcs_a[i] != NULL);
delete patsrcs_a[i];
View
12 hit.cpp
@@ -14,13 +14,11 @@ bool operator< (const Hit& a, const Hit& b) {
* Report a maxed-out read.
*/
void VerboseHitSink::reportMaxed(
- BTString& o,
vector<Hit>& hs,
- PatternSourcePerThread& p,
- bool lock,
- size_t threadId)
+ size_t threadId,
+ PatternSourcePerThread& p)
{
- HitSink::reportMaxed(o, hs, p, threadId);
+ HitSink::reportMaxed(hs, threadId, p);
if(sampleMax_) {
RandomSource rand;
rand.init(p.bufa().seed);
@@ -47,7 +45,7 @@ void VerboseHitSink::reportMaxed(
if(strat == bestStratum) {
if(num == r) {
hs[i].oms = hs[i+1].oms = (uint32_t)(hs.size()/2);
- reportHits(o, hs, i, i+2, threadId);
+ reportHits(NULL, &hs, i, i+2, threadId, 0, 0, true);
break;
}
num++;
@@ -64,7 +62,7 @@ void VerboseHitSink::reportMaxed(
uint32_t r = rand.nextU32() % num;
Hit& h = hs[r];
h.oms = (uint32_t)hs.size();
- reportHit(o, h, false, true);
+ reportHits(&h, NULL, 0, 1, threadId, 0, 0, true);
}
}
}
Oops, something went wrong.

0 comments on commit 7844221

Please sign in to comment.