Skip to content

Commit

Permalink
Skip the partition columns in table write
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Dec 18, 2023
1 parent afaaef3 commit 18bb57d
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 2 deletions.
52 changes: 50 additions & 2 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,46 @@ namespace facebook::velox::connector::hive {

namespace {

RowVectorPtr makeRowVector(
std::vector<std::string> childNames,
const std::vector<VectorPtr>& children,
int32_t numRows,
velox::memory::MemoryPool* pool) {
std::vector<std::shared_ptr<const Type>> childTypes;
childTypes.resize(children.size());
for (int i = 0; i < children.size(); i++) {
childTypes[i] = children[i]->type();
}
auto rowType = ROW(std::move(childNames), std::move(childTypes));
return std::make_shared<RowVector>(
pool, rowType, BufferPtr(nullptr), numRows, std::move(children));
}

RowVectorPtr filterPartitionColumns(
const std::vector<column_index_t>& partitonCols,
const RowVectorPtr& input,
velox::memory::MemoryPool* pool) {
std::vector<std::string> childNames;
std::vector<VectorPtr> childVectors;
auto dataColumns = input->childrenSize() - partitonCols.size();
childNames.reserve(dataColumns);
childVectors.reserve(dataColumns);

auto type = facebook::velox::asRowType(input->type());

for (uint32_t i = 0; i < input->childrenSize(); i++) {
if (std::find(partitonCols.begin(), partitonCols.end(), i) ==
partitonCols.end()) {
auto child = input->childAt(i);
childNames.push_back(type->nameOf(i));
childVectors.push_back(child);
}
}

return makeRowVector(
std::move(childNames), std::move(childVectors), input->size(), pool);
}

// Returns a subset of column indices corresponding to partition keys.
std::vector<column_index_t> getPartitionChannels(
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
Expand Down Expand Up @@ -379,7 +419,11 @@ void HiveDataSink::appendData(RowVectorPtr input) {
// must be zero.
if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) {
const auto index = ensureWriter(HiveWriterId{0});
write(index, input);
auto dataInput = filterPartitionColumns(
getPartitionChannels(insertTableHandle_),
input,
connectorQueryCtx_->memoryPool());
write(index, dataInput);
return;
}

Expand All @@ -394,7 +438,11 @@ void HiveDataSink::appendData(RowVectorPtr input) {
RowVectorPtr writerInput = partitionSize == input->size()
? input
: exec::wrap(partitionSize, partitionRows_[index], input);
write(index, writerInput);
auto dataInput = filterPartitionColumns(
getPartitionChannels(insertTableHandle_),
writerInput,
connectorQueryCtx_->memoryPool());
write(index, dataInput);
}
}

Expand Down
43 changes: 43 additions & 0 deletions velox/exec/tests/TableWriteTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include "velox/common/testutil/TestValue.h"
#include "velox/connectors/hive/HiveConfig.h"
#include "velox/connectors/hive/HivePartitionFunction.h"
#include "velox/dwio/common/BufferedInput.h"
#include "velox/dwio/common/WriterFactory.h"
#include "velox/dwio/parquet/reader/ParquetReader.h"
#include "velox/exec/PlanNodeStats.h"
#include "velox/exec/TableWriter.h"
#include "velox/exec/tests/utils/AssertQueryBuilder.h"
Expand Down Expand Up @@ -1985,6 +1987,47 @@ TEST_P(PartitionedWithoutBucketTableWriterTest, fromSinglePartitionToMultiple) {
"SELECT * FROM tmp");
}

TEST_P(PartitionedTableWriterTest, removePartitionColumns) {
SCOPED_TRACE(testParam_.toString());
auto numPartitions = 5;

auto rowType = ROW({"p0", "c3", "c5"}, {BIGINT(), REAL(), VARCHAR()});
std::vector<std::string> partitionKeys = {"p0"};

RowVectorPtr vector;
vector = makeRowVector(
rowType->names(),
{makeFlatVector<int64_t>(numPartitions, [&](auto row) { return row; }),
makeFlatVector<float>(
numPartitions, [&](auto row) { return row + 33.23; }),
makeFlatVector<StringView>(numPartitions, [&](auto row) {
return StringView::makeInline(fmt::format("bucket_{}", row * 3));
})});

auto outputDirectory = TempDirectoryPath::create();
auto plan = createInsertPlan(
PlanBuilder().values({vector}),
rowType,
outputDirectory->path,
partitionKeys,
bucketProperty_,
compressionKind_,
getNumWriters(),
connector::hive::LocationHandle::TableType::kNew,
commitStrategy_);
runQueryWithWriterConfigs(plan);

dwio::common::ReaderOptions readerOpts{pool()};
auto bufferedInput = std::make_unique<BufferedInput>(
std::make_shared<LocalReadFile>(outputDirectory->path),
readerOpts.getMemoryPool());
auto reader = std::make_unique<facebook::velox::parquet::ParquetReader>(
std::move(bufferedInput), readerOpts);
auto dataType = ROW({"c3", "c5"}, {REAL(), VARCHAR()});

EXPECT_EQ(reader->rowType()->toString(), dataType->toString());
}

TEST_P(PartitionedTableWriterTest, maxPartitions) {
SCOPED_TRACE(testParam_.toString());
const int32_t maxPartitions = 100;
Expand Down

0 comments on commit 18bb57d

Please sign in to comment.