Skip to content
Permalink
Browse files
fix: remove no exist path from merge source paths (#130)
  • Loading branch information
coderzc committed Oct 27, 2021
1 parent 42e9e6b commit af865d913121dbd6d50d8b5c7a0ee9dbe127bb4d
Showing 1 changed file with 14 additions and 9 deletions.
@@ -21,6 +21,8 @@

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -42,16 +44,16 @@ protected HdfsOutputMerger() {

protected void init(Config config) {
try {
Configuration configuration = new Configuration();
String url = config.get(ComputerOptions.OUTPUT_HDFS_URL);
String user = config.get(ComputerOptions.OUTPUT_HDFS_USER);
this.fs = FileSystem.get(new URI(url), configuration, user);

String dir = config.get(ComputerOptions.OUTPUT_HDFS_DIR);
String jobId = config.get(ComputerOptions.JOB_ID);
int partitions = config.get(ComputerOptions.JOB_PARTITIONS_COUNT);
this.sourcePaths = this.paths(dir, jobId, partitions);
this.mergedPath = new Path(new Path(dir, jobId), MERGED_FILE_NAME);

Configuration configuration = new Configuration();
String url = config.get(ComputerOptions.OUTPUT_HDFS_URL);
String user = config.get(ComputerOptions.OUTPUT_HDFS_USER);
this.fs = FileSystem.get(new URI(url), configuration, user);
} catch (Exception e) {
throw new ComputerException("Failed to init hdfs output merger", e);
}
@@ -66,13 +68,16 @@ protected void merge() {
}
}

private Path[] paths(String dir, String jobId, int partitions) {
Path[] paths = new Path[partitions];
private Path[] paths(String dir, String jobId, int partitions) throws
IOException {
List<Path> pathList = new ArrayList<>();
for (int i = 0; i < partitions; i++) {
Path path = HdfsOutput.buildPath(dir, jobId, i);
paths[i] = path;
if (this.fs.exists(path)) {
pathList.add(path);
}
}
return paths;
return pathList.toArray(new Path[0]);
}

protected void close() {

0 comments on commit af865d9

Please sign in to comment.