Skip to content

Commit

Permalink
Merge pull request #41854 from petermattis/pmattis/pebble-time-bound
Browse files Browse the repository at this point in the history
storage/engine: add pebbleIterator support for time-bound iterators
  • Loading branch information
petermattis committed Oct 23, 2019
2 parents 15e4ff3 + 1fd5533 commit 183a9e1
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 255 deletions.
73 changes: 5 additions & 68 deletions pkg/storage/bulk/sst_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@
package bulk

import (
"bytes"
"io"

"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/sstable"
"github.com/pkg/errors"
Expand All @@ -30,75 +28,14 @@ type SSTWriter struct {
scratch []byte
}

// timeboundPropCollector implements a property collector for MVCC Timestamps.
// Its behavior matches TimeBoundTblPropCollector in table_props.cc.
type timeboundPropCollector struct {
min, max []byte
}

var _ pebble.TablePropertyCollector = &timeboundPropCollector{}

func (t *timeboundPropCollector) Add(key pebble.InternalKey, value []byte) error {
_, ts, ok := enginepb.SplitMVCCKey(key.UserKey)
if !ok {
return errors.Errorf("failed to split MVCC key")
}
if len(ts) > 0 {
if len(t.min) == 0 || bytes.Compare(ts, t.min) < 0 {
t.min = append(t.min[:0], ts...)
}
if len(t.max) == 0 || bytes.Compare(ts, t.max) > 0 {
t.max = append(t.max[:0], ts...)
}
}
return nil
}

func (t *timeboundPropCollector) Finish(userProps map[string]string) error {
userProps["crdb.ts.min"] = string(t.min)
userProps["crdb.ts.max"] = string(t.max)
return nil
}

func (t *timeboundPropCollector) Name() string {
return "TimeBoundTblPropCollectorFactory"
}

// dummyDeleteRangeCollector is a stub collector that just identifies itself.
// This stub can be installed so that SSTs claim to have the same props as those
// written by the Rocks writer, using the collector in table_props.cc. However
// since bulk-ingestion SSTs never contain deletions (range or otherwise), there
// is no actual implementation needed here.
type dummyDeleteRangeCollector struct{}

var _ pebble.TablePropertyCollector = &dummyDeleteRangeCollector{}

func (dummyDeleteRangeCollector) Add(key pebble.InternalKey, value []byte) error {
if key.Kind() != pebble.InternalKeyKindSet {
return errors.Errorf("unsupported key kind %v", key.Kind())
}
return nil
}

func (dummyDeleteRangeCollector) Finish(userProps map[string]string) error {
return nil
}

func (dummyDeleteRangeCollector) Name() string {
return "DeleteRangeTblPropCollectorFactory"
}

// MakeSSTWriter creates a new SSTWriter.
func MakeSSTWriter() SSTWriter {
opts := sstable.WriterOptions{
BlockSize: 32 * 1024,
TableFormat: pebble.TableFormatLevelDB,
Comparer: engine.MVCCComparer,
MergerName: "nullptr",
TablePropertyCollectors: []func() pebble.TablePropertyCollector{
func() pebble.TablePropertyCollector { return &timeboundPropCollector{} },
func() pebble.TablePropertyCollector { return &dummyDeleteRangeCollector{} },
},
BlockSize: 32 * 1024,
TableFormat: pebble.TableFormatLevelDB,
Comparer: engine.MVCCComparer,
MergerName: "nullptr",
TablePropertyCollectors: engine.PebbleTablePropertyCollectors,
}
f := &memFile{}
sst := sstable.NewWriter(f, opts)
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/engine/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"encoding/binary"

"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -237,6 +238,11 @@ func encodeKeyToBuf(buf []byte, key MVCCKey, keyLen int) {
buf[len(buf)-1] = byte(timestampLength)
}

func encodeTimestamp(ts hlc.Timestamp) []byte {
_, encodedTS, _ := enginepb.SplitMVCCKey(EncodeKey(MVCCKey{Timestamp: ts}))
return encodedTS
}

// DecodeMVCCKey decodes an engine.MVCCKey from its serialized representation. This
// decoding must match engine/db.cc:DecodeKey().
func DecodeMVCCKey(encodedKey []byte) (MVCCKey, error) {
Expand Down
163 changes: 163 additions & 0 deletions pkg/storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/rand"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -478,6 +479,168 @@ func TestEngineMerge(t *testing.T) {
}, t)
}

func TestEngineTimeBound(t *testing.T) {
defer leaktest.AfterTest(t)()

for _, engineImpl := range mvccEngineImpls {
t.Run(engineImpl.name, func(t *testing.T) {
engine := engineImpl.create()
defer engine.Close()

var minTimestamp = hlc.Timestamp{WallTime: 1, Logical: 0}
var maxTimestamp = hlc.Timestamp{WallTime: 3, Logical: 0}
times := []hlc.Timestamp{
{WallTime: 2, Logical: 0},
minTimestamp,
maxTimestamp,
{WallTime: 2, Logical: 0},
}

for i, time := range times {
s := fmt.Sprintf("%02d", i)
key := MVCCKey{Key: roachpb.Key(s), Timestamp: time}
if err := engine.Put(key, []byte(s)); err != nil {
t.Fatal(err)
}
}
if err := engine.Flush(); err != nil {
t.Fatal(err)
}

batch := engine.NewBatch()
defer batch.Close()

check := func(t *testing.T, tbi Iterator, keys, ssts int) {
defer tbi.Close()
tbi.Seek(NilKey)

var count int
for ; ; tbi.Next() {
ok, err := tbi.Valid()
if err != nil {
t.Fatal(err)
}
if !ok {
break
}
count++
}

// Make sure the iterator sees no writes.
if keys != count {
t.Fatalf("saw %d values in time bounded iterator, but expected %d", count, keys)
}
stats := tbi.Stats()
if a := stats.TimeBoundNumSSTs; a != ssts {
t.Fatalf("touched %d SSTs, expected %d", a, ssts)
}
}

testCases := []struct {
iter Iterator
keys, ssts int
}{
// Completely to the right, not touching.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: maxTimestamp.Next(),
MaxTimestampHint: maxTimestamp.Next().Next(),
UpperBound: roachpb.KeyMax,
WithStats: true,
}),
keys: 0,
ssts: 0,
},
// Completely to the left, not touching.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: minTimestamp.Prev().Prev(),
MaxTimestampHint: minTimestamp.Prev(),
UpperBound: roachpb.KeyMax,
WithStats: true,
}),
keys: 0,
ssts: 0,
},
// Touching on the right.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: maxTimestamp,
MaxTimestampHint: maxTimestamp,
UpperBound: roachpb.KeyMax,
WithStats: true,
}),
keys: len(times),
ssts: 1,
},
// Touching on the left.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: minTimestamp,
MaxTimestampHint: minTimestamp,
UpperBound: roachpb.KeyMax,
WithStats: true,
}),
keys: len(times),
ssts: 1,
},
// Copy of last case, but confirm that we don't get SST stats if we don't
// ask for them.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: minTimestamp,
MaxTimestampHint: minTimestamp,
UpperBound: roachpb.KeyMax,
WithStats: false,
}),
keys: len(times),
ssts: 0,
},
// Copy of last case, but confirm that upper bound is respected.
{
iter: batch.NewIterator(IterOptions{
MinTimestampHint: minTimestamp,
MaxTimestampHint: minTimestamp,
UpperBound: []byte("02"),
WithStats: false,
}),
keys: 2,
ssts: 0,
},
}

for _, test := range testCases {
t.Run("", func(t *testing.T) {
check(t, test.iter, test.keys, test.ssts)
})
}

// Make a regular iterator. Before #21721, this would accidentally pick up the
// time bounded iterator instead.
iter := batch.NewIterator(IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()
iter.Seek(NilKey)

var count int
for ; ; iter.Next() {
ok, err := iter.Valid()
if err != nil {
t.Fatal(err)
}
if !ok {
break
}
count++
}

// Make sure the iterator sees the writes (i.e. it's not the time bounded iterator).
if expCount := len(times); expCount != count {
t.Fatalf("saw %d values in regular iterator, but expected %d", count, expCount)
}
})
}
}

func TestFlushWithSSTables(t *testing.T) {
defer leaktest.AfterTest(t)()
runWithAllEngines(func(engine Engine, t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine/enginepb/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func SplitMVCCKey(mvccKey []byte) (key []byte, ts []byte, ok bool) {
}

// DecodeKey decodes an key/timestamp from its serialized representation. This
// decoding must match engine/db.cc:DecodeKey().
// decoding must match libroach/encoding.cc:DecodeKey().
func DecodeKey(encodedKey []byte) (key []byte, timestamp hlc.Timestamp, _ error) {
key, ts, ok := SplitMVCCKey(encodedKey)
if !ok {
Expand Down
63 changes: 63 additions & 0 deletions pkg/storage/engine/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package engine

import (
"bytes"
"context"
"fmt"
"io/ioutil"
Expand Down Expand Up @@ -85,6 +86,67 @@ var MVCCMerger = &pebble.Merger{
},
}

// pebbleTimeBoundPropCollector implements a property collector for MVCC
// Timestamps. Its behavior matches TimeBoundTblPropCollector in
// table_props.cc.
type pebbleTimeBoundPropCollector struct {
min, max []byte
}

func (t *pebbleTimeBoundPropCollector) Add(key pebble.InternalKey, value []byte) error {
_, ts, ok := enginepb.SplitMVCCKey(key.UserKey)
if !ok {
return errors.Errorf("failed to split MVCC key")
}
if len(ts) > 0 {
if len(t.min) == 0 || bytes.Compare(ts, t.min) < 0 {
t.min = append(t.min[:0], ts...)
}
if len(t.max) == 0 || bytes.Compare(ts, t.max) > 0 {
t.max = append(t.max[:0], ts...)
}
}
return nil
}

func (t *pebbleTimeBoundPropCollector) Finish(userProps map[string]string) error {
userProps["crdb.ts.min"] = string(t.min)
userProps["crdb.ts.max"] = string(t.max)
return nil
}

func (t *pebbleTimeBoundPropCollector) Name() string {
// This constant needs to match the one used by the RocksDB version of this
// table property collector. DO NOT CHANGE.
return "TimeBoundTblPropCollectorFactory"
}

// pebbleDeleteRangeCollector marks an sstable for compaction that contains a
// range tombstone.
type pebbleDeleteRangeCollector struct{}

func (pebbleDeleteRangeCollector) Add(key pebble.InternalKey, value []byte) error {
// TODO(peter): track whether a range tombstone is present. Need to extend
// the TablePropertyCollector interface.
return nil
}

func (pebbleDeleteRangeCollector) Finish(userProps map[string]string) error {
return nil
}

func (pebbleDeleteRangeCollector) Name() string {
// This constant needs to match the one used by the RocksDB version of this
// table property collector. DO NOT CHANGE.
return "DeleteRangeTblPropCollectorFactory"
}

// PebbleTablePropertyCollectors is the list of Pebble TablePropertyCollectors.
var PebbleTablePropertyCollectors = []func() pebble.TablePropertyCollector{
func() pebble.TablePropertyCollector { return &pebbleTimeBoundPropCollector{} },
func() pebble.TablePropertyCollector { return &pebbleDeleteRangeCollector{} },
}

// Pebble is a wrapper around a Pebble database instance.
type Pebble struct {
db *pebble.DB
Expand All @@ -104,6 +166,7 @@ var _ WithSSTables = &Pebble{}
func NewPebble(path string, cfg *pebble.Options) (*Pebble, error) {
cfg.Comparer = MVCCComparer
cfg.Merger = MVCCMerger
cfg.TablePropertyCollectors = PebbleTablePropertyCollectors

// pebble.Open also calls EnsureDefaults, but only after doing a clone. Call
// EnsureDefaults beforehand so we have a matching cfg here for when we save
Expand Down
Loading

0 comments on commit 183a9e1

Please sign in to comment.