Skip to content

Commit

Permalink
Merge #52463
Browse files Browse the repository at this point in the history
52463: colexec: remove internal cancellation behavior from unordered synchronizer r=yuzefovich a=asubiotto

Previously, the unordered synchronizer would cancel all inputs if one of the
inputs encountered an error. This would result in possible context cancellation
errors racing with the original error and would sometimes cause the original
error to be overwritten (according to priority) in the distsql receiver (the
root of a query).

This behavior was incorrect, because what should happen is that the original
error should be propagated followed by a call to DrainMeta by the caller to
drain and close the remaining inputs. This commit removes internal context
cancellation in favor of this behavior.

Release note (bug fix): unexpected context cancellation errors could sometimes
be returned in the vectorized execution engine. This is now fixed.

Fixes #51647 
Fixes #52057 

Co-authored-by: Alfonso Subiotto Marques <alfonso@cockroachlabs.com>
  • Loading branch information
craig[bot] and asubiotto committed Aug 17, 2020
2 parents 8324ed3 + 3888352 commit edb904b
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 77 deletions.
105 changes: 49 additions & 56 deletions pkg/sql/colexec/parallel_unordered_synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -91,7 +88,6 @@ type ParallelUnorderedSynchronizer struct {
// allows the ParallelUnorderedSynchronizer to wait only on internal
// goroutines.
internalWaitGroup *sync.WaitGroup
cancelFn context.CancelFunc
batchCh chan *unorderedSynchronizerMsg
errCh chan error

Expand Down Expand Up @@ -155,7 +151,12 @@ func NewParallelUnorderedSynchronizer(
nextBatch: make([]func(), len(inputs)),
externalWaitGroup: wg,
internalWaitGroup: &sync.WaitGroup{},
batchCh: make(chan *unorderedSynchronizerMsg, len(inputs)),
// batchCh is a buffered channel in order to offer non-blocking writes to
// input goroutines. During normal operation, this channel will have at most
// len(inputs) messages. However, during DrainMeta, inputs might need to
// push an extra metadata message without blocking, hence the need to double
// the size of this channel.
batchCh: make(chan *unorderedSynchronizerMsg, len(inputs)*2),
// errCh is buffered so that writers do not block. If errCh is full, the
// input goroutines will not push an error and exit immediately, given that
// the Next goroutine will read an error and panic anyway.
Expand Down Expand Up @@ -187,19 +188,6 @@ func (s *ParallelUnorderedSynchronizer) setState(state parallelUnorderedSynchron
// Next goroutine. Inputs are asynchronous so that the synchronizer is minimally
// affected by slow inputs.
func (s *ParallelUnorderedSynchronizer) init(ctx context.Context) {
s.setState(parallelUnorderedSynchronizerStateRunning)
var (
cancelFn context.CancelFunc
// internalCancellation is an atomic that will be set to 1 if cancelFn is
// called (i.e. this is an internal cancellation), so that input goroutines
// know not propagate this cancellation.
internalCancellation int32
)
ctx, cancelFn = contextutil.WithCancel(ctx)
s.cancelFn = func() {
atomic.StoreInt32(&internalCancellation, 1)
cancelFn()
}
for i, input := range s.inputs {
s.nextBatch[i] = func(input SynchronizerInput, inputIdx int) func() {
return func() {
Expand All @@ -223,21 +211,6 @@ func (s *ParallelUnorderedSynchronizer) init(ctx context.Context) {
s.externalWaitGroup.Done()
}()
sendErr := func(err error) {
ctxCanceled := errors.Is(err, context.Canceled)
grpcCtxCanceled := grpcutil.IsContextCanceled(err)
queryCanceled := errors.Is(err, sqlbase.QueryCanceledError)
// TODO(yuzefovich): should we be swallowing flow and stream
// context cancellation errors regardless whether the
// cancellation is internal? The reasoning is that if another
// operator encounters an error, the context is likely to get
// canceled with this synchronizer seeing the "fallout".
if atomic.LoadInt32(&internalCancellation) == 1 && (ctxCanceled || grpcCtxCanceled || queryCanceled) {
// If the synchronizer performed the internal cancellation,
// we need to swallow context cancellation and "query
// canceled" errors since they could occur as part of
// "graceful" shutdown of synchronizer's inputs.
return
}
select {
// Non-blocking write to errCh, if an error is present the main
// goroutine will use that and cancel all inputs.
Expand Down Expand Up @@ -315,6 +288,7 @@ func (s *ParallelUnorderedSynchronizer) Next(ctx context.Context) coldata.Batch
case parallelUnorderedSynchronizerStateDone:
return coldata.ZeroBatch
case parallelUnorderedSynchronizerStateUninitialized:
s.setState(parallelUnorderedSynchronizerStateRunning)
s.init(ctx)
case parallelUnorderedSynchronizerStateRunning:
// Signal the input whose batch we returned in the last call to Next that it
Expand All @@ -328,10 +302,9 @@ func (s *ParallelUnorderedSynchronizer) Next(ctx context.Context) coldata.Batch
select {
case err := <-s.errCh:
if err != nil {
// If we got an error from one of our inputs, cancel all inputs and
// propagate this error through a panic.
s.cancelFn()
s.internalWaitGroup.Wait()
// If we got an error from one of our inputs, propagate this error
// through a panic. The caller should then proceed to call DrainMeta,
// which will take care of closing any inputs.
colexecerror.InternalError(err)
}
case msg := <-s.batchCh:
Expand Down Expand Up @@ -380,29 +353,47 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta(
s.setState(parallelUnorderedSynchronizerStateDraining)
if prevState == parallelUnorderedSynchronizerStateUninitialized {
s.init(ctx)
} else {
// The inputs were initialized in Next. This means that the inputs will
// either read the new draining state and exit, or have yet to read the
// state, in which case they are either pushing a batch or waiting for a
// signal to get the next batch. These latter inputs are preempted below by
// draining batchCh and sending a signal to the inputs whose batches have
// been read.
// However, there is also a case where a batch was read in Next and that
// input is waiting for preemption, so do that here.
// Note that if this is not the case (i.e. inputs were initialized but the
// synchronizer immediately transitioned to draining) this preemption won't
// hurt.
s.notifyInputToReadNextBatch(s.lastReadInputIdx)
}

// Non-blocking drain of batchCh. This is important mostly because of the
// following edge case: all n inputs have pushed batches to the batchCh, so
// there are currently n messages. Next notifies the last read input to
// retrieve the next batch but encounters an error. There are now n+1 messages
// in batchCh. Notifying all these inputs to read the next batch would result
// in 2n+1 messages on batchCh, which would cause a deadlock since this
// goroutine blocks on the wait group, but an input will block on writing to
// batchCh. This is a best effort, but note that for this scenario to occur,
// there *must* be at least one message in batchCh (the message belonging to
// the input that was notified).
for batchChDrained := false; !batchChDrained; {
select {
case msg := <-s.batchCh:
if msg == nil {
batchChDrained = true
} else if msg.meta != nil {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
}
default:
batchChDrained = true
}
}

// Unblock any goroutines currently waiting to be told to read the next batch.
// This will force all inputs to observe the new draining state.
for _, ch := range s.readNextBatch {
close(ch)
}

// Wait for all inputs to exit.
s.internalWaitGroup.Wait()

// Drain the batchCh, this reads the metadata that was pushed.
for msg := <-s.batchCh; msg != nil; msg = <-s.batchCh {
if msg.meta == nil {
// This input goroutine pushed a batch to the batchCh and is waiting to be
// told to proceed. Notify it that it is ok to do so.
s.notifyInputToReadNextBatch(msg.inputIdx)
continue
if msg.meta != nil {
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
}
s.bufferedMeta = append(s.bufferedMeta, msg.meta...)
}

// Buffer any errors that may have happened without blocking on the channel.
for exitLoop := false; !exitLoop; {
select {
Expand All @@ -412,6 +403,8 @@ func (s *ParallelUnorderedSynchronizer) DrainMeta(
exitLoop = true
}
}

// Done.
s.setState(parallelUnorderedSynchronizerStateDone)
return s.bufferedMeta
}
43 changes: 29 additions & 14 deletions pkg/sql/colexec/parallel_unordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/col/coldatatestutils"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/colmem"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -147,6 +148,7 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
defer log.Scope(t).Close(t)

const expectedErr = "first input error"
ctx := context.Background()

inputs := make([]SynchronizerInput, 6)
inputs[0].Op = &colexecbase.CallbackOperator{NextCb: func(context.Context) coldata.Batch {
Expand All @@ -155,25 +157,38 @@ func TestUnorderedSynchronizerNoLeaksOnError(t *testing.T) {
return nil
}}
for i := 1; i < len(inputs); i++ {
inputs[i].Op = &colexecbase.CallbackOperator{
NextCb: func(ctx context.Context) coldata.Batch {
<-ctx.Done()
colexecerror.InternalError(ctx.Err())
// This code is unreachable, but the compiler cannot infer that.
return nil
},
}
acc := testMemMonitor.MakeBoundAccount()
defer acc.Close(ctx)
func(allocator *colmem.Allocator) {
inputs[i].Op = &colexecbase.CallbackOperator{
NextCb: func(ctx context.Context) coldata.Batch {
// All inputs that do not encounter an error will continue to return
// batches.
b := allocator.NewMemBatchWithMaxCapacity([]*types.T{types.Int})
b.SetLength(1)
return b
},
}
}(
// Make a separate allocator per input, since each input will call Next in
// a different goroutine.
colmem.NewAllocator(ctx, &acc, coldata.StandardColumnFactory),
)
}

var (
ctx = context.Background()
wg sync.WaitGroup
)
var wg sync.WaitGroup
s := NewParallelUnorderedSynchronizer(inputs, &wg)
err := colexecerror.CatchVectorizedRuntimeError(func() { _ = s.Next(ctx) })
for {
if err := colexecerror.CatchVectorizedRuntimeError(func() { _ = s.Next(ctx) }); err != nil {
require.True(t, testutils.IsError(err, expectedErr), err)
break
}
// Loop until we get an error.
}
// The caller must call DrainMeta on error.
require.Zero(t, len(s.DrainMeta(ctx)))
// This is the crux of the test: assert that all inputs have finished.
require.Equal(t, len(inputs), int(atomic.LoadUint32(&s.numFinishedInputs)))
require.True(t, testutils.IsError(err, expectedErr), err)
}

func BenchmarkParallelUnorderedSynchronizer(b *testing.B) {
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/colflow/vectorized_flow_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,11 @@ func TestVectorizedFlowShutdown(t *testing.T) {
require.NotNil(t, meta.Err)
id, err := strconv.Atoi(meta.Err.Error())
require.NoError(t, err)
require.False(t, receivedMetaFromID[id])
receivedMetaFromID[id] = true
}
require.Equal(t, streamID, metaCount, fmt.Sprintf("received metadata from Outbox %+v", receivedMetaFromID))
for id, received := range receivedMetaFromID {
require.True(t, received, "did not receive metadata from Outbox %d", id)
}
case consumerClosed:
materializer.ConsumerClosed()
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/sql/rowexec/processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,11 +853,6 @@ func TestUncertaintyErrorIsReturned(t *testing.T) {
vectorizeOpt := "off"
if vectorize {
vectorizeOpt = "on"
// Occasionally, the vectorized engine propagates either flow's or
// stream's context canceled error instead of the expected one, so
// we temporarily skip such config.
// TODO(yuzefovich): remove this.
skip.WithIssue(t, 52057)
}
for _, testCase := range testCases {
t.Run(testCase.query, func(t *testing.T) {
Expand Down

0 comments on commit edb904b

Please sign in to comment.