-
Notifications
You must be signed in to change notification settings - Fork 1
Stage files #27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stage files #27
Conversation
writer.close(); | ||
if (tmpPath != null) { | ||
Files.move(tmpPath, path, REPLACE_EXISTING); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this also run if there is an unhandled exception ?? Like socket connection with hdfs is broken ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, in that case the IOException will be propagated to the upper level, and the offsets file will not be updated. At the next iteration, it will retry.
} | ||
|
||
// Response codes for each write record case | ||
public enum WriteStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WriteResponse ??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Thanks.
} else { | ||
this.tmpPath = Files.createTempFile(tmpDir, path.getFileName().toString(), | ||
gzip ? ".tmp.gz" : ".tmp"); | ||
outFile = Files.newOutputStream(tmpPath, StandardOpenOption.WRITE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens here if the tmpfile already exists and is corrupt ?
I think using the default with no options will be better -
javadoc -
If no options are present then this method works as if the CREATE, TRUNCATE_EXISTING, and WRITE options are present. In other words, it opens the file for writing, creating the file if it doesn't exist, or initially truncating an existing regular-file to a size of 0 if it exists.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I just realised that in addition, I'll need to code some logic for when the input file is corrupt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you can catch ZipException here and delete that file i guess.
I have seen only the following exception when it tries to append to corrupt files --
java.util.zip.ZipException: invalid block type
at java.util.zip.InflaterInputStream.read(InflaterInputStream.java:164)
at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:117)
at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:122)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at org.radarcns.util.RecordConverterFactory.testSortedAndUnique(RecordConverterFactory.java:166)
at org.radarcns.util.RecordConverterFactory.sortUnique(RecordConverterFactory.java:73)
at org.radarcns.util.FileCacheStore.ensureCapacity(FileCacheStore.java:108)
at org.radarcns.util.FileCacheStore.writeRecord(FileCacheStore.java:78)
at org.radarcns.RestructureAvroRecords.writeRecord(RestructureAvroRecords.java:287)
at org.radarcns.RestructureAvroRecords.processFile(RestructureAvroRecords.java:243)
at org.radarcns.RestructureAvroRecords.start(RestructureAvroRecords.java:186)
at org.radarcns.RestructureAvroRecords.main(RestructureAvroRecords.java:93)
Write output data to a temporary staging directory and move them once we close the file. Fixes #26. To get the full file contents, that means that before starting writing, first the existing file, if any, needs to be copied into the staging file. The new staging behaviour will be the default, but it can be switched off if needed using the
--no-stage
flag.