forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 1
/
mmap_buffer.go
374 lines (305 loc) · 10 KB
/
mmap_buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
// Copyright (c) 2014 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following cacheitions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package impl
import (
"fmt"
"time"
"github.com/Jeffail/benthos/lib/types"
"github.com/Jeffail/benthos/lib/util/service/log"
"github.com/Jeffail/benthos/lib/util/service/metrics"
)
//------------------------------------------------------------------------------
// MmapBufferConfig is config options for a memory-map based buffer reader.
type MmapBufferConfig MmapCacheConfig
// NewMmapBufferConfig creates a MmapBufferConfig oject with default values.
func NewMmapBufferConfig() MmapBufferConfig {
return MmapBufferConfig(NewMmapCacheConfig())
}
// MmapBuffer is a buffer implemented around rotated memory mapped files.
type MmapBuffer struct {
config MmapBufferConfig
cache *MmapCache
logger log.Modular
stats metrics.Type
readFrom int
readIndex int
writtenTo int
writeIndex int
closed bool
}
// NewMmapBuffer creates a memory-map based buffer.
func NewMmapBuffer(config MmapBufferConfig, log log.Modular, stats metrics.Type) (*MmapBuffer, error) {
cache, err := NewMmapCache(MmapCacheConfig(config), log, stats)
if err != nil {
return nil, fmt.Errorf("MMAP Cache: %v", err)
}
cache.L.Lock()
defer cache.L.Unlock()
f := &MmapBuffer{
config: config,
cache: cache,
logger: log,
stats: stats,
readFrom: 0,
readIndex: 0,
writtenTo: 0,
writeIndex: 0,
closed: false,
}
f.readTracker()
f.logger.Infof("Storing messages to file in: %s\n", f.config.Path)
// Try to ensure both the starting write and read indexes are cached
if err = cache.EnsureCached(f.readIndex); err != nil {
return nil, fmt.Errorf("MMAP index read: %v", err)
}
if err = cache.EnsureCached(f.writeIndex); err != nil {
log.Errorf("MMAP index write: %v, benthos will block writes until this is resolved.\n", err)
}
go f.cacheManagerLoop(&f.writeIndex)
go f.cacheManagerLoop(&f.readIndex)
return f, nil
}
//------------------------------------------------------------------------------
// readTracker reads our cached values from the tracker file for recording
// reader/writer indexes.
func (f *MmapBuffer) readTracker() {
if !f.closed {
trackerBlock := f.cache.GetTracker()
f.writeIndex = readMessageSize(trackerBlock, 0)
f.writtenTo = readMessageSize(trackerBlock, 4)
f.readIndex = readMessageSize(trackerBlock, 8)
f.readFrom = readMessageSize(trackerBlock, 12)
}
}
// writeTracker writes our current state to the tracker.
func (f *MmapBuffer) writeTracker() {
if !f.closed {
trackerBlock := f.cache.GetTracker()
writeMessageSize(trackerBlock, 0, f.writeIndex)
writeMessageSize(trackerBlock, 4, f.writtenTo)
writeMessageSize(trackerBlock, 8, f.readIndex)
writeMessageSize(trackerBlock, 12, f.readFrom)
}
}
//------------------------------------------------------------------------------
// cacheManagerLoop continuously checks whether the cache contains maps of our
// next indexes.
func (f *MmapBuffer) cacheManagerLoop(indexPtr *int) {
bootstrapped := false
loop := func() bool {
f.cache.L.Lock()
defer f.cache.L.Unlock()
if f.closed {
return false
}
targetIndex := *indexPtr
if bootstrapped {
targetIndex++
}
if err := f.cache.EnsureCached(targetIndex); err != nil {
// Failed to read, log the error and wait before trying again.
f.logger.Errorf("Failed to cache mmap file for index %v: %v\n", targetIndex, err)
f.stats.Incr("cache.open.error", 1)
f.cache.L.Unlock()
<-time.After(time.Duration(f.config.RetryPeriodMS) * time.Millisecond)
f.cache.L.Lock()
} else if !bootstrapped {
bootstrapped = true
} else if *indexPtr < targetIndex {
// NOTE: It's possible that while we were waiting for ensure target
// was indexed the actual index caught up with us, in which case we
// should loop straight back into ensuring the new index rather than
// waiting.
f.cache.Wait()
}
return true
}
for loop() {
}
}
//------------------------------------------------------------------------------
// backlog reads the current backlog of messages stored.
func (f *MmapBuffer) backlog() int {
// NOTE: For speed, the following calculation assumes that all mmap files
// are the size of limit.
return ((f.writeIndex - f.readIndex) * f.config.FileSize) + f.writtenTo - f.readFrom
}
//------------------------------------------------------------------------------
// CloseOnceEmpty closes the mmap buffer once the backlog reaches 0.
func (f *MmapBuffer) CloseOnceEmpty() {
defer func() {
f.cache.L.Unlock()
f.Close()
}()
f.cache.L.Lock()
// Until the backlog is cleared.
for f.backlog() > 0 {
// Wait for a broadcast from our reader.
f.cache.Wait()
}
}
// Close unblocks any blocked calls and prevents further writing to the block.
func (f *MmapBuffer) Close() {
f.cache.L.Lock()
f.closed = true
f.cache.Broadcast()
f.cache.L.Unlock()
f.cache.L.Lock()
f.cache.RemoveAll()
f.cache.L.Unlock()
}
// ShiftMessage removes the last message. Returns the backlog count.
func (f *MmapBuffer) ShiftMessage() (int, error) {
f.cache.L.Lock()
defer func() {
f.writeTracker()
f.cache.Broadcast()
f.cache.L.Unlock()
}()
if !f.closed && f.cache.IsCached(f.readIndex) {
msgSize := readMessageSize(f.cache.Get(f.readIndex), f.readFrom)
f.readFrom = f.readFrom + int(msgSize) + 4
}
return f.backlog(), nil
}
// NextMessage reads the next message, blocks until there's something to read.
func (f *MmapBuffer) NextMessage() (types.Message, error) {
f.cache.L.Lock()
defer func() {
f.writeTracker()
f.cache.Broadcast()
f.cache.L.Unlock()
}()
// If reader is the same position as the writer then we wait.
for f.writeIndex == f.readIndex && f.readFrom == f.writtenTo && !f.closed {
f.cache.Wait()
}
if f.closed {
return types.Message{}, types.ErrTypeClosed
}
index := f.readFrom
block := f.cache.Get(f.readIndex)
msgSize := readMessageSize(block, index)
// Messages are written in a contiguous array of bytes, therefore when the
// writer reaches the end it will zero the next four bytes (zero size
// message) to indicate to the reader that it should move onto the next
// file.
for msgSize <= 0 {
// If we need to switch
for !f.cache.IsCached(f.readIndex+1) && !f.closed {
// Block until the next file is ready to read.
f.cache.Wait()
}
if f.closed {
return types.Message{}, types.ErrTypeClosed
}
// If we are meant to delete files as we are done with them
if f.config.CleanUp {
// The delete is done asynchronously as it has no impact on the
// reader
go func(prevIndex int) {
f.cache.L.Lock()
defer f.cache.L.Unlock()
// Remove and delete the previous index
f.cache.Remove(prevIndex)
f.cache.Delete(prevIndex)
}(f.readIndex)
}
f.readIndex = f.readIndex + 1
f.readFrom = 0
block = f.cache.Get(f.readIndex)
index = 0
f.cache.Broadcast()
// If reader is the same position as the writer then we wait.
for f.writeIndex == f.readIndex && f.readFrom == f.writtenTo && !f.closed {
f.cache.Wait()
}
if f.closed {
return types.Message{}, types.ErrTypeClosed
}
// Read the next message.
msgSize = readMessageSize(block, index)
}
index = index + 4
if index+int(msgSize) > len(block) {
return types.Message{}, types.ErrBlockCorrupted
}
return types.FromBytes(block[index : index+int(msgSize)])
}
// PushMessage pushes a new message, returns the backlog count.
func (f *MmapBuffer) PushMessage(msg types.Message) (int, error) {
f.cache.L.Lock()
defer func() {
f.writeTracker()
f.cache.Broadcast()
f.cache.L.Unlock()
}()
blob := msg.Bytes()
index := f.writtenTo
if len(blob)+4 > f.config.FileSize {
return 0, types.ErrMessageTooLarge
}
for !f.cache.IsCached(f.writeIndex) && !f.closed {
f.cache.Wait()
}
if f.closed {
return 0, types.ErrTypeClosed
}
block := f.cache.Get(f.writeIndex)
// If we can't fit our next message in the remainder of the buffer we will
// move onto the next file. In order to prevent the reader from reading
// garbage we set the next message size to 0, which tells the reader to loop
// back to index 0.
for len(blob)+4+index > len(block) {
// Write zeroes into remainder of the block.
for i := index; i < len(block) && i < index+4; i++ {
block[i] = byte(0)
}
// Wait until our next file is ready.
for !f.cache.IsCached(f.writeIndex+1) && !f.closed {
f.cache.Wait()
}
if f.closed {
return 0, types.ErrTypeClosed
}
// If the read index is behind then don't keep our writer block cached.
if f.readIndex < f.writeIndex-1 {
// But do not block while doing so.
go func(prevIndex int) {
f.cache.L.Lock()
defer f.cache.L.Unlock()
// Remove the previous index from cache.
f.cache.Remove(prevIndex)
}(f.writeIndex)
}
// Set counters
f.writeIndex = f.writeIndex + 1
f.writtenTo = 0
block = f.cache.Get(f.writeIndex)
index = 0
f.cache.Broadcast()
}
writeMessageSize(block, index, len(blob))
copy(block[index+4:], blob)
// Move writtenTo ahead.
f.writtenTo = (index + len(blob) + 4)
return f.backlog(), nil
}
//------------------------------------------------------------------------------