/
bucket_options.go
161 lines (136 loc) · 4.26 KB
/
bucket_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// _ _
// __ _____ __ ___ ___ __ _| |_ ___
// \ \ /\ / / _ \/ _` \ \ / / |/ _` | __/ _ \
// \ V V / __/ (_| |\ V /| | (_| | || __/
// \_/\_/ \___|\__,_| \_/ |_|\__,_|\__\___|
//
// Copyright © 2016 - 2023 Weaviate B.V. All rights reserved.
//
// CONTACT: hello@weaviate.io
//
package lsmkv
import (
"time"
"github.com/pkg/errors"
)
type BucketOption func(b *Bucket) error
func WithStrategy(strategy string) BucketOption {
return func(b *Bucket) error {
switch strategy {
case StrategyReplace, StrategyMapCollection, StrategySetCollection,
StrategyRoaringSet:
default:
return errors.Errorf("unrecognized strategy %q", strategy)
}
b.strategy = strategy
return nil
}
}
func WithMemtableThreshold(threshold uint64) BucketOption {
return func(b *Bucket) error {
b.memtableThreshold = threshold
return nil
}
}
func WithWalThreshold(threshold uint64) BucketOption {
return func(b *Bucket) error {
b.walThreshold = threshold
return nil
}
}
func WithIdleThreshold(threshold time.Duration) BucketOption {
return func(b *Bucket) error {
b.flushAfterIdle = threshold
return nil
}
}
func WithSecondaryIndices(count uint16) BucketOption {
return func(b *Bucket) error {
b.secondaryIndices = count
return nil
}
}
func WithLegacyMapSorting() BucketOption {
return func(b *Bucket) error {
b.legacyMapSortingBeforeCompaction = true
return nil
}
}
func WithPread(with bool) BucketOption {
return func(b *Bucket) error {
b.mmapContents = !with
return nil
}
}
func WithDynamicMemtableSizing(
initialMB, maxMB, minActiveSeconds, maxActiveSeconds int,
) BucketOption {
return func(b *Bucket) error {
mb := 1024 * 1024
cfg := memtableSizeAdvisorCfg{
initial: initialMB * mb,
stepSize: 10 * mb,
maxSize: maxMB * mb,
minDuration: time.Duration(minActiveSeconds) * time.Second,
maxDuration: time.Duration(maxActiveSeconds) * time.Second,
}
b.memtableResizer = newMemtableSizeAdvisor(cfg)
return nil
}
}
type secondaryIndexKeys [][]byte
type SecondaryKeyOption func(s secondaryIndexKeys) error
func WithSecondaryKey(pos int, key []byte) SecondaryKeyOption {
return func(s secondaryIndexKeys) error {
if pos > len(s) {
return errors.Errorf("set secondary index %d on an index of length %d",
pos, len(s))
}
s[pos] = key
return nil
}
}
func WithMonitorCount() BucketOption {
return func(b *Bucket) error {
if b.strategy != StrategyReplace {
return errors.Errorf("count monitoring only supported on 'replace' buckets")
}
b.monitorCount = true
return nil
}
}
func WithKeepTombstones(keepTombstones bool) BucketOption {
return func(b *Bucket) error {
b.keepTombstones = keepTombstones
return nil
}
}
func WithUseBloomFilter(useBloomFilter bool) BucketOption {
return func(b *Bucket) error {
b.useBloomFilter = useBloomFilter
return nil
}
}
func WithCalcCountNetAdditions(calcCountNetAdditions bool) BucketOption {
return func(b *Bucket) error {
b.calcCountNetAdditions = calcCountNetAdditions
return nil
}
}
/*
Background for this option:
We use the LSM store in two places:
Our existing key/value and inverted buckets
As part of the new brute-force based index (to be built this week).
Brute-force index
This is a simple disk-index where we use a cursor to iterate over all objects. This is what we need the force-compaction for. The experimentation so far has shown that the cursor is much more performant on a single segment than it is on multiple segments. This is because with a single segment it’s essentially just one conitiguuous chunk of data on disk that we read through. But with multiple segments (and an unpredicatable order) it ends up being many tiny reads (inefficient).
Existing uses of the LSM store
For existing uses, e.g. the object store, we don’t want to force-compact. This is because they can grow massive. For example, you could have a 100GB segment, then a new write leads to a new segment that is just a few bytes. If we would force-compact those two we would write 100GB every time the user sends a few bytes to Weaviate. In this case, the existing tiered compaction strategy makes more sense.
Configurability of buckets
*/
func WithForceCompation(opt bool) BucketOption {
return func(b *Bucket) error {
b.forceCompaction = opt
return nil
}
}