Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
43565: kvnemeses: begin scaffolding for a jepsen-style kv test r=nvanbenschoten/tbg a=danhhz

Package kvnemeses exercises the KV api with random traffic and then
validates that the observed behaviors are consistent with our
guarantees.

A set of Operations are generated which represent usage of the public KV
api. These include both "workload" operations like Gets and Puts as well
as "admin" operations like rebalances. These Operations can be handed to
an Applier, which runs them against the KV api and records the results.

Operations do allow for concurrency (this testing is much less
interesting otherwise), which means that the state of the KV map is not
recoverable from _only_ the input. TODO(dan): We can use RangeFeed to
recover the exact KV history. This plus some Kyle magic can be used to
check our transactional guarantees.

TODO (in later commits)
- Validate the log
- CPut/InitPut/Increment/Delete
- DeleteRange/ClearRange/RevertRange/Scan/ReverseScan
- ChangeReplicas/TransferLease
- ExportRequest
- AddSSTable
- Root and leaf transactions
- GCRequest
- Protected timestamps

Release note: None

44144: colexec: fix multiple starts of the wrapped processors r=yuzefovich a=yuzefovich

**colexec: fix multiple starts of the wrapped processors**

Previously, wrapped processors could be started multiple times if they
were in the input chain for the bufferOp (each of the CASE arms will
initialize its input - the bufferOp). Now this is fixed by tracking in
both Columnarizer and bufferOp whether Init has already been called.

Previous behavior could lead to a crash when rowexec.valuesProcessor was
wrapped because it sends a "bogus" metadata header on each call to
Start, and only single header is expected whereas with multiple Inits
they would be multiple headers.

Fixes: #44133.

Release note (bug fix): Previously, CockroachDB could crash in special
circumstances when vectorized execution engine is used (it was more
likely to happen if `vectorize=experimental_on` setting was used). Now
this has been fixed.

**execerror: catch panics coming from sql/execinfra package**

sql/execinfra is definitely a part of the vectorized engine as a whole,
so we should be catching panics coming from it when running vectorized
flows.

Release note: None

44169: sql/opt/optbuilder: resolve remaining comments from #44015 r=nvanbenschoten a=nvanbenschoten

This commit resolves a few typos that were missed before #44015 was merged.

Release note: None

44172: Include "/cockroach" into PATH in Docker image r=vladdy a=vladdy

This adds "/cockroach" into environment's PATH in Docker image to require less typing when invoking "cockroach" commands via running container.

Fixes: #44189

Co-authored-by: Daniel Harrison <daniel.harrison@gmail.com>
Co-authored-by: Yahor Yuzefovich <yahor@cockroachlabs.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Vlad Artamonov <742047+vladdy@users.noreply.github.com>
  • Loading branch information
5 people committed Jan 22, 2020
5 parents 66c30db + 9e1dfff + 9417336 + 643371d + be53893 commit c76ad97
Show file tree
Hide file tree
Showing 33 changed files with 4,893 additions and 134 deletions.
5 changes: 5 additions & 0 deletions build/deploy/Dockerfile
Expand Up @@ -16,6 +16,11 @@ COPY cockroach.sh cockroach /cockroach/
# are resolved appropriately when passed as args.
WORKDIR /cockroach/

# Include the directory into the path
# to make it easier to invoke commands
# via Docker
ENV PATH=/cockroach:$PATH

ENV COCKROACH_CHANNEL=official-docker

EXPOSE 26257 8080
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/bench_test.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
Expand Down Expand Up @@ -218,7 +219,7 @@ func createBenchmarkChangefeed(
m: th,
}
rowsFn := kvsToRows(s.LeaseManager().(*sql.LeaseManager), details, buf.Get)
sf := makeSpanFrontier(spans...)
sf := span.MakeFrontier(spans...)
tickFn := emitEntries(
s.ClusterSettings(), details, sf, encoder, sink, rowsFn, TestingKnobs{}, metrics)

Expand All @@ -232,7 +233,7 @@ func createBenchmarkChangefeed(
go func() {
defer wg.Done()
err := func() error {
sf := makeSpanFrontier(spans...)
sf := span.MakeFrontier(spans...)
for {
// This is basically the ChangeAggregator processor.
resolvedSpans, err := tickFn(ctx)
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/changefeedccl/changefeed.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -225,7 +226,7 @@ func kvsToRows(
func emitEntries(
settings *cluster.Settings,
details jobspb.ChangefeedDetails,
sf *spanFrontier,
sf *span.Frontier,
encoder Encoder,
sink Sink,
inputFn func(context.Context) ([]emitEntry, error),
Expand Down Expand Up @@ -359,7 +360,7 @@ func emitEntries(
func checkpointResolvedTimestamp(
ctx context.Context,
jobProgressedFn func(context.Context, jobs.HighWaterProgressedFn) error,
sf *spanFrontier,
sf *span.Frontier,
) error {
resolved := sf.Frontier()
var resolvedSpans []jobspb.ResolvedSpan
Expand Down
11 changes: 6 additions & 5 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -73,7 +74,7 @@ type timestampLowerBoundOracle interface {
}

type changeAggregatorLowerBoundOracle struct {
sf *spanFrontier
sf *span.Frontier
initialInclusiveLowerBound hlc.Timestamp
}

Expand Down Expand Up @@ -164,7 +165,7 @@ func (ca *changeAggregator) Start(ctx context.Context) context.Context {
// This object is used to filter out some previously emitted rows, and
// by the cloudStorageSink to name its output files in lexicographically
// monotonic fashion.
sf := makeSpanFrontier(spans...)
sf := span.MakeFrontier(spans...)
for _, watch := range ca.spec.Watches {
sf.Forward(watch.Span, watch.InitialResolved)
}
Expand Down Expand Up @@ -355,7 +356,7 @@ type changeFrontier struct {

// sf contains the current resolved timestamp high-water for the tracked
// span set.
sf *spanFrontier
sf *span.Frontier
// encoder is the Encoder to use for resolved timestamp serialization.
encoder Encoder
// sink is the Sink to write resolved timestamps to. Rows are never written
Expand Down Expand Up @@ -407,7 +408,7 @@ func newChangeFrontierProcessor(
spec: spec,
memAcc: memMonitor.MakeBoundAccount(),
input: input,
sf: makeSpanFrontier(spec.TrackedSpans...),
sf: span.MakeFrontier(spec.TrackedSpans...),
}
if err := cf.Init(
cf, &execinfrapb.PostProcessSpec{},
Expand Down Expand Up @@ -655,7 +656,7 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error {
const slowSpanMaxFrequency = 10 * time.Second
if now.Sub(cf.lastSlowSpanLog) > slowSpanMaxFrequency {
cf.lastSlowSpanLog = now
s := cf.sf.peekFrontierSpan()
s := cf.sf.PeekFrontierSpan()
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/poller.go
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -246,7 +247,7 @@ func (p *poller) rangefeedImplIter(ctx context.Context, i int) error {
// contention pattern and use additional goroutines. it's not clear which
// solution is best without targeted performance testing, so we're choosing
// the faster-to-implement solution for now.
frontier := makeSpanFrontier(spans...)
frontier := span.MakeFrontier(spans...)

rangeFeedStartTS := lastHighwater
for _, span := range p.spans {
Expand Down
21 changes: 11 additions & 10 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -81,7 +82,7 @@ func TestCloudStorageSink(t *testing.T) {
t.Run(`golden`, func(t *testing.T) {
t1 := &sqlbase.TableDescriptor{Name: `t1`}
testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf := makeSpanFrontier(testSpan)
sf := span.MakeFrontier(testSpan)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
sinkDir := `golden`
s, err := makeCloudStorageSink(
Expand Down Expand Up @@ -109,7 +110,7 @@ func TestCloudStorageSink(t *testing.T) {
t2 := &sqlbase.TableDescriptor{Name: `t2`}

testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf := makeSpanFrontier(testSpan)
sf := span.MakeFrontier(testSpan)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `single-node`
s, err := makeCloudStorageSink(
Expand Down Expand Up @@ -182,7 +183,7 @@ func TestCloudStorageSink(t *testing.T) {
t1 := &sqlbase.TableDescriptor{Name: `t1`}

testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf := makeSpanFrontier(testSpan)
sf := span.MakeFrontier(testSpan)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `multi-node`
s1, err := makeCloudStorageSink(
Expand Down Expand Up @@ -262,7 +263,7 @@ func TestCloudStorageSink(t *testing.T) {
t.Run(`zombie`, func(t *testing.T) {
t1 := &sqlbase.TableDescriptor{Name: `t1`}
testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf := makeSpanFrontier(testSpan)
sf := span.MakeFrontier(testSpan)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `zombie`
s1, err := makeCloudStorageSink(
Expand Down Expand Up @@ -303,7 +304,7 @@ func TestCloudStorageSink(t *testing.T) {
t.Run(`bucketing`, func(t *testing.T) {
t1 := &sqlbase.TableDescriptor{Name: `t1`}
testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf := makeSpanFrontier(testSpan)
sf := span.MakeFrontier(testSpan)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `bucketing`
const targetMaxFileSize = 6
Expand All @@ -330,7 +331,7 @@ func TestCloudStorageSink(t *testing.T) {
"v4\nv5\n",
}, slurpDir(t, dir))

// Forward the spanFrontier here and trigger an empty flush to update
// Forward the SpanFrontier here and trigger an empty flush to update
// the sink's `inclusiveLowerBoundTs`
sf.Forward(testSpan, ts(5))
require.NoError(t, s.Flush(ctx))
Expand All @@ -353,7 +354,7 @@ func TestCloudStorageSink(t *testing.T) {
// ts at >= this one before this call starts.
//
// The resolved timestamp file should precede the data files that were
// started after the spanFrontier was forwarded to ts(5).
// started after the SpanFrontier was forwarded to ts(5).
require.NoError(t, s.EmitResolvedTimestamp(ctx, e, ts(5)))
require.Equal(t, []string{
"v1\nv2\nv3\n",
Expand All @@ -365,7 +366,7 @@ func TestCloudStorageSink(t *testing.T) {
// Flush then writes the rest. Since we use the time of the EmitRow
// or EmitResolvedTimestamp calls to order files, the resolved timestamp
// file should precede the last couple files since they started buffering
// after the spanFrontier was forwarded to ts(5).
// after the SpanFrontier was forwarded to ts(5).
require.NoError(t, s.Flush(ctx))
require.Equal(t, []string{
"v1\nv2\nv3\n",
Expand All @@ -391,7 +392,7 @@ func TestCloudStorageSink(t *testing.T) {
t.Run(`file-ordering`, func(t *testing.T) {
t1 := &sqlbase.TableDescriptor{Name: `t1`}
testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf := makeSpanFrontier(testSpan)
sf := span.MakeFrontier(testSpan)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `file-ordering`
s, err := makeCloudStorageSink(
Expand Down Expand Up @@ -450,7 +451,7 @@ func TestCloudStorageSink(t *testing.T) {
t.Run(`ordering-among-schema-versions`, func(t *testing.T) {
t1 := &sqlbase.TableDescriptor{Name: `t1`}
testSpan := roachpb.Span{Key: []byte("a"), EndKey: []byte("b")}
sf := makeSpanFrontier(testSpan)
sf := span.MakeFrontier(testSpan)
timestampOracle := &changeAggregatorLowerBoundOracle{sf: sf}
dir := `ordering-among-schema-versions`
var targetMaxFileSize int64 = 10
Expand Down
166 changes: 166 additions & 0 deletions pkg/kv/kvnemeses/applier.go
@@ -0,0 +1,166 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvnemeses

import (
"context"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
)

// Applier executes Steps.
type Applier struct {
db *client.DB
mu struct {
syncutil.Mutex
txns map[string]*client.Txn
}
}

// MakeApplier constructs an Applier that executes against the given DB.
func MakeApplier(db *client.DB) *Applier {
a := &Applier{
db: db,
}
a.mu.txns = make(map[string]*client.Txn)
return a
}

// Apply executes the given Step and mutates it with the result of execution. An
// error is only returned from Apply if there is an internal coding error within
// Applier, errors from a Step execution are saved in the Step itself.
func (a *Applier) Apply(ctx context.Context, step *Step) (retErr error) {
step.Before = a.db.Clock().Now()
defer func() {
step.After = a.db.Clock().Now()
if p := recover(); p != nil {
retErr = errors.Errorf(`panic applying step %s: %v`, step, p)
}
}()
a.applyOp(ctx, &step.Op)
return nil
}

func (a *Applier) applyOp(ctx context.Context, op *Operation) {
switch o := op.GetValue().(type) {
case *GetOperation, *PutOperation, *BatchOperation:
applyBatchOp(ctx, a.db, op)
case *SplitOperation:
err := a.db.AdminSplit(ctx, o.Key, o.Key, hlc.MaxTimestamp)
o.Result = resultError(ctx, err)
case *MergeOperation:
err := a.db.AdminMerge(ctx, o.Key)
o.Result = resultError(ctx, err)
case *ClosureTxnOperation:
txnErr := a.db.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
for i := range o.Ops {
op := &o.Ops[i]
applyBatchOp(ctx, txn, op)
// The KV api disallows use of a txn after an operation on it errors.
if r := op.Result(); r.Type == ResultType_Error {
return errors.DecodeError(ctx, *r.Err)
}
}
switch o.Type {
case ClosureTxnType_Commit:
return nil
case ClosureTxnType_Rollback:
return errors.New("rollback")
default:
panic(errors.AssertionFailedf(`unknown closure txn type: %s`, o.Type))
}
})
o.Result = resultError(ctx, txnErr)
default:
panic(errors.AssertionFailedf(`unknown operation type: %T %v`, o, o))
}
}

type clientI interface {
Get(context.Context, interface{}) (client.KeyValue, error)
Put(context.Context, interface{}, interface{}) error
Run(context.Context, *client.Batch) error
}

func applyBatchOp(ctx context.Context, db clientI, op *Operation) {
switch o := op.GetValue().(type) {
case *GetOperation:
result, err := db.Get(ctx, o.Key)
if err != nil {
o.Result = resultError(ctx, err)
} else {
o.Result.Type = ResultType_Value
if result.Value != nil {
if value, err := result.Value.GetBytes(); err != nil {
panic(errors.Wrapf(err, "decoding %x", result.Value.RawBytes))
} else {
o.Result.Value = value
}
}
}
case *PutOperation:
err := db.Put(ctx, o.Key, o.Value)
o.Result = resultError(ctx, err)
case *BatchOperation:
b := &client.Batch{}
for i := range o.Ops {
switch subO := o.Ops[i].GetValue().(type) {
case *GetOperation:
b.Get(subO.Key)
case *PutOperation:
b.Put(subO.Key, subO.Value)
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
}
runErr := db.Run(ctx, b)
o.Result = resultError(ctx, runErr)
for i := range o.Ops {
switch subO := o.Ops[i].GetValue().(type) {
case *GetOperation:
if b.Results[i].Err != nil {
subO.Result = resultError(ctx, b.Results[i].Err)
} else {
subO.Result.Type = ResultType_Value
result := b.Results[i].Rows[0]
if result.Value != nil {
if value, err := result.Value.GetBytes(); err != nil {
panic(errors.Wrapf(err, "decoding %x", result.Value.RawBytes))
} else {
subO.Result.Value = value
}
}
}
case *PutOperation:
err := b.Results[i].Err
subO.Result = resultError(ctx, err)
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, subO, subO))
}
}
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, o, o))
}
}

func resultError(ctx context.Context, err error) Result {
if err == nil {
return Result{Type: ResultType_NoError}
}
ee := errors.EncodeError(ctx, err)
return Result{
Type: ResultType_Error,
Err: &ee,
}
}

0 comments on commit c76ad97

Please sign in to comment.