[flink][bug] MergeIntoAction and DeleteAction work abnormally with partial update merge engine#829
Conversation
…rtial update merge engine
2ab2441 to
3c29f24
Compare
| ``` | ||
|
|
||
| {{< hint info >}} | ||
| If you have set `merge-engine` = `partial-update`, it will be disabled temporarily. |
There was a problem hiding this comment.
aggregate merge-engine also?
| ``` | ||
|
|
||
| {{< hint info >}} | ||
| If you have set `merge-engine` = `partial-update`, it will be disabled temporarily. |
There was a problem hiding this comment.
I can not get it will be disabled temporarily
There was a problem hiding this comment.
I want to explain that when the actions running, the merge engine settings are ignored; when the actions finishied, the merge engine settings still works.
There was a problem hiding this comment.
But just dynamic options, no affect to file system.
| public FileStoreTable copy(Map<String, String> dynamicOptions) { | ||
| // check option is not immutable | ||
| Map<String, String> options = new HashMap<>(tableSchema.options()); | ||
| dynamicOptions.forEach( |
There was a problem hiding this comment.
don't remove this, create an another internal method.
There was a problem hiding this comment.
You mean in this internal method, we can still check immutable options changing except merge engine?
There was a problem hiding this comment.
I mean create a new method internalCopyWithoutCheck.
| * want these records to be sunk directly. This method is a workaround. Actions that may produce | ||
| * -U/-D records can disable merge engine settings when running. | ||
| */ | ||
| protected void resetMergeEngine() { |
There was a problem hiding this comment.
maybe not just reset merge engine, we need to force compaction too.
60f093e to
a230d37
Compare
d260585 to
1d1a204
Compare
| protected void forceSinking() { | ||
| Map<String, String> dynamicOptions = new HashMap<>(); | ||
| dynamicOptions.put( | ||
| CoreOptions.MERGE_ENGINE.key(), CoreOptions.MergeEngine.DEDUPLICATE.toString()); |
There was a problem hiding this comment.
only to non-DEDUPLICATE engines. Need to set FULL_COMPACTION_DELTA_COMMITS.
| * want these records to be sunk directly. This method is a workaround. Actions that may produce | ||
| * -U/-D records can call this to disable merge engine settings and force compaction. | ||
| */ | ||
| protected void forceSinking() { |
There was a problem hiding this comment.
changeIgnoreMergeEngine?
| ``` | ||
|
|
||
| {{< hint info >}} | ||
| If you have set `merge-engine` = `partial-update` or `aggregation`, it will be reset by dynamic options |
There was a problem hiding this comment.
remove this? I cannot understand it from user side.
There was a problem hiding this comment.
Agree. I think it's not important for users to know.
| Important table properties setting: | ||
| 1. Only [primary key table]({{< ref "concepts/primary-key-table" >}}) supports this feature. | ||
| 2. The action won't produce UPDATE_BEFORE, so it's not recommended to set 'changelog-producer' = 'input'. | ||
| 3. If you have set `merge-engine` = `partial-update` or `aggregation`, it will be reset by dynamic options |
There was a problem hiding this comment.
remove this? I cannot understand it from user side.
3cb0bd7 to
5f4e9e0
Compare
61f0161 to
fa02a1b
Compare
|
Kudos, SonarCloud Quality Gate passed!
|
[core] Renaming streaming-read-from to streaming-read-mode (apache#778) parent 4a56565 author Alibaba-HZY <daowu.hzy@cainiao.com> 1681139027 +0800 committer Alibaba-HZY <daowu.hzy@cainiao.com> 1681194324 +0800 parent 4a56565 author Alibaba-HZY <daowu.hzy@cainiao.com> 1681139027 +0800 committer Alibaba-HZY <daowu.hzy@cainiao.com> 1681194307 +0800 [core] Renaming streaming-read-from to streaming-read-mode(apache#778) [core] Renaming streaming-read-from to streaming-read-mode (apache#778) [core] generate doc (apache#778) [core] add test (apache#778) Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> [core] add test (apache#778) [core] commit (apache#778) [doc] Move Watermark and Bounded Stream to Append only table page [deploy] Publish paimon-shade to snapshot [core] Introduce SecurityContext (apache#834) [deps] Use paimon-shade dependencies (apache#838) [Document] Adds the Trino feature that create/drop table, create/drop database in the overview document (apache#835) [docs] Add document for MySqlSyncTableAction (apache#828) [doc] Document file format (apache#840) [ci] Add licensing check (apache#839) [common] Use caffeine to replace with guava cache (apache#665) [release] Added incubating name in release file (apache#845) [doc] add flink 1.17 version in Engines Overview (apache#847) [hive] Dropping table deletes table directory to avoid schema in filesystem exists (apache#772) [hive] Support hive-conf-dir and hadoop-conf-dir option in hive catalog (apache#831) [hotfix] Do not swallow InterruptedException in SnapshotManager [core][flink] Refactor BucketComputer and StreamPartitioner to gather bucketing / partitioning core logic in the same class (apache#844) [doc] Document Java API (apache#849) [README] Document mark generated-sources as Sources Root [flink][bug] MergeIntoAction and DeleteAction work abnormally with partial update merge engine (apache#829) [hotfix] Set directExecutor to caffeine Cache Resolve the problem which Sonar reported that InterruptedException need to re-throw or re-interrupt (apache#862) [hotfix] Enable StreamingReadWriteTableWithKafkaLogITCase.testReadInsertOnlyChangelogFromEnormousTimestamp (apache#854) [test] Migrate MinioTestContainer to paimon-s3 (apache#869) [flink] Assign splits evenly in StaticFileStoreSplitEnumerator (apache#856) [core] Compacted StartingScanner is incorrect (apache#857) [core] revert flink.md(apache#778) [core] change dose to does (apache#778)
[core] Add streaming read mode from option (apache#778) [core] Renaming streaming-read-from to streaming-read-mode (apache#778) parent 4a56565 author Alibaba-HZY <daowu.hzy@cainiao.com> 1681139027 +0800 committer Alibaba-HZY <daowu.hzy@cainiao.com> 1681194324 +0800 parent 4a56565 author Alibaba-HZY <daowu.hzy@cainiao.com> 1681139027 +0800 committer Alibaba-HZY <daowu.hzy@cainiao.com> 1681194307 +0800 [core] Renaming streaming-read-from to streaming-read-mode(apache#778) [core] Renaming streaming-read-from to streaming-read-mode (apache#778) [core] generate doc (apache#778) [core] add test (apache#778) Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> [core] add test (apache#778) [core] commit (apache#778) [doc] Move Watermark and Bounded Stream to Append only table page [deploy] Publish paimon-shade to snapshot [core] Introduce SecurityContext (apache#834) [deps] Use paimon-shade dependencies (apache#838) [Document] Adds the Trino feature that create/drop table, create/drop database in the overview document (apache#835) [docs] Add document for MySqlSyncTableAction (apache#828) [doc] Document file format (apache#840) [ci] Add licensing check (apache#839) [common] Use caffeine to replace with guava cache (apache#665) [release] Added incubating name in release file (apache#845) [doc] add flink 1.17 version in Engines Overview (apache#847) [hive] Dropping table deletes table directory to avoid schema in filesystem exists (apache#772) [hive] Support hive-conf-dir and hadoop-conf-dir option in hive catalog (apache#831) [hotfix] Do not swallow InterruptedException in SnapshotManager [core][flink] Refactor BucketComputer and StreamPartitioner to gather bucketing / partitioning core logic in the same class (apache#844) [doc] Document Java API (apache#849) [README] Document mark generated-sources as Sources Root [flink][bug] MergeIntoAction and DeleteAction work abnormally with partial update merge engine (apache#829) [hotfix] Set directExecutor to caffeine Cache Resolve the problem which Sonar reported that InterruptedException need to re-throw or re-interrupt (apache#862) [hotfix] Enable StreamingReadWriteTableWithKafkaLogITCase.testReadInsertOnlyChangelogFromEnormousTimestamp (apache#854) [test] Migrate MinioTestContainer to paimon-s3 (apache#869) [flink] Assign splits evenly in StaticFileStoreSplitEnumerator (apache#856) [core] Compacted StartingScanner is incorrect (apache#857) [core] revert flink.md(apache#778) [core] change dose to does (apache#778) parent 2a62022 author Alibaba-HZY <daowu.hzy@cainiao.com> 1681226598 +0800 committer Alibaba-HZY <daowu.hzy@cainiao.com> 1681226783 +0800 parent 2a62022 author Alibaba-HZY <daowu.hzy@cainiao.com> 1681226598 +0800 committer Alibaba-HZY <daowu.hzy@cainiao.com> 1681226730 +0800 parent 2a62022 author Alibaba-HZY <daowu.hzy@cainiao.com> 1681226598 +0800 committer Alibaba-HZY <daowu.hzy@cainiao.com> 1681226598 +0800 [core] Add streaming read from option (apache#778) [core] add test (apache#778) Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> [core] add test (apache#778) merger sdda 这是一个 3 个提交的组合。 [core] commit1 (apache#778) [core] commit2 (apache#778) [core] commit3 (apache#778) [core] do commit 1(apache#778) [core] do commit 2(apache#778) sdda 这是一个 3 个提交的组合。 [core] commit1 (apache#778) [core] commit2 (apache#778) [core] commit3 (apache#778) [core] Add streaming read mode option (apache#778) parent 3acede2 author Alibaba-HZY <daowu.hzy@cainiao.com> 1681226824 +0800 committer Alibaba-HZY <daowu.hzy@cainiao.com> 1681226824 +0800 [core] Add streaming read from option (apache#778) [core] revert flink.md (apache#778) Delete AbstractChannelComputer.java delete









Purpose
Currently, the partial update merge engine cannot accept -U/-D records, thus actions that may produce -U/-D records will produce error results. This PR try to solve it.
Tests
DeleteActionITCaseMergeIntoActionITCaseAPI and Format
No.
Documentation
Add hint to how-to#writing tables