diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index f06924e4700f..aca8d33a3e2f 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" ) @@ -64,9 +63,13 @@ func Validate(steps []Step, kvs *Engine) []error { // by `After` timestamp is sufficient to get us the necessary ordering. This // is because txns cannot be used concurrently, so none of the (Begin,After) // timespans for a given transaction can overlap. + // + // TODO(tbg): if, as we should, we order all operations by the timestamp at + // which they executed (at least for MVCC-aware operations), then we would + // sort here by that timestamp. sort.Slice(steps, func(i, j int) bool { return steps[i].After.Less(steps[j].After) }) for _, s := range steps { - v.processOp(nil /* txnID */, s.Op) + v.processOp(notBuffering, s.Op) } var extraKVs []observedOp @@ -240,8 +243,12 @@ type observedScan struct { func (*observedScan) observedMarker() {} type validator struct { - kvs *Engine - observedOpsByTxn map[string][]observedOp + kvs *Engine + + // Ops for the current atomic unit. This is reset between units, in + // checkAtomic, which then calls processOp (which might recurse owing + // to the existence of txn closures, batches, etc). + curOps []observedOp // NB: The Generator carefully ensures that each value written is unique // globally over a run, so there's a 1:1 relationship between a value that was @@ -308,42 +315,53 @@ func makeValidator(kvs *Engine) (*validator, error) { kvByValue: kvByValue, tombstonesForKey: tombstonesForKey, committedDeletesForKey: make(map[string]int), - observedOpsByTxn: make(map[string][]observedOp), }, nil } // getDeleteForKey looks up a stored tombstone for a given key (if it // exists) from tombstonesForKey, returning the tombstone (i.e. MVCCKey) along // with a `true` boolean value if found, or the empty key and `false` if not. -func (v *validator) getDeleteForKey(key string, txn *roachpb.Transaction) (storage.MVCCKey, bool) { - if txn == nil { +func (v *validator) getDeleteForKey(key string, optOpTS hlc.Timestamp) (storage.MVCCKey, bool) { + if optOpTS.IsEmpty() { panic(errors.AssertionFailedf(`transaction required to look up delete for key: %v`, key)) } - if used, ok := v.tombstonesForKey[key][txn.TxnMeta.WriteTimestamp]; !used && ok { - v.tombstonesForKey[key][txn.TxnMeta.WriteTimestamp] = true - return storage.MVCCKey{Key: []byte(key), Timestamp: txn.TxnMeta.WriteTimestamp}, true + if used, ok := v.tombstonesForKey[key][optOpTS]; !used && ok { + v.tombstonesForKey[key][optOpTS] = true + return storage.MVCCKey{Key: []byte(key), Timestamp: optOpTS}, true } return storage.MVCCKey{}, false } -func (v *validator) processOp(txnID *string, op Operation) { +const ( + notBuffering = false + isBuffering = true +) + +// processOp turns the result of an operation into its observations (which are +// later checked against the MVCC history). The boolean parameter indicates +// whether the operation is its own atomic unit or whether it's happening as +// part of a surrounding transaction or batch (in which case the caller is +// itself processOp, with the operation to handle being the batch or txn). +// Whenever it is `false`, processOp invokes the validator's checkAtomic method +// for the operation. +func (v *validator) processOp(buffering bool, op Operation) { switch t := op.GetValue().(type) { case *GetOperation: v.failIfError(op, t.Result) - if txnID == nil { - v.checkAtomic(`get`, t.Result, nil, op) + if !buffering { + v.checkAtomic(`get`, t.Result, hlc.Timestamp{}, op) } else { read := &observedRead{ Key: t.Key, Value: roachpb.Value{RawBytes: t.Result.Value}, } - v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], read) + v.curOps = append(v.curOps, read) } case *PutOperation: - if txnID == nil { - v.checkAtomic(`put`, t.Result, nil, op) + if !buffering { + v.checkAtomic(`put`, t.Result, hlc.Timestamp{}, op) } else { // Accumulate all the writes for this transaction. kv, ok := v.kvByValue[string(t.Value)] @@ -356,11 +374,11 @@ func (v *validator) processOp(txnID *string, op Operation) { if write.Materialized { write.Timestamp = kv.Key.Timestamp } - v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], write) + v.curOps = append(v.curOps, write) } case *DeleteOperation: - if txnID == nil { - v.checkAtomic(`delete`, t.Result, nil, op) + if !buffering { + v.checkAtomic(`delete`, t.Result, hlc.Timestamp{}, op) } else { // NB: While Put operations can be identified as having materialized // (or not) in the storage engine because the Generator guarantees each @@ -378,11 +396,11 @@ func (v *validator) processOp(txnID *string, op Operation) { Key: t.Key, Value: roachpb.Value{}, } - v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], write) + v.curOps = append(v.curOps, write) } case *DeleteRangeOperation: - if txnID == nil { - v.checkAtomic(`deleteRange`, t.Result, nil, op) + if !buffering { + v.checkAtomic(`deleteRange`, t.Result, hlc.Timestamp{}, op) } else { // For the purposes of validation, DelRange operations decompose into // a specialized scan for keys with non-nil values, followed by @@ -410,17 +428,17 @@ func (v *validator) processOp(txnID *string, op Operation) { } deleteOps[i] = write } - v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], scan) - v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], deleteOps...) + v.curOps = append(v.curOps, scan) + v.curOps = append(v.curOps, deleteOps...) } case *ScanOperation: v.failIfError(op, t.Result) - if txnID == nil { + if !buffering { atomicScanType := `scan` if t.Reverse { atomicScanType = `reverse scan` } - v.checkAtomic(atomicScanType, t.Result, nil, op) + v.checkAtomic(atomicScanType, t.Result, hlc.Timestamp{}, op) } else { scan := &observedScan{ Span: roachpb.Span{ @@ -436,7 +454,7 @@ func (v *validator) processOp(txnID *string, op Operation) { Value: roachpb.Value{RawBytes: kv.Value}, } } - v.observedOpsByTxn[*txnID] = append(v.observedOpsByTxn[*txnID], scan) + v.curOps = append(v.curOps, scan) } case *SplitOperation: v.failIfError(op, t.Result) @@ -525,11 +543,11 @@ func (v *validator) processOp(txnID *string, op Operation) { case *BatchOperation: if !resultIsRetryable(t.Result) { v.failIfError(op, t.Result) - if txnID == nil { - v.checkAtomic(`batch`, t.Result, nil, t.Ops...) + if !buffering { + v.checkAtomic(`batch`, t.Result, hlc.Timestamp{}, t.Ops...) } else { for _, op := range t.Ops { - v.processOp(txnID, op) + v.processOp(buffering, op) } } } @@ -538,24 +556,37 @@ func (v *validator) processOp(txnID *string, op Operation) { if t.CommitInBatch != nil { ops = append(ops, t.CommitInBatch.Ops...) } - v.checkAtomic(`txn`, t.Result, t.Txn, ops...) + var optOpsTimestamp hlc.Timestamp + if t.Result.Err == nil { + if t.Txn == nil { + v.failures = append(v.failures, errors.AssertionFailedf("missing transaction")) + break + } + optOpsTimestamp = t.Txn.WriteTimestamp + } + v.checkAtomic(`txn`, t.Result, optOpsTimestamp, ops...) default: panic(errors.AssertionFailedf(`unknown operation type: %T %v`, t, t)) } } +// checkAtomic verifies a set of operations that should be atomic by trying to find +// a timestamp at which the observed reads and writes of the operations (as executed +// in the order in which they appear in the arguments) match the MVCC history. func (v *validator) checkAtomic( - atomicType string, result Result, optTxn *roachpb.Transaction, ops ...Operation, + atomicType string, result Result, optOpsTimestamp hlc.Timestamp, ops ...Operation, ) { - fakeTxnID := uuid.MakeV4().String() for _, op := range ops { - v.processOp(&fakeTxnID, op) + // NB: we're not really necessarily in a txn, but passing true here means that + // we have an atomic unit, which is also the case if we are called here by a + // non-transactional Put, for example. + v.processOp(isBuffering, op) } - txnObservations := v.observedOpsByTxn[fakeTxnID] - delete(v.observedOpsByTxn, fakeTxnID) + txnObservations := v.curOps + v.curOps = nil if result.Type != ResultType_Error { - v.checkCommittedTxn(`committed `+atomicType, txnObservations, optTxn) + v.checkCommittedTxn(`committed `+atomicType, txnObservations, optOpsTimestamp) } else if resultIsAmbiguous(result) { v.checkAmbiguousTxn(`ambiguous `+atomicType, txnObservations) } else { @@ -564,7 +595,7 @@ func (v *validator) checkAtomic( } func (v *validator) checkCommittedTxn( - atomicType string, txnObservations []observedOp, optTxn *roachpb.Transaction, + atomicType string, txnObservations []observedOp, optOpsTimestamp hlc.Timestamp, ) { // The following works by verifying that there is at least one time at which // it was valid to see all the reads and writes that we saw in this @@ -653,13 +684,13 @@ func (v *validator) checkCommittedTxn( if o.isDelete() { key := string(o.Key) v.committedDeletesForKey[key]++ - if optTxn == nil { + if optOpsTimestamp.IsEmpty() { // In the case that the delete is not in a transaction (or in an // ambiguous transaction), we do not match it to a specific // tombstone as we cannot be certain which tombstone resulted from // this operation; hence, we leave the timestamp empty. o.Materialized = v.committedDeletesForKey[key] <= len(v.tombstonesForKey[key]) - } else if storedDelete, ok := v.getDeleteForKey(key, optTxn); ok { + } else if storedDelete, ok := v.getDeleteForKey(key, optOpsTimestamp); ok { o.Materialized = true o.Timestamp = storedDelete.Timestamp } @@ -779,8 +810,8 @@ func (v *validator) checkCommittedTxn( } switch o := observation.(type) { case *observedWrite: - if optTxn != nil && o.Materialized && optTxn.TxnMeta.WriteTimestamp != o.Timestamp { - failure = fmt.Sprintf(`committed txn mismatched write timestamp %s`, optTxn.TxnMeta.WriteTimestamp) + if optOpsTimestamp.IsSet() && o.Materialized && optOpsTimestamp != o.Timestamp { + failure = fmt.Sprintf(`mismatched write timestamp %s`, optOpsTimestamp) } } } @@ -834,9 +865,9 @@ func (v *validator) checkAmbiguousTxn(atomicType string, txnObservations []obser // TODO(dan): Is it possible to receive an ambiguous read-only txn? Assume // committed for now because the committed case has assertions about reads // but the uncommitted case doesn't and this seems to work. - v.checkCommittedTxn(atomicType, txnObservations, nil) + v.checkCommittedTxn(atomicType, txnObservations, hlc.Timestamp{}) } else if somethingCommitted { - v.checkCommittedTxn(atomicType, txnObservations, nil) + v.checkCommittedTxn(atomicType, txnObservations, hlc.Timestamp{}) } else { v.checkUncommittedTxn(atomicType, txnObservations) } diff --git a/pkg/kv/kvnemesis/validator_test.go b/pkg/kv/kvnemesis/validator_test.go index da549ba68ba4..e483cb52dab7 100644 --- a/pkg/kv/kvnemesis/validator_test.go +++ b/pkg/kv/kvnemesis/validator_test.go @@ -228,11 +228,11 @@ func TestValidate(t *testing.T) { steps: []Step{ step(withResult(del(`a`), nil)), step(withResult(put(`a`, `v1`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withResult(put(`a`, `v2`), nil), withResult(del(`a`), nil), withResult(put(`a`, `v3`), nil), - ), nil)), + ), 3), nil)), step(withResult(del(`a`), nil)), }, kvs: kvs(tombstone(`a`, 1), kv(`a`, 2, `v1`), kv(`a`, 3, `v3`), tombstone(`a`, 4)), @@ -265,9 +265,9 @@ func TestValidate(t *testing.T) { { name: "one transactionally committed put with the correct writes", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withResult(put(`a`, `v1`), nil), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v1`)), expected: nil, @@ -275,9 +275,9 @@ func TestValidate(t *testing.T) { { name: "one transactionally committed delete with the correct writes", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withResult(del(`a`), nil), - ), nil)), + ), 1), nil)), }, kvs: kvs(tombstone(`a`, 1)), expected: nil, @@ -285,10 +285,10 @@ func TestValidate(t *testing.T) { { name: "one transactionally committed put with first write missing", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withResult(put(`a`, `v1`), nil), withResult(put(`b`, `v2`), nil), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`b`, 1, `v2`)), expected: []string{`committed txn missing write: [w]"a":missing->v1 [w]"b":0.000000001,0->v2`}, @@ -307,10 +307,10 @@ func TestValidate(t *testing.T) { { name: "one transactionally committed put with second write missing", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withResult(put(`a`, `v1`), nil), withResult(put(`b`, `v2`), nil), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v1`)), expected: []string{`committed txn missing write: [w]"a":0.000000001,0->v1 [w]"b":missing->v2`}, @@ -329,10 +329,10 @@ func TestValidate(t *testing.T) { { name: "one transactionally committed put with write timestamp disagreement", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withResult(put(`a`, `v1`), nil), withResult(put(`b`, `v2`), nil), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v1`), kv(`b`, 2, `v2`)), expected: []string{ @@ -421,10 +421,10 @@ func TestValidate(t *testing.T) { { name: "two transactionally committed puts of the same key", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withResult(put(`a`, `v1`), nil), withResult(put(`a`, `v2`), nil), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v2`)), expected: nil, @@ -454,10 +454,10 @@ func TestValidate(t *testing.T) { { name: "two transactionally committed writes (delete, put) of the same key", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withResult(del(`a`), nil), withResult(put(`a`, `v2`), nil), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v2`)), expected: nil, @@ -465,10 +465,10 @@ func TestValidate(t *testing.T) { { name: "two transactionally committed puts of the same key with extra write", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withResult(put(`a`, `v1`), nil), withResult(put(`a`, `v2`), nil), - ), nil)), + ), 2), nil)), }, // HACK: These should be the same timestamp. See the TODO in // watcher.processEvents. @@ -805,10 +805,10 @@ func TestValidate(t *testing.T) { step(withResult(put(`a`, `v2`), nil)), step(withResult(put(`b`, `v3`), nil)), step(withResult(put(`b`, `v4`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), `v1`), withReadResult(get(`b`), `v3`), - ), nil)), + ), 3), nil)), }, // Reading v1 is valid from 1-3 and v3 is valid from 2-3: overlap 2-3 kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 3, `v2`), kv(`b`, 2, `v3`), kv(`b`, 3, `v4`)), @@ -821,11 +821,11 @@ func TestValidate(t *testing.T) { step(withResult(put(`b`, `v2`), nil)), step(withResult(del(`a`), nil)), step(withResult(del(`b`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), ``), withReadResult(get(`b`), `v2`), withReadResult(get(`c`), ``), - ), nil)), + ), 4), nil)), }, // Reading (a, ) is valid from min-1 or 3-max, and (b, v2) is valid from 2-4: overlap 3-4 kvs: kvs(kv(`a`, 1, `v1`), kv(`b`, 2, `v2`), tombstone(`a`, 3), tombstone(`b`, 4)), @@ -838,10 +838,10 @@ func TestValidate(t *testing.T) { step(withResult(put(`a`, `v2`), nil)), step(withResult(put(`b`, `v3`), nil)), step(withResult(put(`b`, `v4`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), `v1`), withReadResult(get(`b`), `v3`), - ), nil)), + ), 3), nil)), }, // Reading v1 is valid from 1-2 and v3 is valid from 2-3: no overlap kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 2, `v2`), kv(`b`, 2, `v3`), kv(`b`, 3, `v4`)), @@ -859,11 +859,11 @@ func TestValidate(t *testing.T) { withResult(del(`a`), nil), withResult(del(`b`), nil), ), 3), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), ``), withReadResult(get(`b`), `v2`), withReadResult(get(`c`), ``), - ), nil)), + ), 4), nil)), }, // Reading (a, ) is valid from min-1 or 3-max, and (b, v2) is valid from 2-3: no overlap kvs: kvs(kv(`a`, 1, `v1`), kv(`b`, 2, `v2`), tombstone(`a`, 3), tombstone(`b`, 3)), @@ -913,10 +913,10 @@ func TestValidate(t *testing.T) { step(withResult(put(`a`, `v1`), nil)), step(withResult(put(`a`, `v2`), nil)), step(withResult(put(`b`, `v3`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), `v1`), withReadResult(get(`b`), ``), - ), nil)), + ), 1), nil)), }, // Reading v1 is valid from 1-2 and v3 is valid from 0-2: overlap 1-2 kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 2, `v2`), kv(`b`, 2, `v3`)), @@ -928,10 +928,10 @@ func TestValidate(t *testing.T) { step(withResult(put(`a`, `v1`), nil)), step(withResult(put(`a`, `v2`), nil)), step(withResult(put(`b`, `v3`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), `v1`), withReadResult(get(`b`), ``), - ), nil)), + ), 1), nil)), }, // Reading v1 is valid from 1-2 and v3 is valid from 0-1: no overlap kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 2, `v2`), kv(`b`, 1, `v3`)), @@ -945,10 +945,10 @@ func TestValidate(t *testing.T) { steps: []Step{ step(withResult(put(`a`, `v1`), nil)), step(withResult(put(`a`, `v2`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), `v1`), withResult(put(`b`, `v3`), nil), - ), nil)), + ), 2), nil)), }, // Reading v1 is valid from 1-3 and v3 is valid at 2: overlap @2 kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 3, `v2`), kv(`b`, 2, `v3`)), @@ -959,10 +959,10 @@ func TestValidate(t *testing.T) { steps: []Step{ step(withResult(put(`a`, `v1`), nil)), step(withResult(put(`a`, `v2`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), `v1`), withResult(put(`b`, `v3`), nil), - ), nil)), + ), 2), nil)), }, // Reading v1 is valid from 1-2 and v3 is valid at 2: no overlap kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 2, `v2`), kv(`b`, 2, `v3`)), @@ -974,11 +974,11 @@ func TestValidate(t *testing.T) { { name: "transaction with read before and after write", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), ``), withResult(put(`a`, `v1`), nil), withReadResult(get(`a`), `v1`), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v1`)), expected: nil, @@ -999,11 +999,11 @@ func TestValidate(t *testing.T) { { name: "transaction with incorrect read before write", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), `v1`), withResult(put(`a`, `v1`), nil), withReadResult(get(`a`), `v1`), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v1`)), expected: []string{ @@ -1030,11 +1030,11 @@ func TestValidate(t *testing.T) { { name: "transaction with incorrect read after write", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), ``), withResult(put(`a`, `v1`), nil), withReadResult(get(`a`), ``), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v1`)), expected: []string{ @@ -1061,13 +1061,13 @@ func TestValidate(t *testing.T) { { name: "two transactionally committed puts of the same key with reads", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withReadResult(get(`a`), ``), withResult(put(`a`, `v1`), nil), withReadResult(get(`a`), `v1`), withResult(put(`a`, `v2`), nil), withReadResult(get(`a`), `v2`), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v2`)), expected: nil, @@ -1122,7 +1122,7 @@ func TestValidate(t *testing.T) { }, kvs: kvs(kv(`a`, 2, `v1`)), expected: []string{ - `committed txn mismatched write timestamp 0.000000001,0: [w]"a":0.000000002,0->v1`, + `mismatched write timestamp 0.000000001,0: [w]"a":0.000000002,0->v1`, }, }, { @@ -1318,10 +1318,10 @@ func TestValidate(t *testing.T) { { name: "one scan after writes and delete returning missing key", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withResult(put(`a`, `v1`), nil), withResult(put(`b`, `v2`), nil), - ), nil)), + ), 1), nil)), step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`), scanKV(`b`, `v2`)), withResult(del(`a`), nil), @@ -1471,10 +1471,10 @@ func TestValidate(t *testing.T) { step(withResult(put(`a`, `v2`), nil)), step(withResult(put(`b`, `v3`), nil)), step(withResult(put(`b`, `v4`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`), scanKV(`b`, `v3`)), withScanResult(scan(`b`, `d`), scanKV(`b`, `v3`)), - ), nil)), + ), 2), nil)), }, // Reading v1 is valid from 1-3 and v3 is valid from 2-3: overlap 2-3 kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 3, `v2`), kv(`b`, 2, `v3`), kv(`b`, 3, `v4`)), @@ -1488,10 +1488,10 @@ func TestValidate(t *testing.T) { step(withResult(put(`b`, `v3`), nil)), step(withResult(del(`b`), nil)), step(withResult(put(`b`, `v4`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`)), withScanResult(scan(`b`, `d`)), - ), nil)), + ), 2), nil)), }, // Reading v1 is valid from 1-3 and for `b` is valid -1 and 2-4: overlap 2-3 kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 3, `v2`), kv(`b`, 1, `v3`), tombstone(`b`, 2), kv(`b`, 4, `v4`)), @@ -1504,10 +1504,10 @@ func TestValidate(t *testing.T) { step(withResult(put(`a`, `v2`), nil)), step(withResult(put(`b`, `v3`), nil)), step(withResult(put(`b`, `v4`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`), scanKV(`b`, `v3`)), withScanResult(scan(`b`, `d`), scanKV(`b`, `v3`)), - ), nil)), + ), 2), nil)), }, // Reading v1 is valid from 1-2 and v3 is valid from 2-3: no overlap kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 2, `v2`), kv(`b`, 2, `v3`), kv(`b`, 3, `v4`)), @@ -1524,10 +1524,10 @@ func TestValidate(t *testing.T) { step(withResult(put(`a`, `v2`), nil)), step(withResult(put(`b`, `v3`), nil)), step(withResult(del(`b`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`)), withScanResult(scan(`b`, `d`)), - ), nil)), + ), 3), nil)), }, // Reading v1 is valid from 1-2 and for `b` is valid from -1, 3-: no overlap kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 2, `v2`), kv(`b`, 1, `v3`), tombstone(`b`, 3)), @@ -1543,10 +1543,10 @@ func TestValidate(t *testing.T) { step(withResult(put(`a`, `v1`), nil)), step(withResult(put(`a`, `v2`), nil)), step(withResult(put(`b`, `v3`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`)), withScanResult(scan(`b`, `d`)), - ), nil)), + ), 2), nil)), }, // Reading v1 is valid from 1-2 and v3 is valid from 0-2: overlap 1-2 kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 2, `v2`), kv(`b`, 2, `v3`)), @@ -1558,10 +1558,10 @@ func TestValidate(t *testing.T) { step(withResult(put(`a`, `v1`), nil)), step(withResult(put(`a`, `v2`), nil)), step(withResult(put(`b`, `v3`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`)), withScanResult(scan(`b`, `d`)), - ), nil)), + ), 1), nil)), }, // Reading v1 is valid from 1-2 and v3 is valid from 0-1: no overlap kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 2, `v2`), kv(`b`, 1, `v3`)), @@ -1576,10 +1576,10 @@ func TestValidate(t *testing.T) { steps: []Step{ step(withResult(put(`a`, `v1`), nil)), step(withResult(put(`a`, `v2`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`)), withResult(put(`b`, `v3`), nil), - ), nil)), + ), 2), nil)), }, // Reading v1 is valid from 1-3 and v3 is valid at 2: overlap @2 kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 3, `v2`), kv(`b`, 2, `v3`)), @@ -1590,10 +1590,10 @@ func TestValidate(t *testing.T) { steps: []Step{ step(withResult(put(`a`, `v1`), nil)), step(withResult(put(`a`, `v2`), nil)), - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`)), withResult(put(`b`, `v3`), nil), - ), nil)), + ), 2), nil)), }, // Reading v1 is valid from 1-2 and v3 is valid at 2: no overlap kvs: kvs(kv(`a`, 1, `v1`), kv(`a`, 2, `v2`), kv(`b`, 2, `v3`)), @@ -1605,11 +1605,11 @@ func TestValidate(t *testing.T) { { name: "transaction with scan before and after write", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`)), withResult(put(`a`, `v1`), nil), withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`)), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v1`)), expected: nil, @@ -1617,11 +1617,11 @@ func TestValidate(t *testing.T) { { name: "transaction with incorrect scan before write", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`)), withResult(put(`a`, `v1`), nil), withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`)), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v1`)), expected: []string{ @@ -1634,11 +1634,11 @@ func TestValidate(t *testing.T) { { name: "transaction with incorrect scan after write", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`)), withResult(put(`a`, `v1`), nil), withScanResult(scan(`a`, `c`)), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v1`)), expected: []string{ @@ -1649,7 +1649,7 @@ func TestValidate(t *testing.T) { { name: "two transactionally committed puts of the same key with scans", steps: []Step{ - step(withResult(closureTxn(ClosureTxnType_Commit, + step(withResult(withTimestamp(closureTxn(ClosureTxnType_Commit, withScanResult(scan(`a`, `c`)), withResult(put(`a`, `v1`), nil), withScanResult(scan(`a`, `c`), scanKV(`a`, `v1`)), @@ -1657,7 +1657,7 @@ func TestValidate(t *testing.T) { withScanResult(scan(`a`, `c`), scanKV(`a`, `v2`)), withResult(put(`b`, `v3`), nil), withScanResult(scan(`a`, `c`), scanKV(`a`, `v2`), scanKV(`b`, `v3`)), - ), nil)), + ), 1), nil)), }, kvs: kvs(kv(`a`, 1, `v2`), kv(`b`, 1, `v3`)), expected: nil, diff --git a/pkg/kv/kvserver/rangefeed/budget.go b/pkg/kv/kvserver/rangefeed/budget.go index 83336f0ba121..e40f72022571 100644 --- a/pkg/kv/kvserver/rangefeed/budget.go +++ b/pkg/kv/kvserver/rangefeed/budget.go @@ -117,7 +117,7 @@ type FeedBudget struct { closed bool } // Maximum amount of memory to use by feed. We use separate limit here to - // avoid creating BytesMontior with a limit per feed. + // avoid creating BytesMonitor with a limit per feed. limit int64 // Channel to notify that memory was returned to the budget. replenishC chan interface{} diff --git a/pkg/kv/kvserver/rangefeed/processor.go b/pkg/kv/kvserver/rangefeed/processor.go index 46aa41b2722b..b733a8a46737 100644 --- a/pkg/kv/kvserver/rangefeed/processor.go +++ b/pkg/kv/kvserver/rangefeed/processor.go @@ -148,20 +148,37 @@ func putPooledEvent(ev *event) { // to be informed of. It is used so that all events can be sent over the same // channel, which is necessary to prevent reordering. type event struct { - ops []enginepb.MVCCLogicalOp - ct hlc.Timestamp - sst []byte - sstSpan roachpb.Span - sstWTS hlc.Timestamp - initRTS bool - syncC chan struct{} - // This setting is used in conjunction with syncC in tests in order to ensure - // that all registrations have fully finished outputting their buffers. This - // has to be done by the processor in order to avoid race conditions with the + // Event variants. Only one set. + ops opsEvent + ct ctEvent + initRTS initRTSEvent + sst *sstEvent + sync *syncEvent + // Budget allocated to process the event. + alloc *SharedBudgetAllocation +} + +type opsEvent []enginepb.MVCCLogicalOp + +type ctEvent struct { + hlc.Timestamp +} + +type initRTSEvent bool + +type sstEvent struct { + data []byte + span roachpb.Span + ts hlc.Timestamp +} + +type syncEvent struct { + c chan struct{} + // This setting is used in conjunction with c in tests in order to ensure that + // all registrations have fully finished outputting their buffers. This has to + // be done by the processor in order to avoid race conditions with the // registry. Should be used only in tests. - testRegCatchupSpan roachpb.Span - // Budget allocated to process the event - allocation *SharedBudgetAllocation + testRegCatchupSpan *roachpb.Span } // spanErr is an error across a key span that will disconnect overlapping @@ -326,7 +343,7 @@ func (p *Processor) run( // Transform and route events. case e := <-p.eventC: p.consumeEvent(ctx, e) - e.allocation.Release(ctx) + e.alloc.Release(ctx) putPooledEvent(e) // Check whether any unresolved intents need a push. @@ -530,7 +547,7 @@ func (p *Processor) ConsumeSSTable( if p == nil { return true } - return p.sendEvent(ctx, event{sst: sst, sstSpan: sstSpan, sstWTS: writeTS}, p.EventChanTimeout) + return p.sendEvent(ctx, event{sst: &sstEvent{sst, sstSpan, writeTS}}, p.EventChanTimeout) } // ForwardClosedTS indicates that the closed timestamp that serves as the basis @@ -546,7 +563,7 @@ func (p *Processor) ForwardClosedTS(ctx context.Context, closedTS hlc.Timestamp) if closedTS.IsEmpty() { return true } - return p.sendEvent(ctx, event{ct: closedTS}, p.EventChanTimeout) + return p.sendEvent(ctx, event{ct: ctEvent{closedTS}}, p.EventChanTimeout) } // sendEvent informs the Processor of a new event. If a timeout is specified, @@ -557,13 +574,13 @@ func (p *Processor) sendEvent(ctx context.Context, e event, timeout time.Duratio // path where we have enough budget and outgoing channel is free. If not, we // try to set up timeout for acquiring budget and then reuse it for inserting // value into channel. - var allocation *SharedBudgetAllocation + var alloc *SharedBudgetAllocation if p.MemBudget != nil { size := calculateDateEventSize(e) if size > 0 { var err error // First we will try non-blocking fast path to allocate memory budget. - allocation, err = p.MemBudget.TryGet(ctx, size) + alloc, err = p.MemBudget.TryGet(ctx, size) if err != nil { // Since we don't have enough budget, we should try to wait for // allocation returns before failing. @@ -576,7 +593,7 @@ func (p *Processor) sendEvent(ctx context.Context, e event, timeout time.Duratio timeout = 0 } p.Metrics.RangeFeedBudgetBlocked.Inc(1) - allocation, err = p.MemBudget.WaitAndGet(ctx, size) + alloc, err = p.MemBudget.WaitAndGet(ctx, size) } if err != nil { p.Metrics.RangeFeedBudgetExhausted.Inc(1) @@ -584,12 +601,12 @@ func (p *Processor) sendEvent(ctx context.Context, e event, timeout time.Duratio return false } defer func() { - allocation.Release(ctx) + alloc.Release(ctx) }() } } ev := getPooledEvent(e) - ev.allocation = allocation + ev.alloc = alloc if timeout == 0 { // Timeout is zero if no timeout was requested or timeout is already set on // the context by budget allocation. Just try to write using context as a @@ -598,7 +615,7 @@ func (p *Processor) sendEvent(ctx context.Context, e event, timeout time.Duratio case p.eventC <- ev: // Reset allocation after successful posting to prevent deferred cleanup // from freeing it. - allocation = nil + alloc = nil case <-p.stoppedC: // Already stopped. Do nothing. case <-ctx.Done(): @@ -612,7 +629,7 @@ func (p *Processor) sendEvent(ctx context.Context, e event, timeout time.Duratio case p.eventC <- ev: // Reset allocation after successful posting to prevent deferred cleanup // from freeing it. - allocation = nil + alloc = nil case <-p.stoppedC: // Already stopped. Do nothing. default: @@ -625,7 +642,7 @@ func (p *Processor) sendEvent(ctx context.Context, e event, timeout time.Duratio case p.eventC <- ev: // Reset allocation after successful posting to prevent deferred cleanup // from freeing it. - allocation = nil + alloc = nil case <-p.stoppedC: // Already stopped. Do nothing. case <-ctx.Done(): @@ -650,7 +667,7 @@ func (p *Processor) setResolvedTSInitialized(ctx context.Context) { // It does so by flushing the event pipeline. func (p *Processor) syncEventC() { syncC := make(chan struct{}) - ev := getPooledEvent(event{syncC: syncC}) + ev := getPooledEvent(event{sync: &syncEvent{c: syncC}}) select { case p.eventC <- ev: select { @@ -666,17 +683,17 @@ func (p *Processor) syncEventC() { func (p *Processor) consumeEvent(ctx context.Context, e *event) { switch { - case len(e.ops) > 0: - p.consumeLogicalOps(ctx, e.ops, e.allocation) - case len(e.sst) > 0: - p.consumeSSTable(ctx, e.sst, e.sstSpan, e.sstWTS, e.allocation) + case e.ops != nil: + p.consumeLogicalOps(ctx, e.ops, e.alloc) case !e.ct.IsEmpty(): - p.forwardClosedTS(ctx, e.ct) - case e.initRTS: + p.forwardClosedTS(ctx, e.ct.Timestamp) + case bool(e.initRTS): p.initResolvedTS(ctx) - case e.syncC != nil: - if e.testRegCatchupSpan.Valid() { - if err := p.reg.waitForCaughtUp(e.testRegCatchupSpan); err != nil { + case e.sst != nil: + p.consumeSSTable(ctx, e.sst.data, e.sst.span, e.sst.ts, e.alloc) + case e.sync != nil: + if e.sync.testRegCatchupSpan != nil { + if err := p.reg.waitForCaughtUp(*e.sync.testRegCatchupSpan); err != nil { log.Errorf( ctx, "error waiting for registries to catch up during test, results might be impacted: %s", @@ -684,25 +701,25 @@ func (p *Processor) consumeEvent(ctx context.Context, e *event) { ) } } - close(e.syncC) + close(e.sync.c) default: panic(fmt.Sprintf("missing event variant: %+v", e)) } } func (p *Processor) consumeLogicalOps( - ctx context.Context, ops []enginepb.MVCCLogicalOp, allocation *SharedBudgetAllocation, + ctx context.Context, ops []enginepb.MVCCLogicalOp, alloc *SharedBudgetAllocation, ) { for _, op := range ops { // Publish RangeFeedValue updates, if necessary. switch t := op.GetValue().(type) { case *enginepb.MVCCWriteValueOp: // Publish the new value directly. - p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, allocation) + p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, alloc) case *enginepb.MVCCDeleteRangeOp: // Publish the range deletion directly. - p.publishDeleteRange(ctx, t.StartKey, t.EndKey, t.Timestamp, allocation) + p.publishDeleteRange(ctx, t.StartKey, t.EndKey, t.Timestamp, alloc) case *enginepb.MVCCWriteIntentOp: // No updates to publish. @@ -712,7 +729,7 @@ func (p *Processor) consumeLogicalOps( case *enginepb.MVCCCommitIntentOp: // Publish the newly committed value. - p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, allocation) + p.publishValue(ctx, t.Key, t.Timestamp, t.Value, t.PrevValue, alloc) case *enginepb.MVCCAbortIntentOp: // No updates to publish. @@ -737,9 +754,9 @@ func (p *Processor) consumeSSTable( sst []byte, sstSpan roachpb.Span, sstWTS hlc.Timestamp, - allocation *SharedBudgetAllocation, + alloc *SharedBudgetAllocation, ) { - p.publishSSTable(ctx, sst, sstSpan, sstWTS, allocation) + p.publishSSTable(ctx, sst, sstSpan, sstWTS, alloc) } func (p *Processor) forwardClosedTS(ctx context.Context, newClosedTS hlc.Timestamp) { @@ -759,7 +776,7 @@ func (p *Processor) publishValue( key roachpb.Key, timestamp hlc.Timestamp, value, prevValue []byte, - allocation *SharedBudgetAllocation, + alloc *SharedBudgetAllocation, ) { if !p.Span.ContainsKey(roachpb.RKey(key)) { log.Fatalf(ctx, "key %v not in Processor's key range %v", key, p.Span) @@ -778,14 +795,14 @@ func (p *Processor) publishValue( }, PrevValue: prevVal, }) - p.reg.PublishToOverlapping(ctx, roachpb.Span{Key: key}, &event, allocation) + p.reg.PublishToOverlapping(ctx, roachpb.Span{Key: key}, &event, alloc) } func (p *Processor) publishDeleteRange( ctx context.Context, startKey, endKey roachpb.Key, timestamp hlc.Timestamp, - allocation *SharedBudgetAllocation, + alloc *SharedBudgetAllocation, ) { span := roachpb.Span{Key: startKey, EndKey: endKey} if !p.Span.ContainsKeyRange(roachpb.RKey(startKey), roachpb.RKey(endKey)) { @@ -797,7 +814,7 @@ func (p *Processor) publishDeleteRange( Span: span, Timestamp: timestamp, }) - p.reg.PublishToOverlapping(ctx, span, &event, allocation) + p.reg.PublishToOverlapping(ctx, span, &event, alloc) } func (p *Processor) publishSSTable( @@ -805,7 +822,7 @@ func (p *Processor) publishSSTable( sst []byte, sstSpan roachpb.Span, sstWTS hlc.Timestamp, - allocation *SharedBudgetAllocation, + alloc *SharedBudgetAllocation, ) { if sstSpan.Equal(roachpb.Span{}) { panic(errors.AssertionFailedf("received SSTable without span")) @@ -819,7 +836,7 @@ func (p *Processor) publishSSTable( Span: sstSpan, WriteTS: sstWTS, }, - }, allocation) + }, alloc) } func (p *Processor) publishCheckpoint(ctx context.Context) { @@ -852,6 +869,8 @@ func calculateDateEventSize(e event) int64 { for _, op := range e.ops { size += int64(op.Size()) } - size += int64(len(e.sst)) + if e.sst != nil { + size += int64(len(e.sst.data)) + } return size } diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index 8cae2f52e28b..44d747f3f95d 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -21,6 +21,7 @@ import ( "sync/atomic" "testing" "time" + "unsafe" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1052,7 +1053,7 @@ func (p *Processor) syncEventAndRegistrations() { // overlapping the given span to fully process their own internal buffers. func (p *Processor) syncEventAndRegistrationSpan(span roachpb.Span) { syncC := make(chan struct{}) - ev := getPooledEvent(event{syncC: syncC, testRegCatchupSpan: span}) + ev := getPooledEvent(event{sync: &syncEvent{c: syncC, testRegCatchupSpan: &span}}) select { case p.eventC <- ev: select { @@ -1466,3 +1467,11 @@ func BenchmarkProcessorWithBudget(b *testing.B) { require.NoError(b, err.GoError()) } } + +// TestSizeOfEvent tests the size of the event struct. It is fine if this struct +// changes in size, as long as this is done consciously. +func TestSizeOfEvent(t *testing.T) { + var e event + size := int(unsafe.Sizeof(e)) + require.Equal(t, 72, size) +} diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index a945160e95f6..b47256642ae8 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -40,8 +40,8 @@ type Stream interface { // to track memory budgets. event itself could either be shared or not in case // we optimized unused fields in it based on registration options. type sharedEvent struct { - event *roachpb.RangeFeedEvent - allocation *SharedBudgetAllocation + event *roachpb.RangeFeedEvent + alloc *SharedBudgetAllocation } var sharedEventSyncPool = sync.Pool{ @@ -148,17 +148,17 @@ func newRegistration( // If overflowed is already set, events are ignored and not written to the // buffer. func (r *registration) publish( - ctx context.Context, event *roachpb.RangeFeedEvent, allocation *SharedBudgetAllocation, + ctx context.Context, event *roachpb.RangeFeedEvent, alloc *SharedBudgetAllocation, ) { r.validateEvent(event) - e := getPooledSharedEvent(sharedEvent{event: r.maybeStripEvent(event), allocation: allocation}) + e := getPooledSharedEvent(sharedEvent{event: r.maybeStripEvent(event), alloc: alloc}) r.mu.Lock() defer r.mu.Unlock() if r.mu.overflowed { return } - allocation.Use() + alloc.Use() select { case r.buf <- e: r.mu.caughtUp = false @@ -166,7 +166,7 @@ func (r *registration) publish( // Buffer exceeded and we are dropping this event. Registration will need // a catch-up scan. r.mu.overflowed = true - allocation.Release(ctx) + alloc.Release(ctx) } } @@ -322,7 +322,7 @@ func (r *registration) outputLoop(ctx context.Context) error { select { case nextEvent := <-r.buf: err := r.stream.Send(nextEvent.event) - nextEvent.allocation.Release(ctx) + nextEvent.alloc.Release(ctx) putPooledSharedEvent(nextEvent) if err != nil { return err @@ -357,7 +357,7 @@ func (r *registration) drainAllocations(ctx context.Context) { if !ok { return } - e.allocation.Release(ctx) + e.alloc.Release(ctx) putPooledSharedEvent(e) default: return @@ -443,7 +443,7 @@ func (reg *registry) PublishToOverlapping( ctx context.Context, span roachpb.Span, event *roachpb.RangeFeedEvent, - allocation *SharedBudgetAllocation, + alloc *SharedBudgetAllocation, ) { // Determine the earliest starting timestamp that a registration // can have while still needing to hear about this event. @@ -470,7 +470,7 @@ func (reg *registry) PublishToOverlapping( // Don't publish events if they are equal to or less // than the registration's starting timestamp. if r.catchUpTimestamp.Less(minTS) { - r.publish(ctx, event, allocation) + r.publish(ctx, event, alloc) } return false, nil }) diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index c88ec2d41677..507ba98d2c55 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -156,8 +156,8 @@ func TestRegistrationBasic(t *testing.T) { // Registration with no catchup scan specified. noCatchupReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) - noCatchupReg.publish(ctx, ev1, nil /* allocation */) - noCatchupReg.publish(ctx, ev2, nil /* allocation */) + noCatchupReg.publish(ctx, ev1, nil /* alloc */) + noCatchupReg.publish(ctx, ev2, nil /* alloc */) require.Equal(t, len(noCatchupReg.buf), 2) go noCatchupReg.runOutputLoop(context.Background(), 0) require.NoError(t, noCatchupReg.waitForCaughtUp()) @@ -172,8 +172,8 @@ func TestRegistrationBasic(t *testing.T) { makeKV("bc", "val3", 11), makeKV("bd", "val4", 9), }, nil), false) - catchupReg.publish(ctx, ev1, nil /* allocation */) - catchupReg.publish(ctx, ev2, nil /* allocation */) + catchupReg.publish(ctx, ev1, nil /* alloc */) + catchupReg.publish(ctx, ev2, nil /* alloc */) require.Equal(t, len(catchupReg.buf), 2) go catchupReg.runOutputLoop(context.Background(), 0) require.NoError(t, catchupReg.waitForCaughtUp()) @@ -186,8 +186,8 @@ func TestRegistrationBasic(t *testing.T) { // EXIT CONDITIONS // External Disconnect. disconnectReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) - disconnectReg.publish(ctx, ev1, nil /* allocation */) - disconnectReg.publish(ctx, ev2, nil /* allocation */) + disconnectReg.publish(ctx, ev1, nil /* alloc */) + disconnectReg.publish(ctx, ev2, nil /* alloc */) go disconnectReg.runOutputLoop(context.Background(), 0) require.NoError(t, disconnectReg.waitForCaughtUp()) discErr := roachpb.NewError(fmt.Errorf("disconnection error")) @@ -198,8 +198,8 @@ func TestRegistrationBasic(t *testing.T) { // External Disconnect before output loop. disconnectEarlyReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) - disconnectEarlyReg.publish(ctx, ev1, nil /* allocation */) - disconnectEarlyReg.publish(ctx, ev2, nil /* allocation */) + disconnectEarlyReg.publish(ctx, ev1, nil /* alloc */) + disconnectEarlyReg.publish(ctx, ev2, nil /* alloc */) disconnectEarlyReg.disconnect(discErr) go disconnectEarlyReg.runOutputLoop(context.Background(), 0) err = <-disconnectEarlyReg.errC @@ -209,7 +209,7 @@ func TestRegistrationBasic(t *testing.T) { // Overflow. overflowReg := newTestRegistration(spAB, hlc.Timestamp{}, nil, false) for i := 0; i < cap(overflowReg.buf)+3; i++ { - overflowReg.publish(ctx, ev1, nil /* allocation */) + overflowReg.publish(ctx, ev1, nil /* alloc */) } go overflowReg.runOutputLoop(context.Background(), 0) err = <-overflowReg.errC @@ -221,7 +221,7 @@ func TestRegistrationBasic(t *testing.T) { streamErr := fmt.Errorf("stream error") streamErrReg.stream.SetSendErr(streamErr) go streamErrReg.runOutputLoop(context.Background(), 0) - streamErrReg.publish(ctx, ev1, nil /* allocation */) + streamErrReg.publish(ctx, ev1, nil /* alloc */) err = <-streamErrReg.errC require.Equal(t, streamErr.Error(), err.GoError().Error()) @@ -351,7 +351,7 @@ func TestRegistryBasic(t *testing.T) { reg := makeRegistry() require.Equal(t, 0, reg.Len()) - require.NotPanics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev1, nil /* allocation */) }) + require.NotPanics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev1, nil /* alloc */) }) require.NotPanics(t, func() { reg.Disconnect(spAB) }) require.NotPanics(t, func() { reg.DisconnectWithErr(spAB, err1) }) @@ -379,10 +379,10 @@ func TestRegistryBasic(t *testing.T) { require.Equal(t, 4, reg.Len()) // Publish to different spans. - reg.PublishToOverlapping(ctx, spAB, ev1, nil /* allocation */) - reg.PublishToOverlapping(ctx, spBC, ev2, nil /* allocation */) - reg.PublishToOverlapping(ctx, spCD, ev3, nil /* allocation */) - reg.PublishToOverlapping(ctx, spAC, ev4, nil /* allocation */) + reg.PublishToOverlapping(ctx, spAB, ev1, nil /* alloc */) + reg.PublishToOverlapping(ctx, spBC, ev2, nil /* alloc */) + reg.PublishToOverlapping(ctx, spCD, ev3, nil /* alloc */) + reg.PublishToOverlapping(ctx, spAC, ev4, nil /* alloc */) require.NoError(t, reg.waitForCaughtUp(all)) require.Equal(t, []*roachpb.RangeFeedEvent{noPrev(ev1), noPrev(ev4)}, rAB.Events()) require.Equal(t, []*roachpb.RangeFeedEvent{ev2, ev4}, rBC.Events()) @@ -422,10 +422,10 @@ func TestRegistryBasic(t *testing.T) { require.Equal(t, err1.GoError(), rCD.Err().GoError()) // Can still publish to rAB. - reg.PublishToOverlapping(ctx, spAB, ev4, nil /* allocation */) - reg.PublishToOverlapping(ctx, spBC, ev3, nil /* allocation */) - reg.PublishToOverlapping(ctx, spCD, ev2, nil /* allocation */) - reg.PublishToOverlapping(ctx, spAC, ev1, nil /* allocation */) + reg.PublishToOverlapping(ctx, spAB, ev4, nil /* alloc */) + reg.PublishToOverlapping(ctx, spBC, ev3, nil /* alloc */) + reg.PublishToOverlapping(ctx, spCD, ev2, nil /* alloc */) + reg.PublishToOverlapping(ctx, spAC, ev1, nil /* alloc */) require.NoError(t, reg.waitForCaughtUp(all)) require.Equal(t, []*roachpb.RangeFeedEvent{noPrev(ev4), noPrev(ev1)}, rAB.Events()) @@ -487,8 +487,8 @@ func TestRegistryPublishAssertsPopulatedInformation(t *testing.T) { Value: val, PrevValue: val, }) - require.Panics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev, nil /* allocation */) }) - require.Panics(t, func() { reg.PublishToOverlapping(ctx, spCD, ev, nil /* allocation */) }) + require.Panics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev, nil /* alloc */) }) + require.Panics(t, func() { reg.PublishToOverlapping(ctx, spCD, ev, nil /* alloc */) }) require.NoError(t, reg.waitForCaughtUp(all)) // Both registrations require RangeFeedValue events to have a Value. @@ -497,8 +497,8 @@ func TestRegistryPublishAssertsPopulatedInformation(t *testing.T) { Value: noVal, PrevValue: val, }) - require.Panics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev, nil /* allocation */) }) - require.Panics(t, func() { reg.PublishToOverlapping(ctx, spCD, ev, nil /* allocation */) }) + require.Panics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev, nil /* alloc */) }) + require.Panics(t, func() { reg.PublishToOverlapping(ctx, spCD, ev, nil /* alloc */) }) require.NoError(t, reg.waitForCaughtUp(all)) // Neither registrations require RangeFeedValue events to have a PrevValue. @@ -508,8 +508,8 @@ func TestRegistryPublishAssertsPopulatedInformation(t *testing.T) { Value: val, PrevValue: roachpb.Value{}, }) - require.NotPanics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev, nil /* allocation */) }) - require.NotPanics(t, func() { reg.PublishToOverlapping(ctx, spCD, ev, nil /* allocation */) }) + require.NotPanics(t, func() { reg.PublishToOverlapping(ctx, spAB, ev, nil /* alloc */) }) + require.NotPanics(t, func() { reg.PublishToOverlapping(ctx, spCD, ev, nil /* alloc */) }) require.NoError(t, reg.waitForCaughtUp(all)) rNoDiff.disconnect(nil) @@ -531,7 +531,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { ev.MustSetValue(&roachpb.RangeFeedValue{ Value: roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 5}}, }) - reg.PublishToOverlapping(ctx, spAB, ev, nil /* allocation */) + reg.PublishToOverlapping(ctx, spAB, ev, nil /* alloc */) require.NoError(t, reg.waitForCaughtUp(all)) require.Nil(t, r.Events()) @@ -540,7 +540,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { ev.MustSetValue(&roachpb.RangeFeedValue{ Value: roachpb.Value{Timestamp: hlc.Timestamp{WallTime: 10}}, }) - reg.PublishToOverlapping(ctx, spAB, ev, nil /* allocation */) + reg.PublishToOverlapping(ctx, spAB, ev, nil /* alloc */) require.NoError(t, reg.waitForCaughtUp(all)) require.Nil(t, r.Events()) @@ -549,7 +549,7 @@ func TestRegistryPublishBeneathStartTimestamp(t *testing.T) { ev.MustSetValue(&roachpb.RangeFeedCheckpoint{ Span: spAB, ResolvedTS: hlc.Timestamp{WallTime: 5}, }) - reg.PublishToOverlapping(ctx, spAB, ev, nil /* allocation */) + reg.PublishToOverlapping(ctx, spAB, ev, nil /* alloc */) require.NoError(t, reg.waitForCaughtUp(all)) require.Equal(t, []*roachpb.RangeFeedEvent{ev}, r.Events()) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index b1955eb52f2f..170614c4c0ed 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -290,8 +290,8 @@ func (r *Replica) updateRangefeedFilterLocked() bool { return false } -// The size of an event is 112 bytes, so this will result in an allocation on -// the order of ~512KB per RangeFeed. That's probably ok given the number of +// The size of an event is 72 bytes, so this will result in an allocation on +// the order of ~300KB per RangeFeed. That's probably ok given the number of // ranges on a node that we'd like to support with active rangefeeds, but it's // certainly on the upper end of the range. //