-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
inmemory_part.go
155 lines (129 loc) · 3.83 KB
/
inmemory_part.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package logstorage
import (
"path/filepath"
"sort"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
// inmemoryPart is an in-memory part.
type inmemoryPart struct {
// ph contains partHeader information for the given in-memory part.
ph partHeader
metaindex bytesutil.ByteBuffer
index bytesutil.ByteBuffer
columnsHeader bytesutil.ByteBuffer
timestamps bytesutil.ByteBuffer
fieldValues bytesutil.ByteBuffer
fieldBloomFilter bytesutil.ByteBuffer
messageValues bytesutil.ByteBuffer
messageBloomFilter bytesutil.ByteBuffer
}
// reset resets mp, so it can be re-used
func (mp *inmemoryPart) reset() {
mp.ph.reset()
mp.metaindex.Reset()
mp.index.Reset()
mp.columnsHeader.Reset()
mp.timestamps.Reset()
mp.fieldValues.Reset()
mp.fieldBloomFilter.Reset()
mp.messageValues.Reset()
mp.messageBloomFilter.Reset()
}
// mustInitFromRows initializes mp from lr.
func (mp *inmemoryPart) mustInitFromRows(lr *LogRows) {
mp.reset()
if len(lr.timestamps) == 0 {
return
}
sort.Sort(lr)
bsw := getBlockStreamWriter()
bsw.MustInitForInmemoryPart(mp)
trs := getTmpRows()
var sidPrev *streamID
uncompressedBlockSizeBytes := uint64(0)
timestamps := lr.timestamps
rows := lr.rows
streamIDs := lr.streamIDs
for i := range timestamps {
streamID := &streamIDs[i]
if sidPrev == nil {
sidPrev = streamID
}
if uncompressedBlockSizeBytes >= maxUncompressedBlockSize || !streamID.equal(sidPrev) {
bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows)
trs.reset()
sidPrev = streamID
uncompressedBlockSizeBytes = 0
}
fields := rows[i]
trs.timestamps = append(trs.timestamps, timestamps[i])
trs.rows = append(trs.rows, fields)
uncompressedBlockSizeBytes += uncompressedRowSizeBytes(fields)
}
bsw.MustWriteRows(sidPrev, trs.timestamps, trs.rows)
putTmpRows(trs)
bsw.Finalize(&mp.ph)
putBlockStreamWriter(bsw)
}
// MustStoreToDisk stores mp to disk at the given path.
func (mp *inmemoryPart) MustStoreToDisk(path string) {
fs.MustMkdirFailIfExist(path)
metaindexPath := filepath.Join(path, metaindexFilename)
indexPath := filepath.Join(path, indexFilename)
columnsHeaderPath := filepath.Join(path, columnsHeaderFilename)
timestampsPath := filepath.Join(path, timestampsFilename)
fieldValuesPath := filepath.Join(path, fieldValuesFilename)
fieldBloomFilterPath := filepath.Join(path, fieldBloomFilename)
messageValuesPath := filepath.Join(path, messageValuesFilename)
messageBloomFilterPath := filepath.Join(path, messageBloomFilename)
fs.MustWriteSync(metaindexPath, mp.metaindex.B)
fs.MustWriteSync(indexPath, mp.index.B)
fs.MustWriteSync(columnsHeaderPath, mp.columnsHeader.B)
fs.MustWriteSync(timestampsPath, mp.timestamps.B)
fs.MustWriteSync(fieldValuesPath, mp.fieldValues.B)
fs.MustWriteSync(fieldBloomFilterPath, mp.fieldBloomFilter.B)
fs.MustWriteSync(messageValuesPath, mp.messageValues.B)
fs.MustWriteSync(messageBloomFilterPath, mp.messageBloomFilter.B)
mp.ph.mustWriteMetadata(path)
fs.MustSyncPath(path)
// Do not sync parent directory - it must be synced by the caller.
}
// tmpRows is used as a helper for inmemoryPart.mustInitFromRows()
type tmpRows struct {
timestamps []int64
rows [][]Field
}
func (trs *tmpRows) reset() {
trs.timestamps = trs.timestamps[:0]
rows := trs.rows
for i := range rows {
rows[i] = nil
}
trs.rows = rows[:0]
}
func getTmpRows() *tmpRows {
v := tmpRowsPool.Get()
if v == nil {
return &tmpRows{}
}
return v.(*tmpRows)
}
func putTmpRows(trs *tmpRows) {
trs.reset()
tmpRowsPool.Put(trs)
}
var tmpRowsPool sync.Pool
func getInmemoryPart() *inmemoryPart {
v := inmemoryPartPool.Get()
if v == nil {
return &inmemoryPart{}
}
return v.(*inmemoryPart)
}
func putInmemoryPart(mp *inmemoryPart) {
mp.reset()
inmemoryPartPool.Put(mp)
}
var inmemoryPartPool sync.Pool