Skip to content

Commit

Permalink
Optimize row_constructor Presto function (#1921)
Browse files Browse the repository at this point in the history
Summary:
row_constructor function used to copy data in cases when it was not necessary.
Fix this by using `context->moveOrCopyResult`.

Also, optimize RowVector::copy to copy one column at a time instead of one cell
(value) at a time.

Added a benchmark.

Before:

```
============================================================================
[...]unctions/prestosql/benchmarks/Row.cpp     relative  time/iter   iters/s
============================================================================
noCopy                                                    791.06us     1.26K
copyMostlyFlat                                              4.28ms    233.54
copyMostlyConst                                            24.67ms     40.53
```

After

```
============================================================================
[...]unctions/prestosql/benchmarks/Row.cpp     relative  time/iter   iters/s
============================================================================
noCopy                                                    760.95us     1.31K
copyMostlyFlat                                              1.69ms    592.85
copyMostlyConst                                             1.50ms    664.71
```

Pull Request resolved: #1921

Reviewed By: kagamiori

Differential Revision: D37624916

Pulled By: mbasmanova

fbshipit-source-id: 868dbcc8ac8811fa14cb42b8966567130110eb52
  • Loading branch information
mbasmanova authored and facebook-github-bot committed Jul 6, 2022
1 parent 262fd65 commit e4ce5a6
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 71 deletions.
7 changes: 1 addition & 6 deletions velox/functions/prestosql/RowFunction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,7 @@ class RowFunction : public exec::VectorFunction {
rows.size(),
std::move(argsCopy),
0 /*nullCount*/);
if (*result) {
BaseVector::ensureWritable(rows, outputType, context->pool(), result);
(*result)->copy(row.get(), rows, nullptr);
} else {
*result = std::move(row);
}
context->moveOrCopyResult(row, rows, *result);
}

bool isDefaultNullBehavior() const override {
Expand Down
4 changes: 4 additions & 0 deletions velox/functions/prestosql/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,7 @@ target_link_libraries(velox_functions_benchmarks_string_writer_no_nulls
add_executable(velox_functions_prestosql_benchmarks_zip ZipBenchmark.cpp)
target_link_libraries(velox_functions_prestosql_benchmarks_zip
${BENCHMARK_DEPENDENCIES})

add_executable(velox_functions_prestosql_benchmarks_row Row.cpp)
target_link_libraries(velox_functions_prestosql_benchmarks_row
${BENCHMARK_DEPENDENCIES})
76 changes: 76 additions & 0 deletions velox/functions/prestosql/benchmarks/Row.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) Facebook, Inc. and its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <folly/Benchmark.h>
#include "velox/functions/Macros.h"
#include "velox/functions/Registerer.h"
#include "velox/functions/lib/benchmarks/FunctionBenchmarkBase.h"
#include "velox/functions/prestosql/registration/RegistrationFunctions.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;

namespace {

class RowFunctionBenchmark : public functions::test::FunctionBenchmarkBase {
public:
RowFunctionBenchmark() : FunctionBenchmarkBase() {
functions::prestosql::registerAllScalarFunctions();
}

void run(const std::string& expr) {
folly::BenchmarkSuspender suspender;
vector_size_t size = 1'000;

auto rowVector = vectorMaker_.rowVector({
vectorMaker_.flatVector<int64_t>(size, [](auto row) { return row; }),
vectorMaker_.flatVector<double>(
size, [](auto row) { return row * 0.1; }),
});

auto exprSet = compileExpression(expr, rowVector->type());
suspender.dismiss();

int cnt = 0;
for (auto i = 0; i < 100; i++) {
cnt += evaluate(exprSet, rowVector)->size();
}
folly::doNotOptimizeAway(cnt);
}
};

BENCHMARK(noCopy) {
RowFunctionBenchmark benchmark;
benchmark.run("row_constructor(c0, c1)");
}

BENCHMARK(copyMostlyFlat) {
RowFunctionBenchmark benchmark;
benchmark.run(
"if(c0 > 100, row_constructor(c0, c1), row_constructor(1, 0.1))");
}

BENCHMARK(copyMostlyConst) {
RowFunctionBenchmark benchmark;
benchmark.run(
"if(c0 < 100, row_constructor(c0, c1), row_constructor(1, 0.1))");
}

} // namespace

int main(int /*argc*/, char** /*argv*/) {
folly::runBenchmarks();
return 0;
}
157 changes: 92 additions & 65 deletions velox/vector/ComplexVector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,76 +138,103 @@ void RowVector::copy(
vector_size_t targetIndex,
vector_size_t sourceIndex,
vector_size_t count) {
auto sourceValue = source->wrappedVector();
if (sourceValue->isConstantEncoding()) {
// A null constant does not have a value vector, so wrappedVector
// returns the constant.
VELOX_CHECK(sourceValue->isNullAt(0));
for (auto i = 0; i < count; ++i) {
setNull(targetIndex + i, true);
}
return;
}
if (childrenSize_ == 0) {
return;
SelectivityVector rows(targetIndex + count);
rows.setValidRange(0, targetIndex, false);
rows.updateBounds();

BufferPtr indices;
vector_size_t* toSourceRow = nullptr;
if (sourceIndex != targetIndex) {
indices =
AlignedBuffer::allocate<vector_size_t>(targetIndex + count, pool_);
toSourceRow = indices->asMutable<vector_size_t>();
std::iota(
toSourceRow + targetIndex,
toSourceRow + targetIndex + count,
sourceIndex);
}
VELOX_CHECK_EQ(sourceValue->encoding(), VectorEncoding::Simple::ROW);
auto sourceAsRow = sourceValue->asUnchecked<RowVector>();
VELOX_CHECK(children_.size() && children_[0]);
VELOX_DCHECK(BaseVector::length_ >= targetIndex + count);
vector_size_t childSize = this->childSize();
auto rowType = type()->as<TypeKind::ROW>();
SelectivityVector allRows;
for (int32_t i = 0; i < children_.size(); ++i) {
auto& child = children_[i];
if (child->isConstantEncoding()) {
if (!allRows.size()) {
// Initialize 'allRows' on first use.
allRows.resize(childSize);
allRows.clearAll();
}
BaseVector::ensureWritable(allRows, rowType.childAt(i), pool(), &child);
} else {
// Non-constants will become writable at their original size.
BaseVector::ensureWritable(
SelectivityVector::empty(), rowType.childAt(i), pool(), &child);

copy(source, rows, toSourceRow);
}

void RowVector::copy(
const BaseVector* source,
const SelectivityVector& rows,
const vector_size_t* toSourceRow) {
for (auto i = 0; i < children_.size(); ++i) {
BaseVector::ensureWritable(
rows, type()->asRow().childAt(i), pool(), &children_[i]);
}

// Copy non-null values.
SelectivityVector nonNullRows = rows;

SelectivityVector allRows(source->size());
DecodedVector decodedSource(*source, allRows);
if (decodedSource.isIdentityMapping()) {
if (source->mayHaveNulls()) {
auto rawNulls = source->rawNulls();
rows.applyToSelected([&](auto row) {
auto idx = toSourceRow ? toSourceRow[row] : row;
if (bits::isBitNull(rawNulls, idx)) {
nonNullRows.setValid(row, false);
}
});
nonNullRows.updateBounds();
}
if (childSize < targetIndex + count) {
child->resize(targetIndex + count);

auto rowSource = source->loadedVector()->as<RowVector>();
for (auto i = 0; i < childrenSize_; ++i) {
children_[i]->copy(
rowSource->childAt(i)->loadedVector(), nonNullRows, toSourceRow);
}
}
// Shortcut for insert of non-null at end of children.
if (!source->mayHaveNulls() && targetIndex == childSize) {
if (sourceAsRow == source) {
appendToChildren(sourceAsRow, sourceIndex, count, targetIndex);
} else {
for (int32_t i = 0; i < count; ++i) {
appendToChildren(
sourceAsRow,
source->wrappedIndex(sourceIndex + i),
1,
childSize + i);
}
} else {
auto nullIndices = decodedSource.nullIndices();
auto nulls = decodedSource.nulls();

if (nulls) {
rows.applyToSelected([&](auto row) {
auto idx = toSourceRow ? toSourceRow[row] : row;
idx = nullIndices ? nullIndices[idx] : idx;
if (bits::isBitNull(nulls, idx)) {
nonNullRows.setValid(row, false);
}
});
nonNullRows.updateBounds();
}

// Copy baseSource[indices[toSource[row]]] into row.
auto indices = decodedSource.indices();
BufferPtr mappedIndices;
vector_size_t* rawMappedIndices = nullptr;
if (toSourceRow) {
mappedIndices =
AlignedBuffer::allocate<vector_size_t>(rows.size(), pool_);
rawMappedIndices = mappedIndices->asMutable<vector_size_t>();
nonNullRows.applyToSelected(
[&](auto row) { rawMappedIndices[row] = indices[toSourceRow[row]]; });
}

auto baseSource = decodedSource.base()->as<RowVector>();
for (auto i = 0; i < childrenSize_; ++i) {
children_[i]->copy(
baseSource->childAt(i)->loadedVector(),
nonNullRows,
rawMappedIndices ? rawMappedIndices : indices);
}
return;
}
auto setNotNulls = mayHaveNulls() || source->mayHaveNulls();
for (int32_t i = 0; i < count; ++i) {
auto childIndex = targetIndex + i;
if (source->isNullAt(sourceIndex + i)) {
setNull(childIndex, true);
} else {
if (setNotNulls) {
setNull(childIndex, false);
}
vector_size_t wrappedIndex = source->wrappedIndex(sourceIndex + i);
for (int32_t j = 0; j < children_.size(); ++j) {
childAt(j)->copy(
sourceAsRow->childAt(j)->loadedVector(),
childIndex,
wrappedIndex,
1);
}

if (nulls_) {
nonNullRows.clearNulls(nulls_);
}

// Copy nulls.
if (source->mayHaveNulls()) {
SelectivityVector nullRows = rows;
nullRows.deselect(nonNullRows);
if (nullRows.hasSelections()) {
ensureNulls();
nullRows.setNulls(nulls_);
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions velox/vector/ComplexVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ class RowVector : public BaseVector {
vector_size_t sourceIndex,
vector_size_t count) override;

void copy(
const BaseVector* source,
const SelectivityVector& rows,
const vector_size_t* toSourceRow) override;

void move(vector_size_t source, vector_size_t target) override;

uint64_t retainedSize() const override {
Expand Down
7 changes: 7 additions & 0 deletions velox/vector/SelectivityVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ class SelectivityVector {
}
}

/// Set null bits in 'nulls' for active rows.
void setNulls(BufferPtr& nulls) const {
VELOX_CHECK_NOT_NULL(nulls);
bits::andWithNegatedBits(
nulls->asMutable<uint64_t>(), bits_.data(), begin_, end_);
}

/**
* Merges the valid vector of another SelectivityVector by or'ing
* them together. This is used to support memoization where a state
Expand Down

0 comments on commit e4ce5a6

Please sign in to comment.