Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
87877: kvnemesis: simplify and document validation logic r=erikgrinaker a=tbg

It took me a while to fully understand `processOp` and `checkAtomic`.
This commit simplifies them. It does so in a few ways:

- remove a map that only ever had one entry.
- avoid the need to use fake transaction IDs, and in particular clarify
  that nothing is really about txn IDs, it's about collecting atomic
  units (which might originate in a batch or a txn)
- thread the optional execution timestamp directly, as opposed to
  indirecting through an optional `*Transaction` proto.

The last point deserves a few more words. At its core, `kvnemesis` wants
to figure out valid execution timestamps by relying on unique values
coming in over the rangestream. But deletion tombstones carry no value
and thus aren't unique. There then needs to be some way to match up a
deletion tombstone with an operation that might have written it. This
requires knowledge of the timestamp at which the operation executed, and
kvnemesis was, at least for `ClosureTxnOperation`s, using its knowledge
of the commit timestamp for that purpose.

We can actually get that timestamp for all operations, though, and we
should switch `kvnemesis` to sort operations by their execution
timestamp, and then verify that the observed MVCC history is congruent
with that execution order[^1].

This commit doesn't quite do that but it
sets the stage by abstracting away from the txn commit timestamp.

This is related to cockroachdb#69642 in that this is the issue that prompted this
refactor.

[^1]: which happens to have been something also envisioned by the original author: https://github.com/cockroachdb/cockroach/blob/7cde315da539fe3d790f546a1ddde6cc882fca6b/pkg/kv/kvnemesis/validator.go#L43-L46

Release note: None

88308: kv/rangefeed: reduce size of event struct from 200 bytes to 72 bytes r=nvanbenschoten a=nvanbenschoten

This commit restructures the event struct and reduces its size from 200 bytes to 72 bytes. This is accomplished primarily by pushing large, infrequently used struct fields into pointers. This is mostly just a drive-by cleanup found while working on cockroachdb#77724.

Release justification: None. Don't merge yet.

Release note: None.

Co-authored-by: Tobias Grieger <tobias.b.grieger@gmail.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
3 people committed Sep 22, 2022
3 parents 7278468 + 97bec49 + 11ab1ec commit f08a1b0
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 203 deletions.
119 changes: 75 additions & 44 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)
Expand Down Expand Up @@ -64,9 +63,13 @@ func Validate(steps []Step, kvs *Engine) []error {
// by `After` timestamp is sufficient to get us the necessary ordering. This
// is because txns cannot be used concurrently, so none of the (Begin,After)
// timespans for a given transaction can overlap.
//
// TODO(tbg): if, as we should, we order all operations by the timestamp at
// which they executed (at least for MVCC-aware operations), then we would
// sort here by that timestamp.
sort.Slice(steps, func(i, j int) bool { return steps[i].After.Less(steps[j].After) })
for _, s := range steps {
v.processOp(nil /* txnID */, s.Op)
v.processOp(notBuffering, s.Op)
}

var extraKVs []observedOp
Expand Down Expand Up @@ -240,8 +243,12 @@ type observedScan struct {
func (*observedScan) observedMarker() {}

type validator struct {
kvs *Engine
observedOpsByTxn map[string][]observedOp
kvs *Engine

// Ops for the current atomic unit. This is reset between units, in
// checkAtomic, which then calls processOp (which might recurse owing
// to the existence of txn closures, batches, etc).
curOps []observedOp

// NB: The Generator carefully ensures that each value written is unique
// globally over a run, so there's a 1:1 relationship between a value that was
Expand Down Expand Up @@ -308,42 +315,53 @@ func makeValidator(kvs *Engine) (*validator, error) {
kvByValue: kvByValue,
tombstonesForKey: tombstonesForKey,
committedDeletesForKey: make(map[string]int),
observedOpsByTxn: make(map[string][]observedOp),
}, nil
}

// getDeleteForKey looks up a stored tombstone for a given key (if it
// exists) from tombstonesForKey, returning the tombstone (i.e. MVCCKey) along
// with a `true` boolean value if found, or the empty key and `false` if not.
func (v *validator) getDeleteForKey(key string, txn *roachpb.Transaction) (storage.MVCCKey, bool) {
if txn == nil {
func (v *validator) getDeleteForKey(key string, optOpTS hlc.Timestamp) (storage.MVCCKey, bool) {
if optOpTS.IsEmpty() {
panic(errors.AssertionFailedf(`transaction required to look up delete for key: %v`, key))
}

if used, ok := v.tombstonesForKey[key][txn.TxnMeta.WriteTimestamp]; !used && ok {
v.tombstonesForKey[key][txn.TxnMeta.WriteTimestamp] = true
return storage.MVCCKey{Key: []byte(key), Timestamp: txn.TxnMeta.WriteTimestamp}, true
if used, ok := v.tombstonesForKey[key][optOpTS]; !used && ok {
v.tombstonesForKey[key][optOpTS] = true
return storage.MVCCKey{Key: []byte(key), Timestamp: optOpTS}, true
}

return storage.MVCCKey{}, false
}

func (v *validator) processOp(txnID *string, op Operation) {
const (
notBuffering = false
isBuffering = true
)

// processOp turns the result of an operation into its observations (which are
// later checked against the MVCC history). The boolean parameter indicates
// whether the operation is its own atomic unit or whether it's happening as
// part of a surrounding transaction or batch (in which case the caller is
// itself processOp, with the operation to handle being the batch or txn).
// Whenever it is `false`, processOp invokes the validator's checkAtomic method
// for the operation.
func (v *validator) processOp(buffering bool, op Operation) {
switch t := op.GetValue().(type) {
case *GetOperation:
v.failIfError(op, t.Result)
if txnID == nil {
v.checkAtomic(`get`, t.Result, nil, op)
if !buffering {
v.checkAtomic(`get`, t.Result, hlc.Timestamp{}, op)
} else {
read := &observedRead{
Key: t.Key,
Value: roachpb.Value{RawBytes: t.Result.Value},
}
v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], read)
v.curOps = append(v.curOps, read)
}
case *PutOperation:
if txnID == nil {
v.checkAtomic(`put`, t.Result, nil, op)
if !buffering {
v.checkAtomic(`put`, t.Result, hlc.Timestamp{}, op)
} else {
// Accumulate all the writes for this transaction.
kv, ok := v.kvByValue[string(t.Value)]
Expand All @@ -356,11 +374,11 @@ func (v *validator) processOp(txnID *string, op Operation) {
if write.Materialized {
write.Timestamp = kv.Key.Timestamp
}
v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], write)
v.curOps = append(v.curOps, write)
}
case *DeleteOperation:
if txnID == nil {
v.checkAtomic(`delete`, t.Result, nil, op)
if !buffering {
v.checkAtomic(`delete`, t.Result, hlc.Timestamp{}, op)
} else {
// NB: While Put operations can be identified as having materialized
// (or not) in the storage engine because the Generator guarantees each
Expand All @@ -378,11 +396,11 @@ func (v *validator) processOp(txnID *string, op Operation) {
Key: t.Key,
Value: roachpb.Value{},
}
v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], write)
v.curOps = append(v.curOps, write)
}
case *DeleteRangeOperation:
if txnID == nil {
v.checkAtomic(`deleteRange`, t.Result, nil, op)
if !buffering {
v.checkAtomic(`deleteRange`, t.Result, hlc.Timestamp{}, op)
} else {
// For the purposes of validation, DelRange operations decompose into
// a specialized scan for keys with non-nil values, followed by
Expand Down Expand Up @@ -410,17 +428,17 @@ func (v *validator) processOp(txnID *string, op Operation) {
}
deleteOps[i] = write
}
v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], scan)
v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], deleteOps...)
v.curOps = append(v.curOps, scan)
v.curOps = append(v.curOps, deleteOps...)
}
case *ScanOperation:
v.failIfError(op, t.Result)
if txnID == nil {
if !buffering {
atomicScanType := `scan`
if t.Reverse {
atomicScanType = `reverse scan`
}
v.checkAtomic(atomicScanType, t.Result, nil, op)
v.checkAtomic(atomicScanType, t.Result, hlc.Timestamp{}, op)
} else {
scan := &observedScan{
Span: roachpb.Span{
Expand All @@ -436,7 +454,7 @@ func (v *validator) processOp(txnID *string, op Operation) {
Value: roachpb.Value{RawBytes: kv.Value},
}
}
v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], scan)
v.curOps = append(v.curOps, scan)
}
case *SplitOperation:
v.failIfError(op, t.Result)
Expand Down Expand Up @@ -525,11 +543,11 @@ func (v *validator) processOp(txnID *string, op Operation) {
case *BatchOperation:
if !resultIsRetryable(t.Result) {
v.failIfError(op, t.Result)
if txnID == nil {
v.checkAtomic(`batch`, t.Result, nil, t.Ops...)
if !buffering {
v.checkAtomic(`batch`, t.Result, hlc.Timestamp{}, t.Ops...)
} else {
for _, op := range t.Ops {
v.processOp(txnID, op)
v.processOp(buffering, op)
}
}
}
Expand All @@ -538,24 +556,37 @@ func (v *validator) processOp(txnID *string, op Operation) {
if t.CommitInBatch != nil {
ops = append(ops, t.CommitInBatch.Ops...)
}
v.checkAtomic(`txn`, t.Result, t.Txn, ops...)
var optOpsTimestamp hlc.Timestamp
if t.Result.Err == nil {
if t.Txn == nil {
v.failures = append(v.failures, errors.AssertionFailedf("missing transaction"))
break
}
optOpsTimestamp = t.Txn.WriteTimestamp
}
v.checkAtomic(`txn`, t.Result, optOpsTimestamp, ops...)
default:
panic(errors.AssertionFailedf(`unknown operation type: %T %v`, t, t))
}
}

// checkAtomic verifies a set of operations that should be atomic by trying to find
// a timestamp at which the observed reads and writes of the operations (as executed
// in the order in which they appear in the arguments) match the MVCC history.
func (v *validator) checkAtomic(
atomicType string, result Result, optTxn *roachpb.Transaction, ops ...Operation,
atomicType string, result Result, optOpsTimestamp hlc.Timestamp, ops ...Operation,
) {
fakeTxnID := uuid.MakeV4().String()
for _, op := range ops {
v.processOp(&fakeTxnID, op)
// NB: we're not really necessarily in a txn, but passing true here means that
// we have an atomic unit, which is also the case if we are called here by a
// non-transactional Put, for example.
v.processOp(isBuffering, op)
}
txnObservations := v.observedOpsByTxn[fakeTxnID]
delete(v.observedOpsByTxn, fakeTxnID)
txnObservations := v.curOps
v.curOps = nil

if result.Type != ResultType_Error {
v.checkCommittedTxn(`committed `+atomicType, txnObservations, optTxn)
v.checkCommittedTxn(`committed `+atomicType, txnObservations, optOpsTimestamp)
} else if resultIsAmbiguous(result) {
v.checkAmbiguousTxn(`ambiguous `+atomicType, txnObservations)
} else {
Expand All @@ -564,7 +595,7 @@ func (v *validator) checkAtomic(
}

func (v *validator) checkCommittedTxn(
atomicType string, txnObservations []observedOp, optTxn *roachpb.Transaction,
atomicType string, txnObservations []observedOp, optOpsTimestamp hlc.Timestamp,
) {
// The following works by verifying that there is at least one time at which
// it was valid to see all the reads and writes that we saw in this
Expand Down Expand Up @@ -653,13 +684,13 @@ func (v *validator) checkCommittedTxn(
if o.isDelete() {
key := string(o.Key)
v.committedDeletesForKey[key]++
if optTxn == nil {
if optOpsTimestamp.IsEmpty() {
// In the case that the delete is not in a transaction (or in an
// ambiguous transaction), we do not match it to a specific
// tombstone as we cannot be certain which tombstone resulted from
// this operation; hence, we leave the timestamp empty.
o.Materialized = v.committedDeletesForKey[key] <= len(v.tombstonesForKey[key])
} else if storedDelete, ok := v.getDeleteForKey(key, optTxn); ok {
} else if storedDelete, ok := v.getDeleteForKey(key, optOpsTimestamp); ok {
o.Materialized = true
o.Timestamp = storedDelete.Timestamp
}
Expand Down Expand Up @@ -779,8 +810,8 @@ func (v *validator) checkCommittedTxn(
}
switch o := observation.(type) {
case *observedWrite:
if optTxn != nil && o.Materialized && optTxn.TxnMeta.WriteTimestamp != o.Timestamp {
failure = fmt.Sprintf(`committed txn mismatched write timestamp %s`, optTxn.TxnMeta.WriteTimestamp)
if optOpsTimestamp.IsSet() && o.Materialized && optOpsTimestamp != o.Timestamp {
failure = fmt.Sprintf(`mismatched write timestamp %s`, optOpsTimestamp)
}
}
}
Expand Down Expand Up @@ -834,9 +865,9 @@ func (v *validator) checkAmbiguousTxn(atomicType string, txnObservations []obser
// TODO(dan): Is it possible to receive an ambiguous read-only txn? Assume
// committed for now because the committed case has assertions about reads
// but the uncommitted case doesn't and this seems to work.
v.checkCommittedTxn(atomicType, txnObservations, nil)
v.checkCommittedTxn(atomicType, txnObservations, hlc.Timestamp{})
} else if somethingCommitted {
v.checkCommittedTxn(atomicType, txnObservations, nil)
v.checkCommittedTxn(atomicType, txnObservations, hlc.Timestamp{})
} else {
v.checkUncommittedTxn(atomicType, txnObservations)
}
Expand Down
Loading

0 comments on commit f08a1b0

Please sign in to comment.