Skip to content

Commit

Permalink
Resolve comments and fix failed unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Dec 25, 2023
1 parent 7553ac1 commit 48decd5
Show file tree
Hide file tree
Showing 3 changed files with 192 additions and 141 deletions.
97 changes: 44 additions & 53 deletions velox/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,44 +38,26 @@ namespace facebook::velox::connector::hive {

namespace {

RowVectorPtr makeRowVector(
std::vector<std::string> childNames,
const std::vector<VectorPtr>& children,
int32_t numRows,
velox::memory::MemoryPool* pool) {
std::vector<std::shared_ptr<const Type>> childTypes;
childTypes.resize(children.size());
for (int i = 0; i < children.size(); i++) {
childTypes[i] = children[i]->type();
}
auto rowType = ROW(std::move(childNames), std::move(childTypes));
return std::make_shared<RowVector>(
pool, rowType, BufferPtr(nullptr), numRows, std::move(children));
}

RowVectorPtr filterPartitionColumns(
RowVectorPtr makeDataInput(
const std::vector<column_index_t>& partitonCols,
const RowVectorPtr& input,
velox::memory::MemoryPool* pool) {
std::vector<std::string> childNames;
const RowTypePtr& dataType) {
std::vector<VectorPtr> childVectors;
auto dataColumns = input->childrenSize() - partitonCols.size();
childNames.reserve(dataColumns);
childVectors.reserve(dataColumns);

auto type = facebook::velox::asRowType(input->type());

childVectors.reserve(dataType->size());
for (uint32_t i = 0; i < input->childrenSize(); i++) {
if (std::find(partitonCols.begin(), partitonCols.end(), i) ==
partitonCols.end()) {
auto child = input->childAt(i);
childNames.push_back(type->nameOf(i));
childVectors.push_back(child);
if (std::find(partitonCols.cbegin(), partitonCols.cend(), i) ==
partitonCols.cend()) {
childVectors.push_back(input->childAt(i));
}
}

return makeRowVector(
std::move(childNames), std::move(childVectors), input->size(), pool);
return std::make_shared<RowVector>(
input->pool(),
dataType,
input->nulls(),
input->size(),
std::move(childVectors),
input->getNullCount());
}

// Returns a subset of column indices corresponding to partition keys.
Expand Down Expand Up @@ -371,6 +353,18 @@ HiveDataSink::HiveDataSink(
"Unsupported commit strategy: {}",
commitStrategyToString(commitStrategy_));

// Get the data input type based on the inputType and the parition index.
std::vector<TypePtr> childTypes;
std::vector<std::string> childNames;
for (auto i = 0; i < inputType_->size(); i++) {
if (std::find(partitionChannels_.cbegin(), partitionChannels_.cend(), i) ==
partitionChannels_.end()) {
childNames.push_back(inputType_->nameOf(i));
childTypes.push_back(inputType_->childAt(i));
}
}
dataType_ = ROW(std::move(childNames), std::move(childTypes));

if (!isBucketed()) {
return;
}
Expand Down Expand Up @@ -419,16 +413,7 @@ void HiveDataSink::appendData(RowVectorPtr input) {
// must be zero.
if (!isBucketed() && partitionIdGenerator_->numPartitions() == 1) {
const auto index = ensureWriter(HiveWriterId{0});
auto dataInput = input;
if (insertTableHandle_->tableStorageFormat() ==
dwio::common::FileFormat::PARQUET) {
dataInput = filterPartitionColumns(
getPartitionChannels(insertTableHandle_),
input,
connectorQueryCtx_->memoryPool());
}

write(index, dataInput);
write(index, input);
return;
}

Expand All @@ -443,22 +428,23 @@ void HiveDataSink::appendData(RowVectorPtr input) {
RowVectorPtr writerInput = partitionSize == input->size()
? input
: exec::wrap(partitionSize, partitionRows_[index], input);
auto dataInput = writerInput;
if (insertTableHandle_->tableStorageFormat() ==
dwio::common::FileFormat::PARQUET) {
dataInput = filterPartitionColumns(
getPartitionChannels(insertTableHandle_),
writerInput,
connectorQueryCtx_->memoryPool());
}
write(index, dataInput);
write(index, writerInput);
}
}

void HiveDataSink::write(size_t index, const VectorPtr& input) {
WRITER_NON_RECLAIMABLE_SECTION_GUARD(index);
writers_[index]->write(input);
writerInfo_[index]->numWrittenRows += input->size();
// Skip the partition columns before writing.
auto dataInput = input;
if (!isBucketed()) {
dataInput = makeDataInput(
partitionChannels_,
std::dynamic_pointer_cast<RowVector>(input),
dataType_);
}

writers_[index]->write(dataInput);
writerInfo_[index]->numWrittenRows += dataInput->size();
}

std::string HiveDataSink::stateString(State state) {
Expand Down Expand Up @@ -654,7 +640,12 @@ uint32_t HiveDataSink::appendWriter(const HiveWriterId& id) {
dwio::common::WriterOptions options;
const auto* connectorSessionProperties =
connectorQueryCtx_->sessionProperties();
options.schema = inputType_;
if (!isBucketed()) {
options.schema = dataType_;
} else {
options.schema = inputType_;
}

options.memoryPool = writerInfo_.back()->writerPool.get();
options.compressionKind = insertTableHandle_->compressionKind();
if (canReclaim()) {
Expand Down
2 changes: 2 additions & 0 deletions velox/connectors/hive/HiveDataSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,8 @@ class HiveDataSink : public DataSink {
void closeInternal();

const RowTypePtr inputType_;
// Written data columns into file.
RowTypePtr dataType_;
const std::shared_ptr<const HiveInsertTableHandle> insertTableHandle_;
const ConnectorQueryCtx* const connectorQueryCtx_;
const CommitStrategy commitStrategy_;
Expand Down

0 comments on commit 48decd5

Please sign in to comment.