-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
open.go
481 lines (442 loc) · 16.5 KB
/
open.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package storage
import (
"cmp"
"context"
"slices"
"time"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/fs"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/pebble/wal"
)
// A ConfigOption may be passed to Open to configure the storage engine.
type ConfigOption func(cfg *engineConfig) error
// CombineOptions combines many options into one.
func CombineOptions(opts ...ConfigOption) ConfigOption {
return func(cfg *engineConfig) error {
for _, opt := range opts {
if err := opt(cfg); err != nil {
return err
}
}
return nil
}
}
// MustExist configures an engine to error on Open if the target directory
// does not contain an initialized store.
var MustExist ConfigOption = func(cfg *engineConfig) error {
cfg.MustExist = true
return nil
}
// DisableAutomaticCompactions configures an engine to be opened with disabled
// automatic compactions. Used primarily for debugCompactCmd.
var DisableAutomaticCompactions ConfigOption = func(cfg *engineConfig) error {
cfg.Opts.DisableAutomaticCompactions = true
return nil
}
// ForceWriterParallelism configures an engine to be opened with disabled
// automatic compactions. Used primarily for debugCompactCmd.
var ForceWriterParallelism ConfigOption = func(cfg *engineConfig) error {
cfg.Opts.Experimental.ForceWriterParallelism = true
return nil
}
// ForTesting configures the engine for use in testing. It may randomize some
// config options to improve test coverage.
var ForTesting ConfigOption = func(cfg *engineConfig) error {
cfg.onClose = append(cfg.onClose, func(p *Pebble) {
m := p.db.Metrics()
if m.Keys.MissizedTombstonesCount > 0 {
// A missized tombstone is a Pebble DELSIZED tombstone that encodes
// the wrong size of the value it deletes. This kind of tombstone is
// written when ClearOptions.ValueSizeKnown=true. If this assertion
// failed, something might be awry in the code clearing the key. Are
// we feeding the wrong value length to ValueSize?
panic(errors.AssertionFailedf("expected to find 0 missized tombstones; found %d", m.Keys.MissizedTombstonesCount))
}
})
return nil
}
// Attributes configures the engine's attributes.
func Attributes(attrs roachpb.Attributes) ConfigOption {
return func(cfg *engineConfig) error {
cfg.Attrs = attrs
return nil
}
}
// MaxSize sets the intended maximum store size. MaxSize is used for
// calculating free space and making rebalancing decisions.
func MaxSize(size int64) ConfigOption {
return func(cfg *engineConfig) error {
cfg.MaxSize = size
return nil
}
}
// BlockSize sets the engine block size, primarily for testing purposes.
func BlockSize(size int) ConfigOption {
return func(cfg *engineConfig) error {
for i := range cfg.Opts.Levels {
cfg.Opts.Levels[i].BlockSize = size
cfg.Opts.Levels[i].IndexBlockSize = size
}
return nil
}
}
// TargetFileSize sets the target file size across all levels of the LSM,
// primarily for testing purposes.
func TargetFileSize(size int64) ConfigOption {
return func(cfg *engineConfig) error {
for i := range cfg.Opts.Levels {
cfg.Opts.Levels[i].TargetFileSize = size
}
return nil
}
}
// MaxWriterConcurrency sets the concurrency of the sstable Writers. A concurrency
// of 0 implies no parallelism in the Writer, and a concurrency of 1 or more implies
// parallelism in the Writer. Currently, there's no difference between a concurrency
// of 1 or more.
func MaxWriterConcurrency(concurrency int) ConfigOption {
return func(cfg *engineConfig) error {
cfg.Opts.Experimental.MaxWriterConcurrency = concurrency
return nil
}
}
// MaxOpenFiles sets the maximum number of files an engine should open.
func MaxOpenFiles(count int) ConfigOption {
return func(cfg *engineConfig) error {
cfg.Opts.MaxOpenFiles = count
return nil
}
}
// CacheSize configures the size of the block cache.
func CacheSize(size int64) ConfigOption {
return func(cfg *engineConfig) error {
cfg.cacheSize = &size
return nil
}
}
// Caches sets the block and table caches. Useful when multiple stores share
// the same caches.
func Caches(cache *pebble.Cache, tableCache *pebble.TableCache) ConfigOption {
return func(cfg *engineConfig) error {
cfg.Opts.Cache = cache
cfg.Opts.TableCache = tableCache
return nil
}
}
// BallastSize sets the amount reserved by a ballast file for manual
// out-of-disk recovery.
func BallastSize(size int64) ConfigOption {
return func(cfg *engineConfig) error {
cfg.BallastSize = size
return nil
}
}
// SharedStorage enables use of shared storage (experimental).
func SharedStorage(sharedStorage cloud.ExternalStorage) ConfigOption {
return func(cfg *engineConfig) error {
cfg.SharedStorage = sharedStorage
if cfg.SharedStorage != nil && cfg.Opts.FormatMajorVersion < pebble.FormatMinForSharedObjects {
cfg.Opts.FormatMajorVersion = pebble.FormatMinForSharedObjects
}
return nil
}
}
// SecondaryCache enables use of a secondary cache to store shared objects.
func SecondaryCache(size int64) ConfigOption {
return func(cfg *engineConfig) error {
cfg.Opts.Experimental.SecondaryCacheSizeBytes = size
return nil
}
}
// RemoteStorageFactory enables use of remote storage (experimental).
func RemoteStorageFactory(accessor *cloud.EarlyBootExternalStorageAccessor) ConfigOption {
return func(cfg *engineConfig) error {
cfg.RemoteStorageFactory = accessor
return nil
}
}
// MaxConcurrentCompactions configures the maximum number of concurrent
// compactions an Engine will execute.
func MaxConcurrentCompactions(n int) ConfigOption {
return func(cfg *engineConfig) error {
cfg.Opts.MaxConcurrentCompactions = func() int { return n }
return nil
}
}
// LBaseMaxBytes configures the maximum number of bytes for LBase.
func LBaseMaxBytes(v int64) ConfigOption {
return func(cfg *engineConfig) error {
cfg.Opts.LBaseMaxBytes = v
return nil
}
}
func noopConfigOption(*engineConfig) error {
return nil
}
func errConfigOption(err error) func(*engineConfig) error {
return func(*engineConfig) error { return err }
}
func makeExternalWALDir(engineCfg *engineConfig, externalDir base.ExternalPath) (wal.Dir, error) {
// If the store is encrypted, we require that all the WAL failover dirs also
// be encrypted so that the user doesn't accidentally leak data unencrypted
// onto the filesystem.
if engineCfg.Env.Encryption != nil && len(externalDir.EncryptionOptions) == 0 {
return wal.Dir{}, errors.Newf("must provide --enterprise-encryption flag for %q, used as WAL failover path for encrypted store %q",
externalDir.Path, engineCfg.Env.Dir)
}
env, err := fs.InitEnv(context.Background(), vfs.Default, externalDir.Path, fs.EnvConfig{
RW: engineCfg.Env.RWMode(),
EncryptionOptions: externalDir.EncryptionOptions,
})
if err != nil {
return wal.Dir{}, err
}
engineCfg.onClose = append(engineCfg.onClose, func(*Pebble) { env.Close() })
return wal.Dir{
FS: env,
Dirname: externalDir.Path,
}, nil
}
// WALFailover configures automatic failover of the engine's write-ahead log to
// another volume in the event the WAL becomes blocked on a write that does not
// complete within a reasonable duration.
func WALFailover(walCfg base.WALFailoverConfig, storeEnvs fs.Envs) ConfigOption {
// The set of options available in single-store versus multi-store
// configurations vary. This is in part due to the need to store the multiple
// stores' WALs separately. When WALFailoverExplicitPath is provided, we have
// no stable store identifier available to disambiguate the WALs of multiple
// stores. Note that the store ID is not known when a store is first opened.
if len(storeEnvs) == 1 {
switch walCfg.Mode {
case base.WALFailoverDefault, base.WALFailoverAmongStores:
return noopConfigOption
case base.WALFailoverDisabled:
// Check if the user provided an explicit previous path. If they did, they
// were previously using WALFailoverExplicitPath and are now disabling it.
// We need to add the explicilt path to WALRecoveryDirs.
if walCfg.PrevPath.IsSet() {
return func(cfg *engineConfig) error {
walDir, err := makeExternalWALDir(cfg, walCfg.PrevPath)
if err != nil {
return err
}
cfg.Opts.WALRecoveryDirs = append(cfg.Opts.WALRecoveryDirs, walDir)
return nil
}
}
// No PrevPath was provided. The user may be simply expressing their
// intent to not run with WAL failover, regardless of any future default
// values. If WAL failover was previously enabled, Open will error when it
// notices the OPTIONS file encodes a WAL failover secondary that was not
// provided to Options.WALRecoveryDirs.
return noopConfigOption
case base.WALFailoverExplicitPath:
// The user has provided an explicit path to which we should fail over WALs.
return func(cfg *engineConfig) error {
walDir, err := makeExternalWALDir(cfg, walCfg.Path)
if err != nil {
return err
}
cfg.Opts.WALFailover = makePebbleWALFailoverOptsForDir(cfg.Settings, walDir)
if walCfg.PrevPath.IsSet() {
walDir, err := makeExternalWALDir(cfg, walCfg.PrevPath)
if err != nil {
return err
}
cfg.Opts.WALRecoveryDirs = append(cfg.Opts.WALRecoveryDirs, walDir)
}
return nil
}
default:
panic("unreachable")
}
}
switch walCfg.Mode {
case base.WALFailoverDefault:
// If the user specified no WAL failover setting, we default to disabling WAL
// failover and assume that the previous process did not have WAL failover
// enabled (so there's no need to populate Options.WALRecoveryDirs). If an
// operator had WAL failover enabled and now wants to disable it, they must
// explicitly set --wal-failover=disabled for the next process.
return noopConfigOption
case base.WALFailoverDisabled:
// Check if the user provided an explicit previous path; that's unsupported
// in multi-store configurations.
if walCfg.PrevPath.IsSet() {
return errConfigOption(errors.Newf("storage: cannot use explicit prev_path --wal-failover option with multiple stores"))
}
// No PrevPath was provided, implying that the user previously was using
// WALFailoverAmongStores.
// Fallthrough
case base.WALFailoverExplicitPath:
// Not supported for multi-store configurations.
return errConfigOption(errors.Newf("storage: cannot use explicit path --wal-failover option with multiple stores"))
case base.WALFailoverAmongStores:
// Fallthrough
default:
panic("unreachable")
}
// Either
// 1. mode == WALFailoverAmongStores
// or
// 2. mode == WALFailoverDisabled and the user previously was using
// WALFailoverAmongStores, so we should build the deterministic store pairing
// to determine which WALRecoveryDirs to pass to which engines.
//
// For each store, we need to determine which store is its secondary for the
// purpose of WALs. Even if failover is disabled, it's possible that it wasn't
// when the previous process ran, and the secondary's wal dir may have WALs
// that need to be replayed.
//
// To assign secondaries, we sort by path and dictate that the next store in
// the slice is the secondary. Note that in-memory stores may not have unique
// paths, in which case we fall back to using the ordering of the store flags
// (which falls out of the use of a stable sort).
//
// TODO(jackson): Using the path is a simple way to assign secondaries, but
// it's not resilient to changing between absolute and relative paths,
// introducing symlinks, etc. Since we have the fs.Envs already available, we
// could peek into the data directories, find the most recent OPTIONS file and
// parse out the previous secondary if any. If we had device nos and inodes
// available, we could deterministically sort by those instead.
sortedEnvs := slices.Clone(storeEnvs)
slices.SortStableFunc(sortedEnvs, func(a, b *fs.Env) int {
return cmp.Compare(a.Dir, b.Dir)
})
indexOfEnv := func(e *fs.Env) (int, bool) {
for i := range sortedEnvs {
if sortedEnvs[i] == e {
return i, true
}
}
return 0, false
}
return func(cfg *engineConfig) error {
// Find the Env being opened in the slice of sorted envs.
idx, ok := indexOfEnv(cfg.Env)
if !ok {
panic(errors.AssertionFailedf("storage: opening a store with an unrecognized filesystem Env (dir=%s)", cfg.Env.Dir))
}
failoverIdx := (idx + 1) % len(sortedEnvs)
secondaryEnv := sortedEnvs[failoverIdx]
// Ref once to ensure the secondary Env isn't closed before this Engine has
// been closed if the secondary's corresponding Engine is closed first.
secondaryEnv.Ref()
cfg.onClose = append(cfg.onClose, func(p *Pebble) {
// Release the reference.
secondaryEnv.Close()
})
secondary := wal.Dir{
FS: secondaryEnv,
// Use auxiliary/wals-among-stores within the other stores directory.
Dirname: secondaryEnv.PathJoin(secondaryEnv.Dir, base.AuxiliaryDir, "wals-among-stores"),
}
if walCfg.Mode == base.WALFailoverAmongStores {
cfg.Opts.WALFailover = makePebbleWALFailoverOptsForDir(cfg.Settings, secondary)
return nil
}
// mode == WALFailoverDisabled
cfg.Opts.WALRecoveryDirs = append(cfg.Opts.WALRecoveryDirs, secondary)
return nil
}
}
func makePebbleWALFailoverOptsForDir(
settings *cluster.Settings, dir wal.Dir,
) *pebble.WALFailoverOptions {
return &pebble.WALFailoverOptions{
Secondary: dir,
FailoverOptions: wal.FailoverOptions{
// Leave most the options to their defaults, but
// UnhealthyOperationLatencyThreshold should be pulled from the
// cluster setting.
UnhealthyOperationLatencyThreshold: func() (time.Duration, bool) {
// WAL failover requires 24.1 to be finalized first. Otherwise, we might
// write WALs to a secondary, downgrade to a previous version's binary and
// blindly miss WALs. The second return value indicates whether the
// WAL manager is allowed to failover to the secondary.
//
// NB: We do not use settings.Version.IsActive because we do not have a
// guarantee that the cluster version has been initialized.
failoverOK := settings.Version.ActiveVersionOrEmpty(context.TODO()).IsActive(clusterversion.V24_1Start)
return walFailoverUnhealthyOpThreshold.Get(&settings.SV), failoverOK
},
},
}
}
// PebbleOptions contains Pebble-specific options in the same format as a
// Pebble OPTIONS file. For example:
// [Options]
// delete_range_flush_delay=2s
// flush_split_bytes=4096
func PebbleOptions(pebbleOptions string, parseHooks *pebble.ParseHooks) ConfigOption {
return func(cfg *engineConfig) error {
return cfg.Opts.Parse(pebbleOptions, parseHooks)
}
}
// If enables the given option if enable is true.
func If(enable bool, opt ConfigOption) ConfigOption {
if enable {
return opt
}
return func(cfg *engineConfig) error { return nil }
}
// InMemory re-exports fs.InMemory.
//
// TODO(jackson): Update callers to use fs.InMemory directly.
var InMemory = fs.InMemory
type engineConfig struct {
PebbleConfig
// cacheSize is stored separately so that we can avoid constructing the
// PebbleConfig.Opts.Cache until the call to Open. A Cache is created with
// a ref count of 1, so creating the Cache during execution of
// ConfigOption makes it too easy to leak a cache.
cacheSize *int64
}
// Open opens a new Pebble storage engine, reading and writing data to the
// provided fs.Env, configured with the provided options.
//
// If successful, the returned Engine takes ownership over the provided fs.Env's
// reference. When the Engine is closed, the fs.Env is closed once too. If the
// Env must be retained beyond the Engine's lifetime, the caller should Ref() it
// first.
func Open(
ctx context.Context, env *fs.Env, settings *cluster.Settings, opts ...ConfigOption,
) (*Pebble, error) {
var cfg engineConfig
cfg.Dir = env.Dir
cfg.Env = env
cfg.Settings = settings
cfg.Opts = DefaultPebbleOptions()
cfg.Opts.FS = env
cfg.Opts.ReadOnly = env.IsReadOnly()
for _, opt := range opts {
if err := opt(&cfg); err != nil {
return nil, err
}
}
if cfg.cacheSize != nil && cfg.Opts.Cache == nil {
cfg.Opts.Cache = pebble.NewCache(*cfg.cacheSize)
defer cfg.Opts.Cache.Unref()
}
p, err := newPebble(ctx, cfg.PebbleConfig)
if err != nil {
return nil, err
}
return p, nil
}