Skip to content

Commit

Permalink
Remove partition columns before writing partitioned file (facebookinc…
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf authored and glutenperfbot committed Jan 15, 2024
1 parent ae9cc1e commit 476dc65
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 62 deletions.
87 changes: 76 additions & 11 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,45 @@ namespace facebook::velox::connector::hive {

namespace {

// Returns the type corresponding to non-partition keys.
RowTypePtr getDataType(
const std::vector<column_index_t>& dataCols,
const RowTypePtr& inputType) {
std::vector<std::string> childNames;
std::vector<TypePtr> childTypes;
const auto& dataSize = dataCols.size();
childNames.reserve(dataSize);
childTypes.reserve(dataSize);
for (uint32_t i = 0; i < inputType->size(); i++) {
if (std::find(dataCols.cbegin(), dataCols.cend(), i) != dataCols.cend()) {
childNames.push_back(inputType->nameOf(i));
childTypes.push_back(inputType->childAt(i));
}
}

return ROW(std::move(childNames), std::move(childTypes));
}

RowVectorPtr makeDataInput(
const std::vector<column_index_t>& dataCols,
const RowVectorPtr& input) {
std::vector<VectorPtr> childVectors;
childVectors.reserve(dataCols.size());
for (uint32_t i = 0; i < input->childrenSize(); i++) {
if (std::find(dataCols.cbegin(), dataCols.cend(), i) != dataCols.cend()) {
childVectors.push_back(input->childAt(i));
}
}

return std::make_shared<RowVector>(
input->pool(),
getDataType(dataCols, asRowType(input->type())),
input->nulls(),
input->size(),
std::move(childVectors),
input->getNullCount());
}

// Returns a subset of column indices corresponding to partition keys.
std::vector<column_index_t> getPartitionChannels(
const std::shared_ptr<const HiveInsertTableHandle>& insertTableHandle) {
Expand All @@ -53,6 +92,23 @@ std::vector<column_index_t> getPartitionChannels(
return channels;
}

// Returns a subset of column indices corresponding to non-partition keys.
std::vector<column_index_t> getDataChannels(
const std::vector<column_index_t>& partitionChannels,
const column_index_t childrenSize) {
std::vector<column_index_t> dataChannels;
dataChannels.reserve(childrenSize - partitionChannels.size());

for (column_index_t i = 0; i < childrenSize; i++) {
if (std::find(partitionChannels.cbegin(), partitionChannels.cend(), i) ==
partitionChannels.cend()) {
dataChannels.push_back(i);
}
}

return dataChannels;
}

std::string makePartitionDirectory(
const std::string& tableDirectory,
const std::optional<std::string>& partitionSubdirectory) {
Expand Down Expand Up @@ -310,6 +366,7 @@ HiveDataSink::HiveDataSink(
hiveConfig_->isFileColumnNamesReadAsLowerCase(
connectorQueryCtx->sessionProperties()))
: nullptr),
dataChannels_(getDataChannels(partitionChannels_, inputType_->size())),
bucketCount_(
insertTableHandle_->bucketProperty() == nullptr
? 0
Expand Down Expand Up @@ -342,13 +399,17 @@ HiveDataSink::HiveDataSink(
sortColumnIndices_.reserve(sortedProperty.size());
sortCompareFlags_.reserve(sortedProperty.size());
for (int i = 0; i < sortedProperty.size(); ++i) {
sortColumnIndices_.push_back(
inputType_->getChildIdx(sortedProperty.at(i)->sortColumn()));
sortCompareFlags_.push_back(
{sortedProperty.at(i)->sortOrder().isNullsFirst(),
sortedProperty.at(i)->sortOrder().isAscending(),
false,
CompareFlags::NullHandlingMode::kNullAsValue});
auto columnIndex =
getDataType(dataChannels_, inputType_)
->getChildIdxIfExists(sortedProperty.at(i)->sortColumn());
if (columnIndex.has_value()) {
sortColumnIndices_.push_back(columnIndex.value());
sortCompareFlags_.push_back(
{sortedProperty.at(i)->sortOrder().isNullsFirst(),
sortedProperty.at(i)->sortOrder().isAscending(),
false,
CompareFlags::NullHandlingMode::kNullAsValue});
}
}
}
}
Expand Down Expand Up @@ -403,8 +464,11 @@ void HiveDataSink::appendData(RowVectorPtr input) {

void HiveDataSink::write(size_t index, const VectorPtr& input) {
WRITER_NON_RECLAIMABLE_SECTION_GUARD(index);
writers_[index]->write(input);
writerInfo_[index]->numWrittenRows += input->size();
auto dataInput =
makeDataInput(dataChannels_, std::dynamic_pointer_cast<RowVector>(input));

writers_[index]->write(dataInput);
writerInfo_[index]->numWrittenRows += dataInput->size();
}

std::string HiveDataSink::stateString(State state) {
Expand Down Expand Up @@ -600,7 +664,8 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
dwio::common::WriterOptions options;
const auto* connectorSessionProperties =
connectorQueryCtx_->sessionProperties();
options.schema = inputType_;
options.schema = getDataType(dataChannels_, inputType_);

options.memoryPool = writerInfo_.back()->writerPool.get();
options.compressionKind = insertTableHandle_->compressionKind();
if (canReclaim()) {
Expand Down Expand Up @@ -648,7 +713,7 @@ HiveDataSink::maybeCreateBucketSortWriter(
auto* sortPool = writerInfo_.back()->sortPool.get();
VELOX_CHECK_NOT_NULL(sortPool);
auto sortBuffer = std::make_unique<exec::SortBuffer>(
inputType_,
getDataType(dataChannels_, inputType_),
sortColumnIndices_,
sortCompareFlags_,
sortPool,
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ class HiveDataSink : public DataSink {
const uint32_t maxOpenWriters_;
const std::vector<column_index_t> partitionChannels_;
const std::unique_ptr<PartitionIdGenerator> partitionIdGenerator_;
const std::vector<column_index_t> dataChannels_;
const int32_t bucketCount_{0};
const std::unique_ptr<core::PartitionFunction> bucketFunction_;
const std::shared_ptr<dwio::common::WriterFactory> writerFactory_;
Expand Down

0 comments on commit 476dc65

Please sign in to comment.