Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 29 additions & 11 deletions core/src/main/java/org/apache/iceberg/hadoop/HadoopInputFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,31 @@
public class HadoopInputFile implements InputFile {
public static final String[] NO_LOCATION_PREFERENCE = new String[0];

private final String location;
private final FileSystem fs;
private final Path path;
private final Configuration conf;
private FileStatus stat = null;
private Long length = null;

public static HadoopInputFile fromLocation(CharSequence location, Configuration conf) {
Path path = new Path(location.toString());
return fromPath(path, conf);
FileSystem fs = Util.getFs(new Path(location.toString()), conf);
return new HadoopInputFile(fs, location.toString(), conf);
}

public static HadoopInputFile fromLocation(CharSequence location, long length,
Configuration conf) {
Path path = new Path(location.toString());
return fromPath(path, length, conf);
FileSystem fs = Util.getFs(new Path(location.toString()), conf);
return new HadoopInputFile(fs, location.toString(), length, conf);
}

public static HadoopInputFile fromLocation(CharSequence location, FileSystem fs) {
Path path = new Path(location.toString());
return fromPath(path, fs);
return new HadoopInputFile(fs, location.toString(), fs.getConf());
}

public static HadoopInputFile fromLocation(CharSequence location, long length,
FileSystem fs) {
Path path = new Path(location.toString());
return fromPath(path, length, fs);
return new HadoopInputFile(fs, location.toString(), length, fs.getConf());
}

public static HadoopInputFile fromPath(Path path, Configuration conf) {
Expand Down Expand Up @@ -110,23 +109,42 @@ public static HadoopInputFile fromStatus(FileStatus stat, FileSystem fs, Configu
return new HadoopInputFile(fs, stat, conf);
}

private HadoopInputFile(FileSystem fs, String location, Configuration conf) {
this.fs = fs;
this.location = location;
this.path = new Path(location);
this.conf = conf;
}

private HadoopInputFile(FileSystem fs, String location, long length, Configuration conf) {
Preconditions.checkArgument(length >= 0, "Invalid file length: %s", length);
this.fs = fs;
this.location = location;
this.path = new Path(location);
this.conf = conf;
this.length = length;
}

private HadoopInputFile(FileSystem fs, Path path, Configuration conf) {
this.fs = fs;
this.path = path;
this.location = path.toString();
this.conf = conf;
}

private HadoopInputFile(FileSystem fs, Path path, long length, Configuration conf) {
Preconditions.checkArgument(length >= 0, "Invalid file length: %s", length);
this.fs = fs;
this.path = path;
this.location = path.toString();
this.conf = conf;
this.length = length;
}

private HadoopInputFile(FileSystem fs, FileStatus stat, Configuration conf) {
this.fs = fs;
this.path = stat.getPath();
this.location = path.toString();
this.stat = stat;
this.conf = conf;
this.length = stat.getLen();
Expand Down Expand Up @@ -181,8 +199,8 @@ public Path getPath() {
public String[] getBlockLocations(long start, long end) {
List<String> hosts = Lists.newArrayList();
try {
for (BlockLocation location : fs.getFileBlockLocations(path, start, end)) {
Collections.addAll(hosts, location.getHosts());
for (BlockLocation bl : fs.getFileBlockLocations(path, start, end)) {
Collections.addAll(hosts, bl.getHosts());
}

return hosts.toArray(NO_LOCATION_PREFERENCE);
Expand All @@ -194,7 +212,7 @@ public String[] getBlockLocations(long start, long end) {

@Override
public String location() {
return path.toString();
return location;
}

@Override
Expand Down