Keep original location in HadoopInputFile if given#2170
Keep original location in HadoopInputFile if given#2170rdblue merged 4 commits intoapache:masterfrom
Conversation
This is a fix for issue apache#2169. In `BaseDataReader#getInputFile`, the `InputFile` is looked up in a map by the path string. The keys in the map are normalized paths. If the path string in the manifest is not normalized, the lookup will fail since the strings are not equal. The fix is to normalize the path string before looking it up in the map.
steveloughran
left a comment
There was a problem hiding this comment.
might be best to use path.toURI() as the key, as that has the strictest guarantees of escaping things
| protected InputFile getInputFile(FileScanTask task) { | ||
| Preconditions.checkArgument(!task.isDataTask(), "Invalid task type"); | ||
| return inputFiles.get(task.file().path().toString()); | ||
| return getInputFile(task.file().path().toString()); |
There was a problem hiding this comment.
probably best to path().toURI() for best guarantee of handling of spaces
There was a problem hiding this comment.
This isn't a Hadoop Path, it is a CharSequence. That's how we generally avoid issues like encoding.
| InputFile getInputFile(String location) { | ||
| return inputFiles.get(location); | ||
| // normalize the path before looking it up in the map | ||
| Path path = new Path(location); |
There was a problem hiding this comment.
I don't think using the Hadoop API directly is a good way to solve the problem. It sounds like we need to fix the keys in the map to match the original location from the input split instead.
There was a problem hiding this comment.
For @steveloughran's information, the reason the keys in the inputFiles map became normalized paths is that the files for the scan tasks go through encryption and decryption:
Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
.map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));
// decrypt with the batch call to avoid multiple RPCs to a key server, if possible
Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(encrypted::iterator);
and the keys are the location of the decrypted files. The call to io.newInputFile either passes through HadoopFileIO or S3FileIO (currently the two implementations of FileIO), and the newInputFile process normalized the path. The normalization through HadoopFileIO is exactly what I'm replicating here. I think it will work for an S3 path too.
@rdblue, from the decrypted files, I do not see a natural way to recover the original path string written in the manifest. Instead, can we add a method to the FileIO interface to return the normalized path for a path String, and then HadoopFileIO and S3FileIO will have to implement it?
There was a problem hiding this comment.
Sounds like we will need to fix the interfaces that are modifying the file location and pass the original.
There was a problem hiding this comment.
I think that HadoopInputFile just needs to be updated so that fromLocation preserves the original location rather than using path.toString().
There was a problem hiding this comment.
@rdblue thank you for the suggestion; I have implemented it and updated the PR description accordingly.
There was a problem hiding this comment.
According to the javadoc for InputFile#location, this is "The fully-qualified location of the input file as a String." The only concern I have is if we ever call FileIO#newInputFile or HadoopInputFile.fromLocation with a location that is not fully-qualified.
flink/src/main/java/org/apache/iceberg/flink/source/DataIterator.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
Outdated
Show resolved
Hide resolved
|
Thanks, @wypoon! I'll merge this when tests are passing. |
6ee16a8 to
4318d2e
Compare
|
Merged. Thank you for fixing this, @wypoon! And thanks for the review, @steveloughran! |
This is a fix for issue #2169.
In the
getInputFilemethods oforg.apache.iceberg.spark.source.BaseDataReaderandorg.apache.iceberg.flink.source.DataIterator, theInputFileis looked up in a map by the path string. The keys in the map come fromInputFile#location. Currently, when we create aHadoopInputFileusing a location string, we do not store the string in a field but construct anorg.apache.hadoop.fs.Pathfrom it and store only thePath.HadoopInputFile#locationthen returnspath.toString()for thisPath. This causes the string to be normalized. If the path string in the manifest is not normalized, the lookup will fail since the strings are not equal.The fix is to store the original location string in
HadoopInputFilewhen it is created using a location string, so that we can return this string inHadoopInputFile#location.