-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parse Batch support #5081
Parse Batch support #5081
Conversation
@pjain1 could this be moved to 0.11.1? It doesn't seem like a regression fix or something that critically needs to be in 0.11.0. So I'd rather avoid delaying 0.11.0. Although if there is a reason I'm missing please call it out. |
0.11.1 is fine |
@gianm reason for originally putting it in 0.11.0 was that some customers have been blocked on this feature for quite a while. |
@gianm thanks. |
I don't understand TeamCity inspection, now it is complaining about a line which is from a commit from 2014. |
I restarted it. I've noticed sometimes it does that and I'm not sure why. |
@pjain1 could you please merge master into this branch and push it? May help with CI. |
add addAll method, skip max rows in memory check for it remove parse method from implemetations transform transformers add string multiplier input row parser fix withParseSpec fix kafka batch indexing fix isPersistRequired comments
@gianm rebased with master |
@pjain1 thanks! reviewing this now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good, just a few comments.
{ | ||
return parseMap(buildStringKeyMap(input)); | ||
return ImmutableList.of(parseMap(buildStringKeyMap(input))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throws NPE if parseMap returns a null. Maybe in general it's better to prefer your Utils.nullableListOf()
instead of ImmutableList.of()
throughout - for example NoopInputRowParser
will also throw an NPE if input is null which is not the same behavior as previously.
* Parse an input into list of {@link InputRow}. List can contains null for rows that should be thrown away, | ||
* or throws {@code ParseException} if the input is unparseable. | ||
*/ | ||
default List<InputRow> parseBatch(T input) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider using @NotNull
or adding a comment to the Javadoc that implementations must not return null. Lots of things would break if a null was returned.
@Override | ||
public boolean hasMore() | ||
{ | ||
return iter.hasNext(); | ||
return iter.hasNext() || nextIterator.hasNext(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should check nextIterator.hasNext()
first before ConsumerIterator.hasNext()
in case their implementation is blocking, for example in old implementations: https://issues.apache.org/jira/browse/KAFKA-520
@@ -92,20 +96,20 @@ protected void map(Object key, Object value, Context context) throws IOException | |||
} | |||
|
|||
@Nullable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't be marked as @Nullable
anymore. Also method can be made private.
@dclim thanks for the review, I have addressed your comments. |
👍 |
@pjain1 @himanshug |
@quenlang This is the code changes. |
Fixes #4373
InputRowParser
to haveList<InputRow> parseBatch(T input)
method which returns list of InputRows. Currently is has a default implementation which just wraps the output ofparse(T)
method in a list.InputRowParser
to implementparseBatch
instead ofparse
, although they currently just warp the output of parsing the input in a list.parse
in the codebase toparseBatch
.parseBatch
.Appenderator
'sadd
method to accept a flag to disable automatic persists, so that callers can decide to not persist during addition of batch of rows and can call persist later explicitly.Having been running this code on our internal cluster with a InputRowPraser that multiplies each row a certain number of times and using KafkaIndexingService to index these rows.