Skip to content

Commit cb342e8

Browse files
committed
Integrate Iceberg data file statistics and adding unit test.
1 parent 2fe7ea3 commit cb342e8

File tree

13 files changed

+1821
-25
lines changed

13 files changed

+1821
-25
lines changed

velox/connectors/hive/HiveDataSink.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ class HiveDataSink : public DataSink {
679679
// Invoked to write 'input' to the specified file writer.
680680
void write(size_t index, RowVectorPtr input);
681681

682-
void closeInternal();
682+
virtual void closeInternal();
683683

684684
// IMPORTANT NOTE: these are passed to writers as raw pointers. HiveDataSink
685685
// owns the lifetime of these objects, and therefore must destroy them last.

velox/connectors/hive/iceberg/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414

1515
velox_add_library(
1616
velox_hive_iceberg_splitreader
17+
DataFileStatsCollector.cpp
1718
EqualityDeleteFileReader.cpp
1819
FilterUtil.cpp
20+
IcebergColumnHandle.cpp
1921
IcebergDataSink.cpp
2022
IcebergDeleteFile.cpp
2123
IcebergPartitionIdGenerator.cpp
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#include "velox/connectors/hive/iceberg/DataFileStatsCollector.h"
17+
#include "velox/common/base/Exceptions.h"
18+
#include "velox/common/encode/Base64.h"
19+
#include "velox/dwio/parquet/writer/arrow/Metadata.h"
20+
#include "velox/dwio/parquet/writer/arrow/Statistics.h"
21+
22+
namespace facebook::velox::connector::hive::iceberg {
23+
24+
using namespace facebook::velox::parquet;
25+
26+
DataFileStatsCollector::DataFileStatsCollector(
27+
std::shared_ptr<
28+
std::vector<std::unique_ptr<dwio::common::DataFileStatsSettings>>>
29+
settings)
30+
: FileStatsCollector(std::move(settings)) {}
31+
32+
void DataFileStatsCollector::collectStats(
33+
const void* metadata,
34+
const std::shared_ptr<dwio::common::DataFileStatistics>& dataFileStats) {
35+
const auto& fileMetadata =
36+
*static_cast<const std::shared_ptr<parquet::arrow::FileMetaData>*>(
37+
metadata);
38+
VELOX_CHECK_NOT_NULL(fileMetadata);
39+
40+
std::unordered_set<int32_t> skipBoundsFields;
41+
std::function<int32_t(IcebergDataFileStatsSettings*)> processFields =
42+
[&skipBoundsFields,
43+
&processFields](IcebergDataFileStatsSettings* field) -> int32_t {
44+
if (field->skipBounds) {
45+
skipBoundsFields.insert(field->fieldId);
46+
}
47+
if (field->children.empty()) {
48+
return 1;
49+
}
50+
int32_t count = 0;
51+
for (const auto& child : field->children) {
52+
count += processFields(child.get());
53+
}
54+
return count;
55+
};
56+
57+
// numFields is not the number of columns in Iceberg table's schema,
58+
// e.g., schema_->size(). It also contains the sub-fields when there are
59+
// nested types in table's schema.
60+
int32_t numFields = 0;
61+
for (const auto& field : *statsSetting_) {
62+
auto* icebergField =
63+
static_cast<IcebergDataFileStatsSettings*>(field.get());
64+
numFields += processFields(icebergField);
65+
}
66+
67+
std::unordered_map<int32_t, std::shared_ptr<arrow::Statistics>>
68+
globalMinStats;
69+
std::unordered_map<int32_t, std::shared_ptr<arrow::Statistics>>
70+
globalMaxStats;
71+
72+
dataFileStats->numRecords = fileMetadata->num_rows();
73+
const auto numRowGroups = fileMetadata->num_row_groups();
74+
for (auto i = 0; i < numRowGroups; ++i) {
75+
const auto rgm = fileMetadata->RowGroup(i);
76+
VELOX_CHECK_EQ(numFields, rgm->num_columns());
77+
dataFileStats->splitOffsets.emplace_back(rgm->file_offset());
78+
79+
for (auto j = 0; j < numFields; ++j) {
80+
const auto columnChunkMetadata = rgm->ColumnChunk(j);
81+
const auto fieldId = columnChunkMetadata->field_id();
82+
const auto numValues = columnChunkMetadata->num_values();
83+
84+
dataFileStats->valueCounts[fieldId] += numValues;
85+
dataFileStats->columnsSizes[fieldId] +=
86+
columnChunkMetadata->total_compressed_size();
87+
88+
const auto columnChunkStats = columnChunkMetadata->statistics();
89+
if (columnChunkStats->nan_count() > 0) {
90+
dataFileStats->nanValueCounts[fieldId] += columnChunkStats->nan_count();
91+
}
92+
dataFileStats->nullValueCounts[fieldId] += columnChunkStats->null_count();
93+
94+
if (columnChunkStats->HasMinMax() &&
95+
!skipBoundsFields.contains(fieldId)) {
96+
if (globalMaxStats.find(fieldId) == globalMaxStats.end()) {
97+
globalMinStats[fieldId] = columnChunkStats;
98+
globalMaxStats[fieldId] = columnChunkStats;
99+
} else {
100+
globalMaxStats[fieldId] = arrow::Statistics::CompareAndGetMax(
101+
globalMaxStats[fieldId], columnChunkStats);
102+
globalMinStats[fieldId] = arrow::Statistics::CompareAndGetMin(
103+
globalMinStats[fieldId], columnChunkStats);
104+
}
105+
}
106+
}
107+
}
108+
109+
for (const auto& [fieldId, minStats] : globalMinStats) {
110+
const auto lowerBound = minStats->MinValue();
111+
dataFileStats->lowerBounds[fieldId] =
112+
encoding::Base64::encode(lowerBound.data(), lowerBound.size());
113+
}
114+
for (const auto& [fieldId, maxStats] : globalMaxStats) {
115+
const auto upperBound = maxStats->MaxValue();
116+
dataFileStats->upperBounds[fieldId] =
117+
encoding::Base64::encode(upperBound.data(), upperBound.size());
118+
}
119+
}
120+
121+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "velox/dwio/common/DataFileStatsCollector.h"
19+
20+
namespace facebook::velox::connector::hive::iceberg {
21+
22+
/// Settings for collecting Iceberg parquet data file statistics.
23+
/// Holds the Iceberg source field id and whether to skip bounds
24+
/// collection for this field. For nested field, it contains child fields.
25+
struct IcebergDataFileStatsSettings
26+
: public dwio::common::DataFileStatsSettings {
27+
int32_t fieldId;
28+
bool skipBounds;
29+
std::vector<std::unique_ptr<IcebergDataFileStatsSettings>> children;
30+
31+
IcebergDataFileStatsSettings(int32_t id, bool skip)
32+
: fieldId(id), skipBounds(skip), children() {}
33+
};
34+
35+
class DataFileStatsCollector : public dwio::common::FileStatsCollector {
36+
public:
37+
explicit DataFileStatsCollector(
38+
std::shared_ptr<
39+
std::vector<std::unique_ptr<dwio::common::DataFileStatsSettings>>>
40+
settings);
41+
42+
void collectStats(
43+
const void* metadata,
44+
const std::shared_ptr<dwio::common::DataFileStatistics>& fileStats)
45+
override;
46+
};
47+
48+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "velox/connectors/hive/iceberg/IcebergColumnHandle.h"
18+
19+
namespace facebook::velox::connector::hive::iceberg {
20+
21+
IcebergColumnHandle::IcebergColumnHandle(
22+
const std::string& name,
23+
ColumnType columnType,
24+
TypePtr dataType,
25+
TypePtr hiveType,
26+
const IcebergNestedField& nestedField,
27+
std::vector<common::Subfield> requiredSubfields,
28+
ColumnParseParameters columnParseParameters)
29+
: HiveColumnHandle(
30+
name,
31+
columnType,
32+
dataType,
33+
hiveType,
34+
std::move(requiredSubfields),
35+
columnParseParameters),
36+
nestedField_(nestedField) {}
37+
38+
const IcebergNestedField& IcebergColumnHandle::nestedField() const {
39+
return nestedField_;
40+
}
41+
42+
} // namespace facebook::velox::connector::hive::iceberg
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright (c) Facebook, Inc. and its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
#pragma once
17+
18+
#include "velox/connectors/hive/TableHandle.h"
19+
20+
namespace facebook::velox::connector::hive::iceberg {
21+
22+
struct IcebergNestedField {
23+
int32_t id;
24+
std::vector<IcebergNestedField> children;
25+
};
26+
27+
class IcebergColumnHandle : public HiveColumnHandle {
28+
public:
29+
IcebergColumnHandle(
30+
const std::string& name,
31+
ColumnType columnType,
32+
TypePtr dataType,
33+
TypePtr hiveType,
34+
const IcebergNestedField& nestedField,
35+
std::vector<common::Subfield> requiredSubfields = {},
36+
ColumnParseParameters columnParseParameters = {});
37+
38+
const IcebergNestedField& nestedField() const;
39+
40+
private:
41+
const IcebergNestedField nestedField_;
42+
};
43+
44+
} // namespace facebook::velox::connector::hive::iceberg

0 commit comments

Comments
 (0)