Skip to content

Commit 4465e82

Browse files
committed
Fix read iceberg table decimal column error
1 parent 0905153 commit 4465e82

File tree

3 files changed

+50
-6
lines changed

3 files changed

+50
-6
lines changed

velox/connectors/hive/SplitReader.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -165,18 +165,18 @@ class SplitReader {
165165
VectorPtr& output,
166166
const std::vector<BaseVector::CopyRange>& ranges);
167167

168+
void setPartitionValue(
169+
common::ScanSpec* spec,
170+
const std::string& partitionKey,
171+
const std::optional<std::string>& value) const;
172+
168173
private:
169174
/// Different table formats may have different meatadata columns.
170175
/// This function will be used to update the scanSpec for these columns.
171-
std::vector<TypePtr> adaptColumns(
176+
virtual std::vector<TypePtr> adaptColumns(
172177
const RowTypePtr& fileType,
173178
const std::shared_ptr<const velox::RowType>& tableSchema) const;
174179

175-
void setPartitionValue(
176-
common::ScanSpec* spec,
177-
const std::string& partitionKey,
178-
const std::optional<std::string>& value) const;
179-
180180
protected:
181181
std::shared_ptr<const HiveConnectorSplit> hiveSplit_;
182182
const std::shared_ptr<const HiveTableHandle> hiveTableHandle_;

velox/connectors/hive/iceberg/IcebergSplitReader.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,4 +240,44 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) {
240240
return rowsScanned;
241241
}
242242

243+
std::vector<TypePtr> IcebergSplitReader::adaptColumns(
244+
const RowTypePtr& fileType,
245+
const std::shared_ptr<const velox::RowType>& tableSchema) const {
246+
// Keep track of schema types for columns in file, used by ColumnSelector.
247+
std::vector<TypePtr> columnTypes = fileType->children();
248+
auto& childrenSpecs = scanSpec_->children();
249+
// Iceberg table stores all column's data in data file.
250+
for (size_t i = 0; i < childrenSpecs.size(); ++i) {
251+
auto* childSpec = childrenSpecs[i].get();
252+
const std::string& fieldName = childSpec->fieldName();
253+
auto fileTypeIdx = fileType->getChildIdxIfExists(fieldName);
254+
auto outputTypeIdx = readerOutputType_->getChildIdxIfExists(fieldName);
255+
if (outputTypeIdx.has_value() && fileTypeIdx.has_value()) {
256+
childSpec->setConstantValue(nullptr);
257+
auto& outputType = readerOutputType_->childAt(*outputTypeIdx);
258+
columnTypes[*fileTypeIdx] = outputType;
259+
} else if (!fileTypeIdx.has_value()) {
260+
// Handle columns missing from the data file in two scenarios:
261+
// 1. Schema evolution: Column was added after the data file was written
262+
// and doesn't exist in older data files.
263+
// 2. Partition columns: In Hive-written data files, partition column
264+
// values are stored in partition metadata rather than in the data
265+
// file itself, following Hive's partitioning convention.
266+
if (auto it = hiveSplit_->partitionKeys.find(fieldName);
267+
it != hiveSplit_->partitionKeys.end()) {
268+
setPartitionValue(childSpec, fieldName, it->second);
269+
} else {
270+
childSpec->setConstantValue(BaseVector::createNullConstant(
271+
tableSchema->findChild(fieldName),
272+
1,
273+
connectorQueryCtx_->memoryPool()));
274+
}
275+
}
276+
}
277+
278+
scanSpec_->resetCachedValues(false);
279+
280+
return columnTypes;
281+
}
282+
243283
} // namespace facebook::velox::connector::hive::iceberg

velox/connectors/hive/iceberg/IcebergSplitReader.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ class IcebergSplitReader : public SplitReader {
5555
std::shared_ptr<const dwio::common::TypeWithId> baseFileSchema();
5656

5757
private:
58+
std::vector<TypePtr> adaptColumns(
59+
const RowTypePtr& fileType,
60+
const std::shared_ptr<const velox::RowType>& tableSchema) const override;
61+
5862
// The read offset to the beginning of the split in number of rows for the
5963
// current batch for the base data file
6064
uint64_t baseReadOffset_;

0 commit comments

Comments
 (0)