Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5882] Close table metadata and file system view instances in Metadata Table Validator #8106

Merged
merged 1 commit into from Mar 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -443,41 +443,45 @@ public boolean doMetadataTableValidation() {
return true;
}

HoodieMetadataValidationContext metadataTableBasedContext =
new HoodieMetadataValidationContext(engineContext, cfg, metaClient, true);
HoodieMetadataValidationContext fsBasedContext =
new HoodieMetadataValidationContext(engineContext, cfg, metaClient, false);
try (HoodieMetadataValidationContext metadataTableBasedContext =
new HoodieMetadataValidationContext(engineContext, cfg, metaClient, true);
HoodieMetadataValidationContext fsBasedContext =
new HoodieMetadataValidationContext(engineContext, cfg, metaClient, false)) {
Set<String> finalBaseFilesForCleaning = baseFilesForCleaning;
List<Pair<Boolean, String>> result = engineContext.parallelize(allPartitions, allPartitions.size()).map(partitionPath -> {
try {
validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath, finalBaseFilesForCleaning);
LOG.info(String.format("Metadata table validation succeeded for partition %s (partition %s)", partitionPath, taskLabels));
return Pair.of(true, "");
} catch (HoodieValidationException e) {
LOG.error(
String.format("Metadata table validation failed for partition %s due to HoodieValidationException (partition %s)",
partitionPath, taskLabels), e);
if (!cfg.ignoreFailed) {
throw e;
}
return Pair.of(false, e.getMessage() + " for partition: " + partitionPath);
}
}).collectAsList();

Set<String> finalBaseFilesForCleaning = baseFilesForCleaning;
List<Pair<Boolean, String>> result = engineContext.parallelize(allPartitions, allPartitions.size()).map(partitionPath -> {
try {
validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath, finalBaseFilesForCleaning);
LOG.info(String.format("Metadata table validation succeeded for partition %s (partition %s)", partitionPath, taskLabels));
return Pair.of(true, "");
} catch (HoodieValidationException e) {
LOG.error(
String.format("Metadata table validation failed for partition %s due to HoodieValidationException (partition %s)",
partitionPath, taskLabels), e);
if (!cfg.ignoreFailed) {
throw e;
for (Pair<Boolean, String> res : result) {
finalResult &= res.getKey();
if (res.getKey().equals(false)) {
LOG.error("Metadata Validation failed for table: " + cfg.basePath + " with error: " + res.getValue());
}
return Pair.of(false, e.getMessage() + " for partition: " + partitionPath);
}
}).collectAsList();

for (Pair<Boolean, String> res : result) {
finalResult &= res.getKey();
if (res.getKey().equals(false)) {
LOG.error("Metadata Validation failed for table: " + cfg.basePath + " with error: " + res.getValue());
if (finalResult) {
LOG.info(String.format("Metadata table validation succeeded (%s).", taskLabels));
return true;
} else {
LOG.warn(String.format("Metadata table validation failed (%s).", taskLabels));
return false;
}
}

if (finalResult) {
LOG.info(String.format("Metadata table validation succeeded (%s).", taskLabels));
} catch (Exception e) {
LOG.warn("Error closing HoodieMetadataValidationContext, "
+ "ignoring the error as the validation is successful.", e);
return true;
} else {
LOG.warn(String.format("Metadata table validation failed (%s).", taskLabels));
return false;
}
}

Expand Down Expand Up @@ -1018,7 +1022,7 @@ public int compare(HoodieColumnRangeMetadata<Comparable> o1, HoodieColumnRangeMe
* the same information regardless of whether metadata table is enabled, which is
* verified in the {@link HoodieMetadataTableValidator}.
*/
private static class HoodieMetadataValidationContext implements Serializable {
private static class HoodieMetadataValidationContext implements AutoCloseable, Serializable {

private static final Logger LOG = LogManager.getLogger(HoodieMetadataValidationContext.class);

Expand Down Expand Up @@ -1145,5 +1149,11 @@ private Option<BloomFilterData> readBloomFilterFromFile(String partitionPath, St
.setBloomFilter(ByteBuffer.wrap(bloomFilter.serializeToString().getBytes()))
.build());
}

@Override
public void close() throws Exception {
tableMetadata.close();
fileSystemView.close();
}
}
}