forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 1
/
memory.go
250 lines (207 loc) · 6.71 KB
/
memory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
// Copyright (c) 2014 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package impl
import (
"sync"
"github.com/Jeffail/benthos/lib/types"
)
//------------------------------------------------------------------------------
// MemoryConfig is config values for a purely memory based ring buffer type.
type MemoryConfig struct {
Limit int `json:"limit" yaml:"limit"`
}
// NewMemoryConfig creates a new MemoryConfig with default values.
func NewMemoryConfig() MemoryConfig {
return MemoryConfig{
Limit: 1024 * 1024 * 500, // 500MB
}
}
// Memory is a purely memory based ring buffer. This buffer blocks when the
// buffer is full.
type Memory struct {
config MemoryConfig
block []byte
readFrom int
writtenTo int
closed bool
cond *sync.Cond
}
// NewMemory creates a new memory based ring buffer.
func NewMemory(config MemoryConfig) *Memory {
return &Memory{
config: config,
block: make([]byte, config.Limit),
readFrom: 0,
writtenTo: 0,
closed: false,
cond: sync.NewCond(&sync.Mutex{}),
}
}
//------------------------------------------------------------------------------
// backlog reads the current backlog of messages stored.
func (m *Memory) backlog() int {
if m.writtenTo >= m.readFrom {
return m.writtenTo - m.readFrom
}
return m.config.Limit - m.readFrom + m.writtenTo
}
// readMessageSize reads the size in bytes of a serialised message block
// starting at index.
func readMessageSize(block []byte, index int) int {
if index+3 >= len(block) {
return 0
}
return int(block[0+index])<<24 |
int(block[1+index])<<16 |
int(block[2+index])<<8 |
int(block[3+index])
}
// writeMessageSize writes the size in bytes of a serialised message block
// starting at index.
func writeMessageSize(block []byte, index int, size int) {
block[index+0] = byte(size >> 24)
block[index+1] = byte(size >> 16)
block[index+2] = byte(size >> 8)
block[index+3] = byte(size)
}
//------------------------------------------------------------------------------
// CloseOnceEmpty closes the memory buffer once the backlog reaches 0.
func (m *Memory) CloseOnceEmpty() {
defer func() {
m.cond.L.Unlock()
m.Close()
}()
m.cond.L.Lock()
// Until the backlog is cleared.
for m.backlog() > 0 {
// Wait for a broadcast from our reader.
m.cond.Wait()
}
}
// Close unblocks any blocked calls and prevents further writing to the block.
func (m *Memory) Close() {
m.cond.L.Lock()
m.closed = true
m.cond.Broadcast()
m.cond.L.Unlock()
}
// ShiftMessage removes the last message from the block. Returns the backlog
// count.
func (m *Memory) ShiftMessage() (int, error) {
m.cond.L.Lock()
defer func() {
m.cond.Broadcast()
m.cond.L.Unlock()
}()
msgSize := readMessageSize(m.block, m.readFrom)
// Messages are written in a contiguous array of bytes, therefore when the
// writer reaches the end it will zero the next four bytes (zero size
// message) to indicate to the reader that it has looped back to index 0.
if msgSize <= 0 {
m.readFrom = 0
msgSize = readMessageSize(m.block, m.readFrom)
}
// Set new read from position to next message start.
m.readFrom = m.readFrom + int(msgSize) + 4
return m.backlog(), nil
}
// NextMessage reads the next message, this call blocks until there's something
// to read.
func (m *Memory) NextMessage() (types.Message, error) {
m.cond.L.Lock()
defer m.cond.L.Unlock()
index := m.readFrom
for index == m.writtenTo && !m.closed {
m.cond.Wait()
}
if m.closed {
return types.Message{}, types.ErrTypeClosed
}
msgSize := readMessageSize(m.block, index)
// Messages are written in a contiguous array of bytes, therefore when the
// writer reaches the end it will zero the next four bytes (zero size
// message) to indicate to the reader that it has looped back to index 0.
if msgSize <= 0 {
index = 0
for index == m.writtenTo && !m.closed {
m.cond.Wait()
}
if m.closed {
return types.Message{}, types.ErrTypeClosed
}
msgSize = readMessageSize(m.block, index)
}
index = index + 4
if index+int(msgSize) > m.config.Limit {
return types.Message{}, types.ErrBlockCorrupted
}
return types.FromBytes(m.block[index : index+int(msgSize)])
}
// PushMessage pushes a new message onto the block, returns the backlog count.
func (m *Memory) PushMessage(msg types.Message) (int, error) {
m.cond.L.Lock()
defer func() {
m.cond.Broadcast()
m.cond.L.Unlock()
}()
block := msg.Bytes()
index := m.writtenTo
if len(block)+4 > m.config.Limit {
return 0, types.ErrMessageTooLarge
}
// Block while the reader is catching up.
for m.readFrom > index && m.readFrom <= index+len(block)+4 {
m.cond.Wait()
}
if m.closed {
return 0, types.ErrTypeClosed
}
// If we can't fit our next message in the remainder of the buffer we will
// loop back to index 0. In order to prevent the reader from reading garbage
// we set the next message size to 0, which tells the reader to loop back to
// index 0.
if len(block)+4+index > m.config.Limit {
// If the reader is currently at 0 then we avoid looping over it.
for m.readFrom <= len(block)+4 && !m.closed {
m.cond.Wait()
}
if m.closed {
return 0, types.ErrTypeClosed
}
for i := index; i < m.config.Limit && i < index+4; i++ {
m.block[i] = byte(0)
}
index = 0
}
// Block again if the reader is catching up.
for m.readFrom > index && m.readFrom <= index+len(block)+4 && !m.closed {
m.cond.Wait()
}
if m.closed {
return 0, types.ErrTypeClosed
}
writeMessageSize(m.block, index, len(block))
copy(m.block[index+4:], block)
// Move writtenTo index ahead. If writtenTo becomes m.config.Limit we want
// it to wrap back to 0
m.writtenTo = (index + len(block) + 4) % m.config.Limit
return m.backlog(), nil
}
//------------------------------------------------------------------------------