[Feature-WIP](iceberg-writer) Implements iceberg partition transform.#36289
[Feature-WIP](iceberg-writer) Implements iceberg partition transform.#36289morningman merged 21 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Since 2024-03-18, the Document has been moved to doris-website. |
|
| const std::chrono::time_point<std::chrono::system_clock> PartitionColumnTransformUtils::EPOCH = | ||
| std::chrono::system_clock::from_time_t(0); | ||
|
|
||
| std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create( |
There was a problem hiding this comment.
warning: function 'create' exceeds recommended size/complexity thresholds [readability-function-size]
std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
^Additional context
be/src/vec/sink/writer/iceberg/partition_transformers .cpp:30: 158 lines including whitespace and comments (threshold 80)
std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
^| Int32* __restrict p_out = out_data.data(); | ||
|
|
||
| while (p_in < end_in) { | ||
| Int64 long_value = static_cast<Int64>(*p_in); |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| Int64 long_value = static_cast<Int64>(*p_in); | |
| auto long_value = static_cast<Int64>(*p_in); |
| binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(UInt32*)p_in); | ||
|
|
||
| int32_t days_from_unix_epoch = value.daynr() - 719528; | ||
| Int64 long_value = static_cast<Int64>(days_from_unix_epoch); |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| Int64 long_value = static_cast<Int64>(days_from_unix_epoch); | |
| auto long_value = static_cast<Int64>(days_from_unix_epoch); |
|
| namespace doris { | ||
| namespace vectorized { |
There was a problem hiding this comment.
warning: nested namespaces can be concatenated [modernize-concat-nested-namespaces]
| namespace doris { | |
| namespace vectorized { | |
| namespace doris::vectorized { |
be/src/vec/sink/writer/iceberg/partition_transformers .cpp:274:
- } // namespace vectorized
- } // namespace doris
+ } // namespace doris| const std::chrono::time_point<std::chrono::system_clock> PartitionColumnTransformUtils::EPOCH = | ||
| std::chrono::system_clock::from_time_t(0); | ||
|
|
||
| std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create( |
There was a problem hiding this comment.
warning: function 'create' exceeds recommended size/complexity thresholds [readability-function-size]
std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
^Additional context
be/src/vec/sink/writer/iceberg/partition_transformers .cpp:29: 158 lines including whitespace and comments (threshold 80)
std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
^
be/src/util/bit_util.h
Outdated
| value >>= 8; | ||
| if (value == 0 && value_to_save >= 0) { break; | ||
| } | ||
| if (value == -1 && value_to_save < 0) { break; |
There was a problem hiding this comment.
warning: statement should be inside braces [readability-braces-around-statements]
| if (value == -1 && value_to_save < 0) { break; | |
| if (value == -1 && value_to_save < 0) { break; | |
| } |
| namespace doris::vectorized { | ||
|
|
There was a problem hiding this comment.
warning: nested namespaces can be concatenated [modernize-concat-nested-namespaces]
| namespace doris::vectorized { | |
| namespace doris::vectorized { |
be/src/vec/sink/writer/iceberg/partition_transformers .cpp:274:
- } // namespace vectorized
- } // namespace doris
+ } // namespace doris| std::chrono::system_clock::from_time_t(0); | ||
|
|
||
| std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create( | ||
| const doris::iceberg::PartitionField& field, const TypeDescriptor& source_type) { |
There was a problem hiding this comment.
warning: function 'create' exceeds recommended size/complexity thresholds [readability-function-size]
std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
^Additional context
be/src/vec/sink/writer/iceberg/partition_transformers .cpp:29: 158 lines including whitespace and comments (threshold 80)
std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
^|
|
||
| private: | ||
| static const std::chrono::time_point<std::chrono::system_clock> EPOCH; | ||
| PartitionColumnTransformUtils() = default; |
There was a problem hiding this comment.
warning: use '= default' to define a trivial default constructor [modernize-use-equals-default]
| PartitionColumnTransformUtils() = default; | |
| PartitionColumnTransformUtils() = default; |
| const std::chrono::time_point<std::chrono::system_clock> PartitionColumnTransformUtils::EPOCH = | ||
| std::chrono::system_clock::from_time_t(0); | ||
|
|
||
| std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create( |
There was a problem hiding this comment.
warning: function 'create' exceeds recommended size/complexity thresholds [readability-function-size]
std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
^Additional context
be/src/vec/sink/writer/iceberg/partition_transformers .cpp:28: 158 lines including whitespace and comments (threshold 80)
std::unique_ptr<PartitionColumnTransform> PartitionColumnTransforms::create(
^|
run buildall |
1 similar comment
|
run buildall |
|
TeamCity be ut coverage result: |
|
|
||
| import java.util.Objects; | ||
|
|
||
| public class SimpleTableInfo { |
There was a problem hiding this comment.
- Generally, when you override the
equalmethod, you also need to override thehashCodemethod. - If you just want to express database and table, you can refactor this class:
DatabaseTableName
There was a problem hiding this comment.
DatabaseTableName只是HMSTransaction内部的一个类,这里的simpleTableInfo类是先构建一个能给全局使用的类,这个类的用于记录iceberg内部库表的简单信息
|
|
||
| package org.apache.doris.datasource.statistics; | ||
|
|
||
| public class CommonStatistics { |
There was a problem hiding this comment.
Can this class be considered to be merged with HivePartitionStatistics?
There was a problem hiding this comment.
The design of CommonStatistics is to abstract the information of HivePartitionStatistics in the future. We cannot do it all at once and need to optimize it step by step.
|
|
||
| public static FileFormat getFileFormat(Table icebergTable) { | ||
| Map<String, String> properties = icebergTable.properties(); | ||
| String fileFormatName = properties.getOrDefault(TableProperties.DEFAULT_FILE_FORMAT, "parquet"); |
There was a problem hiding this comment.
Why should we delete WRITE_FORMAT and DEFAULT_FILE_FORMAT here?
| String tbName = table.getName(); | ||
| SimpleTableInfo tableInfo = new SimpleTableInfo(dbName, tbName); | ||
| IcebergTransaction transaction = (IcebergTransaction) transactionManager.getTransaction(txnId); | ||
| loadedRows = transaction.getUpdateCnt(); |
There was a problem hiding this comment.
Why is the update count not showing now?
There was a problem hiding this comment.
OK I will increase the count stat
| public void beginInsert(String dbName, String tbName) { | ||
| Table icebergTable = ops.getCatalog().loadTable(TableIdentifier.of(dbName, tbName)); | ||
| transaction = icebergTable.newTransaction(); | ||
| public void pendingCommit(SimpleTableInfo tableInfo) { |
There was a problem hiding this comment.
If possible, we try to keep all table insertions using the same function name, including hive, iceberg, and paimon, hudi, etc. that may be supported later.
There was a problem hiding this comment.
This can be unified with hive, but I think pendingCommit, precommit and commit are more suitable, and hive can also move towards this aspect.
| catalog.dropTable(TableIdentifier.of(dbName, tableName)); | ||
| db.setUnInitialized(true); | ||
| } | ||
|
|
There was a problem hiding this comment.
Unnecessary modifications
There was a problem hiding this comment.
During the modification process, I accidentally pressed a blank line
| transaction = icebergTable.newTransaction(); | ||
| public void pendingCommit(SimpleTableInfo tableInfo) { | ||
| this.tableInfo = tableInfo; | ||
| this.transaction = getNativeTable(tableInfo).newTransaction(); |
There was a problem hiding this comment.
The table in the cache cannot be used here, because the table here is not necessarily the same as the cached table.
| return FileContent.POSITION_DELETES; | ||
| private void updateManifestAfterInsert(TUpdateMode updateMode) { | ||
|
|
||
| Table table = getNativeTable(tableInfo); |
There was a problem hiding this comment.
Transactions are isolated and cache tables cannot be used
There was a problem hiding this comment.
First, the getNativeTable function gets the table from IcebergUtil. IcebergUtil is a tool class that is not only used in the current scenario.
Secondly, we should also need to get the latest information of the table.
| convertToFileContent(data.getFileContent()), | ||
| data.isSetReferencedDataFiles() ? Optional.of(data.getReferencedDataFiles()) : Optional.empty() | ||
| )); | ||
| //create start the iceberg transaction |
There was a problem hiding this comment.
| //create start the iceberg transaction | |
| // create and start the iceberg transaction |
| List<WriteResult> pendingResults = Lists.newArrayList(writeResult); | ||
|
|
||
| if (spec.isPartitioned()) { | ||
| LOG.info("{} {} table partition manifest ...", tableInfo, updateMode); |
There was a problem hiding this comment.
change this logs to debug, only leave necessary log.
if (LOG.isDebugEnable()) {
LOG.debug xxx
}
| this.partitionValues = convertPartitionValuesForNull(partitionValues); | ||
| this.content = content; | ||
| this.referencedDataFiles = referencedDataFiles; | ||
| private void partitionManifestUp(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) { |
There was a problem hiding this comment.
| private void partitionManifestUp(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) { | |
| private void partitionManifestUpdate(TUpdateMode updateMode, Table table, List<WriteResult> pendingResults) { |
|
|
||
| package org.apache.doris.datasource.statistics; | ||
|
|
||
| public class CommonStatistics { |
There was a problem hiding this comment.
Add comment for this class.
What is CommonStatistics and why need it?
Because I only see it using in Iceberg
|
run buildall |
|
TeamCity be ut coverage result: |
|
run buildall |
|
TeamCity be ut coverage result: |
| Int64* __restrict p_out = out_data.data(); | ||
|
|
||
| while (p_in < end_in) { | ||
| Int64 long_value = static_cast<Int64>(*p_in); |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| Int64 long_value = static_cast<Int64>(*p_in); | |
| auto long_value = static_cast<Int64>(*p_in); |
|
run buildall |
|
TeamCity be ut coverage result: |
|
run compile |
1 similar comment
|
run compile |
|
run buildall |
|
|
||
| //2) get the input data from block | ||
| if (column_ptr->is_nullable()) { | ||
| const ColumnNullable* nullable_column = |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| const ColumnNullable* nullable_column = | |
| const auto* nullable_column = |
|
|
||
| //2) get the input data from block | ||
| if (column_ptr->is_nullable()) { | ||
| const ColumnNullable* nullable_column = |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| const ColumnNullable* nullable_column = | |
| const auto* nullable_column = |
|
|
||
| //2) get the input data from block | ||
| if (column_ptr->is_nullable()) { | ||
| const ColumnNullable* nullable_column = |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| const ColumnNullable* nullable_column = | |
| const auto* nullable_column = |
| Int32* __restrict p_out = out_data.data(); | ||
|
|
||
| while (p_in < end_in) { | ||
| Int64 long_value = static_cast<Int64>(*p_in); |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| Int64 long_value = static_cast<Int64>(*p_in); | |
| auto long_value = static_cast<Int64>(*p_in); |
|
|
||
| //2) get the input data from block | ||
| if (column_ptr->is_nullable()) { | ||
| const ColumnNullable* nullable_column = |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| const ColumnNullable* nullable_column = | |
| const auto* nullable_column = |
|
|
||
| //2) get the input data from block | ||
| if (column_ptr->is_nullable()) { | ||
| const ColumnNullable* nullable_column = |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| const ColumnNullable* nullable_column = | |
| const auto* nullable_column = |
|
|
||
| //2) get the input data from block | ||
| if (column_ptr->is_nullable()) { | ||
| const ColumnNullable* nullable_column = |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| const ColumnNullable* nullable_column = | |
| const auto* nullable_column = |
|
|
||
| //2) get the input data from block | ||
| if (column_ptr->is_nullable()) { | ||
| const ColumnNullable* nullable_column = |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| const ColumnNullable* nullable_column = | |
| const auto* nullable_column = |
|
|
||
| //2) get the input data from block | ||
| if (column_ptr->is_nullable()) { | ||
| const ColumnNullable* nullable_column = |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| const ColumnNullable* nullable_column = | |
| const auto* nullable_column = |
|
|
||
| //2) get the input data from block | ||
| if (column_ptr->is_nullable()) { | ||
| const ColumnNullable* nullable_column = |
There was a problem hiding this comment.
warning: use auto when initializing with a cast to avoid duplicating the type name [modernize-use-auto]
| const ColumnNullable* nullable_column = | |
| const auto* nullable_column = |
|
TeamCity be ut coverage result: |
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
8876b33 to
33fc8fd
Compare
|
run buildall |
TPC-H: Total hot run time: 39878 ms |
TPC-DS: Total hot run time: 171291 ms |
ClickBench: Total hot run time: 30.32 s |
…#36289) #31442 Added iceberg operator function to support direct entry into the lake by doris 1. Support insert into data to iceberg by appending hdfs files 2. Implement iceberg partition routing through partitionTransform 2.1) Serialize spec and schema data into json on the fe side and then deserialize on the be side to get the schema and partition information of iceberg table 2.2) Then implement Iceberg's Identity, Bucket, Year/Month/Day and other types of partition strategies through partitionTransform and template class 3. Transaction management through IcebergTransaction 3.1) After the be side file is written, report CommitData data to fe according to the partition granularity 3.2) After receiving CommitData data, fe submits metadata to iceberg in IcebergTransaction ### Future work - Add unit test for partition transform function. - Implement partition transform function with exchange sink turned on. - The partition transform function omits the processing of bigint type. --------- Co-authored-by: lik40 <lik40@chinatelecom.cn> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
…apache#36289) apache#31442 Added iceberg operator function to support direct entry into the lake by doris 1. Support insert into data to iceberg by appending hdfs files 2. Implement iceberg partition routing through partitionTransform 2.1) Serialize spec and schema data into json on the fe side and then deserialize on the be side to get the schema and partition information of iceberg table 2.2) Then implement Iceberg's Identity, Bucket, Year/Month/Day and other types of partition strategies through partitionTransform and template class 3. Transaction management through IcebergTransaction 3.1) After the be side file is written, report CommitData data to fe according to the partition granularity 3.2) After receiving CommitData data, fe submits metadata to iceberg in IcebergTransaction ### Future work - Add unit test for partition transform function. - Implement partition transform function with exchange sink turned on. - The partition transform function omits the processing of bigint type. --------- Co-authored-by: lik40 <lik40@chinatelecom.cn> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
…7692) ## Proposed changes Cherry-pick iceberg partition transform functionality. #36289 #36889 --------- Co-authored-by: kang <35803862+ghkang98@users.noreply.github.com> Co-authored-by: lik40 <lik40@chinatelecom.cn> Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> Co-authored-by: Mingyu Chen <morningman@163.com>
#31442
Added iceberg operator function to support direct entry into the lake by doris
2.1) Serialize spec and schema data into json on the fe side and then deserialize on the be side to get the schema and partition information of iceberg table
2.2) Then implement Iceberg's Identity, Bucket, Year/Month/Day and other types of partition strategies through partitionTransform and template class
3.1) After the be side file is written, report CommitData data to fe according to the partition granularity
3.2) After receiving CommitData data, fe submits metadata to iceberg in IcebergTransaction
Future work