Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix min/max(x, n) #8311

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion velox/core/Expressions.h
Original file line number Diff line number Diff line change
Expand Up @@ -667,10 +667,20 @@ class TypedExprs {
return dynamic_cast<const ConstantTypedExpr*>(expr.get()) != nullptr;
}

/// Returns 'expr' as ConstantTypedExprPtr or null if not field access
/// Returns 'expr' as ConstantTypedExprPtr or null if not a constant
/// expression.
static ConstantTypedExprPtr asConstant(const TypedExprPtr& expr) {
return std::dynamic_pointer_cast<const ConstantTypedExpr>(expr);
}

/// Returns true if 'expr' is a lambda expression.
static bool isLambda(const TypedExprPtr& expr) {
return dynamic_cast<const LambdaTypedExpr*>(expr.get()) != nullptr;
}

/// Returns 'expr' as LambdaTypedExprPtr or null if not a lambda expression.
static LambdaTypedExprPtr asLambda(const TypedExprPtr& expr) {
return std::dynamic_pointer_cast<const LambdaTypedExpr>(expr);
}
};
} // namespace facebook::velox::core
6 changes: 5 additions & 1 deletion velox/exec/Aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ class Aggregate {
// 'result' and its parts are expected to be singly referenced. If
// other threads or operators hold references that they would use
// after 'result' has been updated by this, effects will be unpredictable.
// This method should not have side effects, i.e., calling this method
// doesn't change the content of the accumulators. This is needed for an
// optimization in Window operator where aggregations for expanding frames are
// computed incrementally.
virtual void
extractValues(char** groups, int32_t numGroups, VectorPtr* result) = 0;

Expand All @@ -216,7 +220,7 @@ class Aggregate {
// @param numGroups Number of groups to extract results from.
// @param result The result vector to store the results in.
//
// See comment on 'result' in extractValues().
// See comment on 'result' and side effects in extractValues().
virtual void
extractAccumulators(char** groups, int32_t numGroups, VectorPtr* result) = 0;

Expand Down
177 changes: 154 additions & 23 deletions velox/functions/lib/aggregates/tests/utils/AggregationTestBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"
#include "velox/exec/tests/utils/TempFilePath.h"
#include "velox/expression/Expr.h"
#include "velox/expression/SignatureBinder.h"

using facebook::velox::exec::Spiller;
Expand Down Expand Up @@ -621,6 +622,35 @@ void AggregationTestBase::testAggregations(
testWithTableScan);
}

namespace {

std::vector<VectorPtr> extractArgColumns(
const core::CallTypedExprPtr& aggregateExpr,
const RowVectorPtr& input,
memory::MemoryPool* pool) {
auto type = input->type()->asRow();
std::vector<VectorPtr> columns;
for (const auto& arg : aggregateExpr->inputs()) {
if (auto field = core::TypedExprs::asFieldAccess(arg)) {
columns.push_back(input->childAt(field->name()));
}
if (core::TypedExprs::isConstant(arg)) {
auto constant = core::TypedExprs::asConstant(arg);
columns.push_back(constant->toConstantVector(pool));
}
if (auto lambda = core::TypedExprs::asLambda(arg)) {
for (const auto& name : lambda->signature()->names()) {
if (auto captureIndex = type.getChildIdxIfExists(name)) {
columns.push_back(input->childAt(captureIndex.value()));
}
}
}
}
return columns;
}

} // namespace

RowVectorPtr AggregationTestBase::validateStreamingInTestAggregations(
const std::function<void(PlanBuilder&)>& makeSource,
const std::vector<std::string>& aggregates,
Expand Down Expand Up @@ -921,6 +951,11 @@ void AggregationTestBase::testAggregations(
assertResults,
config);

if (testIncremental_) {
SCOPED_TRACE("testIncrementalAggregation");
testIncrementalAggregation(makeSource, aggregates, config);
}

if (testWithTableScan) {
SCOPED_TRACE("Test reading input from table scan");
testReadFromFiles(
Expand Down Expand Up @@ -975,6 +1010,112 @@ VectorPtr AggregationTestBase::testStreaming(
config);
}

namespace {

constexpr int kRowSizeOffset = 8;
constexpr int kOffset = kRowSizeOffset + 8;

std::unique_ptr<exec::Aggregate> createAggregateFunction(
const std::string& functionName,
const std::vector<TypePtr>& inputTypes,
HashStringAllocator& allocator,
const std::unordered_map<std::string, std::string>& config) {
auto [intermediateType, finalType] = getResultTypes(functionName, inputTypes);
core::QueryConfig queryConfig({config});
auto func = exec::Aggregate::create(
functionName,
core::AggregationNode::Step::kSingle,
inputTypes,
finalType,
queryConfig);
func->setAllocator(&allocator);
func->setOffsets(kOffset, 0, 1, kRowSizeOffset);

VELOX_CHECK(intermediateType->equivalent(
*func->intermediateType(functionName, inputTypes)));
VELOX_CHECK(finalType->equivalent(*func->resultType()));

return func;
}

} // namespace

void AggregationTestBase::testIncrementalAggregation(
const std::function<void(exec::test::PlanBuilder&)>& makeSource,
const std::vector<std::string>& aggregates,
const std::unordered_map<std::string, std::string>& config) {
PlanBuilder builder(pool());
makeSource(builder);
auto data = AssertQueryBuilder(builder.planNode())
.configs(config)
.copyResults(pool());
auto inputSize = data->size();
if (inputSize == 0) {
return;
}

auto& aggregationNode = static_cast<const core::AggregationNode&>(
*builder.singleAggregation({}, aggregates).planNode());
for (int i = 0; i < aggregationNode.aggregates().size(); ++i) {
const auto& aggregate = aggregationNode.aggregates()[i];
const auto& aggregateExpr = aggregate.call;
const auto& functionName = aggregateExpr->name();
auto input = extractArgColumns(aggregateExpr, data, pool());

HashStringAllocator allocator(pool());
std::vector<core::LambdaTypedExprPtr> lambdas;
for (const auto& arg : aggregate.call->inputs()) {
if (auto lambda =
std::dynamic_pointer_cast<const core::LambdaTypedExpr>(arg)) {
lambdas.push_back(lambda);
}
}
auto queryCtxConfig = config;
auto func = createAggregateFunction(
functionName, aggregate.rawInputTypes, allocator, config);
auto queryCtx = std::make_shared<core::QueryCtx>(
nullptr, core::QueryConfig{queryCtxConfig});

std::shared_ptr<core::ExpressionEvaluator> expressionEvaluator;
if (!lambdas.empty()) {
expressionEvaluator = std::make_shared<exec::SimpleExpressionEvaluator>(
queryCtx.get(), allocator.pool());
func->setLambdaExpressions(lambdas, expressionEvaluator);
}

std::vector<char> group(kOffset + func->accumulatorFixedWidthSize());
std::vector<char*> groups(inputSize, group.data());
std::vector<vector_size_t> indices(1, 0);
func->initializeNewGroups(groups.data(), indices);
func->addSingleGroupRawInput(
group.data(), SelectivityVector(inputSize), input, false);

// Extract intermediate result from the same accumulator twice and expect
// results to be the same.
auto intermediateType =
func->intermediateType(functionName, aggregate.rawInputTypes);
auto intermediateResult1 = BaseVector::create(intermediateType, 1, pool());
auto intermediateResult2 = BaseVector::create(intermediateType, 1, pool());
func->extractAccumulators(groups.data(), 1, &intermediateResult1);
func->extractAccumulators(groups.data(), 1, &intermediateResult2);
velox::test::assertEqualVectors(intermediateResult1, intermediateResult2);

// Extract values from the same accumulator twice and expect results to be
// the same.
auto result1 = BaseVector::create(func->resultType(), 1, pool());
auto result2 = BaseVector::create(func->resultType(), 1, pool());
func->extractValues(groups.data(), 1, &result1);
func->extractValues(groups.data(), 1, &result2);

// Destroy accumulators to avoid memory leak.
if (func->accumulatorUsesExternalMemory()) {
func->destroy(folly::Range(groups.data(), 1));
}

velox::test::assertEqualVectors(result1, result2);
}
}

VectorPtr AggregationTestBase::testStreaming(
const std::string& functionName,
bool testGlobal,
Expand All @@ -983,28 +1124,16 @@ VectorPtr AggregationTestBase::testStreaming(
const std::vector<VectorPtr>& rawInput2,
vector_size_t rawInput2Size,
const std::unordered_map<std::string, std::string>& config) {
constexpr int kRowSizeOffset = 8;
constexpr int kOffset = kRowSizeOffset + 8;
std::vector<TypePtr> rawInputTypes(rawInput1.size());
std::transform(
rawInput1.begin(),
rawInput1.end(),
rawInputTypes.begin(),
[](const VectorPtr& vec) { return vec->type(); });

HashStringAllocator allocator(pool());
std::vector<TypePtr> rawInputTypes;
for (auto& vec : rawInput1) {
rawInputTypes.push_back(vec->type());
}
auto [intermediateType, finalType] =
getResultTypes(functionName, rawInputTypes);
auto createFunction = [&, &finalType = finalType] {
core::QueryConfig queryConfig({config});
auto func = exec::Aggregate::create(
functionName,
core::AggregationNode::Step::kSingle,
rawInputTypes,
finalType,
queryConfig);
func->setAllocator(&allocator);
func->setOffsets(kOffset, 0, 1, kRowSizeOffset);
return func;
};
auto func = createFunction();
auto func =
createAggregateFunction(functionName, rawInputTypes, allocator, config);
int maxRowCount = std::max(rawInput1Size, rawInput2Size);
std::vector<char> group(kOffset + func->accumulatorFixedWidthSize());
std::vector<char*> groups(maxRowCount, group.data());
Expand All @@ -1017,6 +1146,7 @@ VectorPtr AggregationTestBase::testStreaming(
func->addRawInput(
groups.data(), SelectivityVector(rawInput1Size), rawInput1, false);
}
auto intermediateType = func->intermediateType(functionName, rawInputTypes);
auto intermediate = BaseVector::create(intermediateType, 1, pool());
func->extractAccumulators(groups.data(), 1, &intermediate);
// Destroy accumulators to avoid memory leak.
Expand All @@ -1025,7 +1155,8 @@ VectorPtr AggregationTestBase::testStreaming(
}

// Create a new function picking up the intermediate result.
auto func2 = createFunction();
auto func2 =
createAggregateFunction(functionName, rawInputTypes, allocator, config);
func2->initializeNewGroups(groups.data(), indices);
if (testGlobal) {
func2->addSingleGroupIntermediateResults(
Expand All @@ -1042,7 +1173,7 @@ VectorPtr AggregationTestBase::testStreaming(
func2->addRawInput(
groups.data(), SelectivityVector(rawInput2Size), rawInput2, false);
}
auto result = BaseVector::create(finalType, 1, pool());
auto result = BaseVector::create(func2->resultType(), 1, pool());
func2->extractValues(groups.data(), 1, &result);
// Destroy accumulators to avoid memory leak.
if (func2->accumulatorUsesExternalMemory()) {
Expand Down
20 changes: 20 additions & 0 deletions velox/functions/lib/aggregates/tests/utils/AggregationTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,20 @@ class AggregationTestBase : public exec::test::OperatorTestBase {
testStreaming_ = true;
}

void disableTestIncremental() {
testIncremental_ = false;
}

void enableTestIncremental() {
testIncremental_ = true;
}

/// Whether testStreaming should be called in testAggregations.
bool testStreaming_{true};

/// Whether testIncrementalAggregation should be called in testAggregations.
bool testIncremental_{true};

private:
// Test streaming use case where raw inputs are added after intermediate
// results. Return the result of aggregates if successful.
Expand All @@ -265,6 +276,15 @@ class AggregationTestBase : public exec::test::OperatorTestBase {
vector_size_t rawInput2Size,
const std::unordered_map<std::string, std::string>& config = {});

// Test to ensure that when extractValues() or extractAccumulators() is called
// twice on the same accumulator, the extracted results are the same. This
// ensures that extractValues() and extractAccumulators() are free of side
// effects.
void testIncrementalAggregation(
const std::function<void(exec::test::PlanBuilder&)>& makeSource,
const std::vector<std::string>& aggregates,
const std::unordered_map<std::string, std::string>& config = {});

void testAggregationsImpl(
std::function<void(exec::test::PlanBuilder&)> makeSource,
const std::vector<std::string>& groupingKeys,
Expand Down
34 changes: 18 additions & 16 deletions velox/functions/prestosql/aggregates/MinMaxAggregates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,17 +545,17 @@ std::pair<vector_size_t*, vector_size_t*> rawOffsetAndSizes(
template <typename T, typename Compare>
struct MinMaxNAccumulator {
int64_t n{0};
std::priority_queue<T, std::vector<T, StlAllocator<T>>, Compare> topValues;
std::vector<T, StlAllocator<T>> heapValues;

explicit MinMaxNAccumulator(HashStringAllocator* allocator)
: topValues{Compare{}, StlAllocator<T>(allocator)} {}
: heapValues{StlAllocator<T>(allocator)} {}

int64_t getN() const {
return n;
}

size_t size() const {
return topValues.size();
return heapValues.size();
}

void checkAndSetN(DecodedVector& decodedN, vector_size_t row) {
Expand Down Expand Up @@ -584,25 +584,27 @@ struct MinMaxNAccumulator {
}

void compareAndAdd(T value, Compare& comparator) {
if (topValues.size() < n) {
topValues.push(value);
if (heapValues.size() < n) {
heapValues.push_back(value);
std::push_heap(heapValues.begin(), heapValues.end(), comparator);
} else {
const auto& topValue = topValues.top();
const auto& topValue = heapValues.front();
if (comparator(value, topValue)) {
topValues.pop();
topValues.push(value);
std::pop_heap(heapValues.begin(), heapValues.end(), comparator);
heapValues.back() = value;
std::push_heap(heapValues.begin(), heapValues.end(), comparator);
}
}
}

/// Moves all values from 'topValues' into 'rawValues' buffer. The queue of
/// 'topValues' will be empty after this call.
void extractValues(T* rawValues, vector_size_t offset) {
const vector_size_t size = topValues.size();
for (auto i = size - 1; i >= 0; --i) {
rawValues[offset + i] = topValues.top();
topValues.pop();
/// Copy all values from 'topValues' into 'rawValues' buffer. The heap remains
/// unchanged after the call.
void extractValues(T* rawValues, vector_size_t offset, Compare& comparator) {
std::sort_heap(heapValues.begin(), heapValues.end(), comparator);
for (int64_t i = heapValues.size() - 1; i >= 0; --i) {
rawValues[offset + i] = heapValues[i];
}
std::make_heap(heapValues.begin(), heapValues.end(), comparator);
}
};

Expand Down Expand Up @@ -775,7 +777,7 @@ class MinMaxNAggregateBase : public exec::Aggregate {
if (rawNs != nullptr) {
rawNs[i] = accumulator->n;
}
accumulator->extractValues(rawValues, offset);
accumulator->extractValues(rawValues, offset, comparator_);

offset += size;
}
Expand Down