Skip to content

Commit

Permalink
Merge #24589 #26355
Browse files Browse the repository at this point in the history
24589: distsqlrun: don't allocate between fused processors r=jordanlewis a=jordanlewis

distsqlrun: don't allocate between fused processors

Previously, `ProcOutputHelper.ProcessRow` (and, by extension, all
`RowSource.Next` implementations) always allocated a fresh
`EncDatumRow`. This was wasteful - not every processor needs to be able
to hold a reference to the output of `RowSource.Next`.

Now, `ProcessRow` never allocates a fresh `EncDatumRow`, and the
contract of `RowSource.Next` has been changed to say that it's not valid
to hang on to a row returned by `Next` past the next call to `Next`.
Processors that need to hold on to a row from their upstreams have been
modified to make an explicit copy to achieve this safely.

Finally, a new `copyingRowReceiver` is introduced that makes a copy of
every row that is `Push`'d to it. A `copyingRowReceiver` is inserted
before every router, since routers all expect that their inputs will be
immutable. This preserves the safety of sending outputs of
`RowSource.Next`, which aren't safe to hold on to, to routers, which
expect rows that *are* safe to hold on to.

Release note: None

Fixes #22462.
Fixes #24452.

26355: libroach: fix excessive compactions performed by DBCompactRange r=bdarnell,tschottdorf a=petermattis

Fix excessive compactions from `DBCompactRange` due to mishandling of
the first and last ranges to compact. When a non-empty start or end key
is specified, DBCompactRange was previously calling
`rocksdb::DB::CompactRange` with a `null` start/end key resulting in
compacting from the beginning (or to the end) of the entire key space.

See #24029

Co-authored-by: Jordan Lewis <jordanthelewis@gmail.com>
Co-authored-by: Peter Mattis <petermattis@gmail.com>
  • Loading branch information
3 people committed Jun 4, 2018
3 parents e9d2285 + 7238664 + e2f099f commit 5d2feac
Show file tree
Hide file tree
Showing 20 changed files with 245 additions and 69 deletions.
69 changes: 51 additions & 18 deletions c-deps/libroach/db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,48 @@ ScopedStats::~ScopedStats() {
}
}

void BatchSSTablesForCompaction(const std::vector<rocksdb::SstFileMetaData> &sst,
rocksdb::Slice start_key, rocksdb::Slice end_key,
uint64_t target_size, std::vector<rocksdb::Range> *ranges) {
int prev = -1; // index of the last compacted sst
uint64_t size = 0;
for (int i = 0; i < sst.size(); ++i) {
size += sst[i].size;
if (size < target_size && (i + 1) < sst.size()) {
// We haven't reached the target size or the end of the sstables
// to compact.
continue;
}

rocksdb::Slice start;
if (prev == -1) {
// This is the first compaction.
start = start_key;
} else {
// This is a compaction in the middle or end of the requested
// key range. The start key for the compaction is the largest
// key from the previous compacted.
start = rocksdb::Slice(sst[prev].largestkey);
}

rocksdb::Slice end;
if ((i + 1) == sst.size()) {
// This is the last compaction.
end = end_key;
} else {
// This is a compaction at the start or in the middle of the
// requested key range. The end key is the largest key in the
// current sstable.
end = rocksdb::Slice(sst[i].largestkey);
}

ranges->emplace_back(rocksdb::Range(start, end));

prev = i;
size = 0;
}
}

} // namespace cockroach

namespace {
Expand Down Expand Up @@ -333,30 +375,21 @@ DBStatus DBCompactRange(DBEngine* db, DBSlice start, DBSlice end, bool force_bot
return a.smallestkey < b.smallestkey;
});

// Walk over the bottom-most sstables in order and perform
// compactions every 128MB.
rocksdb::Slice last;
rocksdb::Slice* last_ptr = nullptr;
uint64_t size = 0;
// Batch the bottom-most sstables into compactions of ~128MB.
const uint64_t target_size = 128 << 20;
for (int i = 0; i < sst.size(); ++i) {
size += sst[i].size;
if (size < target_size) {
continue;
}
rocksdb::Slice cur(sst[i].largestkey);
rocksdb::Status status = db->rep->CompactRange(options, last_ptr, &cur);
std::vector<rocksdb::Range> ranges;
BatchSSTablesForCompaction(sst, start_key, end_key, target_size, &ranges);

for (auto r : ranges) {
rocksdb::Status status = db->rep->CompactRange(
options,
r.start.empty() ? nullptr : &r.start,
r.limit.empty() ? nullptr : &r.limit);
if (!status.ok()) {
return ToDBStatus(status);
}
last = cur;
last_ptr = &last;
size = 0;
}

if (size > 0) {
return ToDBStatus(db->rep->CompactRange(options, last_ptr, nullptr));
}
return kSuccess;
}

Expand Down
11 changes: 11 additions & 0 deletions c-deps/libroach/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
#include <libroach.h>
#include <memory>
#include <rocksdb/comparator.h>
#include <rocksdb/db.h>
#include <rocksdb/iterator.h>
#include <rocksdb/metadata.h>
#include <rocksdb/status.h>
#include <rocksdb/write_batch.h>

Expand Down Expand Up @@ -81,4 +83,13 @@ class ScopedStats {
uint64_t internal_delete_skipped_count_base_;
};

// BatchSStables batches the supplied sstable metadata into chunks of
// sstables that are target_size. An empty start or end key indicates
// that the a compaction from the beginning (or end) of the key space
// should be provided. The sstable metadata must already be sorted by
// smallest key.
void BatchSSTablesForCompaction(const std::vector<rocksdb::SstFileMetaData> &sst,
rocksdb::Slice start_key, rocksdb::Slice end_key,
uint64_t target_size, std::vector<rocksdb::Range> *ranges);

} // namespace cockroach
68 changes: 68 additions & 0 deletions c-deps/libroach/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,71 @@ TEST(Libroach, DBOpen) {

DBClose(db);
}

TEST(Libroach, BatchSSTablesForCompaction) {
auto toString = [](const std::vector<rocksdb::Range>& ranges) -> std::string {
std::string res;
for (auto r : ranges) {
if (!res.empty()) {
res.append(",");
}
res.append(r.start.data(), r.start.size());
res.append("-");
res.append(r.limit.data(), r.limit.size());
}
return res;
};

auto sst = [](const std::string& smallest, const std::string& largest,
uint64_t size) -> rocksdb::SstFileMetaData {
return rocksdb::SstFileMetaData("", "", size, 0, 0, smallest, largest, 0, 0);
};

struct TestCase {
TestCase(const std::vector<rocksdb::SstFileMetaData>& s,
const std::string& start, const std::string& end,
uint64_t target, const std::string& expected)
: sst(s),
start_key(start),
end_key(end),
target_size(target),
expected_ranges(expected) {
}
std::vector<rocksdb::SstFileMetaData> sst;
std::string start_key;
std::string end_key;
uint64_t target_size;
std::string expected_ranges;
};

std::vector<TestCase> testCases = {
TestCase({ sst("a", "b", 10) },
"", "", 10, "-"),
TestCase({ sst("a", "b", 10) },
"a", "", 10, "a-"),
TestCase({ sst("a", "b", 10) },
"", "b", 10, "-b"),
TestCase({ sst("a", "b", 10) },
"a", "b", 10, "a-b"),
TestCase({ sst("c", "d", 10) },
"a", "b", 10, "a-b"),
TestCase({ sst("a", "b", 10), sst("b", "c", 10) },
"a", "c", 10, "a-b,b-c"),
TestCase({ sst("a", "b", 10), sst("b", "c", 10) },
"a", "c", 100, "a-c"),
TestCase({ sst("a", "b", 10), sst("b", "c", 10) },
"", "c", 10, "-b,b-c"),
TestCase({ sst("a", "b", 10), sst("b", "c", 10) },
"a", "", 10, "a-b,b-"),
TestCase({ sst("a", "b", 10), sst("b", "c", 10), sst("c", "d", 10) },
"a", "d", 10, "a-b,b-c,c-d"),
TestCase({ sst("a", "b", 10), sst("b", "c", 10), sst("c", "d", 10) },
"a", "d", 20, "a-c,c-d"),
};
for (auto c : testCases) {
std::vector<rocksdb::Range> ranges;
BatchSSTablesForCompaction(c.sst, c.start_key, c.end_key, c.target_size, &ranges);
auto result = toString(ranges);
EXPECT_EQ(c.expected_ranges, result);
}
}
9 changes: 5 additions & 4 deletions pkg/sql/distsqlrun/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type aggregatorBase struct {
funcs []*aggregateFuncHolder
outputTypes []sqlbase.ColumnType
datumAlloc sqlbase.DatumAlloc
rowAlloc sqlbase.EncDatumRowAlloc

bucketsAcc mon.BoundAccount

Expand Down Expand Up @@ -408,15 +409,15 @@ func (ag *hashAggregator) accumulateRows() (aggregatorState, sqlbase.EncDatumRow
}

if ag.lastOrdGroupCols == nil {
ag.lastOrdGroupCols = row
ag.lastOrdGroupCols = ag.rowAlloc.CopyRow(row)
} else {
matched, err := ag.matchLastOrdGroupCols(row)
if err != nil {
ag.moveToDraining(err)
return aggStateUnknown, nil, nil
}
if !matched {
ag.lastOrdGroupCols = row
ag.lastOrdGroupCols = ag.rowAlloc.CopyRow(row)
break
}
}
Expand Down Expand Up @@ -467,15 +468,15 @@ func (ag *orderedAggregator) accumulateRows() (aggregatorState, sqlbase.EncDatum
}

if ag.lastOrdGroupCols == nil {
ag.lastOrdGroupCols = row
ag.lastOrdGroupCols = ag.rowAlloc.CopyRow(row)
} else {
matched, err := ag.matchLastOrdGroupCols(row)
if err != nil {
ag.moveToDraining(err)
return aggStateUnknown, nil, nil
}
if !matched {
ag.lastOrdGroupCols = row
ag.lastOrdGroupCols = ag.rowAlloc.CopyRow(row)
break
}
}
Expand Down
21 changes: 19 additions & 2 deletions pkg/sql/distsqlrun/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ type RowReceiver interface {
// ProducerDone() needs to be called (after draining is done, if draining was
// requested).
//
// The sender must not modify the row after calling this function.
// Unless specifically permitted by the underlying implementation, (see
// copyingRowReceiver, for example), the sender must not modify the row
// after calling this function.
//
// After DrainRequested is returned, it is expected that all future calls only
// carry metadata (however that is not enforced and implementations should be
Expand Down Expand Up @@ -113,6 +115,9 @@ type RowSource interface {
// been exhausted - no more records are coming and any further method calls
// will be no-ops.
//
// EncDatumRows returned by Next() are only valid until the next call to
// Next(), although the EncDatums inside them stay valid forever.
//
// A ProducerMetadata record may contain an error. In that case, this
// interface is oblivious about the semantics: implementers may continue
// returning different rows on future calls, or may return an empty record
Expand Down Expand Up @@ -207,7 +212,7 @@ func DrainAndForwardMetadata(ctx context.Context, src RowSource, dst RowReceiver
)
}

switch dst.Push(row, meta) {
switch dst.Push(nil /* row */, meta) {
case ConsumerClosed:
src.ConsumerClosed()
return
Expand Down Expand Up @@ -638,6 +643,18 @@ func (rb *RowBuffer) ConsumerClosed() {
}
}

type copyingRowReceiver struct {
RowReceiver
alloc sqlbase.EncDatumRowAlloc
}

func (r *copyingRowReceiver) Push(row sqlbase.EncDatumRow, meta *ProducerMetadata) ConsumerStatus {
if row != nil {
row = r.alloc.CopyRow(row)
}
return r.RowReceiver.Push(row, meta)
}

// String implements fmt.Stringer.
func (e *Error) String() string {
if err := e.ErrorDetail(); err != nil {
Expand Down
31 changes: 18 additions & 13 deletions pkg/sql/distsqlrun/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@ import (
type distinct struct {
processorBase

input RowSource
types []sqlbase.ColumnType
lastGroupKey sqlbase.EncDatumRow
arena stringarena.Arena
seen map[string]struct{}
orderedCols []uint32
distinctCols util.FastIntSet
memAcc mon.BoundAccount
datumAlloc sqlbase.DatumAlloc
scratch []byte
input RowSource
types []sqlbase.ColumnType
haveLastGroupKey bool
lastGroupKey sqlbase.EncDatumRow
arena stringarena.Arena
seen map[string]struct{}
orderedCols []uint32
distinctCols util.FastIntSet
memAcc mon.BoundAccount
datumAlloc sqlbase.DatumAlloc
scratch []byte
}

// sortedDistinct is a specialized distinct that can be used when all of the
Expand Down Expand Up @@ -100,6 +101,8 @@ func newDistinct(
}); err != nil {
return nil, err
}
d.lastGroupKey = d.out.rowAlloc.AllocRow(len(d.types))
d.haveLastGroupKey = false

if allSorted {
// We can use the faster sortedDistinct processor.
Expand Down Expand Up @@ -148,7 +151,7 @@ func (d *sortedDistinct) Run(ctx context.Context, wg *sync.WaitGroup) {
}

func (d *distinct) matchLastGroupKey(row sqlbase.EncDatumRow) (bool, error) {
if d.lastGroupKey == nil {
if !d.haveLastGroupKey {
return false, nil
}
for _, colIdx := range d.orderedCols {
Expand Down Expand Up @@ -231,7 +234,8 @@ func (d *distinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
// distinct keys in the 'seen' set will never be seen again. This allows
// us to keep the current arena block and overwrite strings previously
// allocated on it, which implies that UnsafeReset() is safe to call here.
d.lastGroupKey = row
copy(d.lastGroupKey, row)
d.haveLastGroupKey = true
if err := d.arena.UnsafeReset(d.ctx); err != nil {
d.moveToDraining(err)
break
Expand Down Expand Up @@ -281,7 +285,8 @@ func (d *sortedDistinct) Next() (sqlbase.EncDatumRow, *ProducerMetadata) {
continue
}

d.lastGroupKey = row
d.haveLastGroupKey = true
copy(d.lastGroupKey, row)

if outRow := d.processRowHelper(row); outRow != nil {
return outRow, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestDistinct(t *testing.T) {
}
var res sqlbase.EncDatumRows
for {
row := out.NextNoMeta(t)
row := out.NextNoMeta(t).Copy()
if row == nil {
break
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/sql/distsqlrun/flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ func (f *Flow) makeProcessor(
f.startables = append(f.startables, r)
}

// No output router or channel is safe to push rows to, unless the row won't
// be modified later by the thing that created it. No processor creates safe
// rows, either. So, we always wrap our outputs in copyingRowReceivers. These
// outputs aren't used at all if they are processors that get fused to their
// upstreams, though, which means that copyingRowReceivers are only used on
// non-fused processors like the output routers.

for i := range outputs {
outputs[i] = &copyingRowReceiver{RowReceiver: outputs[i]}
}

proc, err := newProcessor(ctx, &f.FlowCtx, ps.ProcessorID, &ps.Core, &ps.Post, inputs, outputs)
if err != nil {
return nil, err
Expand All @@ -315,7 +326,8 @@ func (f *Flow) makeProcessor(
// Initialize any routers (the setupRouter case above) and outboxes.
types := proc.OutputTypes()
for _, o := range outputs {
switch o := o.(type) {
copier := o.(*copyingRowReceiver)
switch o := copier.RowReceiver.(type) {
case router:
o.init(&f.FlowCtx, types)
case *outbox:
Expand Down
Loading

0 comments on commit 5d2feac

Please sign in to comment.