Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove partition columns before writing partitioned file #8089

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
91 changes: 79 additions & 12 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,48 @@ namespace facebook::velox::connector::hive {

namespace {

// Returns the type of non-partition keys.
static RowTypePtr getNonPartitionTypes(
const std::vector<column_index_t>& dataCols,
const RowTypePtr& inputType) {
std::vector<std::string> childNames;
std::vector<TypePtr> childTypes;
const auto& dataSize = dataCols.size();
childNames.reserve(dataSize);
childTypes.reserve(dataSize);
for (uint32_t i = 0; i < inputType->size(); i++) {
if (std::find(dataCols.cbegin(), dataCols.cend(), i) != dataCols.cend()) {
childNames.push_back(inputType->nameOf(i));
childTypes.push_back(inputType->childAt(i));
}
}

return ROW(std::move(childNames), std::move(childTypes));
}

// Filters out partition columns if there is any.
static RowVectorPtr makeDataInput(
const std::vector<column_index_t>& dataCols,
const RowVectorPtr& input) {
std::vector<VectorPtr> childVectors;
childVectors.reserve(dataCols.size());
for (uint32_t i = 0; i < input->childrenSize(); i++) {
if (std::find(dataCols.cbegin(), dataCols.cend(), i) != dataCols.cend()) {
childVectors.push_back(input->childAt(i));
}
}

return std::make_shared<RowVector>(
input->pool(),
getNonPartitionTypes(dataCols, asRowType(input->type())),
input->nulls(),
input->size(),
std::move(childVectors),
input->getNullCount());
}

// Returns a subset of column indices corresponding to partition keys.
std::vector<column_index_t> getPartitionChannels(
static std::vector<column_index_t> getPartitionChannels(
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
std::vector<column_index_t> channels;

Expand All @@ -53,6 +93,23 @@ std::vector<column_index_t> getPartitionChannels(
return channels;
}

// Returns the column indices of non-partition keys.
static std::vector<column_index_t> getNonPartitionChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(childrenSize - partitionChannels.size());

for (column_index_t i = 0; i < childrenSize; i++) {
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
partitionChannels.cend()) {
dataChannels.push_back(i);
}
}

return dataChannels;
}

std::string makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) {
Expand Down Expand Up @@ -310,6 +367,8 @@ HiveDataSink::HiveDataSink(
hiveConfig_->isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->sessionProperties()))
: nullptr),
dataChannels_(
getNonPartitionChannels(partitionChannels_, inputType_->size())),
bucketCount_(
insertTableHandle_->bucketProperty() == nullptr
? 0
Expand Down Expand Up @@ -342,13 +401,17 @@ HiveDataSink::HiveDataSink(
sortColumnIndices_.reserve(sortedProperty.size());
sortCompareFlags_.reserve(sortedProperty.size());
for (int i = 0; i < sortedProperty.size(); ++i) {
sortColumnIndices_.push_back(
inputType_->getChildIdx(sortedProperty.at(i)->sortColumn()));
sortCompareFlags_.push_back(
{sortedProperty.at(i)->sortOrder().isNullsFirst(),
sortedProperty.at(i)->sortOrder().isAscending(),
false,
CompareFlags::NullHandlingMode::kNullAsValue});
auto columnIndex =
getNonPartitionTypes(dataChannels_, inputType_)
->getChildIdxIfExists(sortedProperty.at(i)->sortColumn());
if (columnIndex.has_value()) {
sortColumnIndices_.push_back(columnIndex.value());
sortCompareFlags_.push_back(
{sortedProperty.at(i)->sortOrder().isNullsFirst(),
sortedProperty.at(i)->sortOrder().isAscending(),
false,
CompareFlags::NullHandlingMode::kNullAsValue});
}
}
}
}
Expand Down Expand Up @@ -403,8 +466,11 @@ void HiveDataSink::appendData(RowVectorPtr input) {

void HiveDataSink::write(size_t index, const VectorPtr& input) {
WRITER_NON_RECLAIMABLE_SECTION_GUARD(index);
writers_[index]->write(input);
writerInfo_[index]->numWrittenRows += input->size();
auto dataInput =
makeDataInput(dataChannels_, std::dynamic_pointer_cast<RowVector>(input));

writers_[index]->write(dataInput);
writerInfo_[index]->numWrittenRows += dataInput->size();
}

std::string HiveDataSink::stateString(State state) {
Expand Down Expand Up @@ -600,7 +666,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
dwio::common::WriterOptions options;
const auto* connectorSessionProperties =
connectorQueryCtx_->sessionProperties();
options.schema = inputType_;
options.schema = getNonPartitionTypes(dataChannels_, inputType_);

options.memoryPool = writerInfo_.back()->writerPool.get();
options.compressionKind = insertTableHandle_->compressionKind();
if (canReclaim()) {
Expand Down Expand Up @@ -646,7 +713,7 @@ HiveDataSink::maybeCreateBucketSortWriter(
auto* sortPool = writerInfo_.back()->sortPool.get();
VELOX_CHECK_NOT_NULL(sortPool);
auto sortBuffer = std::make_unique<exec::SortBuffer>(
inputType_,
getNonPartitionTypes(dataChannels_, inputType_),
sortColumnIndices_,
sortCompareFlags_,
sortPool,
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ class HiveDataSink : public DataSink {
const uint32_t maxOpenWriters_;
const std::vector<column_index_t> partitionChannels_;
const std::unique_ptr<PartitionIdGenerator> partitionIdGenerator_;
const std::vector<column_index_t> dataChannels_;
const int32_t bucketCount_{0};
const std::unique_ptr<core::PartitionFunction> bucketFunction_;
const std::shared_ptr<dwio::common::WriterFactory> writerFactory_;
Expand Down