Skip to content

Commit

Permalink
[HUDI-6512] Check whether exists write error before commit compaction…
Browse files Browse the repository at this point in the history
… and logCompaction (#9153)
  • Loading branch information
beyond1920 committed Jul 11, 2023
1 parent 0258a89 commit 6f13283
Showing 1 changed file with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public void commitCompaction(String compactionInstantTime, HoodieCommitMetadata
protected void completeCompaction(HoodieCommitMetadata metadata, HoodieTable table, String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
handleWriteErrors(writeStats, TableServiceType.COMPACT);
final HoodieInstant compactionInstant = HoodieTimeline.getCompactionInflightInstant(compactionCommitTime);
try {
this.txnManager.beginTransaction(Option.of(compactionInstant), Option.empty());
Expand Down Expand Up @@ -167,6 +168,7 @@ protected void completeLogCompaction(HoodieCommitMetadata metadata,
String logCompactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect log compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
handleWriteErrors(writeStats, TableServiceType.LOG_COMPACT);
final HoodieInstant logCompactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.LOG_COMPACTION_ACTION, logCompactionCommitTime);
try {
this.txnManager.beginTransaction(Option.of(logCompactionInstant), Option.empty());
Expand Down Expand Up @@ -249,14 +251,8 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
HoodieTable table,
String clusteringCommitTime,
Option<HoodieData<WriteStatus>> writeStatuses) {
List<HoodieWriteStat> writeStats = metadata.getPartitionToWriteStats().entrySet().stream().flatMap(e ->
e.getValue().stream()).collect(Collectors.toList());

if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
throw new HoodieClusteringException("Clustering failed to write to files:"
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
}

List<HoodieWriteStat> writeStats = metadata.getWriteStats();
handleWriteErrors(writeStats, TableServiceType.CLUSTER);
final HoodieInstant clusteringInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringCommitTime);
try {
this.txnManager.beginTransaction(Option.of(clusteringInstant), Option.empty());
Expand Down Expand Up @@ -291,6 +287,13 @@ private void completeClustering(HoodieReplaceCommitMetadata metadata,
LOG.info("Clustering successfully on commit " + clusteringCommitTime);
}

private void handleWriteErrors(List<HoodieWriteStat> writeStats, TableServiceType tableServiceType) {
if (writeStats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum() > 0) {
throw new HoodieClusteringException(tableServiceType + " failed to write to files:"
+ writeStats.stream().filter(s -> s.getTotalWriteErrors() > 0L).map(HoodieWriteStat::getFileId).collect(Collectors.joining(",")));
}
}

private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>> clusteringMetadata, String clusteringCommitTime, HoodieTable table) {
if (clusteringMetadata.getWriteStatuses().isEmpty()) {
HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan(
Expand Down

0 comments on commit 6f13283

Please sign in to comment.