Skip to content

Commit

Permalink
Merge pull request #60571 from yuzefovich/backport20.2-59851
Browse files Browse the repository at this point in the history
release-20.2: colmem: limit batches of dynamic size by workmem in memory footprint
  • Loading branch information
yuzefovich committed Feb 23, 2021
2 parents 6d941d6 + a987c26 commit d17b05a
Show file tree
Hide file tree
Showing 31 changed files with 301 additions and 91 deletions.
6 changes: 5 additions & 1 deletion pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,13 @@ func NewColOperator(
return r, err
}

memoryLimit := execinfra.GetWorkMemLimit(flowCtx.Cfg)
if flowCtx.Cfg.TestingKnobs.ForceDiskSpill {
memoryLimit = 1
}
inMemoryHashJoiner := colexec.NewHashJoiner(
colmem.NewAllocator(ctx, hashJoinerMemAccount, factory),
hashJoinerUnlimitedAllocator, hjSpec, inputs[0], inputs[1],
hashJoinerUnlimitedAllocator, hjSpec, inputs[0], inputs[1], memoryLimit,
)
if args.TestingKnobs.DiskSpillingDisabled {
// We will not be creating a disk-backed hash joiner because we're
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/colexec/columnarizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type Columnarizer struct {

buffered rowenc.EncDatumRows
batch coldata.Batch
maxBatchMemSize int64
accumulatedMeta []execinfrapb.ProducerMetadata
ctx context.Context
typs []*types.T
Expand All @@ -55,9 +56,10 @@ func NewColumnarizer(
) (*Columnarizer, error) {
var err error
c := &Columnarizer{
allocator: allocator,
input: input,
ctx: ctx,
allocator: allocator,
input: input,
maxBatchMemSize: execinfra.GetWorkMemLimit(flowCtx.Cfg),
ctx: ctx,
}
if err = c.ProcessorBase.Init(
nil,
Expand Down Expand Up @@ -90,7 +92,9 @@ func (c *Columnarizer) Init() {
// Next is part of the Operator interface.
func (c *Columnarizer) Next(context.Context) coldata.Batch {
var reallocated bool
c.batch, reallocated = c.allocator.ResetMaybeReallocate(c.typs, c.batch, 1 /* minCapacity */)
c.batch, reallocated = c.allocator.ResetMaybeReallocate(
c.typs, c.batch, 1 /* minCapacity */, c.maxBatchMemSize,
)
if reallocated {
oldRows := c.buffered
c.buffered = make(rowenc.EncDatumRows, c.batch.Capacity())
Expand Down
5 changes: 4 additions & 1 deletion pkg/sql/colexec/columnarizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ func TestColumnarizerDrainsAndClosesInput(t *testing.T) {
defer evalCtx.Stop(ctx)

rb := distsqlutils.NewRowBuffer([]*types.T{types.Int}, nil /* rows */, distsqlutils.RowBufferArgs{})
flowCtx := &execinfra.FlowCtx{EvalCtx: &evalCtx}
flowCtx := &execinfra.FlowCtx{
Cfg: &execinfra.ServerConfig{Settings: st},
EvalCtx: &evalCtx,
}

const errMsg = "artificial error"
rb.Push(nil, &execinfrapb.ProducerMetadata{Err: errors.New(errMsg)})
Expand Down
13 changes: 11 additions & 2 deletions pkg/sql/colexec/deselector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package colexec

import (
"context"
"math"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase"
Expand Down Expand Up @@ -51,18 +52,26 @@ func (p *deselectorOp) Init() {
}

func (p *deselectorOp) Next(ctx context.Context) coldata.Batch {
// deselectorOp should *not* limit the capacities of the returned batches,
// so we don't use a memory limit here. It is up to the wrapped operator to
// limit the size of batches based on the memory footprint.
const maxBatchMemSize = math.MaxInt64
// TODO(yuzefovich): this allocation is only needed in order to appease the
// tests of the external sorter with forced disk spilling (if we don't do
// this, an OOM error occurs during ResetMaybeReallocate call below at
// which point we have already received a batch from the input and it'll
// get lost because deselectorOp doesn't support fall-over to the
// disk-backed infrastructure).
p.output, _ = p.allocator.ResetMaybeReallocate(p.inputTypes, p.output, 1 /* minCapacity */)
p.output, _ = p.allocator.ResetMaybeReallocate(
p.inputTypes, p.output, 1 /* minCapacity */, maxBatchMemSize,
)
batch := p.input.Next(ctx)
if batch.Selection() == nil {
return batch
}
p.output, _ = p.allocator.ResetMaybeReallocate(p.inputTypes, p.output, batch.Length())
p.output, _ = p.allocator.ResetMaybeReallocate(
p.inputTypes, p.output, batch.Length(), maxBatchMemSize,
)
sel := batch.Selection()
p.allocator.PerformOperation(p.output.ColVecs(), func() {
for i := range p.inputTypes {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/external_hash_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func NewExternalHashJoiner(
// limit, so we use the same unlimited allocator for both
// buildSideAllocator and outputUnlimitedAllocator arguments.
inMemHashJoiner: NewHashJoiner(
unlimitedAllocator, unlimitedAllocator, spec, leftJoinerInput, rightJoinerInput,
unlimitedAllocator, unlimitedAllocator, spec, leftJoinerInput, rightJoinerInput, memoryLimit,
).(*hashJoiner),
diskBackedSortMerge: diskBackedSortMerge,
}
Expand Down
36 changes: 23 additions & 13 deletions pkg/sql/colexec/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ type externalSorter struct {
closerHelper

unlimitedAllocator *colmem.Allocator
totalMemoryLimit int64
state externalSorterState
inputTypes []*types.T
ordering execinfrapb.Ordering
Expand Down Expand Up @@ -190,19 +191,18 @@ func NewExternalSorter(
if maxNumberPartitions < externalSorterMinPartitions {
maxNumberPartitions = externalSorterMinPartitions
}
estimatedOutputBatchMemSize := colmem.EstimateBatchSizeBytes(inputTypes, coldata.BatchSize())
// Each disk queue will use up to BufferSizeBytes of RAM, so we reduce the
// memoryLimit of the partitions to sort in memory by those cache sizes. To be
// safe, we also estimate the size of the output batch and subtract that as
// well.
batchMemSize := colmem.EstimateBatchSizeBytes(inputTypes, coldata.BatchSize())
// Reserve a certain amount of memory for the partition caches.
memoryLimit -= int64((maxNumberPartitions * diskQueueCfg.BufferSizeBytes) + batchMemSize)
if memoryLimit < 1 {
// If the memory limit is 0, the input partitioning operator will return a
// zero-length batch, so make it at least 1.
memoryLimit = 1
// memoryLimit of the partitions to sort in memory by those cache sizes. To
// be safe, we also estimate the size of the output batch and subtract that
// as well.
singlePartitionSize := memoryLimit - int64(maxNumberPartitions*diskQueueCfg.BufferSizeBytes+estimatedOutputBatchMemSize)
if singlePartitionSize < 1 {
// If the memory limit is 0, the input partitioning operator will return
// a zero-length batch, so make it at least 1.
singlePartitionSize = 1
}
inputPartitioner := newInputPartitioningOperator(input, standaloneMemAccount, memoryLimit)
inputPartitioner := newInputPartitioningOperator(input, standaloneMemAccount, singlePartitionSize)
inMemSorter, err := newSorter(
unlimitedAllocator, newAllSpooler(unlimitedAllocator, inputPartitioner, inputTypes),
inputTypes, ordering.Columns,
Expand All @@ -220,13 +220,19 @@ func NewExternalSorter(
es := &externalSorter{
OneInputNode: NewOneInputNode(inMemSorter),
unlimitedAllocator: unlimitedAllocator,
totalMemoryLimit: memoryLimit,
inMemSorter: inMemSorter,
inMemSorterInput: inputPartitioner.(*inputPartitioningOperator),
partitionerCreator: func() colcontainer.PartitionedQueue {
return colcontainer.NewPartitionedDiskQueue(inputTypes, diskQueueCfg, partitionedDiskQueueSemaphore, colcontainer.PartitionerStrategyCloseOnNewPartition, diskAcc)
},
inputTypes: inputTypes,
ordering: ordering,
inputTypes: inputTypes,
ordering: ordering,
// TODO(yuzefovich): maxNumberPartitions should also be semi-dynamically
// limited based on the sizes of batches. Consider a scenario when each
// input batch is 1 GB in size - if we don't put any limiting in place,
// we might try to use 16 partitions at once, which means that during
// merging we will be keeping 16 batches (i.e. 16GB of data) in memory.
maxNumberPartitions: maxNumberPartitions,
}
es.fdState.fdSemaphore = fdSemaphore
Expand Down Expand Up @@ -406,8 +412,12 @@ func (s *externalSorter) createMergerForPartitions(
s.unlimitedAllocator, s.inputTypes, s.partitioner, firstIdx+i,
)
}
// TODO(yuzefovich): we should calculate a more precise memory limit taking
// into account how many partitions are currently being merged and the
// average batch size in each one of them.
return NewOrderedSynchronizer(
s.unlimitedAllocator,
s.totalMemoryLimit,
syncInputs,
s.inputTypes,
execinfrapb.ConvertToColumnOrdering(s.ordering),
Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/colexec/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ type hashJoiner struct {
// ht holds the hashTable that is populated during the build phase and used
// during the probe phase.
ht *hashTable
// memoryLimit is the total amount of RAM available for the hash joiner.
// This limits the output batches (and is also the same limit for the size
// of the hash table).
memoryLimit int64
// output stores the resulting output batch that is constructed and returned
// for every input batch during the probe phase.
output coldata.Batch
Expand Down Expand Up @@ -615,7 +619,9 @@ func (hj *hashJoiner) resetOutput(nResults int) {
// batch at a time. If we were to use a limited allocator, we could hit the
// limit here, and it would have been very hard to fall back to disk backed
// hash joiner because we might have already emitted partial output.
hj.output, _ = hj.outputUnlimitedAllocator.ResetMaybeReallocate(hj.outputTypes, hj.output, minCapacity)
hj.output, _ = hj.outputUnlimitedAllocator.ResetMaybeReallocate(
hj.outputTypes, hj.output, minCapacity, hj.memoryLimit,
)
}

func (hj *hashJoiner) reset(ctx context.Context) {
Expand Down Expand Up @@ -713,10 +719,12 @@ func MakeHashJoinerSpec(
// buildSideAllocator should use a limited memory account and will be used for
// the build side whereas outputUnlimitedAllocator should use an unlimited
// memory account and will only be used when populating the output.
// memoryLimit will limit the size of the batches produced by the hash joiner.
func NewHashJoiner(
buildSideAllocator, outputUnlimitedAllocator *colmem.Allocator,
spec HashJoinerSpec,
leftSource, rightSource colexecbase.Operator,
memoryLimit int64,
) colexecbase.Operator {
outputTypes := append([]*types.T{}, spec.left.sourceTypes...)
if spec.joinType.ShouldIncludeRightColsInOutput() {
Expand All @@ -727,6 +735,7 @@ func NewHashJoiner(
buildSideAllocator: buildSideAllocator,
outputUnlimitedAllocator: outputUnlimitedAllocator,
spec: spec,
memoryLimit: memoryLimit,
outputTypes: outputTypes,
}
hj.probeState.buildIdx = make([]int, coldata.BatchSize())
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/hashjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1077,7 +1077,7 @@ func BenchmarkHashJoiner(b *testing.B) {
require.NoError(b, err)
hj := NewHashJoiner(
testAllocator, testAllocator, hjSpec,
leftSource, rightSource,
leftSource, rightSource, defaultMemoryLimit,
)
hj.Init()

Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/colexec/mergejoiner_exceptall.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/sql/colexec/mergejoiner_fullouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/sql/colexec/mergejoiner_inner.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/sql/colexec/mergejoiner_intersectall.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/sql/colexec/mergejoiner_leftanti.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/sql/colexec/mergejoiner_leftouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/sql/colexec/mergejoiner_leftsemi.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion pkg/sql/colexec/mergejoiner_rightouter.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/sql/colexec/mergejoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1791,7 +1791,7 @@ func TestMergeJoinCrossProduct(t *testing.T) {
right: hashJoinerSourceSpec{
eqCols: []uint32{0}, sourceTypes: typs,
},
}, leftHJSource, rightHJSource)
}, leftHJSource, rightHJSource, defaultMemoryLimit)
hj.Init()

var mjOutputTuples, hjOutputTuples tuples
Expand Down
4 changes: 3 additions & 1 deletion pkg/sql/colexec/mergejoiner_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1594,7 +1594,9 @@ func _SOURCE_FINISHED_SWITCH(_JOIN_TYPE joinTypeInfo) { // */}}
// */}}

func (o *mergeJoin_JOIN_TYPE_STRINGOp) Next(ctx context.Context) coldata.Batch {
o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate(o.outputTypes, o.output, 1 /* minCapacity */)
o.output, _ = o.unlimitedAllocator.ResetMaybeReallocate(
o.outputTypes, o.output, 1 /* minCapacity */, o.memoryLimit,
)
for {
switch o.state {
case mjEntry:
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/colexec/ordered_synchronizer.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d17b05a

Please sign in to comment.