Skip to content

Commit

Permalink
Merge pull request #67059 from michae2/backport20.2-65491
Browse files Browse the repository at this point in the history
release-20.2: sql: shrink SampleReservoir capacity on memory exhaustion
  • Loading branch information
michae2 committed Jun 30, 2021
2 parents 3c397d7 + 8f7247c commit 0c0e0d0
Show file tree
Hide file tree
Showing 10 changed files with 702 additions and 329 deletions.
8 changes: 4 additions & 4 deletions pkg/sql/distsql_plan_stats.go
Expand Up @@ -34,7 +34,7 @@ import (
type requestedStat struct {
columns []descpb.ColumnID
histogram bool
histogramMaxBuckets int
histogramMaxBuckets uint32
name string
inverted bool
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (dsp *DistSQLPlanner) createStatsPlan(
spec := execinfrapb.SketchSpec{
SketchType: execinfrapb.SketchType_HLL_PLUS_PLUS_V1,
GenerateHistogram: s.histogram,
HistogramMaxBuckets: uint32(s.histogramMaxBuckets),
HistogramMaxBuckets: s.histogramMaxBuckets,
Columns: make([]uint32, len(s.columns)),
StatName: s.name,
}
Expand Down Expand Up @@ -238,9 +238,9 @@ func (dsp *DistSQLPlanner) createPlanForCreateStats(
histogramCollectionEnabled := stats.HistogramClusterMode.Get(&dsp.st.SV)
for i := 0; i < len(reqStats); i++ {
histogram := details.ColumnStats[i].HasHistogram && histogramCollectionEnabled
histogramMaxBuckets := defaultHistogramBuckets
var histogramMaxBuckets uint32 = defaultHistogramBuckets
if details.ColumnStats[i].HistogramMaxBuckets > 0 {
histogramMaxBuckets = int(details.ColumnStats[i].HistogramMaxBuckets)
histogramMaxBuckets = details.ColumnStats[i].HistogramMaxBuckets
}
reqStats[i] = requestedStat{
columns: details.ColumnStats[i].ColumnIDs,
Expand Down
17 changes: 9 additions & 8 deletions pkg/sql/execinfrapb/processors_table_stats.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pkg/sql/execinfrapb/processors_table_stats.proto
Expand Up @@ -90,9 +90,10 @@ message SketchSpec {
// - a BYTE column of the inverted index key.
//
// There are four row types produced:
// 1. sample rows, using column group #1.
// 1. sample rows, using column group #1 and the numRows column from #2.
// 2. sketch rows, using column group #2.
// 3. inverted sample rows, using column group #3 and the rank column from #1.
// 3. inverted sample rows, using column group #3, the rank column from #1,
// and numRows column from #2.
// 4. inverted sketch rows, using column group #2 and first column from #3.
//
// Rows have NULLs on either all the sampled row columns or on all the
Expand Down
9 changes: 7 additions & 2 deletions pkg/sql/rowexec/processors.go
Expand Up @@ -226,13 +226,18 @@ func NewProcessor(
if err := checkNumInOut(inputs, outputs, 1, 1); err != nil {
return nil, err
}
return newSamplerProcessor(flowCtx, processorID, core.Sampler, inputs[0], post, outputs[0])
return newSamplerProcessor(
flowCtx, processorID, core.Sampler, defaultMinSampleSize, inputs[0], post, outputs[0],
)
}
if core.SampleAggregator != nil {
if err := checkNumInOut(inputs, outputs, 1, 1); err != nil {
return nil, err
}
return newSampleAggregator(flowCtx, processorID, core.SampleAggregator, inputs[0], post, outputs[0])
return newSampleAggregator(
flowCtx, processorID, core.SampleAggregator, defaultMinSampleSize, inputs[0], post,
outputs[0],
)
}
if core.ReadImport != nil {
if err := checkNumInOut(inputs, outputs, 0, 1); err != nil {
Expand Down
78 changes: 45 additions & 33 deletions pkg/sql/rowexec/sample_aggregator.go
Expand Up @@ -86,6 +86,7 @@ func newSampleAggregator(
flowCtx *execinfra.FlowCtx,
processorID int32,
spec *execinfrapb.SampleAggregatorSpec,
minSampleSize int,
input execinfra.RowSource,
post *execinfrapb.PostProcessSpec,
output execinfra.RowReceiver,
Expand Down Expand Up @@ -144,14 +145,16 @@ func newSampleAggregator(
}
}

s.sr.Init(int(spec.SampleSize), input.OutputTypes()[:rankCol], &s.memAcc, sampleCols)
s.sr.Init(
int(spec.SampleSize), minSampleSize, input.OutputTypes()[:rankCol], &s.memAcc, sampleCols,
)
for i := range spec.InvertedSketches {
var sr stats.SampleReservoir
// The datums are converted to their inverted index bytes and
// sent as a single DBytes column.
var srCols util.FastIntSet
srCols.Add(0)
sr.Init(int(spec.SampleSize), bytesRowType, &s.memAcc, srCols)
sr.Init(int(spec.SampleSize), minSampleSize, bytesRowType, &s.memAcc, srCols)
col := spec.InvertedSketches[i].Columns[0]
s.invSr[col] = &sr
s.invSketch[col] = &sketchInfo{
Expand Down Expand Up @@ -284,7 +287,9 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, err er
colIdx := uint32(invColIdx)
if rank, err := row[s.rankCol].GetInt(); err == nil {
// Inverted sample row.
// Retain the rows with the top ranks.
// Shrink capacity to match the child samplerProcessor and then retain
// the row if it had one of the top (smallest) ranks.
s.maybeDecreaseSamples(ctx, s.invSr[colIdx], row)
sampleRow := row[s.invIdxKeyCol : s.invIdxKeyCol+1]
if err := s.sampleRow(ctx, s.invSr[colIdx], sampleRow, uint64(rank)); err != nil {
return false, err
Expand All @@ -303,7 +308,9 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, err er
}
if rank, err := row[s.rankCol].GetInt(); err == nil {
// Sample row.
// Retain the rows with the top ranks.
// Shrink capacity to match the child samplerProcessor and then retain the
// row if it had one of the top (smallest) ranks.
s.maybeDecreaseSamples(ctx, &s.sr, row)
if err := s.sampleRow(ctx, &s.sr, row[:s.rankCol], uint64(rank)); err != nil {
return false, err
}
Expand Down Expand Up @@ -363,9 +370,27 @@ func (s *sampleAggregator) processSketchRow(
return nil
}

// maybeDecreaseSamples shrinks the capacity of the aggregate reservoir to be <=
// the capacity of the child reservoir. This is done to prevent biasing the
// sampling in favor of child sampleProcessors with larger reservoirs.
func (s *sampleAggregator) maybeDecreaseSamples(
ctx context.Context, sr *stats.SampleReservoir, row rowenc.EncDatumRow,
) {
if capacity, err := row[s.numRowsCol].GetInt(); err == nil {
prevCapacity := sr.Cap()
if sr.MaybeResize(ctx, int(capacity)) {
log.Infof(
ctx, "histogram samples reduced from %d to %d to match sampler processor",
prevCapacity, sr.Cap(),
)
}
}
}

func (s *sampleAggregator) sampleRow(
ctx context.Context, sr *stats.SampleReservoir, sampleRow rowenc.EncDatumRow, rank uint64,
) error {
prevCapacity := sr.Cap()
if err := sr.SampleRow(ctx, s.EvalCtx, sampleRow, rank); err != nil {
if code := pgerror.GetPGCode(err); code != pgcode.OutOfMemory {
return err
Expand All @@ -375,6 +400,11 @@ func (s *sampleAggregator) sampleRow(
sr.Disable()
log.Info(ctx, "disabling histogram collection due to excessive memory utilization")
telemetry.Inc(sqltelemetry.StatsHistogramOOMCounter)
} else if sr.Cap() != prevCapacity {
log.Infof(
ctx, "histogram samples reduced from %d to %d due to excessive memory utilization",
prevCapacity, sr.Cap(),
)
}
return nil
}
Expand Down Expand Up @@ -404,7 +434,7 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error {
h, err := s.generateHistogram(
ctx,
s.EvalCtx,
s.sr.Get(),
&s.sr,
colIdx,
typ,
si.numRows-si.numNulls,
Expand Down Expand Up @@ -434,7 +464,7 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error {
h, err := s.generateHistogram(
ctx,
s.EvalCtx,
invSr.Get(),
invSr,
0, /* colIdx */
types.Bytes,
invSketch.numRows-invSketch.numNulls,
Expand Down Expand Up @@ -502,41 +532,23 @@ func (s *sampleAggregator) writeResults(ctx context.Context) error {
func (s *sampleAggregator) generateHistogram(
ctx context.Context,
evalCtx *tree.EvalContext,
samples []stats.SampledRow,
sr *stats.SampleReservoir,
colIdx int,
colType *types.T,
numRows int64,
distinctCount int64,
maxBuckets int,
) (stats.HistogramData, error) {
// Account for the memory we'll use copying the samples into values.
if err := s.tempMemAcc.Grow(ctx, sizeOfDatum*int64(len(samples))); err != nil {
prevCapacity := sr.Cap()
values, err := sr.GetNonNullDatums(ctx, &s.tempMemAcc, colIdx)
if err != nil {
return stats.HistogramData{}, err
}
values := make(tree.Datums, 0, len(samples))

var da rowenc.DatumAlloc
for _, sample := range samples {
ed := &sample.Row[colIdx]
// Ignore NULLs (they are counted separately).
if !ed.IsNull() {
beforeSize := ed.Datum.Size()
if err := ed.EnsureDecoded(colType, &da); err != nil {
return stats.HistogramData{}, err
}
afterSize := ed.Datum.Size()

// Perform memory accounting. This memory is not added to the temporary
// account since it won't be released until the sampleAggregator is
// destroyed.
if afterSize > beforeSize {
if err := s.memAcc.Grow(ctx, int64(afterSize-beforeSize)); err != nil {
return stats.HistogramData{}, err
}
}

values = append(values, ed.Datum)
}
if sr.Cap() != prevCapacity {
log.Infof(
ctx, "histogram samples reduced from %d to %d due to excessive memory utilization",
prevCapacity, sr.Cap(),
)
}
return stats.EquiDepthHistogram(evalCtx, colType, values, numRows, distinctCount, maxBuckets)
}
Expand Down

0 comments on commit 0c0e0d0

Please sign in to comment.