From 78442216fc8c815496f56ee9df73c5116f073320 Mon Sep 17 00:00:00 2001 From: BenLangmead Date: Sat, 15 Apr 2017 10:22:52 -0400 Subject: [PATCH] reorganization of output locks so everything is buffered --- ebwt_search.cpp | 129 +++++----- hit.cpp | 12 +- hit.h | 784 +++++++++++++++++++++++++++----------------------------- sam.cpp | 306 ++++++++-------------- sam.h | 158 ++++-------- 5 files changed, 610 insertions(+), 779 deletions(-) diff --git a/ebwt_search.cpp b/ebwt_search.cpp index aca900b..f418153 100644 --- a/ebwt_search.cpp +++ b/ebwt_search.cpp @@ -1062,16 +1062,16 @@ createPatsrcFactory(PatternComposer& _patsrc, int tid, uint32_t max_buf) { * global params and return a pointer to it. */ static HitSinkPerThreadFactory* -createSinkFactory(HitSink& _sink) { +createSinkFactory(HitSink& _sink, size_t threadId) { HitSinkPerThreadFactory *sink = NULL; if(!strata) { // Unstratified if(!allHits) { // First N good; "good" inherently ignores strata - sink = new NGoodHitSinkPerThreadFactory(_sink, khits, mhits); + sink = new NGoodHitSinkPerThreadFactory(_sink, khits, mhits, defaultMapq, threadId); } else { // All hits, spanning strata - sink = new AllHitSinkPerThreadFactory(_sink, mhits); + sink = new AllHitSinkPerThreadFactory(_sink, mhits, defaultMapq, threadId); } } else { // Stratified @@ -1080,12 +1080,12 @@ createSinkFactory(HitSink& _sink) { assert(stateful); // Buffer best hits, assuming they're arriving in best- // to-worst order - sink = new NBestFirstStratHitSinkPerThreadFactory(_sink, khits, mhits); + sink = new NBestFirstStratHitSinkPerThreadFactory(_sink, khits, mhits, defaultMapq, threadId); } else { assert(stateful); // Buffer best hits, assuming they're arriving in best- // to-worst order - sink = new NBestFirstStratHitSinkPerThreadFactory(_sink, 0xffffffff/2, mhits); + sink = new NBestFirstStratHitSinkPerThreadFactory(_sink, 0xffffffff/2, mhits, defaultMapq, threadId); } } assert(sink != NULL); @@ -1119,9 +1119,8 @@ static void exactSearchWorker(void *vp) { // Per-thread initialization PatternSourcePerThreadFactory *patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch); PatternSourcePerThread *patsrc = patsrcFact->create(); - HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink); + HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid); HitSinkPerThread* sink = sinkFact->create(); - sink->set_thread_id(tid); EbwtSearchParams > params( *sink, // HitSink os, // reference sequences @@ -1207,7 +1206,7 @@ static void exactSearchWorkerStateful(void *vp) { // Global initialization PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch); - HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink); + HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid); ChunkPool *pool = new ChunkPool(chunkSz * 1024, chunkPoolMegabytes * 1024 * 1024, chunkVerbose); UnpairedExactAlignerV1Factory alSEfact( @@ -1325,11 +1324,10 @@ static void exactSearch(PatternComposer& _patsrc, } exactSearch_refs = refs; #ifdef WITH_TBB - //tbb::task_group tbb_grp; - AutoArray threads(nthreads+1); + AutoArray threads(nthreads); #else - AutoArray threads(nthreads+1); - AutoArray tids(nthreads+1); + AutoArray threads(nthreads); + AutoArray tids(nthreads); #endif #ifdef WITH_TBB @@ -1345,34 +1343,32 @@ static void exactSearch(PatternComposer& _patsrc, ts.tv_sec=0; ts.tv_nsec = mil * 1000000L; - for(int i = 1; i <= nthreads; i++) { + for(int i = 0; i < nthreads; i++) { #ifdef WITH_TBB thread_tracking_pair tp; tp.tid = i; tp.done = &all_threads_done; if(stateful) { - //tbb_grp.run(exactSearchWorkerStateful(i)); threads[i] = new std::thread(exactSearchWorkerStateful, (void*) &tp); } else { - //tbb_grp.run(exactSearchWorker(i)); threads[i] = new std::thread(exactSearchWorker, (void*) &tp); } threads[i]->detach(); nanosleep(&ts, (struct timespec *) NULL); } while(all_threads_done < nthreads); - //tbb_grp.wait(); #else tids[i] = i; if(stateful) { - threads[i] = new tthread::thread(exactSearchWorkerStateful, (void*)&tids[i]); + threads[i] = new tthread::thread(exactSearchWorkerStateful, (void*)&tids[i]); } else { - threads[i] = new tthread::thread(exactSearchWorker, (void*)&tids[i]); + threads[i] = new tthread::thread(exactSearchWorker, (void*)&tids[i]); } } - for(int i = 1; i <= nthreads; i++) - threads[i]->join(); + for(int i = 0; i < nthreads; i++) { + threads[i]->join(); + } #endif } if(refs != NULL) delete refs; @@ -1417,7 +1413,7 @@ static void mismatchSearchWorkerFullStateful(void *vp) { // Global initialization PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch); - HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink); + HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid); ChunkPool *pool = new ChunkPool(chunkSz * 1024, chunkPoolMegabytes * 1024 * 1024, chunkVerbose); Unpaired1mmAlignerV1Factory alSEfact( @@ -1509,9 +1505,8 @@ static void mismatchSearchWorkerFull(void *vp){ // Per-thread initialization PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch); PatternSourcePerThread* patsrc = patsrcFact->create(); - HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink); + HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid); HitSinkPerThread* sink = sinkFact->create(); - sink->set_thread_id(tid); EbwtSearchParams > params( *sink, // HitSinkPerThread os, // reference sequences @@ -1625,11 +1620,10 @@ static void mismatchSearchFull(PatternComposer& _patsrc, mismatchSearch_refs = refs; #ifdef WITH_TBB - //tbb::task_group tbb_grp; - AutoArray threads(nthreads+1); + AutoArray threads(nthreads); #else - AutoArray threads(nthreads+1); - AutoArray tids(nthreads+1); + AutoArray threads(nthreads); + AutoArray tids(nthreads); #endif #ifdef WITH_TBB @@ -1637,23 +1631,23 @@ static void mismatchSearchFull(PatternComposer& _patsrc, all_threads_done = 0; #endif - CHUD_START(); - { + CHUD_START(); + { Timer _t(cerr, "Time for 1-mismatch full-index search: ", timing); int mil = 10; struct timespec ts = {0}; ts.tv_sec=0; ts.tv_nsec = mil * 1000000L; - for(int i = 1; i <= nthreads; i++) { + for(int i = 0; i < nthreads; i++) { #ifdef WITH_TBB thread_tracking_pair tp; tp.tid = i; tp.done = &all_threads_done; if(stateful) { - threads[i] = new std::thread(mismatchSearchWorkerFullStateful, (void*) &tp); + threads[i] = new std::thread(mismatchSearchWorkerFullStateful, (void*)&tp); } else { - threads[i] = new std::thread(mismatchSearchWorkerFull, (void*) &tp); + threads[i] = new std::thread(mismatchSearchWorkerFull, (void*)&tp); } threads[i]->detach(); nanosleep(&ts, (struct timespec *) NULL); @@ -1662,16 +1656,17 @@ static void mismatchSearchFull(PatternComposer& _patsrc, #else tids[i] = i; if(stateful) { - threads[i] = new tthread::thread(mismatchSearchWorkerFullStateful, (void*)&tids[i]); + threads[i] = new tthread::thread(mismatchSearchWorkerFullStateful, (void*)&tids[i]); } else { - threads[i] = new tthread::thread(mismatchSearchWorkerFull, (void*)&tids[i]); + threads[i] = new tthread::thread(mismatchSearchWorkerFull, (void*)&tids[i]); } } - for(int i = 1; i <= nthreads; i++) - threads[i]->join(); + for(int i = 0; i < nthreads; i++) { + threads[i]->join(); + } #endif - } + } if(refs != NULL) delete refs; } @@ -1780,7 +1775,7 @@ static void twoOrThreeMismatchSearchWorkerStateful(void *vp) { // Global initialization PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch); - HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink); + HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid); ChunkPool *pool = new ChunkPool(chunkSz * 1024, chunkPoolMegabytes * 1024 * 1024, chunkVerbose); Unpaired23mmAlignerV1Factory alSEfact( @@ -1868,11 +1863,10 @@ static void twoOrThreeMismatchSearchWorkerFull(void *vp) { HitSink& _sink = *twoOrThreeMismatchSearch_sink; vector >& os = *twoOrThreeMismatchSearch_os; bool two = twoOrThreeMismatchSearch_two; - PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch); + PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch); PatternSourcePerThread* patsrc = patsrcFact->create(); - HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink); + HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid); HitSinkPerThread* sink = sinkFact->create(); - sink->set_thread_id(tid); /* Per-thread initialization */ EbwtSearchParams > params( *sink, /* HitSink */ @@ -2036,11 +2030,10 @@ static void twoOrThreeMismatchSearchFull( twoOrThreeMismatchSearch_two = two; #ifdef WITH_TBB - //tbb::task_group tbb_grp; - AutoArray threads(nthreads+1); + AutoArray threads(nthreads); #else - AutoArray threads(nthreads+1); - AutoArray tids(nthreads+1); + AutoArray threads(nthreads); + AutoArray tids(nthreads); #endif #ifdef WITH_TBB @@ -2048,8 +2041,8 @@ static void twoOrThreeMismatchSearchFull( all_threads_done = 0; #endif - CHUD_START(); - { + CHUD_START(); + { Timer _t(cerr, "End-to-end 2/3-mismatch full-index search: ", timing); int mil = 10; @@ -2057,36 +2050,34 @@ static void twoOrThreeMismatchSearchFull( ts.tv_sec=0; ts.tv_nsec = mil * 1000000L; - for(int i = 1; i <= nthreads; i++) { + for(int i = 0; i < nthreads; i++) { #ifdef WITH_TBB thread_tracking_pair tp; tp.tid = i; tp.done = &all_threads_done; if(stateful) { - //tbb_grp.run(twoOrThreeMismatchSearchWorkerStateful(i)); threads[i] = new std::thread(twoOrThreeMismatchSearchWorkerStateful, (void*) &tp); } else { - //tbb_grp.run(twoOrThreeMismatchSearchWorkerFull(i)); threads[i] = new std::thread(twoOrThreeMismatchSearchWorkerFull, (void*) &tp); } threads[i]->detach(); nanosleep(&ts, (struct timespec *) NULL); } while(all_threads_done < nthreads); - //tbb_grp.wait(); #else tids[i] = i; if(stateful) { - threads[i] = new tthread::thread(twoOrThreeMismatchSearchWorkerStateful, (void*)&tids[i]); + threads[i] = new tthread::thread(twoOrThreeMismatchSearchWorkerStateful, (void*)&tids[i]); } else { - threads[i] = new tthread::thread(twoOrThreeMismatchSearchWorkerFull, (void*)&tids[i]); + threads[i] = new tthread::thread(twoOrThreeMismatchSearchWorkerFull, (void*)&tids[i]); } } - for(int i = 1; i <= nthreads; i++) - threads[i]->join(); + for(int i = 0; i < nthreads; i++) { + threads[i]->join(); + } #endif - } + } if(refs != NULL) delete refs; return; } @@ -2118,9 +2109,8 @@ static void seededQualSearchWorkerFull(void *vp) { int qualCutoff = seededQualSearch_qualCutoff; PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch); PatternSourcePerThread* patsrc = patsrcFact->create(); - HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink); + HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid); HitSinkPerThread* sink = sinkFact->create(); - sink->set_thread_id(tid); /* Per-thread initialization */ EbwtSearchParams > params( *sink, /* HitSink */ @@ -2356,7 +2346,7 @@ static void seededQualSearchWorkerFullStateful(void *vp) { // Global initialization PatternSourcePerThreadFactory* patsrcFact = createPatsrcFactory(_patsrc, tid, readsPerBatch); - HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink); + HitSinkPerThreadFactory* sinkFact = createSinkFactory(_sink, tid); ChunkPool *pool = new ChunkPool(chunkSz * 1024, chunkPoolMegabytes * 1024 * 1024, chunkVerbose); AlignerMetrics *metrics = NULL; @@ -2500,10 +2490,10 @@ static void seededQualCutoffSearchFull( #ifdef WITH_TBB //tbb::task_group tbb_grp; - AutoArray threads(nthreads+1); + AutoArray threads(nthreads); #else - AutoArray threads(nthreads+1); - AutoArray tids(nthreads+1); + AutoArray threads(nthreads); + AutoArray tids(nthreads); #endif #ifdef WITH_TBB @@ -2527,7 +2517,7 @@ static void seededQualCutoffSearchFull( ts.tv_sec=0; ts.tv_nsec = mil * 1000000L; - for(int i = 1; i <= nthreads; i++) { + for(int i = 0; i < nthreads; i++) { #ifdef WITH_TBB thread_tracking_pair tp; tp.tid = i; @@ -2544,14 +2534,15 @@ static void seededQualCutoffSearchFull( #else tids[i] = i; if(stateful) { - threads[i] = new tthread::thread(seededQualSearchWorkerFullStateful, (void*)&tids[i]); + threads[i] = new tthread::thread(seededQualSearchWorkerFullStateful, (void*)&tids[i]); } else { - threads[i] = new tthread::thread(seededQualSearchWorkerFull, (void*)&tids[i]); + threads[i] = new tthread::thread(seededQualSearchWorkerFull, (void*)&tids[i]); } } - for(int i = 1; i <= nthreads; i++) - threads[i]->join(); + for(int i = 0; i < nthreads; i++) { + threads[i]->join(); + } #endif } if(refs != NULL) { @@ -2978,9 +2969,7 @@ static void driver(const char * type, if(ebwtBw != NULL) { delete ebwtBw; } - if(!quiet) { - sink->finish(hadoopOut); // end the hits section of the hit file - } + sink->finish(hadoopOut); // end the hits section of the hit file for(size_t i = 0; i < patsrcs_a.size(); i++) { assert(patsrcs_a[i] != NULL); delete patsrcs_a[i]; diff --git a/hit.cpp b/hit.cpp index 76f2345..f0064a8 100644 --- a/hit.cpp +++ b/hit.cpp @@ -14,13 +14,11 @@ bool operator< (const Hit& a, const Hit& b) { * Report a maxed-out read. */ void VerboseHitSink::reportMaxed( - BTString& o, vector& hs, - PatternSourcePerThread& p, - bool lock, - size_t threadId) + size_t threadId, + PatternSourcePerThread& p) { - HitSink::reportMaxed(o, hs, p, threadId); + HitSink::reportMaxed(hs, threadId, p); if(sampleMax_) { RandomSource rand; rand.init(p.bufa().seed); @@ -47,7 +45,7 @@ void VerboseHitSink::reportMaxed( if(strat == bestStratum) { if(num == r) { hs[i].oms = hs[i+1].oms = (uint32_t)(hs.size()/2); - reportHits(o, hs, i, i+2, threadId); + reportHits(NULL, &hs, i, i+2, threadId, 0, 0, true); break; } num++; @@ -64,7 +62,7 @@ void VerboseHitSink::reportMaxed( uint32_t r = rand.nextU32() % num; Hit& h = hs[r]; h.oms = (uint32_t)hs.size(); - reportHit(o, h, false, true); + reportHits(&h, NULL, 0, 1, threadId, 0, 0, true); } } } diff --git a/hit.h b/hit.h index da8afa7..ee6a8d6 100644 --- a/hit.h +++ b/hit.h @@ -142,33 +142,6 @@ class HitCostCompare { /// Sort by text-id then by text-offset bool operator< (const Hit& a, const Hit& b); -#define DECL_HIT_DUMPS \ - const std::string& dumpAl, \ - const std::string& dumpUnal, \ - const std::string& dumpMax - -#define INIT_HIT_DUMPS \ - dumpAlBase_(dumpAl), \ - dumpUnalBase_(dumpUnal), \ - dumpMaxBase_(dumpMax) - -#define DECL_HIT_DUMPS2 \ - DECL_HIT_DUMPS, \ - bool onePairFile, \ - bool sampleMax, \ - std::vector* refnames - -#define PASS_HIT_DUMPS \ - dumpAl, \ - dumpUnal, \ - dumpMax - -#define PASS_HIT_DUMPS2 \ - PASS_HIT_DUMPS, \ - onePairFile, \ - sampleMax, \ - refnames - /** * Encapsulates an object that accepts hits, optionally retains them in * a vector, and does something else with them according to @@ -176,46 +149,43 @@ bool operator< (const Hit& a, const Hit& b); */ class HitSink { public: - explicit HitSink(OutFileBuf* out, - DECL_HIT_DUMPS, - bool onePairFile, - bool sampleMax, - vector* refnames = NULL, - size_t nthreads = 1, - int perThreadBufSize = 512) : + explicit HitSink( + OutFileBuf* out, + const std::string& dumpAl, + const std::string& dumpUnal, + const std::string& dumpMax, + bool onePairFile, + bool sampleMax, + vector* refnames = NULL, + size_t nthreads = 1, + int perThreadBufSize = 512) : _outs(), _deleteOuts(false), _refnames(refnames), _numWrappers(0), _locks(), ts_wrap(NULL), - INIT_HIT_DUMPS, + dumpAlBase_(dumpAl), + dumpUnalBase_(dumpUnal), + dumpMaxBase_(dumpMax), onePairFile_(onePairFile), sampleMax_(sampleMax), - first_(true), - //numAligned_(0llu), - numUnaligned_(0llu), - numMaxed_(0llu), - //numReported_(0llu), - //numReportedPaired_(0llu), quiet_(false), - nthreads_(nthreads), + nthreads_((nthreads > 0) ? nthreads : 1), + ptBufs_(), + ptCounts_(nthreads_), perThreadBufSize_(perThreadBufSize) { - numAligned_=0llu; - numReported_=0llu; - numReportedPaired_=0llu; + size_t nelt = 5 * nthreads_; + ptNumAligned_ = new uint64_t[nelt]; + std::memset(reinterpret_cast(const_cast(ptNumAligned_)), 0, sizeof(uint64_t) * nelt); + ptNumReported_ = ptNumAligned_ + nthreads_; + ptNumReportedPaired_ = ptNumReported_ + nthreads_; + ptNumUnaligned_ = ptNumReportedPaired_ + nthreads_; + ptNumMaxed_ = ptNumUnaligned_ + nthreads_; _outs.push_back(out); - vector::iterator it; - fprintf(stderr,"perThreadBufSize for output is %d\n",perThreadBufSize_); - perThreadBuf = new BTString*[nthreads_+1]; - perThreadCounter = new int[nthreads_+1]; - size_t i = 0; - for(i=0;i<=nthreads_;i++) - { - perThreadBuf[i] = new BTString[perThreadBufSize_]; - perThreadCounter[i] = 0; - } + ptBufs_.resize(nthreads_); + ptCounts_.resize(nthreads_, 0); //had to move this below the array inits, otherwise it get's mangled leading to segfaults on access _locks.push_back(new MUTEX_T); initDumps(); @@ -227,16 +197,21 @@ class HitSink { * is the 0-padded reference index. Someday we may want to include * the name of the reference sequence in the filename somehow. */ - explicit HitSink(size_t numOuts, - DECL_HIT_DUMPS, - bool onePairFile, - bool sampleMax, - vector* refnames = NULL) : + explicit HitSink( + size_t numOuts, + const std::string& dumpAl, + const std::string& dumpUnal, + const std::string& dumpMax, + bool onePairFile, + bool sampleMax, + vector* refnames = NULL) : _outs(), _deleteOuts(true), _refnames(refnames), _locks(), - INIT_HIT_DUMPS, + dumpAlBase_(dumpAl), + dumpUnalBase_(dumpUnal), + dumpMaxBase_(dumpMax), onePairFile_(onePairFile), sampleMax_(sampleMax), quiet_(false), @@ -255,6 +230,8 @@ class HitSink { * Destroy HitSinkobject; */ virtual ~HitSink() { + delete ptNumAligned_; + ptNumAligned_ = NULL; closeOuts(); if(_deleteOuts) { // Delete all non-NULL output streams @@ -278,7 +255,6 @@ class HitSink { * lock or any of the per-stream locks will be contended. */ void addWrapper() { - // TODO: opt for atomic in TBB mode, or possibly all modes ThreadSafe ts(&numWrapper_mutex_m); _numWrappers++; } @@ -295,123 +271,90 @@ class HitSink { /** * Append a single hit to the given output stream. */ - virtual void append(BTString& o, const Hit& h) = 0; + virtual void append(BTString& o, const Hit& h, int mapq, int xms) = 0; /** - * Report a batch of hits; all in the given vector. + * Add a number of alignments to the tally. Tally shouldn't + * include reads that fail to align either because they had 0 + * alignments or because of -m. */ - virtual void reportHits(BTString& o, vector& hs, size_t threadId) { - reportHits(o, hs, 0, hs.size(), threadId); + void tallyAlignments(size_t threadId, size_t numAl, bool paired) { + ptNumAligned_[threadId] += numAl; + if(paired) { + ptNumReportedPaired_[threadId] += numAl; + } else { + ptNumReported_[threadId] += numAl; + } } /** * Report a batch of hits from a vector, perhaps subsetting it. */ virtual void reportHits( - BTString& o, - vector& hs, + const Hit *hptr, + vector *hsptr, size_t start, size_t end, - size_t threadId) + size_t threadId, + int mapq, + int xms, + bool tally) { assert_geq(end, start); - if(end-start == 0) return; - bool paired = hs[start].mate > 0; + assert(nthreads_ > 1 || threadId == 0); + if(end == start) { + return; + } + const Hit& firstHit = (hptr == NULL) ? (*hsptr)[start] : *hptr; + bool paired = firstHit.mate > 0; // Sort reads so that those against the same reference sequence // are consecutive. - if(_outs.size() > 1 && end-start > 2) { - sort(hs.begin() + start, hs.begin() + end); + if(hsptr != NULL && _outs.size() > 1 && end - start > 2) { + sort(hsptr->begin() + start, hsptr->begin() + end); } - if(_outs.size() == 1 && end - start == 1) { - // 1 output stream, 1 alignment - assert(hs[start].repOk()); - append(o, hs[start]); - { - if(nthreads_ > 0) - { - if(perThreadCounter[threadId] >= perThreadBufSize_) - { - int i = 0; - for(i=0; i < perThreadBufSize_; i++) - { - ThreadSafe _ts(_locks[0]); - out(0).writeString(perThreadBuf[threadId][i]); - } - perThreadCounter[threadId] = 0; - } - perThreadBuf[threadId][perThreadCounter[threadId]++] = o; - //TODO: make these atomics to get rid of lock - first_ = false; -#ifdef WITH_TBB - numAligned_.fetch_and_add(1); - if(paired) { - numReportedPaired_.fetch_and_add(1); - } else { - numReported_.fetch_and_add(1); - } -#else - //TODO: this is segfaulting in the TBB aomtic fetch instruction in the lock - ThreadSafe _ts(_locks[0]); - numAligned_++; - if(paired) { - numReportedPaired_++; - } else { - numReported_++; - } -#endif - } - else - { - ThreadSafe _ts(_locks[0]); - first_ = false; + BTString& o = ptBufs_[threadId]; + if(_outs.size() == 1) { + // Per-thread buffering is active + for(size_t i = start; i < end; i++) { + const Hit& h = (hptr == NULL) ? (*hsptr)[i] : *hptr; + assert(h.repOk()); + if(nthreads_ > 1) { + maybeFlush(threadId, 0); + append(o, h, mapq, xms); + ptCounts_[threadId]++; + } else { + append(o, h, mapq, xms); out(0).writeString(o); -#ifdef WITH_TBB - numAligned_.fetch_and_add(1); - if(paired) { - numReportedPaired_.fetch_and_add(1); - } else { - numReported_.fetch_and_add(1); - } -#else - numAligned_++; - if(paired) { - numReportedPaired_++; - } else { - numReported_++; - } -#endif + o.clear(); } } - o.clear(); } else { // multiple output streams or alignments + // Note: in this case we basically don't get the benefit + // from per-thread buffering, becuase it is too + // complicated to provide per-thread, per-output-lock + // buffers. size_t i = start; while(i < end) { - size_t strIdx = refIdxToStreamIdx(hs[i].h.first); + const Hit& h = (hptr == NULL) ? (*hsptr)[i] : *hptr; + size_t strIdx = refIdxToStreamIdx(h.h.first); { + assert(h.repOk()); do { - assert(hs[i].repOk()); - append(o, hs[i]); + append(o, h, mapq, xms); { ThreadSafe _ts(_locks[strIdx]); - out(hs[i].h.first).writeString(o); + out(h.h.first).writeString(o); } o.clear(); i++; - } while(refIdxToStreamIdx(hs[i].h.first) == strIdx && i < end); - } - } - { - ThreadSafe _ts(&main_mutex_m); - first_ = false; - numAligned_++; - if(paired) { - numReportedPaired_ += (end-start); - } else { - numReported_ += (end-start); + } while(refIdxToStreamIdx(h.h.first) == strIdx && i < end); } } } + if(tally) { + tallyAlignments(threadId, end - start, paired); + } } /** @@ -419,81 +362,77 @@ class HitSink { * synchronization is necessary */ void finish(bool hadoopOut) { - // Close output streams - if(nthreads_ > 0) - { - size_t i = 0; - int j = 0; - for(i=0;i<=nthreads_;i++) - { - for(j=0;j 0) { - alPct = 100.0 * (double)numAligned_ / (double)tot; - unalPct = 100.0 * (double)numUnaligned_ / (double)tot; - maxPct = 100.0 * (double)numMaxed_ / (double)tot; + alPct = 100.0 * (double)numAligned / (double)tot; + unalPct = 100.0 * (double)numUnaligned / (double)tot; + maxPct = 100.0 * (double)numMaxed / (double)tot; } cerr << "# reads processed: " << tot << endl; cerr << "# reads with at least one reported alignment: " - << numAligned_ << " (" << fixed << setprecision(2) + << numAligned << " (" << fixed << setprecision(2) << alPct << "%)" << endl; cerr << "# reads that failed to align: " - << numUnaligned_ << " (" << fixed << setprecision(2) + << numUnaligned << " (" << fixed << setprecision(2) << unalPct << "%)" << endl; - if(numMaxed_ > 0) { + if(numMaxed > 0) { if(sampleMax_) { cerr << "# reads with alignments sampled due to -M: " - << numMaxed_ << " (" << fixed << setprecision(2) + << numMaxed << " (" << fixed << setprecision(2) << maxPct << "%)" << endl; } else { cerr << "# reads with alignments suppressed due to -m: " - << numMaxed_ << " (" << fixed << setprecision(2) + << numMaxed << " (" << fixed << setprecision(2) << maxPct << "%)" << endl; } } - if(first_) { - assert_eq(0llu, numReported_); + if(numReported == 0 && numReportedPaired == 0) { cerr << "No alignments" << endl; } - else if(numReportedPaired_ > 0 && numReported_ == 0) { - cerr << "Reported " << (numReportedPaired_ >> 1) + else if(numReportedPaired > 0 && numReported == 0) { + cerr << "Reported " << (numReportedPaired >> 1) << " paired-end alignments to " << _outs.size() << " output stream(s)" << endl; } - else if(numReported_ > 0 && numReportedPaired_ == 0) { - cerr << "Reported " << numReported_ + else if(numReported > 0 && numReportedPaired == 0) { + cerr << "Reported " << numReported << " alignments to " << _outs.size() << " output stream(s)" << endl; } else { - assert_gt(numReported_, 0); - assert_gt(numReportedPaired_, 0); - cerr << "Reported " << (numReportedPaired_ >> 1) - << " paired-end alignments and " << numReported_ + assert_gt(numReported + numReportedPaired, 0); + cerr << "Reported " << (numReportedPaired >> 1) + << " paired-end alignments and " << numReported << " singleton alignments to " << _outs.size() << " output stream(s)" << endl; } if(hadoopOut) { - cerr << "reporter:counter:Bowtie,Reads with reported alignments," << numAligned_ << endl; - cerr << "reporter:counter:Bowtie,Reads with no alignments," << numUnaligned_ << endl; - cerr << "reporter:counter:Bowtie,Reads exceeding -m limit," << numMaxed_ << endl; - cerr << "reporter:counter:Bowtie,Unpaired alignments reported," << numReported_ << endl; - cerr << "reporter:counter:Bowtie,Paired alignments reported," << numReportedPaired_ << endl; + cerr << "reporter:counter:Bowtie,Reads with reported alignments," << numAligned << endl; + cerr << "reporter:counter:Bowtie,Reads with no alignments," << numUnaligned << endl; + cerr << "reporter:counter:Bowtie,Reads exceeding -m limit," << numMaxed << endl; + cerr << "reporter:counter:Bowtie,Unpaired alignments reported," << numReported << endl; + cerr << "reporter:counter:Bowtie,Paired alignments reported," << numReportedPaired << endl; } } } @@ -716,15 +655,11 @@ class HitSink { * want to print a placeholder when output is chained. */ virtual void reportMaxed( - BTString& o, vector& hs, - PatternSourcePerThread& p, size_t threadId, - bool lock = true) + PatternSourcePerThread& p) { - // TODO: opt for atomic in TBB mode, or possibly all modes - ThreadSafe _ts(&main_mutex_m, lock); - numMaxed_++; + ptNumMaxed_[threadId]++; } /** @@ -732,35 +667,46 @@ class HitSink { * want to print a placeholder when output is chained. */ virtual void reportUnaligned( - BTString& o, - PatternSourcePerThread& p, - bool lock = true) + size_t threadId, + PatternSourcePerThread& p) { - // TODO: opt for atomic in TBB mode, or possibly all modes - ThreadSafe _ts(&main_mutex_m, lock); - numUnaligned_++; + ptNumUnaligned_[threadId]++; } protected: - /// Implementation of hit-report - virtual void reportHit( - BTString& o, - const Hit& h, - bool lock = true) - { - // TODO: opt for atomic in TBB mode, or possibly all modes - assert(h.repOk()); + /** + * Flush thread's output buffer and reset both buffer and count. + */ + void flush(size_t threadId, size_t outId) { { - ThreadSafe _ts(&main_mutex_m, lock); - first_ = false; - if(h.mate > 0) numReportedPaired_++; - else numReported_++; - numAligned_++; + ThreadSafe _ts(_locks[0]); // flush + out(outId).writeString(ptBufs_[threadId]); + } + ptCounts_[threadId] = 0; + ptBufs_[threadId].clear(); + } + + /** + * Flush all output buffers. + */ + void flushAll(size_t outId) { + for(int i = 0; i < nthreads_; i++) { + flush(i, outId); } } /** + * If the thread's output buffer is currently full, flush it and + * reset both buffer and count. + */ + void maybeFlush(size_t threadId, size_t outId) { + if(ptCounts_[threadId] >= perThreadBufSize_) { + flush(threadId, outId); + } + } + + /** * Close (and flush) all OutFileBufs. */ void closeOuts() { @@ -784,8 +730,8 @@ class HitSink { // used for output read buffer size_t nthreads_; - BTString** perThreadBuf; - int* perThreadCounter; + std::vector ptBufs_; + std::vector ptCounts_; int perThreadBufSize_; // Output filenames for dumping @@ -904,18 +850,12 @@ class HitSink { bool dumpUnalignFlag_; bool dumpMaxedFlag_; - volatile bool first_; /// true -> first hit hasn't yet been reported - volatile uint64_t numUnaligned_;/// # reads with no alignments - volatile uint64_t numMaxed_; /// # reads with # alignments exceeding -m ceiling -#ifdef WITH_TBB - tbb::atomic numAligned_; /// # reads with >= 1 alignment - tbb::atomic numReported_; /// # single-ended alignments reported - tbb::atomic numReportedPaired_; /// # paired-end alignments reported -#else - volatile uint64_t numAligned_; /// # reads with >= 1 alignment - volatile uint64_t numReported_; /// # single-ended alignments reported - volatile uint64_t numReportedPaired_; /// # paired-end alignments reported -#endif + volatile bool first_; /// true -> first hit hasn't yet been reported + volatile uint64_t *ptNumAligned_; + volatile uint64_t *ptNumReported_; + volatile uint64_t *ptNumReportedPaired_; + volatile uint64_t *ptNumUnaligned_; + volatile uint64_t *ptNumMaxed_; bool quiet_; /// true -> don't print alignment stats at the end }; @@ -926,7 +866,12 @@ class HitSink { */ class HitSinkPerThread { public: - HitSinkPerThread(HitSink& sink, uint32_t max, uint32_t n) : + explicit HitSinkPerThread( + HitSink& sink, + uint32_t max, + uint32_t n, + int defaultMapq, + size_t threadId) : _sink(sink), _bestRemainingStratum(0), _numValidHits(0llu), @@ -935,14 +880,12 @@ class HitSinkPerThread { hitsForThisRead_(), _max(max), _n(n), - threadId(0) + threadId_(threadId) { sink.addWrapper(); assert_gt(_n, 0); } - void set_thread_id(size_t threadId_) { threadId = threadId_; } - virtual ~HitSinkPerThread() { } /// Return the vector of retained hits @@ -969,18 +912,26 @@ class HitSinkPerThread { ret = 0; if(maxed) { // Report that the read maxed-out; useful for chaining output - if(dump) _sink.reportMaxed(obuf_, _bufferedHits, p, threadId); + if(dump) _sink.reportMaxed(_bufferedHits, threadId_, p); _bufferedHits.clear(); } else if(unal) { // Report that the read failed to align; useful for chaining output - if(dump) _sink.reportUnaligned(obuf_, p, threadId); + if(dump) _sink.reportUnaligned(threadId_, p); } else { // Flush buffered hits assert_gt(_bufferedHits.size(), 0); if(_bufferedHits.size() > _n) { _bufferedHits.resize(_n); } - _sink.reportHits(obuf_, _bufferedHits, threadId); + int mapq = defaultMapq_; + int xms = (int)(_bufferedHits.size()); + const bool paired = p.bufa().mate > 0; + if(paired) { + xms /= 2; + } + xms++; + _sink.reportHits(NULL, &_bufferedHits, 0, _bufferedHits.size(), + threadId_, mapq, xms, true); _sink.dumpAlign(p); ret = (uint32_t)_bufferedHits.size(); _bufferedHits.clear(); @@ -1041,7 +992,7 @@ class HitSinkPerThread { * Return true if there are no reportable hits with the given cost * (or worse). */ - virtual bool irrelevantCost(uint16_t cost) { + virtual bool irrelevantCost(uint16_t cost) const { return false; } @@ -1069,7 +1020,7 @@ class HitSinkPerThread { * Return true iff the underlying HitSink dumps unaligned or * maxed-out reads. */ - bool dumpsReads() { + bool dumpsReads() const { return _sink.dumpsReads(); } @@ -1091,7 +1042,7 @@ class HitSinkPerThread { * Return max # hits to report (*2 in paired-end mode because mates * count separately) */ - virtual uint32_t maxHits() { + virtual uint32_t maxHits() const { return _n; } @@ -1112,8 +1063,8 @@ class HitSinkPerThread { uint32_t hitsForThisRead_; /// # hits for this read so far uint32_t _max; /// don't report any hits if there were > _max uint32_t _n; /// report at most _n hits - size_t threadId; - BTString obuf_; /// expandable output buffer + int defaultMapq_; + size_t threadId_; }; /** @@ -1142,10 +1093,12 @@ class NGoodHitSinkPerThread : public HitSinkPerThread { public: NGoodHitSinkPerThread( - HitSink& sink, - uint32_t n, - uint32_t max) : - HitSinkPerThread(sink, max, n) + HitSink& sink, + uint32_t n, + uint32_t max, + int defaultMapq, + size_t threadId) : + HitSinkPerThread(sink, max, n, defaultMapq, threadId) { } virtual bool spanStrata() { @@ -1199,31 +1152,38 @@ class NGoodHitSinkPerThread : public HitSinkPerThread { class NGoodHitSinkPerThreadFactory : public HitSinkPerThreadFactory { public: NGoodHitSinkPerThreadFactory( - HitSink& sink, - uint32_t n, - uint32_t max) : - sink_(sink), - n_(n), - max_(max) - { } + HitSink& sink, + uint32_t n, + uint32_t max, + int defaultMapq, + size_t threadId) : + sink_(sink), + n_(n), + max_(max), + defaultMapq_(defaultMapq), + threadId_(threadId) { } /** * Allocate a new NGoodHitSinkPerThread object on the heap, * using the parameters given in the constructor. */ virtual HitSinkPerThread* create() const { - return new NGoodHitSinkPerThread(sink_, n_, max_); + return new NGoodHitSinkPerThread(sink_, n_, max_, defaultMapq_, threadId_); } + virtual HitSinkPerThread* createMult(uint32_t m) const { uint32_t max = max_ * (max_ == 0xffffffff ? 1 : m); uint32_t n = n_ * (n_ == 0xffffffff ? 1 : m); - return new NGoodHitSinkPerThread(sink_, n, max); + return new NGoodHitSinkPerThread(sink_, n, max, defaultMapq_, threadId_); } private: + HitSink& sink_; uint32_t n_; uint32_t max_; + int defaultMapq_; + size_t threadId_; }; /** @@ -1235,13 +1195,15 @@ class NBestFirstStratHitSinkPerThread : public HitSinkPerThread { public: NBestFirstStratHitSinkPerThread( - HitSink& sink, - uint32_t n, - uint32_t max, - uint32_t mult) : - HitSinkPerThread(sink, max, n), - bestStratum_(999), mult_(mult) - { } + HitSink& sink, + uint32_t n, + uint32_t max, + uint32_t mult, + int defaultMapq, + size_t threadId) : + HitSinkPerThread(sink, max, n, defaultMapq, threadId), + bestStratum_(999), + mult_(mult) { } /** * false -> we do not allow strata to be spanned @@ -1333,31 +1295,37 @@ class NBestFirstStratHitSinkPerThread : public HitSinkPerThread { class NBestFirstStratHitSinkPerThreadFactory : public HitSinkPerThreadFactory { public: NBestFirstStratHitSinkPerThreadFactory( - HitSink& sink, - uint32_t n, - uint32_t max) : - sink_(sink), - n_(n), - max_(max) - { } + HitSink& sink, + uint32_t n, + uint32_t max, + int defaultMapq, + size_t threadId) : + sink_(sink), + n_(n), + max_(max), + defaultMapq_(defaultMapq), + threadId_(threadId) { } /** * Allocate a new NGoodHitSinkPerThread object on the heap, * using the parameters given in the constructor. */ virtual HitSinkPerThread* create() const { - return new NBestFirstStratHitSinkPerThread(sink_, n_, max_, 1); + return new NBestFirstStratHitSinkPerThread(sink_, n_, max_, 1, defaultMapq_, threadId_); } + virtual HitSinkPerThread* createMult(uint32_t m) const { uint32_t max = max_ * (max_ == 0xffffffff ? 1 : m); uint32_t n = n_ * (n_ == 0xffffffff ? 1 : m); - return new NBestFirstStratHitSinkPerThread(sink_, n, max, m); + return new NBestFirstStratHitSinkPerThread(sink_, n, max, m, defaultMapq_, threadId_); } private: HitSink& sink_; uint32_t n_; uint32_t max_; + int defaultMapq_; + size_t threadId_; }; /** @@ -1368,8 +1336,10 @@ class AllHitSinkPerThread : public HitSinkPerThread { public: AllHitSinkPerThread( HitSink& sink, - uint32_t max) : - HitSinkPerThread(sink, max, 0xffffffff) { } + uint32_t max, + int defaultMapq, + size_t threadId) : + HitSinkPerThread(sink, max, 0xffffffff, defaultMapq, threadId) { } virtual bool spanStrata() { return true; // we span strata @@ -1414,27 +1384,32 @@ class AllHitSinkPerThread : public HitSinkPerThread { class AllHitSinkPerThreadFactory : public HitSinkPerThreadFactory { public: AllHitSinkPerThreadFactory( - HitSink& sink, - uint32_t max) : - sink_(sink), - max_(max) - { } + HitSink& sink, + uint32_t max, + int defaultMapq, + size_t threadId) : + sink_(sink), + max_(max), + defaultMapq_(defaultMapq), + threadId_(threadId) { } /** * Allocate a new NGoodHitSinkPerThread object on the heap, * using the parameters given in the constructor. */ virtual HitSinkPerThread* create() const { - return new AllHitSinkPerThread(sink_, max_); + return new AllHitSinkPerThread(sink_, max_, defaultMapq_, threadId_); } virtual HitSinkPerThread* createMult(uint32_t m) const { uint32_t max = max_ * (max_ == 0xffffffff ? 1 : m); - return new AllHitSinkPerThread(sink_, max); + return new AllHitSinkPerThread(sink_, max, defaultMapq_, threadId_); } private: HitSink& sink_; uint32_t max_; + int defaultMapq_; + size_t threadId_; }; /** @@ -1448,11 +1423,24 @@ class ConciseHitSink : public HitSink { /** * Construct a single-stream ConciseHitSink (default) */ - ConciseHitSink(OutFileBuf* out, - int offBase, - DECL_HIT_DUMPS2, - bool reportOpps = false) : - HitSink(out, PASS_HIT_DUMPS2), + ConciseHitSink( + OutFileBuf* out, + int offBase, + const std::string& dumpAl, + const std::string& dumpUnal, + const std::string& dumpMax, + bool onePairFile, + bool sampleMax, + std::vector* refnames, + bool reportOpps = false) : + HitSink( + out, + dumpAl, + dumpUnal, + dumpMax, + onePairFile, + sampleMax, + refnames), _reportOpps(reportOpps), offBase_(offBase) { } @@ -1460,11 +1448,24 @@ class ConciseHitSink : public HitSink { * Construct a multi-stream ConciseHitSink with one stream per * reference string (see --refout) */ - ConciseHitSink(size_t numOuts, - int offBase, - DECL_HIT_DUMPS2, - bool reportOpps = false) : - HitSink(numOuts, PASS_HIT_DUMPS2), + ConciseHitSink( + size_t numOuts, + int offBase, + const std::string& dumpAl, + const std::string& dumpUnal, + const std::string& dumpMax, + bool onePairFile, + bool sampleMax, + std::vector* refnames, + bool reportOpps = false) : + HitSink( + numOuts, + dumpAl, + dumpUnal, + dumpMax, + onePairFile, + sampleMax, + refnames), _reportOpps(reportOpps), offBase_(offBase) { } @@ -1494,29 +1495,12 @@ class ConciseHitSink : public HitSink { /** * Append a verbose, readable hit to the given output stream. */ - void append(BTString& o, const Hit& h) { + virtual void append(BTString& o, const Hit& h, int mapq, int xms) { ConciseHitSink::append(o, h, this->offBase_, this->_reportOpps); } protected: - /** - * Report a concise alignment to the appropriate output stream. - */ - virtual void reportHit( - BTString& o, - const Hit& h, - bool lock = true) - { - append(o, h); - { - ThreadSafe _ts(_locks[refIdxToStreamIdx(h.h.first)], lock); - HitSink::reportHit(o, h, false); - out(h.h.first).writeString(o); - } - o.clear(); - } - private: bool _reportOpps; int offBase_; /// Add this to reference offsets before outputting. @@ -1547,78 +1531,106 @@ inline void printUptoWs(BTString& o, const std::string& str, bool ws) { * pat-name \t [-|+] \t ref-name \t ref-off \t pat \t qual \t #-alt-hits \t mm-list */ class VerboseHitSink : public HitSink { + public: + /** * Construct a single-stream VerboseHitSink (default) */ - VerboseHitSink(OutFileBuf* out, - int offBase, - bool colorSeq, - bool colorQual, - bool printCost, - const Bitset& suppressOuts, - ReferenceMap *rmap, - AnnotationMap *amap, - bool fullRef, - DECL_HIT_DUMPS2, - int partition = 0) : - HitSink(out, PASS_HIT_DUMPS2), - partition_(partition), - offBase_(offBase), - colorSeq_(colorSeq), - colorQual_(colorQual), - cost_(printCost), - suppress_(suppressOuts), - fullRef_(fullRef), - rmap_(rmap), amap_(amap) - { } + VerboseHitSink( + OutFileBuf* out, + int offBase, + bool colorSeq, + bool colorQual, + bool printCost, + const Bitset& suppressOuts, + ReferenceMap *rmap, + AnnotationMap *amap, + bool fullRef, + const std::string& dumpAl, + const std::string& dumpUnal, + const std::string& dumpMax, + bool onePairFile, + bool sampleMax, + std::vector* refnames, + int partition = 0) : + HitSink( + out, + dumpAl, + dumpUnal, + dumpMax, + onePairFile, + sampleMax, + refnames), + partition_(partition), + offBase_(offBase), + colorSeq_(colorSeq), + colorQual_(colorQual), + cost_(printCost), + suppress_(suppressOuts), + fullRef_(fullRef), + rmap_(rmap), amap_(amap) + { } /** * Construct a multi-stream VerboseHitSink with one stream per * reference string (see --refout) */ - VerboseHitSink(size_t numOuts, - int offBase, - bool colorSeq, - bool colorQual, - bool printCost, - const Bitset& suppressOuts, - ReferenceMap *rmap, - AnnotationMap *amap, - bool fullRef, - DECL_HIT_DUMPS2, - int partition = 0) : - HitSink(numOuts, PASS_HIT_DUMPS2), - partition_(partition), - offBase_(offBase), - colorSeq_(colorSeq), - colorQual_(colorQual), - cost_(printCost), - suppress_(64), - fullRef_(fullRef), - rmap_(rmap), - amap_(amap) - { } + VerboseHitSink( + size_t numOuts, + int offBase, + bool colorSeq, + bool colorQual, + bool printCost, + const Bitset& suppressOuts, + ReferenceMap *rmap, + AnnotationMap *amap, + bool fullRef, + const std::string& dumpAl, + const std::string& dumpUnal, + const std::string& dumpMax, + bool onePairFile, + bool sampleMax, + std::vector* refnames, + int partition = 0) : + HitSink( + numOuts, + dumpAl, + dumpUnal, + dumpMax, + onePairFile, + sampleMax, + refnames), + partition_(partition), + offBase_(offBase), + colorSeq_(colorSeq), + colorQual_(colorQual), + cost_(printCost), + suppress_(64), + fullRef_(fullRef), + rmap_(rmap), + amap_(amap) + { } - // In hit.cpp - static void append(BTString& o, - const Hit& h, - const vector* refnames, - ReferenceMap *rmap, - AnnotationMap *amap, - bool fullRef, - int partition, - int offBase, - bool colorSeq, - bool colorQual, - bool cost, - const Bitset& suppress); + static void append( + BTString& o, + const Hit& h, + const vector* refnames, + ReferenceMap *rmap, + AnnotationMap *amap, + bool fullRef, + int partition, + int offBase, + bool colorSeq, + bool colorQual, + bool cost, + const Bitset& suppress); /** * Append a verbose, readable hit to the output stream * corresponding to the hit. */ - virtual void append(BTString& o, const Hit& h) { + virtual void append(BTString& o, const Hit& h, int mapq, int xms) { VerboseHitSink::append(o, h, _refnames, rmap_, amap_, fullRef_, partition_, offBase_, colorSeq_, colorQual_, cost_, @@ -1629,46 +1641,9 @@ class VerboseHitSink : public HitSink { * See hit.cpp */ virtual void reportMaxed( - BTString& o, vector& hs, - PatternSourcePerThread& p, - bool lock = true, - size_t threadId=0); - -protected: - - /** - * Report a verbose, human-readable alignment to the appropriate - * output stream. - */ - virtual void reportHit( - BTString& o, - const Hit& h, - bool lock = true) - { - reportHit(o, h, true, lock); - } - - /** - * Report a verbose, human-readable alignment to the appropriate - * output stream. - */ - virtual void reportHit( - BTString& o, - const Hit& h, - bool count, - bool lock = true) - { - append(o, h); - { - ThreadSafe _ts(_locks[refIdxToStreamIdx(h.h.first)], lock); - if(count) { - HitSink::reportHit(o, h, false); - } - out(h.h.first).writeString(o); - } - o.clear(); - } + size_t threadId, + PatternSourcePerThread& p); private: int partition_; /// partition size, or 0 if partitioning is disabled @@ -1690,7 +1665,8 @@ class VerboseHitSink : public HitSink { class StubHitSink : public HitSink { public: StubHitSink() : HitSink(new OutFileBuf(".tmp"), "", "", "", false, false, NULL) { } - virtual void append(BTString& o, const Hit& h) { } + + virtual void append(BTString& o, const Hit& h, int mapq, int xms) { } }; #endif /*HIT_H_*/ diff --git a/sam.cpp b/sam.cpp index d77834d..853cb58 100644 --- a/sam.cpp +++ b/sam.cpp @@ -17,17 +17,18 @@ using namespace std; /** * Write the SAM header lines. */ -void SAMHitSink::appendHeaders(OutFileBuf& os, - size_t numRefs, - const vector& refnames, - bool color, - bool nosq, - ReferenceMap *rmap, - const TIndexOffU* plen, - bool fullRef, - bool noQnameTrunc, - const char *cmdline, - const char *rgline) +void SAMHitSink::appendHeaders( + OutFileBuf& os, + size_t numRefs, + const vector& refnames, + bool color, + bool nosq, + ReferenceMap *rmap, + const TIndexOffU* plen, + bool fullRef, + bool noQnameTrunc, + const char *cmdline, + const char *rgline) { BTString o; o << "@HD\tVN:1.0\tSO:unsorted\n"; @@ -53,29 +54,105 @@ void SAMHitSink::appendHeaders(OutFileBuf& os, } /** - * Append a SAM output record for an unaligned read. + * Report either an unaligned read or a read that exceeded the -m + * ceiling. We output placeholders for most of the fields in this + * case. */ -void SAMHitSink::appendAligned(BTString& o, - const Hit& h, - int mapq, - int xms, // value for XM:I field - const vector* refnames, - ReferenceMap *rmap, - AnnotationMap *amap, - bool fullRef, - bool noQnameTrunc, - int offBase) +void SAMHitSink::reportUnOrMax( + PatternSourcePerThread& p, + vector* hs, + size_t threadId, + bool un) { + if(un) { + HitSink::reportUnaligned(threadId, p); + } else { + HitSink::reportMaxed(*hs, threadId, p); + } + bool paired = !p.bufb().empty(); + assert(paired || p.bufa().mate == 0); + assert(!paired || p.bufa().mate > 0); + assert(un || hs->size() > 0); + assert(!un || hs == NULL || hs->size() == 0); + size_t hssz = 0; + if(hs != NULL) hssz = hs->size(); + maybeFlush(threadId, 0); + BTString& o = ptBufs_[threadId]; + for(int i = 0; i < (int)seqan::length(p.bufa().name) - (paired ? 2 : 0); i++) { + if(!noQnameTrunc_ && isspace((int)p.bufa().name[i])) break; + o << p.bufa().name[i]; + } + o << '\t' + << (SAM_FLAG_UNMAPPED | (paired ? (SAM_FLAG_PAIRED | SAM_FLAG_FIRST_IN_PAIR | SAM_FLAG_MATE_UNMAPPED) : 0)) << "\t*" + << "\t0\t0\t*\t*\t0\t0\t"; + for(size_t i = 0; i < seqan::length(p.bufa().patFw); i++) { + o << (char)p.bufa().patFw[i]; + } + o << '\t'; + for(size_t i = 0; i < seqan::length(p.bufa().qual); i++) { + o << (char)p.bufa().qual[i]; + } + o << "\tXM:i:" << (paired ? (hssz+1)/2 : hssz); + // Add optional fields reporting the primer base and the downstream color, + // which, if they were present, were clipped when the read was read in + if(p.bufa().color && gReportColorPrimer) { + if(p.bufa().primer != '?') { + o << "\tZP:Z:" << p.bufa().primer; + assert(isprint(p.bufa().primer)); + } + if(p.bufa().trimc != '?') { + o << "\tZp:Z:" << p.bufa().trimc; + assert(isprint(p.bufa().trimc)); + } + } + o << '\n'; + if(paired) { + // truncate final 2 chars + for(int i = 0; i < (int)seqan::length(p.bufb().name)-2; i++) { + o << p.bufb().name[i]; + } + o << '\t' + << (SAM_FLAG_UNMAPPED | (paired ? (SAM_FLAG_PAIRED | SAM_FLAG_SECOND_IN_PAIR | SAM_FLAG_MATE_UNMAPPED) : 0)) << "\t*" + << "\t0\t0\t*\t*\t0\t0\t"; + for(size_t i = 0; i < seqan::length(p.bufb().patFw); i++) { + o << (char)p.bufb().patFw[i]; + } + o << '\t'; + for(size_t i = 0; i < seqan::length(p.bufb().qual); i++) { + o << (char)p.bufb().qual[i]; + } + o << "\tXM:i:" << (hssz+1)/2; + // Add optional fields reporting the primer base and the downstream color, + // which, if they were present, were clipped when the read was read in + if(p.bufb().color && gReportColorPrimer) { + if(p.bufb().primer != '?') { + o << "\tZP:Z:" << p.bufb().primer; + assert(isprint(p.bufb().primer)); + } + if(p.bufb().trimc != '?') { + o << "\tZp:Z:" << p.bufb().trimc; + assert(isprint(p.bufb().trimc)); + } + } + o << '\n'; + } + ptCounts_[threadId]++; +} + +/** + * Append a SAM alignment to the given output stream. + */ +void SAMHitSink::append(BTString& o, const Hit& h, int mapq, int xms) { // QNAME if(h.mate > 0) { // truncate final 2 chars for(int i = 0; i < (int)seqan::length(h.patName)-2; i++) { - if(!noQnameTrunc && isspace((int)h.patName[i])) break; + if(!noQnameTrunc_ && isspace((int)h.patName[i])) break; o << h.patName[i]; } } else { for(int i = 0; i < (int)seqan::length(h.patName); i++) { - if(!noQnameTrunc && isspace((int)h.patName[i])) break; + if(!noQnameTrunc_ && isspace((int)h.patName[i])) break; o << h.patName[i]; } } @@ -91,10 +168,10 @@ void SAMHitSink::appendAligned(BTString& o, if(h.mate > 0 && !h.mfw) flags |= SAM_FLAG_MATE_STRAND; o << flags << "\t"; // RNAME - if(refnames != NULL && rmap != NULL) { - printUptoWs(o, rmap->getName(h.h.first), !fullRef); - } else if(refnames != NULL && h.h.first < refnames->size()) { - printUptoWs(o, (*refnames)[h.h.first], !fullRef); + if(_refnames != NULL && rmap_ != NULL) { + printUptoWs(o, rmap_->getName(h.h.first), !fullRef_); + } else if(_refnames != NULL && h.h.first < _refnames->size()) { + printUptoWs(o, (*_refnames)[h.h.first], !fullRef_); } else { o << h.h.first; } @@ -223,177 +300,16 @@ void SAMHitSink::appendAligned(BTString& o, } /** - * Report a verbose, human-readable alignment to the appropriate - * output stream. - */ -void SAMHitSink::reportSamHit(BTString& o, const Hit& h, int mapq, int xms) { - append(o, h, mapq, xms); - { - ThreadSafe _ts(_locks[refIdxToStreamIdx(h.h.first)]); - if(xms == 0) { - // Otherwise, this is actually a sampled read and belongs in - // the same category as maxed reads - HitSink::reportHit(o, h, false); - } - out(h.h.first).writeString(o); - } - o.clear(); -} - -/** - * Report a batch of hits from a vector, perhaps subsetting it. - */ -void SAMHitSink::reportSamHits( - BTString& o, - vector& hs, - size_t start, - size_t end, - int mapq, - int xms) -{ - assert_geq(end, start); - if(end-start == 0) return; - assert_gt(hs[start].mate, 0); - string buf(4096, (char) 0); - for(size_t i = start; i < end; i++) { - append(o, hs[i], mapq, xms); - } - { - ThreadSafe _ts(_locks[0]); - out(0).writeString(o); - } - o.clear(); - ThreadSafe ts(&main_mutex_m); - first_ = false; - numAligned_++; - numReportedPaired_ += (end-start); -} - -/** - * Report either an unaligned read or a read that exceeded the -m - * ceiling. We output placeholders for most of the fields in this - * case. - */ -void SAMHitSink::reportUnOrMax( - BTString& o, - PatternSourcePerThread& p, - vector* hs, - bool un, - bool lock, - size_t threadId) -{ - if(un) HitSink::reportUnaligned(o, p, threadId); - else HitSink::reportMaxed(o, *hs, p, threadId); - bool paired = !p.bufb().empty(); - assert(paired || p.bufa().mate == 0); - assert(!paired || p.bufa().mate > 0); - assert(un || hs->size() > 0); - assert(!un || hs == NULL || hs->size() == 0); - size_t hssz = 0; - if(hs != NULL) hssz = hs->size(); - if(paired) { - // truncate final 2 chars - for(int i = 0; i < (int)seqan::length(p.bufa().name)-2; i++) { - if(!noQnameTrunc_ && isspace((int)p.bufa().name[i])) break; - o << p.bufa().name[i]; - } - } else { - for(int i = 0; i < (int)seqan::length(p.bufa().name); i++) { - if(!noQnameTrunc_ && isspace((int)p.bufa().name[i])) break; - o << p.bufa().name[i]; - } - } - o << '\t' - << (SAM_FLAG_UNMAPPED | (paired ? (SAM_FLAG_PAIRED | SAM_FLAG_FIRST_IN_PAIR | SAM_FLAG_MATE_UNMAPPED) : 0)) << "\t*" - << "\t0\t0\t*\t*\t0\t0\t"; - for(size_t i = 0; i < seqan::length(p.bufa().patFw); i++) { - o << (char)p.bufa().patFw[i]; - } - o << '\t'; - for(size_t i = 0; i < seqan::length(p.bufa().qual); i++) { - o << (char)p.bufa().qual[i]; - } - o << "\tXM:i:" << (paired ? (hssz+1)/2 : hssz); - // Add optional fields reporting the primer base and the downstream color, - // which, if they were present, were clipped when the read was read in - if(p.bufa().color && gReportColorPrimer) { - if(p.bufa().primer != '?') { - o << "\tZP:Z:" << p.bufa().primer; - assert(isprint(p.bufa().primer)); - } - if(p.bufa().trimc != '?') { - o << "\tZp:Z:" << p.bufa().trimc; - assert(isprint(p.bufa().trimc)); - } - } - o << '\n'; - if(paired) { - // truncate final 2 chars - for(int i = 0; i < (int)seqan::length(p.bufb().name)-2; i++) { - o << p.bufb().name[i]; - } - o << '\t' - << (SAM_FLAG_UNMAPPED | (paired ? (SAM_FLAG_PAIRED | SAM_FLAG_SECOND_IN_PAIR | SAM_FLAG_MATE_UNMAPPED) : 0)) << "\t*" - << "\t0\t0\t*\t*\t0\t0\t"; - for(size_t i = 0; i < seqan::length(p.bufb().patFw); i++) { - o << (char)p.bufb().patFw[i]; - } - o << '\t'; - for(size_t i = 0; i < seqan::length(p.bufb().qual); i++) { - o << (char)p.bufb().qual[i]; - } - o << "\tXM:i:" << (hssz+1)/2; - // Add optional fields reporting the primer base and the downstream color, - // which, if they were present, were clipped when the read was read in - if(p.bufb().color && gReportColorPrimer) { - if(p.bufb().primer != '?') { - o << "\tZP:Z:" << p.bufb().primer; - assert(isprint(p.bufb().primer)); - } - if(p.bufb().trimc != '?') { - o << "\tZp:Z:" << p.bufb().trimc; - assert(isprint(p.bufb().trimc)); - } - } - o << '\n'; - } - { - ThreadSafe _ts(_locks[0], lock); - out(0).writeString(o); - } - o.clear(); -} - -/** - * Append a SAM alignment to the given output stream. - */ -void SAMHitSink::append( - BTString& o, - const Hit& h, - int mapq, - int xms, - const vector* refnames, - ReferenceMap *rmap, - AnnotationMap *amap, - bool fullRef, - bool noQnameTrunc, - int offBase) -{ - appendAligned(o, h, mapq, xms, refnames, rmap, amap, fullRef, noQnameTrunc, offBase); -} - -/** * Report maxed-out read; if sampleMax_ is set, then report 1 alignment * at random. */ void SAMHitSink::reportMaxed( - BTString& o, vector& hs, - PatternSourcePerThread& p, - size_t threadId) + size_t threadId, + PatternSourcePerThread& p) { if(sampleMax_) { - HitSink::reportMaxed(o, hs, p, threadId); + HitSink::reportMaxed(hs, threadId, p); RandomSource rand; rand.init(p.bufa().seed); assert_gt(hs.size(), 0); @@ -418,7 +334,7 @@ void SAMHitSink::reportMaxed( int strat = min(hs[i].stratum, hs[i+1].stratum); if(strat == bestStratum) { if(num == r) { - reportSamHits(o, hs, i, i+2, 0, (int)(hs.size()/2)+1); + reportHits(NULL, &hs, i, i+2, threadId, 0, (int)(hs.size()/2)+1, false); break; } num++; @@ -433,9 +349,9 @@ void SAMHitSink::reportMaxed( } assert_leq(num, hs.size()); uint32_t r = rand.nextU32() % num; - reportSamHit(o, hs[r], /*MAPQ*/0, /*XM:I*/(int)hs.size()+1); + reportHits(&hs[r], NULL, 0, 1, threadId, 0, (int)hs.size()+1, false); } } else { - reportUnOrMax(o, p, &hs, false, threadId); + reportUnOrMax(p, &hs, threadId, false); } } diff --git a/sam.h b/sam.h index 8b9316c..b6f1d45 100644 --- a/sam.h +++ b/sam.h @@ -40,81 +40,67 @@ class SAMHitSink : public HitSink { /** * Construct a single-stream VerboseHitSink (default) */ - SAMHitSink(OutFileBuf* out, - int offBase, - ReferenceMap *rmap, - AnnotationMap *amap, - bool fullRef, - bool noQnameTrunc, - int defaultMapq, - DECL_HIT_DUMPS2, - int nthreads) : - HitSink(out, PASS_HIT_DUMPS2, nthreads), - offBase_(offBase), defaultMapq_(defaultMapq), - rmap_(rmap), amap_(amap), fullRef_(fullRef), - noQnameTrunc_(noQnameTrunc) { } - - /** - * Construct a multi-stream VerboseHitSink with one stream per - * reference string (see --refout) - */ - SAMHitSink(size_t numOuts, - int offBase, - ReferenceMap *rmap, - AnnotationMap *amap, - bool fullRef, - int defaultMapq, - DECL_HIT_DUMPS2) : - HitSink(numOuts, PASS_HIT_DUMPS2), - offBase_(offBase), defaultMapq_(defaultMapq), - rmap_(rmap), amap_(amap), fullRef_(fullRef) { } - - /** - * Append a SAM alignment to the given output stream. - */ - static void append( - BTString& o, - const Hit& h, - int mapq, - int xms, - const vector* refnames, + SAMHitSink( + OutFileBuf* out, + int offBase, ReferenceMap *rmap, AnnotationMap *amap, bool fullRef, bool noQnameTrunc, - int offBase); + int defaultMapq, + const std::string& dumpAl, + const std::string& dumpUnal, + const std::string& dumpMax, + bool onePairFile, + bool sampleMax, + std::vector* refnames, + int nthreads) : + HitSink( + out, + dumpAl, + dumpUnal, + dumpMax, + onePairFile, + sampleMax, + refnames, + nthreads), + offBase_(offBase), defaultMapq_(defaultMapq), + rmap_(rmap), amap_(amap), fullRef_(fullRef), + noQnameTrunc_(noQnameTrunc) { } /** - * Append a SAM alignment for an aligned read to the given output - * stream. + * Construct a multi-stream VerboseHitSink with one stream per + * reference string (see --refout) */ - static void appendAligned( - BTString& o, - const Hit& h, - int mapq, - int xms, - const vector* refnames, + SAMHitSink( + size_t numOuts, + int offBase, ReferenceMap *rmap, AnnotationMap *amap, bool fullRef, - bool noQnameTrunc, - int offBase); + int defaultMapq, + const std::string& dumpAl, + const std::string& dumpUnal, + const std::string& dumpMax, + bool onePairFile, + bool sampleMax, + std::vector* refnames) : + HitSink( + numOuts, + dumpAl, + dumpUnal, + dumpMax, + onePairFile, + sampleMax, + refnames), + offBase_(offBase), defaultMapq_(defaultMapq), + rmap_(rmap), amap_(amap), fullRef_(fullRef) { } /** * Append a verbose, readable hit to the output stream * corresponding to the hit. */ - virtual void append(BTString& o, const Hit& h) { - SAMHitSink::append(o, h, defaultMapq_, 0, _refnames, rmap_, amap_, fullRef_, noQnameTrunc_, offBase_); - } - - /** - * Append a verbose, readable hit to the output stream - * corresponding to the hit. - */ - virtual void append(BTString& o, const Hit& h, int mapq, int xms) { - SAMHitSink::append(o, h, mapq, xms, _refnames, rmap_, amap_, fullRef_, noQnameTrunc_, offBase_); - } + virtual void append(BTString& o, const Hit& h, int mapq, int xms); /** * Write the SAM header lines. @@ -135,64 +121,30 @@ class SAMHitSink : public HitSink { protected: /** - * + * Both */ void reportUnOrMax( - BTString& o, PatternSourcePerThread& p, - vector* hs, - bool un, - bool lock = true, - size_t threadId = 0); - - /** - * Report a verbose, human-readable alignment to the appropriate - * output stream. - */ - virtual void reportHit(BTString& o, const Hit& h) { - reportSamHit(o, h, defaultMapq_, 0); - } - - /** - * Report a SAM alignment with the given mapping quality and XM - * field. - */ - virtual void reportSamHit( - BTString& o, - const Hit& h, - int mapq, - int xms); - - /** - * Report a batch of SAM alignments (e.g. two mates that should be - * printed together) with the given mapping quality and XM field. - */ - virtual void reportSamHits( - BTString& o, - vector& hs, - size_t start, - size_t end, - int mapq, - int xms); + vector* hs, // could be NULL + size_t threadId, + bool un); /** * See sam.cpp */ virtual void reportMaxed( - BTString& o, vector& hs, - PatternSourcePerThread& p, - size_t threadId); + size_t threadId, + PatternSourcePerThread& p); /** * See sam.cpp */ virtual void reportUnaligned( - BTString& o, - PatternSourcePerThread& p, - bool lock = true) + size_t threadId, + PatternSourcePerThread& p) { - reportUnOrMax(o, p, NULL, true, lock); + reportUnOrMax(p, NULL, threadId, true); } private: