Skip to content

Commit e9e23bd

Browse files
committed
db: add support for reading blob values
Initialize iterators and compactions with a blob.ValueFetcher that's propagated through to sstable iterators. When an sstbale iterator encounters a blob value handle, it constructs an internal value with the blob.ValueFetcher configured as the ValueFetcher. Close #4394. Informs #112.
1 parent 149dc2b commit e9e23bd

40 files changed

+556
-125
lines changed

compaction.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing"
3030
"github.com/cockroachdb/pebble/objstorage/remote"
3131
"github.com/cockroachdb/pebble/sstable"
32+
"github.com/cockroachdb/pebble/sstable/blob"
3233
"github.com/cockroachdb/pebble/sstable/block"
3334
"github.com/cockroachdb/pebble/vfs"
3435
)
@@ -216,6 +217,9 @@ type compaction struct {
216217
// blob files. This consumes more write bandwidth because all values are
217218
// rewritten. However it restores locality.
218219
getValueSeparation func(JobID, *compaction, sstable.TableFormat) compact.ValueSeparation
220+
// valueFetcher is used to fetch values from blob files. It's propagated
221+
// down the iterator tree through the internal iterator options.
222+
valueFetcher blob.ValueFetcher
219223

220224
// startLevel is the level that is being compacted. Inputs from startLevel
221225
// and outputLevel will be merged to produce a set of outputLevel files.
@@ -3085,16 +3089,19 @@ func (d *DB) compactAndWrite(
30853089
// translate to 3 MiB per compaction.
30863090
c.bufferPool.Init(12)
30873091
defer c.bufferPool.Release()
3092+
env := block.ReadEnv{
3093+
BufferPool: &c.bufferPool,
3094+
Stats: &c.stats,
3095+
IterStats: d.fileCache.SSTStatsCollector().Accumulator(
3096+
uint64(uintptr(unsafe.Pointer(c))),
3097+
categoryCompaction,
3098+
),
3099+
}
3100+
c.valueFetcher.Init(d.fileCache, env)
30883101
iiopts := internalIterOpts{
3089-
compaction: true,
3090-
readEnv: block.ReadEnv{
3091-
BufferPool: &c.bufferPool,
3092-
Stats: &c.stats,
3093-
IterStats: d.fileCache.SSTStatsCollector().Accumulator(
3094-
uint64(uintptr(unsafe.Pointer(c))),
3095-
categoryCompaction,
3096-
),
3097-
},
3102+
compaction: true,
3103+
readEnv: env,
3104+
blobValueFetcher: &c.valueFetcher,
30983105
}
30993106

31003107
pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter, iiopts)

compaction_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ func TestCompaction(t *testing.T) {
610610
return "", "", errors.WithStack(err)
611611
}
612612
defer r.Close()
613-
iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
613+
iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */, sstable.AssertNoBlobHandles)
614614
if err != nil {
615615
return "", "", errors.WithStack(err)
616616
}

db.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,6 +1349,9 @@ type internalIterOpts struct {
13491349
compaction bool
13501350
readEnv block.ReadEnv
13511351
boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
1352+
// blobValueFetcher is the base.ValueFetcher to use when constructing
1353+
// internal values to represent values stored externally in blob files.
1354+
blobValueFetcher base.ValueFetcher
13521355
}
13531356

13541357
func finishInitializingInternalIter(
@@ -1401,18 +1404,21 @@ func (i *Iterator) constructPointIter(
14011404
// Already have one.
14021405
return
14031406
}
1407+
readEnv := block.ReadEnv{
1408+
Stats: &i.stats.InternalStats,
1409+
// If the file cache has a sstable stats collector, ask it for an
1410+
// accumulator for this iterator's configured category and QoS. All SSTable
1411+
// iterators created by this Iterator will accumulate their stats to it as
1412+
// they Close during iteration.
1413+
IterStats: i.fc.SSTStatsCollector().Accumulator(
1414+
uint64(uintptr(unsafe.Pointer(i))),
1415+
i.opts.Category,
1416+
),
1417+
}
1418+
i.blobValueFetcher.Init(i.fc, readEnv)
14041419
internalOpts := internalIterOpts{
1405-
readEnv: block.ReadEnv{
1406-
Stats: &i.stats.InternalStats,
1407-
// If the file cache has a sstable stats collector, ask it for an
1408-
// accumulator for this iterator's configured category and QoS. All SSTable
1409-
// iterators created by this Iterator will accumulate their stats to it as
1410-
// they Close during iteration.
1411-
IterStats: i.fc.SSTStatsCollector().Accumulator(
1412-
uint64(uintptr(unsafe.Pointer(i))),
1413-
i.opts.Category,
1414-
),
1415-
},
1420+
readEnv: readEnv,
1421+
blobValueFetcher: &i.blobValueFetcher,
14161422
}
14171423
if i.opts.RangeKeyMasking.Filter != nil {
14181424
internalOpts.boundLimitedFilter = &i.rangeKeyMasking

file_cache.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,11 @@ func (h *fileCacheHandle) newPointIter(
677677
internalOpts.readEnv.IterStats = handle.SSTStatsCollector().Accumulator(uint64(uintptr(unsafe.Pointer(r))), opts.Category)
678678
}
679679
if internalOpts.compaction {
680-
iter, err = cr.NewCompactionIter(transforms, internalOpts.readEnv, &v.readerProvider)
680+
iter, err = cr.NewCompactionIter(transforms, internalOpts.readEnv,
681+
&v.readerProvider, sstable.TableBlobContext{
682+
ValueFetcher: internalOpts.blobValueFetcher,
683+
References: file.BlobReferences,
684+
})
681685
} else {
682686
iter, err = cr.NewPointIter(ctx, sstable.IterOptions{
683687
Lower: opts.GetLowerBound(),
@@ -687,6 +691,10 @@ func (h *fileCacheHandle) newPointIter(
687691
Filterer: filterer,
688692
Env: internalOpts.readEnv,
689693
ReaderProvider: &v.readerProvider,
694+
BlobContext: sstable.TableBlobContext{
695+
ValueFetcher: internalOpts.blobValueFetcher,
696+
References: file.BlobReferences,
697+
},
690698
})
691699
}
692700
if err != nil {

ingest.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ func ingestLoad1(
360360
maybeSetStatsFromProperties(meta.PhysicalMeta(), &r.Properties)
361361

362362
{
363-
iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */)
363+
iter, err := r.NewIter(sstable.NoTransforms, nil /* lower */, nil /* upper */, sstable.AssertNoBlobHandles)
364364
if err != nil {
365365
return nil, keyspan.Span{}, err
366366
}

internal/base/iterator.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,9 @@ type InternalIteratorStats struct {
416416
// ValueBytesFetched is the total byte length of the values (in value
417417
// blocks) that were retrieved.
418418
ValueBytesFetched uint64
419+
420+
// TODO(jackson): Add stats for distinguishing between value-block
421+
// values and blob values.
419422
}
420423
}
421424

internal/base/lazy_value.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44

55
package base
66

7-
import "context"
7+
import (
8+
"context"
9+
10+
"github.com/cockroachdb/errors"
11+
)
812

913
// A value can have user-defined attributes that are a function of the value
1014
// byte slice. For now, we only support "short attributes", which can be
@@ -278,3 +282,26 @@ func (lv *LazyValue) Clone(buf []byte, fetcher *LazyFetcher) (LazyValue, []byte)
278282
lvCopy.ValueOrHandle = buf[bufLen : bufLen+vLen]
279283
return lvCopy, buf
280284
}
285+
286+
// NoBlobFetches is a ValueFetcher that returns an error. It's intended to be
287+
// used in situations where sstables should not encode a blob value, or the
288+
// caller should not fetch the handle's value.
289+
var NoBlobFetches = &errValueFetcher{
290+
Err: errors.AssertionFailedf("unexpected blob value"),
291+
}
292+
293+
// errValueFetcher is a ValueFetcher that returns an error.
294+
type errValueFetcher struct {
295+
Err error
296+
}
297+
298+
// Assert that *errValueFetcher implements base.ValueFetcher.
299+
var _ ValueFetcher = (*errValueFetcher)(nil)
300+
301+
// Fetch implements base.ValueFetcher.
302+
func (e *errValueFetcher) Fetch(
303+
_ context.Context, _ []byte, blobFileNum DiskFileNum, valLen uint32, _ []byte,
304+
) (val []byte, callerOwned bool, err error) {
305+
err = errors.Wrapf(e.Err, "fetching %d-byte value from %s", valLen, blobFileNum)
306+
return nil, false, err
307+
}

internal/blobtest/handles.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import (
2626
// human-readable string describing a blob handle, synthesizing unspecified
2727
// fields, and tracking the blob handle to support future fetches.
2828
type Values struct {
29-
mostRecentHandle blob.Handle
29+
mostRecentFileNum base.DiskFileNum
30+
mostRecentHandles map[base.DiskFileNum]blob.Handle
3031
// trackedHandles maps from a blob handle to its value. The value may be nil
3132
// if the value was not specified (in which case Fetch will
3233
// deterministically derive a random value from the handle itself.)
@@ -108,6 +109,7 @@ func (bv *Values) IsBlobHandle(input string) bool {
108109
func (bv *Values) Parse(input string) (h blob.Handle, err error) {
109110
if bv.trackedHandles == nil {
110111
bv.trackedHandles = make(map[blob.Handle]string)
112+
bv.mostRecentHandles = make(map[base.DiskFileNum]blob.Handle)
111113
}
112114

113115
defer func() {
@@ -156,13 +158,13 @@ func (bv *Values) Parse(input string) (h blob.Handle, err error) {
156158
}
157159

158160
if !fileNumSet {
159-
h.FileNum = max(bv.mostRecentHandle.FileNum, 1)
161+
h.FileNum = bv.mostRecentFileNum
160162
}
161163
if !blockNumSet {
162-
h.BlockNum = bv.mostRecentHandle.BlockNum
164+
h.BlockNum = bv.mostRecentHandles[h.FileNum].BlockNum
163165
}
164166
if !offsetSet {
165-
h.OffsetInBlock = bv.mostRecentHandle.OffsetInBlock + bv.mostRecentHandle.ValueLen
167+
h.OffsetInBlock = bv.mostRecentHandles[h.FileNum].OffsetInBlock + bv.mostRecentHandles[h.FileNum].ValueLen
166168
}
167169
if !valueLenSet {
168170
if len(value) > 0 {
@@ -171,7 +173,8 @@ func (bv *Values) Parse(input string) (h blob.Handle, err error) {
171173
h.ValueLen = 12
172174
}
173175
}
174-
bv.mostRecentHandle = h
176+
bv.mostRecentFileNum = h.FileNum
177+
bv.mostRecentHandles[h.FileNum] = h
175178
bv.trackedHandles[h] = value
176179
return h, nil
177180
}
@@ -228,7 +231,10 @@ func WriteFiles(
228231
return err
229232
}
230233
writer := blob.NewFileWriter(fileNum, writable, writerOpts)
231-
for _, handle := range handles {
234+
for i, handle := range handles {
235+
if i > 0 && handles[i-1].BlockNum != handle.BlockNum {
236+
writer.FlushForTesting()
237+
}
232238
if value, ok := bv.trackedHandles[handle]; ok {
233239
writer.AddValue([]byte(value))
234240
} else {

internal/compact/spans_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func TestSplitAndEncodeSpan(t *testing.T) {
135135
_, rangeDels, rangeKeys, err := sstable.ReadAll(obj, sstable.ReaderOptions{
136136
Comparer: wo.Comparer,
137137
KeySchemas: sstable.MakeKeySchemas(wo.KeySchema),
138-
})
138+
}, base.NoBlobFetches)
139139
require.NoError(t, err)
140140
require.LessOrEqual(t, len(rangeDels)+len(rangeKeys), 1)
141141
s := "."

internal/manifest/blob_metadata.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cockroachdb/pebble/internal/humanize"
1313
"github.com/cockroachdb/pebble/internal/invariants"
1414
"github.com/cockroachdb/pebble/internal/strparse"
15+
"github.com/cockroachdb/pebble/sstable"
1516
"github.com/cockroachdb/pebble/sstable/blob"
1617
"github.com/cockroachdb/redact"
1718
)
@@ -224,6 +225,9 @@ type BlobReferenceDepth int
224225
// persisted to the manifest.
225226
type BlobReferences []BlobReference
226227

228+
// Assert that BlobReferences implements sstable.BlobReferences.
229+
var _ sstable.BlobReferences = BlobReferences{}
230+
227231
// FileNumByID returns the FileNum for the identified BlobReference.
228232
func (br BlobReferences) FileNumByID(i blob.ReferenceID) base.DiskFileNum {
229233
return br[i].FileNum

0 commit comments

Comments
 (0)