Skip to content

Commit

Permalink
Add converter from VirtualTable to ValuesNode (#1437)
Browse files Browse the repository at this point in the history
Summary: Pull Request resolved: #1437

Reviewed By: laithsakka

Differential Revision: D36248512

Pulled By: mbasmanova

fbshipit-source-id: 8193ebe2988445da9d3df7f1911206b1b6d18c9f
  • Loading branch information
ZJie1 authored and facebook-github-bot committed May 9, 2022
1 parent c458309 commit 276237c
Show file tree
Hide file tree
Showing 8 changed files with 473 additions and 88 deletions.
10 changes: 10 additions & 0 deletions velox/substrait/SubstraitToVeloxExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ SubstraitVeloxExprConverter::toVeloxExpr(
return std::make_shared<core::ConstantTypedExpr>(variant(sLit.boolean()));
case ::substrait::Expression_Literal::LiteralTypeCase::kI64:
return std::make_shared<core::ConstantTypedExpr>(variant(sLit.i64()));
return std::make_shared<core::ConstantTypedExpr>(sLit.boolean());
case ::substrait::Expression_Literal::LiteralTypeCase::kI32:
return std::make_shared<core::ConstantTypedExpr>(sLit.i32());
case ::substrait::Expression_Literal::LiteralTypeCase::kNull: {
auto subType = subParser_->parseType(sLit.null());
auto veloxType = toVeloxType(subType->type);

return std::make_shared<core::ConstantTypedExpr>(
veloxType, variant::null(veloxType->kind()));
}
default:
VELOX_NYI(
"Substrait conversion not supported for type case '{}'", typeCase);
Expand Down
235 changes: 180 additions & 55 deletions velox/substrait/SubstraitToVeloxPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,71 @@

#include "velox/substrait/SubstraitToVeloxPlan.h"
#include "velox/substrait/TypeUtils.h"
#include "velox/type/Type.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/FlatVector.h"

namespace facebook::velox::substrait {

std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::AggregateRel& sAgg) {
std::shared_ptr<const core::PlanNode> childNode;
if (sAgg.has_input()) {
childNode = toVeloxPlan(sAgg.input());
namespace {
template <TypeKind KIND>
VectorPtr setVectorFromVariantsByKind(
const std::vector<velox::variant>& value,
memory::MemoryPool* pool) {
using T = typename TypeTraits<KIND>::NativeType;

auto flatVector = std::dynamic_pointer_cast<FlatVector<T>>(
BaseVector::create(CppToType<T>::create(), value.size(), pool));

for (vector_size_t i = 0; i < value.size(); i++) {
if (value[i].isNull()) {
flatVector->setNull(i, true);
} else {
flatVector->set(i, value[i].value<T>());
}
}
return flatVector;
}

template <>
VectorPtr setVectorFromVariantsByKind<TypeKind::VARBINARY>(
const std::vector<velox::variant>& value,
memory::MemoryPool* pool) {
throw std::invalid_argument("Return of VARBINARY data is not supported");
}

template <>
VectorPtr setVectorFromVariantsByKind<TypeKind::VARCHAR>(
const std::vector<velox::variant>& value,
memory::MemoryPool* pool) {
auto flatVector = std::dynamic_pointer_cast<FlatVector<StringView>>(
BaseVector::create(VARCHAR(), value.size(), pool));

for (vector_size_t i = 0; i < value.size(); i++) {
if (value[i].isNull()) {
flatVector->setNull(i, true);
} else {
flatVector->set(i, StringView(value[i].value<Varchar>()));
}
}
return flatVector;
}

VectorPtr setVectorFromVariants(
const TypePtr& type,
const std::vector<velox::variant>& value,
velox::memory::MemoryPool* pool) {
return VELOX_DYNAMIC_SCALAR_TYPE_DISPATCH(
setVectorFromVariantsByKind, type->kind(), value, pool);
}
} // namespace

core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::AggregateRel& aggRel,
memory::MemoryPool* pool) {
core::PlanNodePtr childNode;
if (aggRel.has_input()) {
childNode = toVeloxPlan(aggRel.input(), pool);
} else {
VELOX_FAIL("Child Rel is expected in AggregateRel.");
}
Expand All @@ -32,8 +89,9 @@ std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
auto inputTypes = childNode->outputType();
std::vector<std::shared_ptr<const core::FieldAccessTypedExpr>>
veloxGroupingExprs;
const auto& groupings = sAgg.groupings();

const auto& groupings = aggRel.groupings();
int inputPlanNodeId = planNodeId_ - 1;
// The index of output column.
int outIdx = 0;
for (const auto& grouping : groupings) {
Expand All @@ -53,16 +111,16 @@ std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
core::AggregationNode::Step aggStep;
// Project expressions are used to conduct a pre-projection before
// Aggregation if needed.
std::vector<std::shared_ptr<const core::ITypedExpr>> projectExprs;
std::vector<core::TypedExprPtr> projectExprs;
std::vector<std::string> projectOutNames;
std::vector<std::shared_ptr<const core::CallTypedExpr>> aggExprs;
aggExprs.reserve(sAgg.measures().size());
aggExprs.reserve(aggRel.measures().size());

// Construct Velox Aggregate expressions.
for (const auto& sMea : sAgg.measures()) {
for (const auto& sMea : aggRel.measures()) {
auto aggFunction = sMea.measure();
// Get the params of this Aggregate function.
std::vector<std::shared_ptr<const core::ITypedExpr>> aggParams;
std::vector<core::TypedExprPtr> aggParams;
auto args = aggFunction.args();
aggParams.reserve(args.size());
for (auto arg : args) {
Expand Down Expand Up @@ -170,19 +228,20 @@ std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
}
}

std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::ProjectRel& sProject) {
std::shared_ptr<const core::PlanNode> childNode;
if (sProject.has_input()) {
childNode = toVeloxPlan(sProject.input());
core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::ProjectRel& projectRel,
memory::MemoryPool* pool) {
core::PlanNodePtr childNode;
if (projectRel.has_input()) {
childNode = toVeloxPlan(projectRel.input(), pool);
} else {
VELOX_FAIL("Child Rel is expected in ProjectRel.");
}

// Construct Velox Expressions.
auto projectExprs = sProject.expressions();
auto projectExprs = projectRel.expressions();
std::vector<std::string> projectNames;
std::vector<std::shared_ptr<const core::ITypedExpr>> expressions;
std::vector<core::TypedExprPtr> expressions;
projectNames.reserve(projectExprs.size());
expressions.reserve(projectExprs.size());

Expand All @@ -202,26 +261,28 @@ std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
return projectNode;
}

std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::FilterRel& sFilter) {
std::shared_ptr<const core::PlanNode> childNode;
if (sFilter.has_input()) {
childNode = toVeloxPlan(sFilter.input());
core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::FilterRel& filterRel,
memory::MemoryPool* pool) {
core::PlanNodePtr childNode;
if (filterRel.has_input()) {
childNode = toVeloxPlan(filterRel.input(), pool);
} else {
VELOX_FAIL("Child Rel is expected in FilterRel.");
}

const auto& inputType = childNode->outputType();
const auto& sExpr = sFilter.condition();
const auto& sExpr = filterRel.condition();

return std::make_shared<core::FilterNode>(
nextPlanNodeId(),
exprConverter_->toVeloxExpr(sExpr, inputType),
childNode);
}

std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::ReadRel& sRead,
memory::MemoryPool* pool,
u_int32_t& index,
std::vector<std::string>& paths,
std::vector<u_int64_t>& starts,
Expand Down Expand Up @@ -288,49 +349,113 @@ std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
}
auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));

auto tableScanNode = std::make_shared<core::TableScanNode>(
nextPlanNodeId(), outputType, tableHandle, assignments);
return tableScanNode;
if (sRead.has_virtual_table()) {
return toVeloxPlan(sRead, pool, outputType);

} else {
auto tableScanNode = std::make_shared<core::TableScanNode>(
nextPlanNodeId(), outputType, tableHandle, assignments);
return tableScanNode;
}
}

core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::ReadRel& readRel,
memory::MemoryPool* pool,
const RowTypePtr& type) {
::substrait::ReadRel_VirtualTable readVirtualTable = readRel.virtual_table();
int64_t numVectors = readVirtualTable.values_size();
int64_t numColumns = type->size();
int64_t valueFieldNums =
readVirtualTable.values(numVectors - 1).fields_size();
std::vector<RowVectorPtr> vectors;
vectors.reserve(numVectors);

int64_t batchSize = valueFieldNums / numColumns;

for (int64_t index = 0; index < numVectors; ++index) {
std::vector<VectorPtr> children;
::substrait::Expression_Literal_Struct rowValue =
readRel.virtual_table().values(index);
auto fieldSize = rowValue.fields_size();
VELOX_CHECK_EQ(fieldSize, batchSize * numColumns);

for (int64_t col = 0; col < numColumns; ++col) {
const TypePtr& outputChildType = type->childAt(col);
std::vector<variant> batchChild;
batchChild.reserve(batchSize);
for (int64_t batchId = 0; batchId < batchSize; batchId++) {
// each value in the batch
auto fieldIdx = col * batchSize + batchId;
::substrait::Expression_Literal field = rowValue.fields(fieldIdx);

auto expr = exprConverter_->toVeloxExpr(field);
if (auto constantExpr =
std::dynamic_pointer_cast<const core::ConstantTypedExpr>(
expr)) {
if (!constantExpr->hasValueVector()) {
batchChild.emplace_back(constantExpr->value());
} else {
VELOX_UNSUPPORTED(
"Values node with complex type values is not supported yet");
}
} else {
VELOX_FAIL("Expected constant expression");
}
}
children.emplace_back(
setVectorFromVariants(outputChildType, batchChild, pool));
}

vectors.emplace_back(
std::make_shared<RowVector>(pool, type, nullptr, batchSize, children));
}

return std::make_shared<core::ValuesNode>(nextPlanNodeId(), vectors);
}

std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::Rel& sRel) {
if (sRel.has_aggregate()) {
return toVeloxPlan(sRel.aggregate());
core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::Rel& rel,
memory::MemoryPool* pool) {
if (rel.has_aggregate()) {
return toVeloxPlan(rel.aggregate(), pool);
}
if (sRel.has_project()) {
return toVeloxPlan(sRel.project());
if (rel.has_project()) {
return toVeloxPlan(rel.project(), pool);
}
if (sRel.has_filter()) {
return toVeloxPlan(sRel.filter());
if (rel.has_filter()) {
return toVeloxPlan(rel.filter(), pool);
}
if (sRel.has_read()) {
return toVeloxPlan(sRel.read(), partitionIndex_, paths_, starts_, lengths_);
if (rel.has_read()) {
return toVeloxPlan(
rel.read(), pool, partitionIndex_, paths_, starts_, lengths_);
}
VELOX_NYI("Substrait conversion not supported for Rel.");
}

std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::RelRoot& sRoot) {
core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::RelRoot& root,
memory::MemoryPool* pool) {
// TODO: Use the names as the output names for the whole computing.
const auto& sNames = sRoot.names();
if (sRoot.has_input()) {
const auto& sRel = sRoot.input();
return toVeloxPlan(sRel);
const auto& names = root.names();
if (root.has_input()) {
const auto& rel = root.input();
return toVeloxPlan(rel, pool);
}
VELOX_FAIL("Input is expected in RelRoot.");
}

std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::Plan& sPlan) {
core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::Plan& substraitPlan,
memory::MemoryPool* pool) {
// Construct the function map based on the Substrait representation.
for (const auto& sExtension : sPlan.extensions()) {
if (!sExtension.has_extension_function()) {
for (const auto& extension : substraitPlan.extensions()) {
if (!extension.has_extension_function()) {
continue;
}
const auto& sFmap = sExtension.extension_function();
auto id = sFmap.function_anchor();
auto name = sFmap.name();
const auto& functionMap = extension.extension_function();
auto id = functionMap.function_anchor();
auto name = functionMap.name();
functionMap_[id] = name;
}

Expand All @@ -339,12 +464,12 @@ std::shared_ptr<const core::PlanNode> SubstraitVeloxPlanConverter::toVeloxPlan(
std::make_shared<SubstraitVeloxExprConverter>(subParser_, functionMap_);

// In fact, only one RelRoot or Rel is expected here.
for (const auto& sRel : sPlan.relations()) {
if (sRel.has_root()) {
return toVeloxPlan(sRel.root());
for (const auto& rel : substraitPlan.relations()) {
if (rel.has_root()) {
return toVeloxPlan(rel.root(), pool);
}
if (sRel.has_rel()) {
return toVeloxPlan(sRel.rel());
if (rel.has_rel()) {
return toVeloxPlan(rel.rel(), pool);
}
}
VELOX_FAIL("RelRoot or Rel is expected in Plan.");
Expand Down

0 comments on commit 276237c

Please sign in to comment.