Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

colexec: fix an issue with builtin operators and a minor cleanup #43989

Merged
merged 1 commit into from
Jan 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions pkg/sql/colexec/and_or_projection_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ func (o *_OP_LOWERProjOp) Next(ctx context.Context) coldata.Batch {
o.leftFeedOp.batch = batch
batch = o.leftProjOpChain.Next(ctx)

if origLen == 0 {
// Run the right-side projection on the remaining tuples.
o.rightFeedOp.batch = batch
batch = o.rightProjOpChain.Next(ctx)
return batch
}

// Now we need to populate a selection vector on the batch in such a way that
// those tuples that we already know the result of logical operation for do
// not get the projection for the right side.
Expand Down Expand Up @@ -184,10 +191,6 @@ func (o *_OP_LOWERProjOp) Next(ctx context.Context) coldata.Batch {
}
batch.SetLength(origLen)

if origLen == 0 {
return batch
}

rightCol := batch.ColVec(o.rightIdx)
outputCol := batch.ColVec(o.outputIdx)
rightColVals := rightCol.Bool()
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/colexec/builtin_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func (b *defaultBuiltinFuncOperator) Next(ctx context.Context) coldata.Batch {
}
},
)
// Although we didn't change the length of the batch, it is necessary to set
// the length anyway (this helps maintaining the invariant of flat bytes).
batch.SetLength(n)
return batch
}

Expand Down Expand Up @@ -185,6 +188,9 @@ func (s *substringFunctionOperator) Next(ctx context.Context) coldata.Batch {
}
},
)
// Although we didn't change the length of the batch, it is necessary to set
// the length anyway (this helps maintaining the invariant of flat bytes).
batch.SetLength(n)
return batch
}

Expand Down
135 changes: 75 additions & 60 deletions pkg/sql/colexec/case.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,20 @@ func (c *caseOp) Next(ctx context.Context) coldata.Batch {
copy(c.origSel, sel)
}

outputCol := c.buffer.batch.Batch.ColVec(c.outputIdx)
var batch coldata.Batch
c.allocator.PerformOperation([]coldata.Vec{outputCol}, func() {
// It is possible that we have a typeless zero-length batch, and we cannot
// get a particular columnar vector from it, so we have special casing here
// for non-zero length batches.
// TODO(yuzefovich): remove it once we can short-circuit.
var (
outputCol coldata.Vec
batch coldata.Batch
destVecs []coldata.Vec
)
if origLen > 0 {
outputCol = c.buffer.batch.Batch.ColVec(c.outputIdx)
destVecs = []coldata.Vec{outputCol}
}
c.allocator.PerformOperation(destVecs, func() {
for i := range c.caseOps {
// Run the next case operator chain. It will project its THEN expression
// for all tuples that matched its WHEN expression and that were not
Expand Down Expand Up @@ -146,73 +157,77 @@ func (c *caseOp) Next(ctx context.Context) coldata.Batch {
// current case arm.
var subtractIdx int
var curIdx uint16
inputCol := c.buffer.batch.Batch.ColVec(c.thenIdxs[i])
// Copy the results into the output vector, using the toSubtract selection
// vector to copy only the elements that we actually wrote according to the
// current case arm.
if origLen > 0 {
inputCol := c.buffer.batch.Batch.ColVec(c.thenIdxs[i])
// Copy the results into the output vector, using the toSubtract selection
// vector to copy only the elements that we actually wrote according to the
// current case arm.
outputCol.Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
ColType: c.typ,
Src: inputCol,
Sel: toSubtract,
SrcStartIdx: 0,
SrcEndIdx: uint64(len(toSubtract)),
},
SelOnDest: true,
})
if oldSel := c.buffer.batch.Batch.Selection(); oldSel != nil {
// We have an old selection vector, which represents the tuples that
// haven't yet been matched. Remove the ones that just matched from the
// old selection vector.
for i := range oldSel[:c.buffer.batch.Batch.Length()] {
if subtractIdx < len(toSubtract) && toSubtract[subtractIdx] == oldSel[i] {
// The ith element of the old selection vector matched the current one
// in toSubtract. Skip writing this element, removing it from the old
// selection vector.
subtractIdx++
continue
}
oldSel[curIdx] = oldSel[i]
curIdx++
}
} else {
// No selection vector means there have been no matches yet, and we were
// considering the entire batch of tuples for this case arm. Make a new
// selection vector with all of the tuples but the ones that just matched.
c.buffer.batch.Batch.SetSelection(true)
oldSel = c.buffer.batch.Batch.Selection()
for i := uint16(0); i < c.buffer.batch.Batch.Length(); i++ {
if subtractIdx < len(toSubtract) && toSubtract[subtractIdx] == i {
subtractIdx++
continue
}
oldSel[curIdx] = i
curIdx++
}
}
c.buffer.batch.Batch.SetLength(curIdx)

// Now our selection vector is set to exclude all the things that have
// matched so far. Reset the buffer and run the next case arm.
c.buffer.rewind()
}
}
// Finally, run the else operator, which will project into all tuples that
// are remaining in the selection vector (didn't match any case arms). Once
// that's done, restore the original selection vector and return the batch.
batch = c.elseOp.Next(ctx)
if origLen > 0 {
inputCol := c.buffer.batch.Batch.ColVec(c.thenIdxs[len(c.thenIdxs)-1])
outputCol.Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
ColType: c.typ,
Src: inputCol,
Sel: toSubtract,
Sel: batch.Selection(),
SrcStartIdx: 0,
SrcEndIdx: uint64(len(toSubtract)),
SrcEndIdx: uint64(batch.Length()),
},
SelOnDest: true,
})
if oldSel := c.buffer.batch.Batch.Selection(); oldSel != nil {
// We have an old selection vector, which represents the tuples that
// haven't yet been matched. Remove the ones that just matched from the
// old selection vector.
for i := range oldSel[:c.buffer.batch.Batch.Length()] {
if subtractIdx < len(toSubtract) && toSubtract[subtractIdx] == oldSel[i] {
// The ith element of the old selection vector matched the current one
// in toSubtract. Skip writing this element, removing it from the old
// selection vector.
subtractIdx++
continue
}
oldSel[curIdx] = oldSel[i]
curIdx++
}
} else {
// No selection vector means there have been no matches yet, and we were
// considering the entire batch of tuples for this case arm. Make a new
// selection vector with all of the tuples but the ones that just matched.
c.buffer.batch.Batch.SetSelection(true)
oldSel = c.buffer.batch.Batch.Selection()
for i := uint16(0); i < c.buffer.batch.Batch.Length(); i++ {
if subtractIdx < len(toSubtract) && toSubtract[subtractIdx] == i {
subtractIdx++
continue
}
oldSel[curIdx] = i
curIdx++
}
}
c.buffer.batch.Batch.SetLength(curIdx)

// Now our selection vector is set to exclude all the things that have
// matched so far. Reset the buffer and run the next case arm.
c.buffer.rewind()
}
// Finally, run the else operator, which will project into all tuples that
// are remaining in the selection vector (didn't match any case arms). Once
// that's done, restore the original selection vector and return the batch.
batch = c.elseOp.Next(ctx)
inputCol := c.buffer.batch.Batch.ColVec(c.thenIdxs[len(c.thenIdxs)-1])
outputCol.Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
ColType: c.typ,
Src: inputCol,
Sel: batch.Selection(),
SrcStartIdx: 0,
SrcEndIdx: uint64(batch.Length()),
},
SelOnDest: true,
})
})
batch.SetLength(origLen)
batch.SetSelection(origHasSel)
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/colexec/cfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,8 +857,7 @@ func (rf *cFetcher) nextBatch(ctx context.Context) (coldata.Batch, error) {
return rf.machine.batch, nil

case stateFinished:
rf.machine.batch.SetLength(0)
return rf.machine.batch, nil
return coldata.ZeroBatch, nil
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/colexec/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,22 @@ func NewCountOp(allocator *Allocator, input Operator) Operator {

func (c *countOp) Init() {
c.input.Init()
// Our output is always just one row.
c.internalBatch.SetLength(1)
c.count = 0
c.done = false
}

func (c *countOp) Next(ctx context.Context) coldata.Batch {
c.internalBatch.ResetInternalBatch()
if c.done {
c.internalBatch.SetLength(0)
return c.internalBatch
return coldata.ZeroBatch
}
for {
bat := c.input.Next(ctx)
length := bat.Length()
if length == 0 {
c.done = true
c.internalBatch.ColVec(0).Int64()[0] = c.count
c.internalBatch.SetLength(1)
return c.internalBatch
}
c.count += int64(length)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func (p *deselectorOp) Next(ctx context.Context) coldata.Batch {
return batch
}

p.output.SetLength(batch.Length())
p.output.ResetInternalBatch()
sel := batch.Selection()
p.allocator.PerformOperation(p.output.ColVecs(), func() {
Expand All @@ -72,5 +71,6 @@ func (p *deselectorOp) Next(ctx context.Context) coldata.Batch {
)
}
})
p.output.SetLength(batch.Length())
return p.output
}
8 changes: 3 additions & 5 deletions pkg/sql/colexec/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,8 @@ func (s *zeroOperator) Init() {

func (s *zeroOperator) Next(ctx context.Context) coldata.Batch {
// TODO(solon): Can we avoid calling Next on the input at all?
next := s.input.Next(ctx)
next.SetLength(0)
return next
s.input.Next(ctx)
return coldata.ZeroBatch
}

type singleTupleNoInputOperator struct {
Expand All @@ -224,8 +223,7 @@ func (s *singleTupleNoInputOperator) Init() {
func (s *singleTupleNoInputOperator) Next(ctx context.Context) coldata.Batch {
s.batch.ResetInternalBatch()
if s.nexted {
s.batch.SetLength(0)
return s.batch
return coldata.ZeroBatch
}
s.nexted = true
s.batch.SetLength(1)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/proj_const_ops_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ func (p _OP_CONST_NAME) Next(ctx context.Context) coldata.Batch {
} else {
_SET_PROJECTION(false)
}
// Although we didn't change the length of the batch, it is necessary to set
// the length anyway (this helps maintaining the invariant of flat bytes).
batch.SetLength(n)
return batch
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/proj_non_const_ops_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func (p _OP_NAME) Next(ctx context.Context) coldata.Batch {
_SET_PROJECTION(false)
}

// Although we didn't change the length of the batch, it is necessary to set
// the length anyway (this helps maintaining the invariant of flat bytes).
batch.SetLength(n)
return batch
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/sort_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,9 @@ func (s *chunker) prepareNextChunks(ctx context.Context) chunkerReadingState {
// buffer appends all tuples in range [start,end) from s.batch to already
// buffered tuples.
func (s *chunker) buffer(start uint16, end uint16) {
if start == end {
return
}
s.allocator.PerformOperation(s.bufferedTuples.colVecs, func() {
for i := 0; i < len(s.bufferedTuples.colVecs); i++ {
s.bufferedTuples.colVecs[i].Append(
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/colexec/sorttopk.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,7 @@ func (t *topKSorter) emit() coldata.Batch {
toEmit := t.topK.Length() - t.emitted
if toEmit == 0 {
// We're done.
t.output.SetLength(0)
return t.output
return coldata.ZeroBatch
}
if toEmit > coldata.BatchSize() {
toEmit = coldata.BatchSize()
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/colexec/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,9 @@ func (s *RepeatableBatchSource) Next(context.Context) coldata.Batch {
s.internalBatch.SetSelection(s.sel != nil)
s.batchesReturned++
if s.batchesToReturn != 0 && s.batchesReturned > s.batchesToReturn {
s.internalBatch.SetLength(0)
} else {
s.internalBatch.SetLength(s.batchLen)
return coldata.ZeroBatch
}
s.internalBatch.SetLength(s.batchLen)
if s.sel != nil {
// Since selection vectors are mutable, to make sure that we return the
// batch with the given selection vector, we need to reset
Expand Down
17 changes: 12 additions & 5 deletions pkg/sql/colexec/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ func runTestsWithoutAllNullsInjection(
// output on its second Next call (we need the first call to Next to get a
// reference to a batch to modify), and a second time to modify the batch
// and verify that this does not change the operator output.
// NOTE: this test makes sense only if the operator returns two non-zero
// length batches (if not, we short-circuit the test since the operator
// doesn't have to restore anything on a zero-length batch).
var (
secondBatchHasSelection, secondBatchHasNulls bool
inputTypes []coltypes.T
Expand All @@ -249,6 +252,9 @@ func runTestsWithoutAllNullsInjection(
}
ctx := context.Background()
b := op.Next(ctx)
if b.Length() == 0 {
return
}
if round == 1 {
if secondBatchHasSelection {
b.SetSelection(false)
Expand All @@ -265,6 +271,9 @@ func runTestsWithoutAllNullsInjection(
}
}
b = op.Next(ctx)
if b.Length() == 0 {
return
}
if round == 0 {
secondBatchHasSelection = b.Selection() != nil
secondBatchHasNulls = maybeHasNulls(b)
Expand Down Expand Up @@ -488,8 +497,7 @@ func (s *opTestInput) Init() {
func (s *opTestInput) Next(context.Context) coldata.Batch {
s.batch.ResetInternalBatch()
if len(s.tuples) == 0 {
s.batch.SetLength(0)
return s.batch
return coldata.ZeroBatch
}
batchSize := s.batchSize
if len(s.tuples) < int(batchSize) {
Expand Down Expand Up @@ -700,8 +708,7 @@ func (s *opFixedSelTestInput) Next(context.Context) coldata.Batch {
}
} else {
if s.idx == uint16(len(s.sel)) {
s.batch.SetLength(0)
return s.batch
return coldata.ZeroBatch
}
batchSize = s.batchSize
if uint16(len(s.sel))-s.idx < batchSize {
Expand Down Expand Up @@ -1125,7 +1132,7 @@ func (c *chunkingBatchSource) Init() {

func (c *chunkingBatchSource) Next(context.Context) coldata.Batch {
if c.curIdx >= c.len {
c.batch.SetLength(0)
return coldata.ZeroBatch
}
lastIdx := c.curIdx + uint64(coldata.BatchSize())
if lastIdx > c.len {
Expand Down
Loading