New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement reliable log file management for Merge on read, which is fa… #64
Conversation
* Also contains logic to roll-over the log file | ||
*/ | ||
public class HoodieLogFile { | ||
public static final String DELTA_EXTENSION = ".avro.delta"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Concepts like delta should nt be leaked into the log file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, deferring to the next MoR commit, we will know where this goes.
* | ||
* @see org.apache.avro.file.DataFileReader | ||
*/ | ||
public class AvroLogAppender { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should implement HoodieLogAppender?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see you have wrapped this in the RollingAvroLogAppender..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it should implement. did that.
} | ||
} | ||
this.writer.appendTo(new AvroFSInput(FileContext.getFileContext(fs.getConf()), path), output); | ||
this.writer.setFlushOnEveryBlock(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs to come from autoFlush above? (not sure)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
setFlushOnEveryBlock is slightly different. autoFlush config says should we write out a block after every call to append(List records). setFlushOnEveryBlock - means whenever a block is written out flush to disk - we always want that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment in code to clarify
this.file = file; | ||
} | ||
|
||
public Iterator<GenericRecord> readBlock(long startOffset) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is a block? Does this read till we reach the end of the file? if so, should this method be readRecords(.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no. Avro has blocks which are seperated by a sync marker. We will write each commit/transaction in a block. we will always read a block at a time. It does not read till the end of the file. This check pastSync() takes care. Look at test cases for more clarity.
|
||
public Iterator<GenericRecord> readBlock(long startOffset) throws IOException { | ||
// We keep track of exact offset for blocks, just seek to it directly | ||
reader.seek(startOffset); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a safety checks on startOffset bounds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reader.seek will throw appropriate exception on bad offset
throw new HoodieIOException( | ||
"Could not read avro records from path " + hoodieLogFile); | ||
} | ||
}).collect(Collectors.toMap(new Function<AvroLogReader, Integer>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
given we use a map, how do we ensure we read the the versions from lowest to highest?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a queue of log files to read, seems like a more natural data structure to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rework this a little bit in the follow up commit.
}, Function.identity())); | ||
} | ||
|
||
public Iterator<GenericRecord> readBlocks(Map<Integer, List<Long>> filesToOffsetMap) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment on readBlocks(..), I think it will keep reading till the end.. so not sure about what a block means here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it wont. same comment as above.
|
||
public Iterators(Map<Integer, List<Long>> versionToOffsetMap, | ||
Map<Integer, AvroLogReader> readers) { | ||
this.currentVersionIterator = versionToOffsetMap.keySet().iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does not guarantee order unless this is a TreeMap?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. Will re-work this.
}); | ||
} | ||
|
||
public class Iterators implements Iterator<GenericRecord> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class look ok. but probably need more tests and so forth.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, any specific test (in addition to the ones existing) you have in mind?. We can always add more tests to this later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a test that tests some happy paths and degenerate paths
- Just one iterator
- 0 iterators
- iterators with 0 elements, 1 elements chained here
As long as we have some tests around this class standalone, it will be nice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall my take is, we probably need to iterate on the abstractions a bit more.. The one introduced here HoodieLogAppender
can be generalized more..
Here is my suggestion . There is a need for abstractions at two levels
- HoodieLogFile: supports HoodieLogAppender + Reader as well (ideally we had done the HoodieFile abstraction by now, so this all flows nicely. I can work on that in feb)
- HoodieLog : Which is a Collection with a pluggable comparator for ordering files. This has the functionality of CompositeReader and RollingAppender..
Then, we will have a HoodieAvroLogFile (that can read and append from individually) and HoodieDeltaLog which looks at the overall thing as a single rolling log of delta updates. This way we can add methods like trimLog(.) which can be used for purging old log files etc..
Once this is there, we can move teh current commit archived log also into a HoodieCommitLogFile and HoodieCommitArchivedLog impl..
Your abstraction thinks of this a OO exercise and makes sense. But there are some practical limitations to consider. We need transfer objects which can be serialized (HoodieLogFile and HoodieFile) are those objects. HoodieLogAppender will be task executor specific. Also dont think the concept of rolling log is not specific to HoodieDeltaLog (thinking this will be re-used in archivedlog and in global index in future). TrimLog and compact log can be methods in RollingAvroLogAppender. We could still have HoodieAvroLogFile, HoodieCommitLogFile extending from RollingAvroLogAppender. Lets discuss this on a separate issue, once the code for MoR is in, the abstraction changes will be more concrete. |
Okay Now I see what you mean. We can have "log" abstractions over the reader and writer which unifies getting a reader and writer from a single log interface. Lets add this later. I will create a issue to track it. |
…ult tolerant and allows random block level access on avro file
yeah I think we are saying the same things. Overall for Hoodie as project, we need a File abstraction, and a log abstraction that builds on top of it and provides the functionality of the RollingLogAppender and the Composite Log reader ... lol.. I am thinking how that in itself will be an useful OSS project :) |
Overall, go ahead merge this, once you ahve the follow on commit with the few things we are fixing. Created #68 to follow up on other thigns discussed here |
…estimator adding default estimator configs
…ult tolerant and allows random block level access on avro file