-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[FLINK-25876] Implement overwrite in FlinkStoreCommitImpl #16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| public byte[] serialize(ManifestCommittable obj) throws IOException { | ||
| ByteArrayOutputStream out = new ByteArrayOutputStream(); | ||
| DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); | ||
| view.writeInt(obj.uuid().length()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
writeUtf8?
| } | ||
|
|
||
| @Nullable | ||
| private static ManifestFileMeta merge(List<ManifestFileMeta> metas, ManifestFile manifestFile) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return Optional?
| } | ||
| changesWithOverwrite.addAll(changes); | ||
|
|
||
| if (tryCommitOnce(changesWithOverwrite, hash, commitKind, latestSnapshotId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that there is no need to check noConflictsOrFail again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another job might be overwriting the same partition at the same time, so we still need to check this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, both checks are for the same snapshot, there can't be a new snapshot here.
| List<String> fieldNames = partitionType.getFieldNames(); | ||
| RowDataSerializer serializer = new RowDataSerializer(partitionType); | ||
| GenericRowData rowData = new GenericRowData(partitionType.getFieldCount()); | ||
| for (Map.Entry<String, String> entry : partition.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check that partition has no full partition keys
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we can have the partial partition and we can add tests for it
| this.newFiles = new HashMap<>(); | ||
| this.compactBefore = new HashMap<>(); | ||
| this.compactAfter = new HashMap<>(); | ||
| this(UUID.randomUUID().toString(), new HashMap<>(), new HashMap<>(), new HashMap<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not work in the latest Flink master...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's see if we can restore the previous behavior
JingsongLi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
parent 2a62022 author Alibaba-HZY <daowu.hzy@cainiao.com> 1681226598 +0800 committer Alibaba-HZY <daowu.hzy@cainiao.com> 1681226783 +0800 # 这是一个 13 个提交的组合。 parent 2a62022 author Alibaba-HZY <daowu.hzy@cainiao.com> 1681226598 +0800 committer Alibaba-HZY <daowu.hzy@cainiao.com> 1681226730 +0800 # 这是一个 8 个提交的组合。tree 46928987599714b071489a7b6d4957049e6ded7a parent 2a62022 author Alibaba-HZY <daowu.hzy@cainiao.com> 1681226598 +0800 committer Alibaba-HZY <daowu.hzy@cainiao.com> 1681226598 +0800 [core] Add streaming read from option (apache#778) # 这是提交说明 #8: [core] add test (apache#778) # 这是提交说明 apache#11: Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> # 这是提交说明 apache#12: Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> # 这是提交说明 apache#13: Update paimon-core/src/main/java/org/apache/paimon/CoreOptions.java Co-authored-by: Nicholas Jiang <programgeek@163.com> # 这是提交说明 apache#15: Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> # 这是提交说明 apache#16: Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> # 这是提交说明 apache#17: Update paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java Co-authored-by: Nicholas Jiang <programgeek@163.com> # 这是提交说明 apache#18: [core] add test (apache#778) # 这是提交说明 apache#19: merger sdda 这是一个 3 个提交的组合。 [core] commit1 (apache#778) [core] commit2 (apache#778) [core] commit3 (apache#778) [core] do commit 1(apache#778) [core] do commit 2(apache#778) # 这是提交说明 apache#20: sdda 这是一个 3 个提交的组合。 [core] commit1 (apache#778) [core] commit2 (apache#778) [core] commit3 (apache#778)
…readble_row_skip PRODENAB-149: skip unreadable records for all cdc writers
Overwrite is a useful transaction for batch jobs to completely update a partition for data correction. Currently FileStoreScanImpl doesn't implement this transaction so we need to implement that.