Skip to content

Commit

Permalink
[HUDI-4913] Fix HoodieSnapshotExporter for writing to a different S3 …
Browse files Browse the repository at this point in the history
…bucket or FS (#6785)
  • Loading branch information
yihua authored and yuzhaojing committed Sep 29, 2022
1 parent 1c12e84 commit 31ba568
Showing 1 changed file with 38 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,18 @@ public static class Config implements Serializable {
}

public void export(JavaSparkContext jsc, Config cfg) throws IOException {
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);

if (outputPathExists(fs, cfg)) {
if (outputPathExists(outputFs, cfg)) {
throw new HoodieSnapshotExporterException("The target output path already exists.");
}

final String latestCommitTimestamp = getLatestCommitTimestamp(fs, cfg).<HoodieSnapshotExporterException>orElseThrow(() -> {
throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
});
FileSystem sourceFs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
final String latestCommitTimestamp = getLatestCommitTimestamp(sourceFs, cfg)
.<HoodieSnapshotExporterException>orElseThrow(() -> {
throw new HoodieSnapshotExporterException("No commits present. Nothing to snapshot.");
});
LOG.info(String.format("Starting to snapshot latest version files which are also no-late-than %s.",
latestCommitTimestamp));

Expand All @@ -134,11 +136,11 @@ public void export(JavaSparkContext jsc, Config cfg) throws IOException {
LOG.info(String.format("The job needs to export %d partitions.", partitions.size()));

if (cfg.outputFormat.equals(OutputFormatValidator.HUDI)) {
exportAsHudi(jsc, cfg, partitions, latestCommitTimestamp);
exportAsHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
} else {
exportAsNonHudi(jsc, cfg, partitions, latestCommitTimestamp);
exportAsNonHudi(jsc, sourceFs, cfg, partitions, latestCommitTimestamp);
}
createSuccessTag(fs, cfg);
createSuccessTag(outputFs, cfg);
}

private boolean outputPathExists(FileSystem fs, Config cfg) throws IOException {
Expand All @@ -164,7 +166,8 @@ private void createSuccessTag(FileSystem fs, Config cfg) throws IOException {
}
}

private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) {
private void exportAsNonHudi(JavaSparkContext jsc, FileSystem sourceFs,
Config cfg, List<String> partitions, String latestCommitTimestamp) {
Partitioner defaultPartitioner = dataset -> {
Dataset<Row> hoodieDroppedDataset = dataset.drop(JavaConversions.asScalaIterator(HoodieRecord.HOODIE_META_COLUMNS.iterator()).toSeq());
return StringUtils.isNullOrEmpty(cfg.outputPartitionField)
Expand All @@ -178,7 +181,7 @@ private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> part

HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
context.setJobStatus(this.getClass().getSimpleName(), "Exporting as non-HUDI dataset: " + cfg.targetOutputPath);
final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);
Iterator<String> exportingFilePaths = jsc
.parallelize(partitions, partitions.size())
.flatMap(partition -> fsView
Expand All @@ -193,8 +196,9 @@ private void exportAsNonHudi(JavaSparkContext jsc, Config cfg, List<String> part
.save(cfg.targetOutputPath);
}

private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
final BaseFileOnlyView fsView = getBaseFileOnlyView(jsc, cfg);
private void exportAsHudi(JavaSparkContext jsc, FileSystem sourceFs,
Config cfg, List<String> partitions, String latestCommitTimestamp) throws IOException {
final BaseFileOnlyView fsView = getBaseFileOnlyView(sourceFs, cfg);

final HoodieEngineContext context = new HoodieSparkEngineContext(jsc);
final SerializableConfiguration serConf = context.getHadoopConf();
Expand All @@ -219,20 +223,26 @@ private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partiti
String partition = tuple._1();
Path sourceFilePath = new Path(tuple._2());
Path toPartitionPath = FSUtils.getPartitionPath(cfg.targetOutputPath, partition);
FileSystem fs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());
FileSystem executorSourceFs = FSUtils.getFs(cfg.sourceBasePath, serConf.newCopy());
FileSystem executorOutputFs = FSUtils.getFs(cfg.targetOutputPath, serConf.newCopy());

if (!fs.exists(toPartitionPath)) {
fs.mkdirs(toPartitionPath);
if (!executorOutputFs.exists(toPartitionPath)) {
executorOutputFs.mkdirs(toPartitionPath);
}
FileUtil.copy(fs, sourceFilePath, fs, new Path(toPartitionPath, sourceFilePath.getName()), false,
fs.getConf());
FileUtil.copy(
executorSourceFs,
sourceFilePath,
executorOutputFs,
new Path(toPartitionPath, sourceFilePath.getName()),
false,
executorOutputFs.getConf());
}, files.size());

// Also copy the .commit files
LOG.info(String.format("Copying .commit files which are no-late-than %s.", latestCommitTimestamp));
final FileSystem fileSystem = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
FileSystem outputFs = FSUtils.getFs(cfg.targetOutputPath, jsc.hadoopConfiguration());
FileStatus[] commitFilesToCopy =
fileSystem.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
sourceFs.listStatus(new Path(cfg.sourceBasePath + "/" + HoodieTableMetaClient.METAFOLDER_NAME), (commitFilePath) -> {
if (commitFilePath.getName().equals(HoodieTableConfig.HOODIE_PROPERTIES_FILE)) {
return true;
} else {
Expand All @@ -244,20 +254,22 @@ private void exportAsHudi(JavaSparkContext jsc, Config cfg, List<String> partiti
for (FileStatus commitStatus : commitFilesToCopy) {
Path targetFilePath =
new Path(cfg.targetOutputPath + "/" + HoodieTableMetaClient.METAFOLDER_NAME + "/" + commitStatus.getPath().getName());
if (!fileSystem.exists(targetFilePath.getParent())) {
fileSystem.mkdirs(targetFilePath.getParent());
if (!outputFs.exists(targetFilePath.getParent())) {
outputFs.mkdirs(targetFilePath.getParent());
}
if (fileSystem.exists(targetFilePath)) {
if (outputFs.exists(targetFilePath)) {
LOG.error(
String.format("The target output commit file (%s targetBasePath) already exists.", targetFilePath));
}
FileUtil.copy(fileSystem, commitStatus.getPath(), fileSystem, targetFilePath, false, fileSystem.getConf());
FileUtil.copy(sourceFs, commitStatus.getPath(), outputFs, targetFilePath, false, outputFs.getConf());
}
}

private BaseFileOnlyView getBaseFileOnlyView(JavaSparkContext jsc, Config cfg) {
FileSystem fs = FSUtils.getFs(cfg.sourceBasePath, jsc.hadoopConfiguration());
HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(cfg.sourceBasePath).build();
private BaseFileOnlyView getBaseFileOnlyView(FileSystem sourceFs, Config cfg) {
HoodieTableMetaClient tableMetadata = HoodieTableMetaClient.builder()
.setConf(sourceFs.getConf())
.setBasePath(cfg.sourceBasePath)
.build();
return new HoodieTableFileSystemView(tableMetadata, tableMetadata
.getActiveTimeline().getWriteTimeline().filterCompletedInstants());
}
Expand Down

0 comments on commit 31ba568

Please sign in to comment.