3
3
#include < fstream>
4
4
#include < cstdint>
5
5
6
+ #include " fastq_common.h"
6
7
// number of samrecords per buffer in each reader
7
8
constexpr size_t kSamRecordBufferSize = 10000 ;
8
-
9
9
#include " input_options.h"
10
10
#include " utilities.h"
11
11
@@ -20,14 +20,11 @@ constexpr size_t kSamRecordBufferSize = 10000;
20
20
#include < unordered_map>
21
21
#include < iostream>
22
22
#include < fstream>
23
- #include < condition_variable>
24
- #include < queue>
25
23
#include < cstdio>
26
24
#include < cstdlib>
27
25
#include < getopt.h>
28
26
#include < vector>
29
27
#include < functional>
30
- #include < mutex>
31
28
#include < stack>
32
29
33
30
// Overview of multithreading:
@@ -42,42 +39,28 @@ constexpr size_t kSamRecordBufferSize = 10000;
42
39
// the record pointer's arena that the record's memory is no longer in use.
43
40
// The arena can then give that pointer to its reader for a new read.
44
41
45
- // A pointer to a valid SamRecord waiting to be written to disk, and the index
46
- // of the g_read_arenas that pointer should be released to after the write.
47
- using PendingWrite = std::pair<SamRecord*, int >;
48
-
49
- constexpr int kWriteQueueShutdown = -1 ;
50
- class WriteQueue
42
+ PendingWrite WriteQueue::dequeueWrite ()
51
43
{
52
- public:
53
- PendingWrite dequeueWrite ()
54
- {
55
- std::unique_lock<std::mutex> lock (mutex_);
56
- cv_.wait (lock, [&] { return !queue_.empty (); });
57
- auto pair = queue_.front ();
58
- queue_.pop ();
59
- return pair;
60
- }
61
- void enqueueWrite (PendingWrite write)
62
- {
63
- mutex_.lock ();
64
- queue_.push (write );
65
- mutex_.unlock ();
66
- cv_.notify_one ();
67
- }
68
- void enqueueShutdownSignal ()
69
- {
70
- mutex_.lock ();
71
- queue_.push (std::make_pair (nullptr , kWriteQueueShutdown ));
72
- mutex_.unlock ();
73
- cv_.notify_one ();
74
- }
75
- private:
76
- std::mutex mutex_;
77
- std::condition_variable cv_;
78
- std::queue<PendingWrite> queue_;
79
- };
80
-
44
+ std::unique_lock<std::mutex> lock (mutex_);
45
+ cv_.wait (lock, [&] { return !queue_.empty (); });
46
+ auto pair = queue_.front ();
47
+ queue_.pop ();
48
+ return pair;
49
+ }
50
+ void WriteQueue::enqueueWrite (PendingWrite write)
51
+ {
52
+ mutex_.lock ();
53
+ queue_.push (write );
54
+ mutex_.unlock ();
55
+ cv_.notify_one ();
56
+ }
57
+ void WriteQueue::enqueueShutdownSignal ()
58
+ {
59
+ mutex_.lock ();
60
+ queue_.push (std::make_pair (nullptr , kShutdown ));
61
+ mutex_.unlock ();
62
+ cv_.notify_one ();
63
+ }
81
64
std::vector<std::unique_ptr<WriteQueue>> g_write_queues;
82
65
83
66
// I wrote this class to stay close to the performance characteristics of the
@@ -123,7 +106,10 @@ class SamRecordArena
123
106
};
124
107
125
108
std::vector<std::unique_ptr<SamRecordArena>> g_read_arenas;
126
-
109
+ void releaseReaderThreadMemory (int reader_thread_index, SamRecord* samRecord)
110
+ {
111
+ g_read_arenas[reader_thread_index]->releaseSamRecordMemory (samRecord);
112
+ }
127
113
128
114
129
115
void writeFastqRecord (ogzstream& r1_out, ogzstream& r2_out, SamRecord* sam)
@@ -149,7 +135,7 @@ void fastqWriterThread(int write_thread_index)
149
135
while (true )
150
136
{
151
137
auto [sam, source_reader_index] = g_write_queues[write_thread_index]->dequeueWrite ();
152
- if (source_reader_index == kWriteQueueShutdown )
138
+ if (source_reader_index == WriteQueue:: kShutdown )
153
139
break ;
154
140
155
141
writeFastqRecord (r1_out, r2_out, sam);
@@ -186,7 +172,7 @@ void bamWriterThread(int write_thread_index, std::string sample_id)
186
172
while (true )
187
173
{
188
174
auto [sam, source_reader_index] = g_write_queues[write_thread_index]->dequeueWrite ();
189
- if (source_reader_index == kWriteQueueShutdown )
175
+ if (source_reader_index == WriteQueue:: kShutdown )
190
176
break ;
191
177
192
178
samOut.WriteRecord (samHeader, *sam);
@@ -289,7 +275,8 @@ void fastQFileReaderThread(
289
275
int reader_thread_index, std::string filenameI1, String filenameR1,
290
276
String filenameR2, const WhiteListData* white_list_data,
291
277
std::function <void (SamRecord*, FastQFile*, FastQFile*, FastQFile*, bool )> sam_record_filler,
292
- std::function <std::string(SamRecord*, FastQFile*, FastQFile*, FastQFile*, bool )> barcode_getter)
278
+ std::function <std::string(SamRecord*, FastQFile*, FastQFile*, FastQFile*, bool )> barcode_getter,
279
+ std::function<void(WriteQueue*, SamRecord*, int )> output_handler)
293
280
{
294
281
// / setting the shortest sequence allowed to be read
295
282
FastQFile fastQFileI1 (4 , 4 );
@@ -348,7 +335,7 @@ void fastQFileReaderThread(
348
335
barcode, samrec, white_list_data, &n_barcode_corrected, &n_barcode_correct,
349
336
&n_barcode_errors, g_write_queues.size ());
350
337
351
- g_write_queues[bam_bucket]-> enqueueWrite ( std::make_pair ( samrec, reader_thread_index) );
338
+ output_handler ( g_write_queues[bam_bucket]. get (), samrec, reader_thread_index);
352
339
353
340
if (total_reads % 10000000 == 0 )
354
341
{
@@ -377,7 +364,8 @@ void mainCommon(
377
364
std::vector<std::string> I1s, std::vector<std::string> R1s, std::vector<std::string> R2s,
378
365
std::string sample_id,
379
366
std::function <void (SamRecord*, FastQFile*, FastQFile*, FastQFile*, bool )> sam_record_filler,
380
- std::function <std::string(SamRecord*, FastQFile*, FastQFile*, FastQFile*, bool )> barcode_getter)
367
+ std::function <std::string(SamRecord*, FastQFile*, FastQFile*, FastQFile*, bool )> barcode_getter,
368
+ std::function<void(WriteQueue*, SamRecord*, int )> output_handler)
381
369
{
382
370
std::cout << " reading whitelist file " << white_list_file << " ..." ;
383
371
// stores barcode correction map and vector of correct barcodes
@@ -410,7 +398,7 @@ void mainCommon(
410
398
411
399
g_read_arenas.push_back (std::make_unique<SamRecordArena>());
412
400
readers.emplace_back (fastQFileReaderThread, i, I1.c_str (), R1s[i].c_str (),
413
- R2s[i].c_str (), &white_list_data, sam_record_filler, barcode_getter);
401
+ R2s[i].c_str (), &white_list_data, sam_record_filler, barcode_getter, output_handler );
414
402
}
415
403
416
404
for (auto & reader : readers)
@@ -422,5 +410,5 @@ void mainCommon(
422
410
write_queue->enqueueShutdownSignal ();
423
411
424
412
for (auto & writer : writers)
425
- writer.join ();
413
+ writer.join ();
426
414
}
0 commit comments