/
outq.h
129 lines (129 loc) · 2.54 KB
/
outq.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#ifndef OUTQ_H_
#define OUTQ_H_
#include "assert_helpers.h"
#include "ds.h"
#include "sstring.h"
#include "read.h"
#include "threading.h"
#include "mem_ids.h"
#include <vector>
class OutputQueue
{
static const size_t NFLUSH_THRESH = 8;
public:
OutputQueue(
OutFileBuf &obuf,
bool reorder,
size_t nthreads,
bool threadSafe,
int perThreadBufSize,
TReadId rdid = 0) : obuf_(obuf),
cur_(rdid),
nfinished_(0),
nflushed_(0),
lines_(RES_CAT),
started_(RES_CAT),
finished_(RES_CAT),
reorder_(reorder),
threadSafe_(threadSafe),
mutex_m(),
nthreads_(nthreads),
perThreadBuf(NULL),
perThreadCounter(NULL),
perThreadBufSize_(perThreadBufSize)
{
nstarted_ = 0;
assert(nthreads_ <= 2 || threadSafe);
if (!reorder)
{
perThreadBuf = new BTString *[nthreads_];
perThreadCounter = new int[nthreads_];
size_t i = 0;
for (i = 0; i < nthreads_; i++)
{
perThreadBuf[i] = new BTString[perThreadBufSize_];
perThreadCounter[i] = 0;
}
}
}
~OutputQueue()
{
if (perThreadBuf != NULL)
{
for (size_t i = 0; i < nthreads_; i++)
{
delete[] perThreadBuf[i];
}
delete[] perThreadBuf;
delete[] perThreadCounter;
}
}
void beginRead(TReadId rdid, size_t threadId);
void finishRead(const BTString &rec, TReadId rdid, size_t threadId);
size_t size() const
{
return lines_.size();
}
TReadId numFlushed() const
{
return nflushed_;
}
TReadId numStarted() const
{
return nstarted_;
}
TReadId numFinished() const
{
return nfinished_;
}
void flush(bool force = false, bool getLock = true);
protected:
OutFileBuf &obuf_;
TReadId cur_;
#ifdef WITH_TBB
std::atomic<TReadId> nstarted_;
#else
TReadId nstarted_;
#endif
TReadId nfinished_;
TReadId nflushed_;
EList<BTString> lines_;
EList<bool> started_;
EList<bool> finished_;
bool reorder_;
bool threadSafe_;
MUTEX_T mutex_m;
size_t nthreads_;
BTString **perThreadBuf;
int *perThreadCounter;
int perThreadBufSize_;
private:
void flushImpl(bool force);
void beginReadImpl(TReadId rdid, size_t threadId);
void finishReadImpl(const BTString &rec, TReadId rdid, size_t threadId);
};
class OutputQueueMark
{
public:
OutputQueueMark(
OutputQueue &q,
const BTString &rec,
TReadId rdid,
size_t threadId) : q_(q),
rec_(rec),
rdid_(rdid),
threadId_(threadId)
{
q_.beginRead(rdid, threadId);
}
~OutputQueueMark()
{
q_.finishRead(rec_, rdid_, threadId_);
}
protected:
OutputQueue &q_;
const BTString &rec_;
TReadId rdid_;
size_t threadId_;
};
#endif