Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-20.1: kv: don't mix prefix and non-prefix iters when collecting intents #47301

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_get.go
Expand Up @@ -46,7 +46,11 @@ func Get(
reply.Value = val
if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
var intentVals []roachpb.KeyValue
intentVals, err = CollectIntentRows(ctx, reader, cArgs, intents)
// NOTE: MVCCGet uses a Prefix iterator, so we want to use one in
// CollectIntentRows as well so that we're guaranteed to use the same
// cached iterator and observe a consistent snapshot of the engine.
const usePrefixIter = true
intentVals, err = CollectIntentRows(ctx, reader, usePrefixIter, intents)
if err == nil {
switch len(intentVals) {
case 0:
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Expand Up @@ -76,7 +76,11 @@ func ReverseScan(
}

if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
reply.IntentRows, err = CollectIntentRows(ctx, reader, cArgs, scanRes.Intents)
// NOTE: MVCCScan doesn't use a Prefix iterator, so we don't want to use
// one in CollectIntentRows either so that we're guaranteed to use the
// same cached iterator and observe a consistent snapshot of the engine.
const usePrefixIter = false
reply.IntentRows, err = CollectIntentRows(ctx, reader, usePrefixIter, scanRes.Intents)
if err != nil {
return result.Result{}, err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_scan.go
Expand Up @@ -76,7 +76,11 @@ func Scan(
}

if h.ReadConsistency == roachpb.READ_UNCOMMITTED {
reply.IntentRows, err = CollectIntentRows(ctx, reader, cArgs, scanRes.Intents)
// NOTE: MVCCScan doesn't use a Prefix iterator, so we don't want to use
// one in CollectIntentRows either so that we're guaranteed to use the
// same cached iterator and observe a consistent snapshot of the engine.
const usePrefixIter = false
reply.IntentRows, err = CollectIntentRows(ctx, reader, usePrefixIter, scanRes.Intents)
if err != nil {
return result.Result{}, err
}
Expand Down
82 changes: 67 additions & 15 deletions pkg/kv/kvserver/batcheval/intent.go
Expand Up @@ -17,39 +17,91 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

// CollectIntentRows collects the key-value pairs for each intent provided. It
// also verifies that the ReturnIntents option is allowed.
// CollectIntentRows collects the provisional key-value pairs for each intent
// provided.
//
// TODO(nvanbenschoten): mvccGetInternal should return the intent values directly
// when ReturnIntents is true. Since this will initially only be used for
// RangeLookups and since this is how they currently collect intent values, this
// is ok for now.
// The method accepts a reader and flag indicating whether a prefix iterator
// should be used when creating an iterator from the reader. This flexibility
// works around a limitation of the Engine.NewReadOnly interface where prefix
// iterators and non-prefix iterators pulled from the same read-only engine are
// not guaranteed to provide a consistent snapshot of the underlying engine.
// This function expects to be able to retrieve the corresponding provisional
// value for each of the provided intents. As such, it is critical that it
// observes the engine in the same state that it was in when the intent keys
// were originally collected. Because of this, callers are tasked with
// indicating whether the intents were originally collected using a prefix
// iterator or not.
//
// TODO(nvanbenschoten): remove the usePrefixIter complexity when we're fully on
// Pebble and can guarantee that all iterators created from a read-only engine
// are consistent.
//
// TODO(nvanbenschoten): mvccGetInternal should return the intent values
// directly when reading at the READ_UNCOMMITTED consistency level. Since this
// is only currently used for range lookups and when watching for a merge (both
// of which are off the hot path), this is ok for now.
func CollectIntentRows(
ctx context.Context, reader storage.Reader, cArgs CommandArgs, intents []roachpb.Intent,
ctx context.Context, reader storage.Reader, usePrefixIter bool, intents []roachpb.Intent,
) ([]roachpb.KeyValue, error) {
if len(intents) == 0 {
return nil, nil
}
res := make([]roachpb.KeyValue, 0, len(intents))
for _, intent := range intents {
for i := range intents {
kv, err := readProvisionalVal(ctx, reader, usePrefixIter, &intents[i])
if err != nil {
switch t := err.(type) {
case *roachpb.WriteIntentError:
log.Fatalf(ctx, "unexpected %T in CollectIntentRows: %+v", t, t)
case *roachpb.ReadWithinUncertaintyIntervalError:
log.Fatalf(ctx, "unexpected %T in CollectIntentRows: %+v", t, t)
}
return nil, err
}
if kv.Value.IsPresent() {
res = append(res, kv)
}
}
return res, nil
}

// readProvisionalVal retrieves the provisional value for the provided intent
// using the reader and the specified access method (i.e. with or without the
// use of a prefix iterator). The function returns an empty KeyValue if the
// intent is found to contain a deletion tombstone as its provisional value.
func readProvisionalVal(
ctx context.Context, reader storage.Reader, usePrefixIter bool, intent *roachpb.Intent,
) (roachpb.KeyValue, error) {
if usePrefixIter {
val, _, err := storage.MVCCGetAsTxn(
ctx, reader, intent.Key, intent.Txn.WriteTimestamp, intent.Txn,
)
if err != nil {
return nil, err
return roachpb.KeyValue{}, err
}
if val == nil {
// Intent is a deletion.
continue
return roachpb.KeyValue{}, nil
}
res = append(res, roachpb.KeyValue{
Key: intent.Key,
Value: *val,
})
return roachpb.KeyValue{Key: intent.Key, Value: *val}, nil
}
return res, nil
res, err := storage.MVCCScanAsTxn(
ctx, reader, intent.Key, intent.Key.Next(), intent.Txn.WriteTimestamp, intent.Txn,
)
if err != nil {
return roachpb.KeyValue{}, err
}
if len(res.KVs) > 1 {
log.Fatalf(ctx, "multiple key-values returned from single-key scan: %+v", res.KVs)
} else if len(res.KVs) == 0 {
// Intent is a deletion.
return roachpb.KeyValue{}, nil
}
return res.KVs[0], nil

}

// acquireUnreplicatedLocksOnKeys adds an unreplicated lock acquisition by the
Expand Down
163 changes: 163 additions & 0 deletions pkg/kv/kvserver/batcheval/intent_test.go
@@ -0,0 +1,163 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package batcheval

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// instrumentedEngine wraps a storage.Engine and allows for various methods in
// the interface to be instrumented for testing purposes.
type instrumentedEngine struct {
storage.Engine

onNewIterator func(storage.IterOptions)
// ... can be extended ...
}

func (ie *instrumentedEngine) NewIterator(opts storage.IterOptions) storage.Iterator {
if ie.onNewIterator != nil {
ie.onNewIterator(opts)
}
return ie.Engine.NewIterator(opts)
}

// TestCollectIntentsUsesSameIterator tests that all uses of CollectIntents
// (currently only by READ_UNCOMMITTED Gets, Scans, and ReverseScans) use the
// same cached iterator (prefix or non-prefix) for their initial read and their
// provisional value collection for any intents they find.
func TestCollectIntentsUsesSameIterator(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
key := roachpb.Key("key")
ts := hlc.Timestamp{WallTime: 123}
header := roachpb.Header{
Timestamp: ts,
ReadConsistency: roachpb.READ_UNCOMMITTED,
}

testCases := []struct {
name string
run func(*testing.T, storage.ReadWriter) (intents []roachpb.KeyValue, _ error)
expPrefixIters int
expNonPrefixIters int
}{
{
name: "get",
run: func(t *testing.T, db storage.ReadWriter) ([]roachpb.KeyValue, error) {
req := &roachpb.GetRequest{
RequestHeader: roachpb.RequestHeader{Key: key},
}
var resp roachpb.GetResponse
if _, err := Get(ctx, db, CommandArgs{Args: req, Header: header}, &resp); err != nil {
return nil, err
}
if resp.IntentValue == nil {
return nil, nil
}
return []roachpb.KeyValue{{Key: key, Value: *resp.IntentValue}}, nil
},
expPrefixIters: 2,
expNonPrefixIters: 0,
},
{
name: "scan",
run: func(t *testing.T, db storage.ReadWriter) ([]roachpb.KeyValue, error) {
req := &roachpb.ScanRequest{
RequestHeader: roachpb.RequestHeader{Key: key, EndKey: key.Next()},
}
var resp roachpb.ScanResponse
if _, err := Scan(ctx, db, CommandArgs{Args: req, Header: header}, &resp); err != nil {
return nil, err
}
return resp.IntentRows, nil
},
expPrefixIters: 0,
expNonPrefixIters: 2,
},
{
name: "reverse scan",
run: func(t *testing.T, db storage.ReadWriter) ([]roachpb.KeyValue, error) {
req := &roachpb.ReverseScanRequest{
RequestHeader: roachpb.RequestHeader{Key: key, EndKey: key.Next()},
}
var resp roachpb.ReverseScanResponse
if _, err := ReverseScan(ctx, db, CommandArgs{Args: req, Header: header}, &resp); err != nil {
return nil, err
}
return resp.IntentRows, nil
},
expPrefixIters: 0,
expNonPrefixIters: 2,
},
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
// Test with and without deletion intents. If a READ_UNCOMMITTED request
// encounters an intent whose provisional value is a deletion tombstone,
// the request should ignore the intent and should not return any
// corresponding intent row.
testutils.RunTrueAndFalse(t, "deletion intent", func(t *testing.T, delete bool) {
db := &instrumentedEngine{Engine: storage.NewDefaultInMem()}
defer db.Close()

// Write an intent.
val := roachpb.MakeValueFromBytes([]byte("val"))
txn := roachpb.MakeTransaction("test", key, roachpb.NormalUserPriority, ts, 0)
var err error
if delete {
err = storage.MVCCDelete(ctx, db, nil, key, ts, &txn)
} else {
err = storage.MVCCPut(ctx, db, nil, key, ts, val, &txn)
}
require.NoError(t, err)

// Instrument iterator creation, count prefix vs. non-prefix iters.
var prefixIters, nonPrefixIters int
db.onNewIterator = func(opts storage.IterOptions) {
if opts.Prefix {
prefixIters++
} else {
nonPrefixIters++
}
}

intents, err := c.run(t, db)
require.NoError(t, err)

// Assert proper intent values.
if delete {
require.Len(t, intents, 0)
} else {
expIntentVal := val
expIntentVal.Timestamp = ts
expIntentKeyVal := roachpb.KeyValue{Key: key, Value: expIntentVal}
require.Len(t, intents, 1)
require.Equal(t, expIntentKeyVal, intents[0])
}

// Assert proper iterator use.
require.Equal(t, c.expPrefixIters, prefixIters)
require.Equal(t, c.expNonPrefixIters, nonPrefixIters)
require.Equal(t, c.expNonPrefixIters, nonPrefixIters)
})
})
}
}