forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 1
/
mmap_cache.go
286 lines (240 loc) · 7.93 KB
/
mmap_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
// Copyright (c) 2014 Ashley Jeffs
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package single
import (
"errors"
"fmt"
"os"
"path"
"sync"
"github.com/Jeffail/benthos/lib/util/disk"
"github.com/Jeffail/benthos/lib/util/service/log"
"github.com/Jeffail/benthos/lib/util/service/metrics"
mmap "github.com/edsrzf/mmap-go"
)
//------------------------------------------------------------------------------
// MmapCacheConfig is config options for the MmapCache type.
type MmapCacheConfig struct {
Path string `json:"directory" yaml:"directory"`
FileSize int `json:"file_size" yaml:"file_size"`
RetryPeriodMS int `json:"retry_period_ms" yaml:"retry_period_ms"`
CleanUp bool `json:"clean_up" yaml:"clean_up"`
ReservedDiskSpace uint64 `json:"reserved_disk_space" yaml:"reserved_disk_space"`
}
// NewMmapCacheConfig creates a new MmapCacheConfig oject with default values.
func NewMmapCacheConfig() MmapCacheConfig {
return MmapCacheConfig{
Path: "",
FileSize: 250 * 1024 * 1024, // 250MiB
RetryPeriodMS: 1000, // 1 second
CleanUp: true,
ReservedDiskSpace: 100 * 1024 * 1024, // 50MiB
}
}
// CachedMmap is a struct containing a cached Mmap file and the file handler.
type CachedMmap struct {
f *os.File
m mmap.MMap
}
// MmapCache keeps track of any Mmap files cached in memory and cleans up
// resources as they are unclaimed. This type works similarly to sync.Cond,
// where if you wish to use it you need to lock it.
type MmapCache struct {
config MmapCacheConfig
logger log.Modular
stats metrics.Type
tracker CachedMmap
cache map[int]CachedMmap
inProgress map[int]struct{}
*sync.Cond
}
// NewMmapCache creates a cache for managing open mmap files.
func NewMmapCache(config MmapCacheConfig, log log.Modular, stats metrics.Type) (*MmapCache, error) {
f := &MmapCache{
config: config,
logger: log,
stats: stats,
cache: make(map[int]CachedMmap),
inProgress: make(map[int]struct{}),
Cond: sync.NewCond(&sync.Mutex{}),
}
if err := f.openTracker(); err != nil {
return nil, err
}
return f, nil
}
//------------------------------------------------------------------------------
var (
// ErrWrongTrackerLength means the length of a read tracker was not correct.
ErrWrongTrackerLength = errors.New("tracker was unexpected length")
// ErrNotEnoughSpace means the target disk lacked the space needed for a new
// file.
ErrNotEnoughSpace = errors.New("target disk is at capacity")
)
// openTracker opens a tracker file for recording reader and writer indexes.
func (f *MmapCache) openTracker() error {
defer f.Broadcast()
var fileInfo os.FileInfo
var err error
// Attempt to create the directory tree, ignore errors.
os.MkdirAll(f.config.Path, 0755)
fPath := path.Join(f.config.Path, "tracker")
fileInfo, err = os.Stat(fPath)
// If the tracker file doesn't exist we make a blank one.
if os.IsNotExist(err) {
f.tracker.f, err = os.Create(fPath)
block := make([]byte, 16)
if err == nil {
_, err = f.tracker.f.Write(block)
}
} else if err == nil && fileInfo.Size() == 16 {
f.tracker.f, err = os.OpenFile(fPath, os.O_RDWR, 0644)
} else if err == nil {
err = ErrWrongTrackerLength
}
// Create the memory mapping.
if err == nil {
f.tracker.m, err = mmap.MapRegion(f.tracker.f, 16, mmap.RDWR, 0, 0)
}
return err
}
//------------------------------------------------------------------------------
// GetTracker returns the []byte from the tracker file memory mapping.
func (f *MmapCache) GetTracker() []byte {
return f.tracker.m
}
// Get returns the []byte from a memory mapped file index.
func (f *MmapCache) Get(index int) []byte {
if c, exists := f.cache[index]; exists {
return c.m
}
return []byte{}
}
// EnsureCached checks that a particular index is cached, and if not then read
// the index, this call blocks until either the index is successfully cached or
// an error occurs.
func (f *MmapCache) EnsureCached(index int) error {
var cache CachedMmap
var err error
// If we are already in the process of caching this index wait until that
// attempt is finished.
if _, inProgress := f.inProgress[index]; inProgress {
for inProgress {
f.Wait()
_, inProgress = f.inProgress[index]
}
}
// If the index is cached then return nil.
if f.IsCached(index) {
return nil
}
// Place the index in our inProgress map to indicate we are caching it on
// this goroutine.
f.inProgress[index] = struct{}{}
// Unlock our mutex as we are about to perform blocking, thread safe
// operations.
f.L.Unlock()
// Prefix index files with "mmap_"
fPath := path.Join(f.config.Path, fmt.Sprintf("mmap_%v", index))
// Check if file already exists
_, err = os.Stat(fPath)
if os.IsNotExist(err) {
// If we lack the space needed (reserved space + file size) then return
// error
if uint64(f.config.FileSize)+f.config.ReservedDiskSpace >
disk.TotalRemaining(f.config.Path) {
err = ErrNotEnoughSpace
} else {
// If not then we create it with our configured file size
if cache.f, err = os.Create(fPath); err == nil {
block := make([]byte, f.config.FileSize)
if _, err = cache.f.Write(block); err != nil {
os.Remove(fPath)
}
}
}
} else if err == nil {
cache.f, err = os.OpenFile(fPath, os.O_RDWR, 0644)
}
// Lock our mutex again
f.L.Lock()
// Defer broadcast and deletion of inProgress flag.
defer func() {
delete(f.inProgress, index)
f.Broadcast()
}()
// Create the memory mapping.
if err == nil {
cache.m, err = mmap.Map(cache.f, mmap.RDWR, 0)
if err != nil {
cache.f.Close()
os.Remove(fPath)
} else {
f.cache[index] = cache
}
}
return err
}
// IsCached returns a bool indicating whether the current memory mapped file
// index is cached.
func (f *MmapCache) IsCached(index int) bool {
_, exists := f.cache[index]
return exists
}
// RemoveAll removes all indexes from the cache as well as the tracker.
func (f *MmapCache) RemoveAll() error {
for _, c := range f.cache {
c.m.Flush()
c.m.Unmap()
c.f.Close()
}
f.cache = map[int]CachedMmap{}
f.tracker.m.Flush()
f.tracker.m.Unmap()
f.tracker.f.Close()
f.tracker = CachedMmap{}
return nil
}
// Remove removes the index from our cache, the file is NOT deleted.
func (f *MmapCache) Remove(index int) error {
if c, ok := f.cache[index]; ok {
delete(f.cache, index)
// Now we are flushing the cache, this could block so we unlock
// temporarily.
f.L.Unlock()
defer f.L.Lock()
// TODO: What happens if we subsequently opened the same map file during
// this operation?
c.m.Flush()
c.m.Unmap()
c.f.Close()
}
return nil
}
// Delete deletes the file for an index.
func (f *MmapCache) Delete(index int) error {
p := path.Join(f.config.Path, fmt.Sprintf("mmap_%v", index))
// This could be a blocking call, and there's no reason to keep the cache
// locked.
f.L.Unlock()
defer f.L.Lock()
return os.Remove(p)
}
//------------------------------------------------------------------------------