Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stats,distsqlrun: reduce memory usage of CREATE STATISTICS #32614

Merged
merged 1 commit into from Nov 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/sql/distsqlrun/sample_aggregator.go
Expand Up @@ -99,7 +99,7 @@ func newSampleAggregator(
}
}

s.sr.Init(int(spec.SampleSize))
s.sr.Init(int(spec.SampleSize), input.OutputTypes()[:rankCol])

if err := s.Init(
nil, post, []sqlbase.ColumnType{}, flowCtx, processorID, output, nil, /* memMonitor */
Expand Down Expand Up @@ -161,7 +161,9 @@ func (s *sampleAggregator) mainLoop(ctx context.Context) (earlyExit bool, _ erro
return false, errors.Wrapf(err, "decoding rank column")
}
// Retain the rows with the top ranks.
s.sr.SampleRow(row[:s.rankCol], uint64(rank))
if err := s.sr.SampleRow(row[:s.rankCol], uint64(rank)); err != nil {
return false, err
}
continue
}
// This is a sketch row.
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/distsqlrun/sampler.go
Expand Up @@ -96,7 +96,7 @@ func newSamplerProcessor(
}
}

s.sr.Init(int(spec.SampleSize))
s.sr.Init(int(spec.SampleSize), input.OutputTypes())

inTypes := input.OutputTypes()
outTypes := make([]sqlbase.ColumnType, 0, len(inTypes)+5)
Expand Down Expand Up @@ -163,7 +163,6 @@ func (s *samplerProcessor) Run(ctx context.Context, wg *sync.WaitGroup) {
func (s *samplerProcessor) mainLoop(ctx context.Context) (earlyExit bool, _ error) {
rng, _ := randutil.NewPseudoRand()
var da sqlbase.DatumAlloc
var ra sqlbase.EncDatumRowAlloc
var buf []byte
for {
row, meta := s.input.Next()
Expand Down Expand Up @@ -200,8 +199,9 @@ func (s *samplerProcessor) mainLoop(ctx context.Context) (earlyExit bool, _ erro

// Use Int63 so we don't have headaches converting to DInt.
rank := uint64(rng.Int63())
row = ra.CopyRow(row)
s.sr.SampleRow(row, rank)
if err := s.sr.SampleRow(row, rank); err != nil {
return false, err
}
}

outRow := make(sqlbase.EncDatumRow, len(s.outTypes))
Expand Down
38 changes: 32 additions & 6 deletions pkg/sql/stats/row_sampling.go
Expand Up @@ -42,14 +42,18 @@ type SampledRow struct {
// requirement is that the capacity of each distributed reservoir must have been
// at least as large as this reservoir.
type SampleReservoir struct {
samples []SampledRow
samples []SampledRow
colTypes []sqlbase.ColumnType
da sqlbase.DatumAlloc
ra sqlbase.EncDatumRowAlloc
}

var _ heap.Interface = &SampleReservoir{}

// Init initializes a SampleReservoir.
func (sr *SampleReservoir) Init(numSamples int) {
func (sr *SampleReservoir) Init(numSamples int, colTypes []sqlbase.ColumnType) {
sr.samples = make([]SampledRow, 0, numSamples)
sr.colTypes = colTypes
}

// Len is part of heap.Interface.
Expand All @@ -75,24 +79,46 @@ func (sr *SampleReservoir) Push(x interface{}) { panic("unimplemented") }
func (sr *SampleReservoir) Pop() interface{} { panic("unimplemented") }

// SampleRow looks at a row and either drops it or adds it to the reservoir.
func (sr *SampleReservoir) SampleRow(row sqlbase.EncDatumRow, rank uint64) {
func (sr *SampleReservoir) SampleRow(row sqlbase.EncDatumRow, rank uint64) error {
if len(sr.samples) < cap(sr.samples) {
// We haven't accumulated enough rows yet, just append.
sr.samples = append(sr.samples, SampledRow{Row: row, Rank: rank})
rowCopy := sr.ra.AllocRow(len(row))
if err := sr.copyRow(rowCopy, row); err != nil {
return err
}
sr.samples = append(sr.samples, SampledRow{Row: rowCopy, Rank: rank})
if len(sr.samples) == cap(sr.samples) {
// We just reached the limit; initialize the heap.
heap.Init(sr)
}
return
return nil
}
// Replace the max rank if ours is smaller.
if rank < sr.samples[0].Rank {
sr.samples[0] = SampledRow{Row: row, Rank: rank}
if err := sr.copyRow(sr.samples[0].Row, row); err != nil {
return err
}
sr.samples[0].Rank = rank
heap.Fix(sr, 0)
}
return nil
}

// Get returns the sampled rows.
func (sr *SampleReservoir) Get() []SampledRow {
return sr.samples
}

func (sr *SampleReservoir) copyRow(dst, src sqlbase.EncDatumRow) error {
for i := range src {
// Copy only the decoded datum to ensure that we remove any reference to
// the encoded bytes. The encoded bytes would have been scanned in a batch
// of ~10000 rows, so we must delete the reference to allow the garbage
// collector to release the memory from the batch.
if err := src[i].EnsureDecoded(&sr.colTypes[i], &sr.da); err != nil {
return err
}
dst[i] = sqlbase.DatumToEncDatum(sr.colTypes[i], src[i].Datum)
}
return nil
}
6 changes: 4 additions & 2 deletions pkg/sql/stats/row_sampling_test.go
Expand Up @@ -30,10 +30,12 @@ import (
func runSampleTest(t *testing.T, numSamples int, ranks []int) {
typeInt := sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_INT}
var sr SampleReservoir
sr.Init(numSamples)
sr.Init(numSamples, []sqlbase.ColumnType{typeInt})
for _, r := range ranks {
d := sqlbase.DatumToEncDatum(typeInt, tree.NewDInt(tree.DInt(r)))
sr.SampleRow(sqlbase.EncDatumRow{d}, uint64(r))
if err := sr.SampleRow(sqlbase.EncDatumRow{d}, uint64(r)); err != nil {
t.Errorf("%v", err)
}
}
samples := sr.Get()
sampledRanks := make([]int, len(samples))
Expand Down