From 7238664b574e6f8d5e15f9d8023512b9e540b99d Mon Sep 17 00:00:00 2001 From: Jordan Lewis Date: Wed, 4 Apr 2018 09:06:15 -0400 Subject: [PATCH 1/2] distsqlrun: don't allocate between fused processors Previously, `ProcOutputHelper.ProcessRow` (and, by extension, all `RowSource.Next` implementations) always allocated a fresh `EncDatumRow`. This was wasteful - not every processor needs to be able to hold a reference to the output of `RowSource.Next`. Now, `ProcessRow` never allocates a fresh `EncDatumRow`, and the contract of `RowSource.Next` has been changed to say that it's not valid to hang on to a row returned by `Next` past the next call to `Next`. Processors that need to hold on to a row from their upstreams have been modified to make an explicit copy to achieve this safely. Finally, a new `copyingRowReceiver` is introduced that makes a copy of every row that is `Push`'d to it. A `copyingRowReceiver` is inserted before every router, since routers all expect that their inputs will be immutable. This preserves the safety of sending outputs of `RowSource.Next`, which aren't safe to hold on to, to routers, which expect rows that *are* safe to hold on to. Release note: None --- pkg/sql/distsqlrun/aggregator.go | 9 +++--- pkg/sql/distsqlrun/base.go | 21 +++++++++++-- pkg/sql/distsqlrun/distinct.go | 31 +++++++++++-------- pkg/sql/distsqlrun/distinct_test.go | 2 +- pkg/sql/distsqlrun/flow.go | 14 ++++++++- pkg/sql/distsqlrun/hashjoiner.go | 3 ++ pkg/sql/distsqlrun/input_sync.go | 6 +++- .../distsqlrun/interleaved_reader_joiner.go | 4 +-- .../interleaved_reader_joiner_test.go | 2 +- pkg/sql/distsqlrun/joinreader.go | 2 +- pkg/sql/distsqlrun/processors.go | 23 ++++++++------ pkg/sql/distsqlrun/sampler.go | 2 ++ pkg/sql/distsqlrun/sorter.go | 29 ++++++++++------- .../distsqlrun/stream_group_accumulator.go | 4 +++ pkg/sql/distsqlrun/tablereader.go | 1 - pkg/sql/distsqlrun/tablereader_test.go | 2 +- pkg/sql/sqlbase/encoded_datum.go | 11 +++++++ 17 files changed, 115 insertions(+), 51 deletions(-) diff --git a/pkg/sql/distsqlrun/aggregator.go b/pkg/sql/distsqlrun/aggregator.go index 8dc76f619c91..d889f50751d3 100644 --- a/pkg/sql/distsqlrun/aggregator.go +++ b/pkg/sql/distsqlrun/aggregator.go @@ -112,6 +112,7 @@ type aggregatorBase struct { funcs []*aggregateFuncHolder outputTypes []sqlbase.ColumnType datumAlloc sqlbase.DatumAlloc + rowAlloc sqlbase.EncDatumRowAlloc bucketsAcc mon.BoundAccount @@ -408,7 +409,7 @@ func (ag *hashAggregator) accumulateRows() (aggregatorState, sqlbase.EncDatumRow } if ag.lastOrdGroupCols == nil { - ag.lastOrdGroupCols = row + ag.lastOrdGroupCols = ag.rowAlloc.CopyRow(row) } else { matched, err := ag.matchLastOrdGroupCols(row) if err != nil { @@ -416,7 +417,7 @@ func (ag *hashAggregator) accumulateRows() (aggregatorState, sqlbase.EncDatumRow return aggStateUnknown, nil, nil } if !matched { - ag.lastOrdGroupCols = row + ag.lastOrdGroupCols = ag.rowAlloc.CopyRow(row) break } } @@ -467,7 +468,7 @@ func (ag *orderedAggregator) accumulateRows() (aggregatorState, sqlbase.EncDatum } if ag.lastOrdGroupCols == nil { - ag.lastOrdGroupCols = row + ag.lastOrdGroupCols = ag.rowAlloc.CopyRow(row) } else { matched, err := ag.matchLastOrdGroupCols(row) if err != nil { @@ -475,7 +476,7 @@ func (ag *orderedAggregator) accumulateRows() (aggregatorState, sqlbase.EncDatum return aggStateUnknown, nil, nil } if !matched { - ag.lastOrdGroupCols = row + ag.lastOrdGroupCols = ag.rowAlloc.CopyRow(row) break } } diff --git a/pkg/sql/distsqlrun/base.go b/pkg/sql/distsqlrun/base.go index b93846890e58..1525e6cea564 100644 --- a/pkg/sql/distsqlrun/base.go +++ b/pkg/sql/distsqlrun/base.go @@ -64,7 +64,9 @@ type RowReceiver interface { // ProducerDone() needs to be called (after draining is done, if draining was // requested). // - // The sender must not modify the row after calling this function. + // Unless specifically permitted by the underlying implementation, (see + // copyingRowReceiver, for example), the sender must not modify the row + // after calling this function. // // After DrainRequested is returned, it is expected that all future calls only // carry metadata (however that is not enforced and implementations should be @@ -113,6 +115,9 @@ type RowSource interface { // been exhausted - no more records are coming and any further method calls // will be no-ops. // + // EncDatumRows returned by Next() are only valid until the next call to + // Next(), although the EncDatums inside them stay valid forever. + // // A ProducerMetadata record may contain an error. In that case, this // interface is oblivious about the semantics: implementers may continue // returning different rows on future calls, or may return an empty record @@ -207,7 +212,7 @@ func DrainAndForwardMetadata(ctx context.Context, src RowSource, dst RowReceiver ) } - switch dst.Push(row, meta) { + switch dst.Push(nil /* row */, meta) { case ConsumerClosed: src.ConsumerClosed() return @@ -692,6 +697,18 @@ func (rb *RowBuffer) ConsumerClosed() { } } +type copyingRowReceiver struct { + RowReceiver + alloc sqlbase.EncDatumRowAlloc +} + +func (r *copyingRowReceiver) Push(row sqlbase.EncDatumRow, meta *ProducerMetadata) ConsumerStatus { + if row != nil { + row = r.alloc.CopyRow(row) + } + return r.RowReceiver.Push(row, meta) +} + // String implements fmt.Stringer. func (e *Error) String() string { if err := e.ErrorDetail(); err != nil { diff --git a/pkg/sql/distsqlrun/distinct.go b/pkg/sql/distsqlrun/distinct.go index 9a95ff4bf92a..d126300ae9a9 100644 --- a/pkg/sql/distsqlrun/distinct.go +++ b/pkg/sql/distsqlrun/distinct.go @@ -28,16 +28,17 @@ import ( type distinct struct { processorBase - input RowSource - types []sqlbase.ColumnType - lastGroupKey sqlbase.EncDatumRow - arena stringarena.Arena - seen map[string]struct{} - orderedCols []uint32 - distinctCols util.FastIntSet - memAcc mon.BoundAccount - datumAlloc sqlbase.DatumAlloc - scratch []byte + input RowSource + types []sqlbase.ColumnType + haveLastGroupKey bool + lastGroupKey sqlbase.EncDatumRow + arena stringarena.Arena + seen map[string]struct{} + orderedCols []uint32 + distinctCols util.FastIntSet + memAcc mon.BoundAccount + datumAlloc sqlbase.DatumAlloc + scratch []byte } // sortedDistinct is a specialized distinct that can be used when all of the @@ -100,6 +101,8 @@ func newDistinct( }); err != nil { return nil, err } + d.lastGroupKey = d.out.rowAlloc.AllocRow(len(d.types)) + d.haveLastGroupKey = false if allSorted { // We can use the faster sortedDistinct processor. @@ -148,7 +151,7 @@ func (d *sortedDistinct) Run(ctx context.Context, wg *sync.WaitGroup) { } func (d *distinct) matchLastGroupKey(row sqlbase.EncDatumRow) (bool, error) { - if d.lastGroupKey == nil { + if !d.haveLastGroupKey { return false, nil } for _, colIdx := range d.orderedCols { @@ -231,7 +234,8 @@ func (d *distinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { // distinct keys in the 'seen' set will never be seen again. This allows // us to keep the current arena block and overwrite strings previously // allocated on it, which implies that UnsafeReset() is safe to call here. - d.lastGroupKey = row + copy(d.lastGroupKey, row) + d.haveLastGroupKey = true if err := d.arena.UnsafeReset(d.ctx); err != nil { d.moveToDraining(err) break @@ -281,7 +285,8 @@ func (d *sortedDistinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { continue } - d.lastGroupKey = row + d.haveLastGroupKey = true + copy(d.lastGroupKey, row) if outRow := d.processRowHelper(row); outRow != nil { return outRow, nil diff --git a/pkg/sql/distsqlrun/distinct_test.go b/pkg/sql/distsqlrun/distinct_test.go index 7180d97c801f..5aef99c6669c 100644 --- a/pkg/sql/distsqlrun/distinct_test.go +++ b/pkg/sql/distsqlrun/distinct_test.go @@ -134,7 +134,7 @@ func TestDistinct(t *testing.T) { } var res sqlbase.EncDatumRows for { - row := out.NextNoMeta(t) + row := out.NextNoMeta(t).Copy() if row == nil { break } diff --git a/pkg/sql/distsqlrun/flow.go b/pkg/sql/distsqlrun/flow.go index 7f7c7b87d4b3..949ee7a36b2c 100644 --- a/pkg/sql/distsqlrun/flow.go +++ b/pkg/sql/distsqlrun/flow.go @@ -307,6 +307,17 @@ func (f *Flow) makeProcessor( f.startables = append(f.startables, r) } + // No output router or channel is safe to push rows to, unless the row won't + // be modified later by the thing that created it. No processor creates safe + // rows, either. So, we always wrap our outputs in copyingRowReceivers. These + // outputs aren't used at all if they are processors that get fused to their + // upstreams, though, which means that copyingRowReceivers are only used on + // non-fused processors like the output routers. + + for i := range outputs { + outputs[i] = ©ingRowReceiver{RowReceiver: outputs[i]} + } + proc, err := newProcessor(ctx, &f.FlowCtx, ps.ProcessorID, &ps.Core, &ps.Post, inputs, outputs) if err != nil { return nil, err @@ -315,7 +326,8 @@ func (f *Flow) makeProcessor( // Initialize any routers (the setupRouter case above) and outboxes. types := proc.OutputTypes() for _, o := range outputs { - switch o := o.(type) { + copier := o.(*copyingRowReceiver) + switch o := copier.RowReceiver.(type) { case router: o.init(&f.FlowCtx, types) case *outbox: diff --git a/pkg/sql/distsqlrun/hashjoiner.go b/pkg/sql/distsqlrun/hashjoiner.go index 3d4729386879..8bfe9dce0778 100644 --- a/pkg/sql/distsqlrun/hashjoiner.go +++ b/pkg/sql/distsqlrun/hashjoiner.go @@ -215,6 +215,7 @@ func (h *hashJoiner) Run(ctx context.Context, wg *sync.WaitGroup) { bufferPhaseOom := false if pgErr, ok := pgerror.GetPGCause(err); ok && pgErr.Code == pgerror.CodeOutOfMemoryError { + log.VEvent(ctx, 1, "buffer phase ran out of memory") bufferPhaseOom = true } @@ -359,6 +360,7 @@ func (h *hashJoiner) bufferPhase( // choose the right stream and consume it. h.storedSide = rightSide + log.VEvent(ctx, 1, "buffer phase found no short stream") for { if err := h.cancelChecker.Check(); err != nil { return nil, false, err @@ -702,6 +704,7 @@ func (h *hashJoiner) receiveRow( } if !hasNull { // Normal path. + row = h.out.rowAlloc.CopyRow(row) return row, false, nil } diff --git a/pkg/sql/distsqlrun/input_sync.go b/pkg/sql/distsqlrun/input_sync.go index 94cfe03a3d68..cb7bf2f96e3e 100644 --- a/pkg/sql/distsqlrun/input_sync.go +++ b/pkg/sql/distsqlrun/input_sync.go @@ -84,7 +84,8 @@ type orderedSynchronizer struct { // err can be set by the Less function (used by the heap implementation) err error - alloc sqlbase.DatumAlloc + alloc sqlbase.DatumAlloc + rowAlloc sqlbase.EncDatumRowAlloc // metadata is accumulated from all the sources and is passed on as soon as // possible. @@ -197,6 +198,9 @@ func (s *orderedSynchronizer) consumeMetadata(src *srcInfo, mode consumeMetadata continue } if mode == stopOnRowOrError { + if row != nil { + row = s.rowAlloc.CopyRow(row) + } src.row = row return nil } diff --git a/pkg/sql/distsqlrun/interleaved_reader_joiner.go b/pkg/sql/distsqlrun/interleaved_reader_joiner.go index 496e49a78a81..988ec006eb67 100644 --- a/pkg/sql/distsqlrun/interleaved_reader_joiner.go +++ b/pkg/sql/distsqlrun/interleaved_reader_joiner.go @@ -314,9 +314,7 @@ func (irj *interleavedReaderJoiner) Run(ctx context.Context, wg *sync.WaitGroup) // A new ancestor row is fetched. We re-assign our reference // to the most recent ancestor row. - // This is safe because tableRow is a newly alloc'd - // row. - irj.ancestorRow = tableRow + irj.ancestorRow = tInfo.post.rowAlloc.CopyRow(tableRow) irj.ancestorJoined = false continue } diff --git a/pkg/sql/distsqlrun/interleaved_reader_joiner_test.go b/pkg/sql/distsqlrun/interleaved_reader_joiner_test.go index 53b3fd484c9e..b5e4fd487091 100644 --- a/pkg/sql/distsqlrun/interleaved_reader_joiner_test.go +++ b/pkg/sql/distsqlrun/interleaved_reader_joiner_test.go @@ -423,7 +423,7 @@ func TestInterleavedReaderJoiner(t *testing.T) { if row == nil { break } - res = append(res, row) + res = append(res, row.Copy()) } if result := res.String(irj.OutputTypes()); result != tc.expected { diff --git a/pkg/sql/distsqlrun/joinreader.go b/pkg/sql/distsqlrun/joinreader.go index dfb55b5fe4a0..d156ed2a55b8 100644 --- a/pkg/sql/distsqlrun/joinreader.go +++ b/pkg/sql/distsqlrun/joinreader.go @@ -339,7 +339,7 @@ func (jr *joinReader) mainLoop(ctx context.Context) error { EndKey: key.PrefixEnd(), } if jr.isLookupJoin() { - rows = append(rows, row) + rows = append(rows, jr.out.rowAlloc.CopyRow(row)) if spanToRowIndices[key.String()] == nil { spans = append(spans, span) } diff --git a/pkg/sql/distsqlrun/processors.go b/pkg/sql/distsqlrun/processors.go index 78fb26049b6b..a5414c29ae5d 100644 --- a/pkg/sql/distsqlrun/processors.go +++ b/pkg/sql/distsqlrun/processors.go @@ -62,12 +62,14 @@ type ProcOutputHelper struct { // outputCols can be set. outputCols []uint32 + outputRow sqlbase.EncDatumRow + // outputTypes is the schema of the rows produced by the processor after // post-processing (i.e. the rows that are pushed through a router). // // If renderExprs is set, these types correspond to the types of those // expressions. - // If outpuCols is set, these types correspond to the types of + // If outputCols is set, these types correspond to the types of // those columns. // If neither is set, this is the internal schema of the processor. outputTypes []sqlbase.ColumnType @@ -134,6 +136,10 @@ func (h *ProcOutputHelper) Init( } else { h.outputTypes = types } + if h.outputCols != nil || h.renderExprs != nil { + // We're rendering or projecting, so allocate an output row. + h.outputRow = h.rowAlloc.AllocRow(len(h.outputTypes)) + } h.offset = post.Offset if post.Limit == 0 || post.Limit >= math.MaxUint64-h.offset { @@ -300,7 +306,8 @@ func (h *ProcOutputHelper) EmitRow( } // ProcessRow sends the invoked row through the post-processing stage and returns -// the post-processed row. +// the post-processed row. Results from ProcessRow aren't safe past the next call +// to ProcessRow. // // The moreRowsOK retval is true if more rows can be processed, false if the // limit has been reached (if there's a limit). Upon seeing a false value, the @@ -334,31 +341,27 @@ func (h *ProcOutputHelper) ProcessRow( return nil, true, nil } - var outRow sqlbase.EncDatumRow if h.renderExprs != nil { // Rendering. - outRow = h.rowAlloc.AllocRow(len(h.renderExprs)) for i := range h.renderExprs { datum, err := h.renderExprs[i].eval(row) if err != nil { return nil, false, err } - outRow[i] = sqlbase.DatumToEncDatum(h.outputTypes[i], datum) + h.outputRow[i] = sqlbase.DatumToEncDatum(h.outputTypes[i], datum) } } else if h.outputCols != nil { // Projection. - outRow = h.rowAlloc.AllocRow(len(h.outputCols)) for i, col := range h.outputCols { - outRow[i] = row[col] + h.outputRow[i] = row[col] } } else { // No rendering or projection. - outRow = h.rowAlloc.AllocRow(len(row)) - copy(outRow, row) + return row, h.rowIdx < h.maxRowIdx, nil } // If this row satisfies the limit, the caller is told to drain. - return outRow, h.rowIdx < h.maxRowIdx, nil + return h.outputRow, h.rowIdx < h.maxRowIdx, nil } // Close signals to the output that there will be no more rows. diff --git a/pkg/sql/distsqlrun/sampler.go b/pkg/sql/distsqlrun/sampler.go index 72d061a41517..b176382d1e5b 100644 --- a/pkg/sql/distsqlrun/sampler.go +++ b/pkg/sql/distsqlrun/sampler.go @@ -163,6 +163,7 @@ func (s *samplerProcessor) Run(ctx context.Context, wg *sync.WaitGroup) { func (s *samplerProcessor) mainLoop(ctx context.Context) (earlyExit bool, _ error) { rng, _ := randutil.NewPseudoRand() var da sqlbase.DatumAlloc + var ra sqlbase.EncDatumRowAlloc var buf []byte for { row, meta := s.input.Next() @@ -199,6 +200,7 @@ func (s *samplerProcessor) mainLoop(ctx context.Context) (earlyExit bool, _ erro // Use Int63 so we don't have headaches converting to DInt. rank := uint64(rng.Int63()) + row = ra.CopyRow(row) s.sr.SampleRow(row, rank) } diff --git a/pkg/sql/distsqlrun/sorter.go b/pkg/sql/distsqlrun/sorter.go index c247f4a057d2..f0cc06038a40 100644 --- a/pkg/sql/distsqlrun/sorter.go +++ b/pkg/sql/distsqlrun/sorter.go @@ -519,10 +519,10 @@ type sortChunksProcessor struct { rows memRowContainer rowContainerMon *mon.BytesMonitor alloc sqlbase.DatumAlloc + rowAlloc sqlbase.EncDatumRowAlloc // sortChunksProcessor accumulates rows that are equal on a prefix, until it // encounters a row that is greater. It stores that greater row in nextChunkRow - prefix sqlbase.EncDatumRow nextChunkRow sqlbase.EncDatumRow } @@ -563,10 +563,12 @@ func newSortChunksProcessor( // chunkCompleted is a helper function that determines if the given row shares the same // values for the first matchLen ordering columns with the given prefix. -func (s *sortChunksProcessor) chunkCompleted() (bool, error) { +func (s *sortChunksProcessor) chunkCompleted( + nextChunkRow, prefix sqlbase.EncDatumRow, +) (bool, error) { for _, ord := range s.ordering[:s.matchLen] { col := ord.ColIdx - cmp, err := s.nextChunkRow[col].Compare(&s.rows.types[col], &s.alloc, s.rows.evalCtx, &s.prefix[col]) + cmp, err := nextChunkRow[col].Compare(&s.rows.types[col], &s.alloc, s.rows.evalCtx, &prefix[col]) if cmp != 0 || err != nil { return true, err } @@ -585,23 +587,25 @@ func (s *sortChunksProcessor) fill() (bool, error) { var meta *ProducerMetadata - for s.nextChunkRow == nil { - s.nextChunkRow, meta = s.input.Next() + nextChunkRow := s.nextChunkRow + s.nextChunkRow = nil + for nextChunkRow == nil { + nextChunkRow, meta = s.input.Next() if meta != nil { s.trailingMeta = append(s.trailingMeta, *meta) if meta.Err != nil { return false, nil } continue - } else if s.nextChunkRow == nil { + } else if nextChunkRow == nil { return false, nil } break } - s.prefix = s.nextChunkRow + prefix := nextChunkRow // Add the chunk - if err := s.rows.AddRow(ctx, s.nextChunkRow); err != nil { + if err := s.rows.AddRow(ctx, nextChunkRow); err != nil { return false, err } @@ -610,26 +614,27 @@ func (s *sortChunksProcessor) fill() (bool, error) { // We will accumulate rows to form a chunk such that they all share the same values // as prefix for the first s.matchLen ordering columns. for { - s.nextChunkRow, meta = s.input.Next() + nextChunkRow, meta = s.input.Next() if meta != nil { s.trailingMeta = append(s.trailingMeta, *meta) continue } - if s.nextChunkRow == nil { + if nextChunkRow == nil { break } - chunkCompleted, err := s.chunkCompleted() + chunkCompleted, err := s.chunkCompleted(nextChunkRow, prefix) if err != nil { return false, err } if chunkCompleted { + s.nextChunkRow = s.rowAlloc.CopyRow(nextChunkRow) break } - if err := s.rows.AddRow(ctx, s.nextChunkRow); err != nil { + if err := s.rows.AddRow(ctx, nextChunkRow); err != nil { return false, err } } diff --git a/pkg/sql/distsqlrun/stream_group_accumulator.go b/pkg/sql/distsqlrun/stream_group_accumulator.go index cb44c674dad9..9de6a863fe30 100644 --- a/pkg/sql/distsqlrun/stream_group_accumulator.go +++ b/pkg/sql/distsqlrun/stream_group_accumulator.go @@ -40,6 +40,8 @@ type streamGroupAccumulator struct { // accumulator after the current group is returned, so the accumulator can // resume later. leftoverRow sqlbase.EncDatumRow + + rowAlloc sqlbase.EncDatumRowAlloc } func makeStreamGroupAccumulator( @@ -82,6 +84,8 @@ func (s *streamGroupAccumulator) nextGroup( return s.curGroup, nil } + row = s.rowAlloc.CopyRow(row) + if len(s.curGroup) == 0 { if s.curGroup == nil { s.curGroup = make([]sqlbase.EncDatumRow, 0, 64) diff --git a/pkg/sql/distsqlrun/tablereader.go b/pkg/sql/distsqlrun/tablereader.go index f1824e57876c..835c9676b90b 100644 --- a/pkg/sql/distsqlrun/tablereader.go +++ b/pkg/sql/distsqlrun/tablereader.go @@ -136,7 +136,6 @@ func (w *rowFetcherWrapper) Next() (sqlbase.EncDatumRow, *ProducerMetadata) { } return row, nil } - func (w rowFetcherWrapper) OutputTypes() []sqlbase.ColumnType { return nil } func (w rowFetcherWrapper) ConsumerDone() {} func (w rowFetcherWrapper) ConsumerClosed() {} diff --git a/pkg/sql/distsqlrun/tablereader_test.go b/pkg/sql/distsqlrun/tablereader_test.go index 83532b487816..2489081d8dda 100644 --- a/pkg/sql/distsqlrun/tablereader_test.go +++ b/pkg/sql/distsqlrun/tablereader_test.go @@ -167,7 +167,7 @@ func TestTableReader(t *testing.T) { if row == nil { break } - res = append(res, row) + res = append(res, row.Copy()) } if result := res.String(tr.OutputTypes()); result != c.expected { t.Errorf("invalid results: %s, expected %s'", result, c.expected) diff --git a/pkg/sql/sqlbase/encoded_datum.go b/pkg/sql/sqlbase/encoded_datum.go index a7c62138dd7b..f2ca6b2e56c0 100644 --- a/pkg/sql/sqlbase/encoded_datum.go +++ b/pkg/sql/sqlbase/encoded_datum.go @@ -334,6 +334,17 @@ func (r EncDatumRow) stringToBuf(types []ColumnType, a *DatumAlloc, b *bytes.Buf b.WriteString("]") } +// Copy makes a copy of this EncDatumRow. Convenient for tests. Use an +// EncDatumRowAlloc in non-test code. +func (r EncDatumRow) Copy() EncDatumRow { + if r == nil { + return nil + } + rCopy := make(EncDatumRow, len(r)) + copy(rCopy, r) + return rCopy +} + func (r EncDatumRow) String(types []ColumnType) string { var b bytes.Buffer r.stringToBuf(types, &DatumAlloc{}, &b) From e2f099f69162983d5dd031ebb4edd2214b523059 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Mon, 28 May 2018 10:19:48 -0400 Subject: [PATCH 2/2] libroach: fix excessive compactions performed by DBCompactRange Fix excessive compactions from `DBCompactRange` due to mishandling of the first and last ranges to compact. When a non-empty start or end key is specified, DBCompactRange was previously calling `rocksdb::DB::CompactRange` with a `null` start/end key resulting in compacting from the beginning (or to the end) of the entire key space. See #24029 --- c-deps/libroach/db.cc | 69 ++++++++++++++++++++++++++++---------- c-deps/libroach/db.h | 11 ++++++ c-deps/libroach/db_test.cc | 68 +++++++++++++++++++++++++++++++++++++ 3 files changed, 130 insertions(+), 18 deletions(-) diff --git a/c-deps/libroach/db.cc b/c-deps/libroach/db.cc index a929a8c8fcde..7612bad1c516 100644 --- a/c-deps/libroach/db.cc +++ b/c-deps/libroach/db.cc @@ -77,6 +77,48 @@ ScopedStats::~ScopedStats() { } } +void BatchSSTablesForCompaction(const std::vector &sst, + rocksdb::Slice start_key, rocksdb::Slice end_key, + uint64_t target_size, std::vector *ranges) { + int prev = -1; // index of the last compacted sst + uint64_t size = 0; + for (int i = 0; i < sst.size(); ++i) { + size += sst[i].size; + if (size < target_size && (i + 1) < sst.size()) { + // We haven't reached the target size or the end of the sstables + // to compact. + continue; + } + + rocksdb::Slice start; + if (prev == -1) { + // This is the first compaction. + start = start_key; + } else { + // This is a compaction in the middle or end of the requested + // key range. The start key for the compaction is the largest + // key from the previous compacted. + start = rocksdb::Slice(sst[prev].largestkey); + } + + rocksdb::Slice end; + if ((i + 1) == sst.size()) { + // This is the last compaction. + end = end_key; + } else { + // This is a compaction at the start or in the middle of the + // requested key range. The end key is the largest key in the + // current sstable. + end = rocksdb::Slice(sst[i].largestkey); + } + + ranges->emplace_back(rocksdb::Range(start, end)); + + prev = i; + size = 0; + } +} + } // namespace cockroach namespace { @@ -333,30 +375,21 @@ DBStatus DBCompactRange(DBEngine* db, DBSlice start, DBSlice end, bool force_bot return a.smallestkey < b.smallestkey; }); - // Walk over the bottom-most sstables in order and perform - // compactions every 128MB. - rocksdb::Slice last; - rocksdb::Slice* last_ptr = nullptr; - uint64_t size = 0; + // Batch the bottom-most sstables into compactions of ~128MB. const uint64_t target_size = 128 << 20; - for (int i = 0; i < sst.size(); ++i) { - size += sst[i].size; - if (size < target_size) { - continue; - } - rocksdb::Slice cur(sst[i].largestkey); - rocksdb::Status status = db->rep->CompactRange(options, last_ptr, &cur); + std::vector ranges; + BatchSSTablesForCompaction(sst, start_key, end_key, target_size, &ranges); + + for (auto r : ranges) { + rocksdb::Status status = db->rep->CompactRange( + options, + r.start.empty() ? nullptr : &r.start, + r.limit.empty() ? nullptr : &r.limit); if (!status.ok()) { return ToDBStatus(status); } - last = cur; - last_ptr = &last; - size = 0; } - if (size > 0) { - return ToDBStatus(db->rep->CompactRange(options, last_ptr, nullptr)); - } return kSuccess; } diff --git a/c-deps/libroach/db.h b/c-deps/libroach/db.h index 6a9d177276de..d794aba12417 100644 --- a/c-deps/libroach/db.h +++ b/c-deps/libroach/db.h @@ -17,7 +17,9 @@ #include #include #include +#include #include +#include #include #include @@ -81,4 +83,13 @@ class ScopedStats { uint64_t internal_delete_skipped_count_base_; }; +// BatchSStables batches the supplied sstable metadata into chunks of +// sstables that are target_size. An empty start or end key indicates +// that the a compaction from the beginning (or end) of the key space +// should be provided. The sstable metadata must already be sorted by +// smallest key. +void BatchSSTablesForCompaction(const std::vector &sst, + rocksdb::Slice start_key, rocksdb::Slice end_key, + uint64_t target_size, std::vector *ranges); + } // namespace cockroach diff --git a/c-deps/libroach/db_test.cc b/c-deps/libroach/db_test.cc index f5f86ff0c560..282388322978 100644 --- a/c-deps/libroach/db_test.cc +++ b/c-deps/libroach/db_test.cc @@ -43,3 +43,71 @@ TEST(Libroach, DBOpen) { DBClose(db); } + +TEST(Libroach, BatchSSTablesForCompaction) { + auto toString = [](const std::vector& ranges) -> std::string { + std::string res; + for (auto r : ranges) { + if (!res.empty()) { + res.append(","); + } + res.append(r.start.data(), r.start.size()); + res.append("-"); + res.append(r.limit.data(), r.limit.size()); + } + return res; + }; + + auto sst = [](const std::string& smallest, const std::string& largest, + uint64_t size) -> rocksdb::SstFileMetaData { + return rocksdb::SstFileMetaData("", "", size, 0, 0, smallest, largest, 0, 0); + }; + + struct TestCase { + TestCase(const std::vector& s, + const std::string& start, const std::string& end, + uint64_t target, const std::string& expected) + : sst(s), + start_key(start), + end_key(end), + target_size(target), + expected_ranges(expected) { + } + std::vector sst; + std::string start_key; + std::string end_key; + uint64_t target_size; + std::string expected_ranges; + }; + + std::vector testCases = { + TestCase({ sst("a", "b", 10) }, + "", "", 10, "-"), + TestCase({ sst("a", "b", 10) }, + "a", "", 10, "a-"), + TestCase({ sst("a", "b", 10) }, + "", "b", 10, "-b"), + TestCase({ sst("a", "b", 10) }, + "a", "b", 10, "a-b"), + TestCase({ sst("c", "d", 10) }, + "a", "b", 10, "a-b"), + TestCase({ sst("a", "b", 10), sst("b", "c", 10) }, + "a", "c", 10, "a-b,b-c"), + TestCase({ sst("a", "b", 10), sst("b", "c", 10) }, + "a", "c", 100, "a-c"), + TestCase({ sst("a", "b", 10), sst("b", "c", 10) }, + "", "c", 10, "-b,b-c"), + TestCase({ sst("a", "b", 10), sst("b", "c", 10) }, + "a", "", 10, "a-b,b-"), + TestCase({ sst("a", "b", 10), sst("b", "c", 10), sst("c", "d", 10) }, + "a", "d", 10, "a-b,b-c,c-d"), + TestCase({ sst("a", "b", 10), sst("b", "c", 10), sst("c", "d", 10) }, + "a", "d", 20, "a-c,c-d"), + }; + for (auto c : testCases) { + std::vector ranges; + BatchSSTablesForCompaction(c.sst, c.start_key, c.end_key, c.target_size, &ranges); + auto result = toString(ranges); + EXPECT_EQ(c.expected_ranges, result); + } +}