Skip to content

Commit

Permalink
colexec: make unordered distinct streaming-like
Browse files Browse the repository at this point in the history
Previously, when executing an unordered distinct, we would build the
whole hash table and consume the input source entirely before emitting
any output. This is a suboptimal behavior when the query has a limit -
we're likely to reach the limit long time before consuming the whole
input source.

This commit makes the unordered distinct more streaming-like - it builds
the hash table one batch at a time, and whenever some distinct tuples
are appended to the hash table, all of them are emitted in the output.

Release note: None
  • Loading branch information
yuzefovich committed Dec 4, 2020
1 parent 5f77dce commit b3a567c
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 101 deletions.
2 changes: 1 addition & 1 deletion pkg/sql/colexec/hashjoiner.go
Expand Up @@ -314,7 +314,7 @@ func (hj *hashJoiner) Next(ctx context.Context) coldata.Batch {
}

func (hj *hashJoiner) build(ctx context.Context) {
hj.ht.build(ctx, hj.inputTwo)
hj.ht.fullBuild(ctx, hj.inputTwo)

// We might have duplicates in the hash table, so we need to set up
// same and visited slices for the prober.
Expand Down
79 changes: 40 additions & 39 deletions pkg/sql/colexec/hashtable.go
Expand Up @@ -325,47 +325,48 @@ func (ht *hashTable) buildFromBufferedTuples(ctx context.Context) {
ht.unlimitedSlicesNumUint64AccountedFor = newUint64Count
}

// build executes the entirety of the hash table build phase using the input
// as the build source. The input is entirely consumed in the process.
func (ht *hashTable) build(ctx context.Context, input colexecbase.Operator) {
switch ht.buildMode {
case hashTableFullBuildMode:
// We're using the hash table with the full build mode in which we will
// fully buffer all tuples from the input first and only then we'll
// build the hash table. Such approach allows us to compute the desired
// number of hash buckets for the target load factor (this is done in
// buildFromBufferedTuples()).
for {
batch := input.Next(ctx)
if batch.Length() == 0 {
break
}
ht.allocator.PerformOperation(ht.vals.ColVecs(), func() {
ht.vals.append(batch, 0 /* startIdx */, batch.Length())
})
}
ht.buildFromBufferedTuples(ctx)

case hashTableDistinctBuildMode:
for {
batch := input.Next(ctx)
if batch.Length() == 0 {
break
}
ht.computeHashAndBuildChains(ctx, batch)
ht.removeDuplicates(batch, ht.keys, ht.probeScratch.first, ht.probeScratch.next, ht.checkProbeForDistinct)
// We only check duplicates when there is at least one buffered
// tuple.
if ht.vals.Length() > 0 {
ht.removeDuplicates(batch, ht.keys, ht.buildScratch.first, ht.buildScratch.next, ht.checkBuildForDistinct)
}
if batch.Length() > 0 {
ht.appendAllDistinct(ctx, batch)
}
// fullBuild executes the entirety of the hash table build phase using the input
// as the build source. The input is entirely consumed in the process. Note that
// the hash table is assumed to operate in hashTableFullBuildMode.
func (ht *hashTable) fullBuild(ctx context.Context, input colexecbase.Operator) {
if ht.buildMode != hashTableFullBuildMode {
colexecerror.InternalError(errors.AssertionFailedf(
"hashTable.fullBuild is called in unexpected build mode %d", ht.buildMode,
))
}
// We're using the hash table with the full build mode in which we will
// fully buffer all tuples from the input first and only then we'll build
// the hash table. Such approach allows us to compute the desired number of
// hash buckets for the target load factor (this is done in
// buildFromBufferedTuples()).
for {
batch := input.Next(ctx)
if batch.Length() == 0 {
break
}
ht.allocator.PerformOperation(ht.vals.ColVecs(), func() {
ht.vals.append(batch, 0 /* startIdx */, batch.Length())
})
}
ht.buildFromBufferedTuples(ctx)
}

default:
colexecerror.InternalError(errors.AssertionFailedf("hashTable in unhandled state"))
// distinctBuild appends all distinct tuples from batch to the hash table. Note
// that the hash table is assumed to operate in hashTableDistinctBuildMode.
func (ht *hashTable) distinctBuild(ctx context.Context, batch coldata.Batch) {
if ht.buildMode != hashTableDistinctBuildMode {
colexecerror.InternalError(errors.AssertionFailedf(
"hashTable.fullBuild is called in unexpected build mode %d", ht.buildMode,
))
}
ht.computeHashAndBuildChains(ctx, batch)
ht.removeDuplicates(batch, ht.keys, ht.probeScratch.first, ht.probeScratch.next, ht.checkProbeForDistinct)
// We only check duplicates when there is at least one buffered tuple.
if ht.vals.Length() > 0 {
ht.removeDuplicates(batch, ht.keys, ht.buildScratch.first, ht.buildScratch.next, ht.checkBuildForDistinct)
}
if batch.Length() > 0 {
ht.appendAllDistinct(ctx, batch)
}
}

Expand Down
102 changes: 41 additions & 61 deletions pkg/sql/colexec/unordered_distinct.go
Expand Up @@ -51,25 +51,21 @@ func NewUnorderedDistinct(
}
}

// unorderedDistinct performs a DISTINCT operation using a hashTable. Once the
// building of the hashTable is completed, this operator iterates over all of
// the tuples to check whether the tuple is the "head" of a linked list that
// contain all of the tuples that are equal on distinct columns. Only the
// "head" is included into the big selection vector. Once the big selection
// vector is populated, the operator proceeds to returning the batches
// according to a chunk of the selection vector.
// unorderedDistinct performs a DISTINCT operation using a hashTable. It
// populates the hash table in an iterative fashion by appending only the
// distinct tuples from each input batch. Once at least one tuple is appended,
// all of the distinct tuples from the batch are emitted in the output.
type unorderedDistinct struct {
OneInputNode

allocator *colmem.Allocator
ht *hashTable
typs []*types.T
buildFinished bool
allocator *colmem.Allocator
ht *hashTable
typs []*types.T

distinctCount int

output coldata.Batch
outputBatchStart int
output coldata.Batch
// htIdx indicates the number of tuples from ht we have already emitted in
// the output.
htIdx int
}

var _ colexecbase.Operator = &unorderedDistinct{}
Expand All @@ -79,59 +75,43 @@ func (op *unorderedDistinct) Init() {
}

func (op *unorderedDistinct) Next(ctx context.Context) coldata.Batch {
// First, build the hash table and populate the selection vector that
// includes only distinct tuples.
if !op.buildFinished {
op.buildFinished = true
op.ht.build(ctx, op.input)

// We're using the hashTable in distinct mode, so it buffers only distinct
// tuples, as a result, we will be simply returning all buffered tuples.
op.distinctCount = op.ht.vals.Length()
}
if op.outputBatchStart == op.distinctCount {
return coldata.ZeroBatch
}
op.output, _ = op.allocator.ResetMaybeReallocate(op.typs, op.output, op.distinctCount-op.outputBatchStart)

// Create and return the next batch of input to a maximum size equal to the
// capacity of the output batch.
nSelected := 0
batchEnd := op.outputBatchStart + op.output.Capacity()
if batchEnd > op.distinctCount {
batchEnd = op.distinctCount
}
nSelected = batchEnd - op.outputBatchStart

op.allocator.PerformOperation(op.output.ColVecs(), func() {
for colIdx, fromCol := range op.ht.vals.ColVecs() {
toCol := op.output.ColVec(colIdx)
toCol.Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
Src: fromCol,
SrcStartIdx: op.outputBatchStart,
SrcEndIdx: batchEnd,
},
},
)
for {
batch := op.input.Next(ctx)
if batch.Length() == 0 {
return coldata.ZeroBatch
}
})

op.outputBatchStart = batchEnd
op.output.SetLength(nSelected)
return op.output
op.ht.distinctBuild(ctx, batch)
if op.ht.vals.Length() > op.htIdx {
// We've just appended some distinct tuples to the hash table, so we
// will emit all of them as the output.
outputLength := op.ht.vals.Length() - op.htIdx
op.output, _ = op.allocator.ResetMaybeReallocate(op.typs, op.output, outputLength)
op.allocator.PerformOperation(op.output.ColVecs(), func() {
for colIdx, fromCol := range op.ht.vals.ColVecs() {
toCol := op.output.ColVec(colIdx)
toCol.Copy(
coldata.CopySliceArgs{
SliceArgs: coldata.SliceArgs{
Src: fromCol,
SrcStartIdx: op.htIdx,
SrcEndIdx: op.htIdx + outputLength,
},
},
)
}
op.output.SetLength(outputLength)
})
op.htIdx += outputLength
return op.output
}
}
}

// reset resets the unorderedDistinct.
func (op *unorderedDistinct) reset(ctx context.Context) {
if r, ok := op.input.(resetter); ok {
r.reset(ctx)
}
op.ht.vals.ResetInternalBatch()
op.ht.vals.SetLength(0)
op.buildFinished = false
op.ht.reset(ctx)
op.distinctCount = 0
op.outputBatchStart = 0
op.htIdx = 0
}

0 comments on commit b3a567c

Please sign in to comment.