From c255cb019adc187155af35ddb896221458e7adf0 Mon Sep 17 00:00:00 2001 From: Rebecca Taft Date: Mon, 26 Nov 2018 15:42:37 -0500 Subject: [PATCH] stats,distsqlrun: reduce memory usage of CREATE STATISTICS Prior to this patch, running CREATE STATISTICS on a large table could cause the memory utilization of the database to grow so large that it crashed the server. This was due to the way garbage collection works in go. As described in this blog post: https://blog.golang.org/go-slices-usage-and-internals, > Re-slicing a slice doesn't make a copy of the underlying array. The > full array will be kept in memory until it is no longer referenced. > Occasionally this can cause the program to hold all the data in memory > when only a small piece of it is needed. This is exactly what was happening with CREATE STATISTICS. The CREATE STATISTICS command performs a full table scan, and randomly samples rows in order to build a histogram for a column. When rows are scanned in the kv layer, they are scanned in batches of ~10,000, creating arrays of encoded datums of length 10,000. Since the EncDatum object passed to the sampler contains a slice referencing that underlying array, the entire scanned batch would remain in memory as long as a single datum from that slice was included in the sample. With random sampling, it is likely that most of the batches have at least one sampled row, meaning almost the entire table will be in memory until the histogram is built. The fix is to copy the EncDatum to the sampler with only the decoded Datum, setting the encoded []bytes slice to nil. Release note (bug fix): Fixed an issue where calling CREATE STATISTICS on a large table could cause the server to crash due to running out of memory. --- pkg/sql/distsqlrun/sample_aggregator.go | 6 ++-- pkg/sql/distsqlrun/sampler.go | 8 +++--- pkg/sql/stats/row_sampling.go | 38 +++++++++++++++++++++---- pkg/sql/stats/row_sampling_test.go | 6 ++-- 4 files changed, 44 insertions(+), 14 deletions(-) diff --git a/pkg/sql/distsqlrun/sample_aggregator.go b/pkg/sql/distsqlrun/sample_aggregator.go index 093f3c9e376b..0ca6c1eb24a6 100644 --- a/pkg/sql/distsqlrun/sample_aggregator.go +++ b/pkg/sql/distsqlrun/sample_aggregator.go @@ -99,7 +99,7 @@ func newSampleAggregator( } } - s.sr.Init(int(spec.SampleSize)) + s.sr.Init(int(spec.SampleSize), input.OutputTypes()[:rankCol]) if err := s.Init( nil, post, []sqlbase.ColumnType{}, flowCtx, processorID, output, nil, /* memMonitor */ @@ -161,7 +161,9 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, _ erro return false, errors.Wrapf(err, "decoding rank column") } // Retain the rows with the top ranks. - s.sr.SampleRow(row[:s.rankCol], uint64(rank)) + if err := s.sr.SampleRow(row[:s.rankCol], uint64(rank)); err != nil { + return false, err + } continue } // This is a sketch row. diff --git a/pkg/sql/distsqlrun/sampler.go b/pkg/sql/distsqlrun/sampler.go index 0f218502fed3..e00b94011bc2 100644 --- a/pkg/sql/distsqlrun/sampler.go +++ b/pkg/sql/distsqlrun/sampler.go @@ -96,7 +96,7 @@ func newSamplerProcessor( } } - s.sr.Init(int(spec.SampleSize)) + s.sr.Init(int(spec.SampleSize), input.OutputTypes()) inTypes := input.OutputTypes() outTypes := make([]sqlbase.ColumnType, 0, len(inTypes)+5) @@ -163,7 +163,6 @@ func (s *samplerProcessor) Run(ctx context.Context, wg *sync.WaitGroup) { func (s *samplerProcessor) mainLoop(ctx context.Context) (earlyExit bool, _ error) { rng, _ := randutil.NewPseudoRand() var da sqlbase.DatumAlloc - var ra sqlbase.EncDatumRowAlloc var buf []byte for { row, meta := s.input.Next() @@ -200,8 +199,9 @@ func (s *samplerProcessor) mainLoop(ctx context.Context) (earlyExit bool, _ erro // Use Int63 so we don't have headaches converting to DInt. rank := uint64(rng.Int63()) - row = ra.CopyRow(row) - s.sr.SampleRow(row, rank) + if err := s.sr.SampleRow(row, rank); err != nil { + return false, err + } } outRow := make(sqlbase.EncDatumRow, len(s.outTypes)) diff --git a/pkg/sql/stats/row_sampling.go b/pkg/sql/stats/row_sampling.go index 245c7772fd13..f6e4b5df6809 100644 --- a/pkg/sql/stats/row_sampling.go +++ b/pkg/sql/stats/row_sampling.go @@ -42,14 +42,18 @@ type SampledRow struct { // requirement is that the capacity of each distributed reservoir must have been // at least as large as this reservoir. type SampleReservoir struct { - samples []SampledRow + samples []SampledRow + colTypes []sqlbase.ColumnType + da sqlbase.DatumAlloc + ra sqlbase.EncDatumRowAlloc } var _ heap.Interface = &SampleReservoir{} // Init initializes a SampleReservoir. -func (sr *SampleReservoir) Init(numSamples int) { +func (sr *SampleReservoir) Init(numSamples int, colTypes []sqlbase.ColumnType) { sr.samples = make([]SampledRow, 0, numSamples) + sr.colTypes = colTypes } // Len is part of heap.Interface. @@ -75,24 +79,46 @@ func (sr *SampleReservoir) Push(x interface{}) { panic("unimplemented") } func (sr *SampleReservoir) Pop() interface{} { panic("unimplemented") } // SampleRow looks at a row and either drops it or adds it to the reservoir. -func (sr *SampleReservoir) SampleRow(row sqlbase.EncDatumRow, rank uint64) { +func (sr *SampleReservoir) SampleRow(row sqlbase.EncDatumRow, rank uint64) error { if len(sr.samples) < cap(sr.samples) { // We haven't accumulated enough rows yet, just append. - sr.samples = append(sr.samples, SampledRow{Row: row, Rank: rank}) + rowCopy := sr.ra.AllocRow(len(row)) + if err := sr.copyRow(rowCopy, row); err != nil { + return err + } + sr.samples = append(sr.samples, SampledRow{Row: rowCopy, Rank: rank}) if len(sr.samples) == cap(sr.samples) { // We just reached the limit; initialize the heap. heap.Init(sr) } - return + return nil } // Replace the max rank if ours is smaller. if rank < sr.samples[0].Rank { - sr.samples[0] = SampledRow{Row: row, Rank: rank} + if err := sr.copyRow(sr.samples[0].Row, row); err != nil { + return err + } + sr.samples[0].Rank = rank heap.Fix(sr, 0) } + return nil } // Get returns the sampled rows. func (sr *SampleReservoir) Get() []SampledRow { return sr.samples } + +func (sr *SampleReservoir) copyRow(dst, src sqlbase.EncDatumRow) error { + for i := range src { + // Copy only the decoded datum to ensure that we remove any reference to + // the encoded bytes. The encoded bytes would have been scanned in a batch + // of ~10000 rows, so we must delete the reference to allow the garbage + // collector to release the memory from the batch. + if err := src[i].EnsureDecoded(&sr.colTypes[i], &sr.da); err != nil { + return err + } + dst[i] = sqlbase.DatumToEncDatum(sr.colTypes[i], src[i].Datum) + } + return nil +} diff --git a/pkg/sql/stats/row_sampling_test.go b/pkg/sql/stats/row_sampling_test.go index 9e2ff8b21966..cb0bea733db5 100644 --- a/pkg/sql/stats/row_sampling_test.go +++ b/pkg/sql/stats/row_sampling_test.go @@ -30,10 +30,12 @@ import ( func runSampleTest(t *testing.T, numSamples int, ranks []int) { typeInt := sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_INT} var sr SampleReservoir - sr.Init(numSamples) + sr.Init(numSamples, []sqlbase.ColumnType{typeInt}) for _, r := range ranks { d := sqlbase.DatumToEncDatum(typeInt, tree.NewDInt(tree.DInt(r))) - sr.SampleRow(sqlbase.EncDatumRow{d}, uint64(r)) + if err := sr.SampleRow(sqlbase.EncDatumRow{d}, uint64(r)); err != nil { + t.Errorf("%v", err) + } } samples := sr.Get() sampledRanks := make([]int, len(samples))