-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consider null inputRows and parse errors as unparseable during realtime ingestion. #1506
Conversation
continue; | ||
} | ||
} | ||
catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be ParseException ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure I will change this to ParseException, both here and in the RealtimeManager (which has been using Exception). ParseException is probably better since we're wanting to log & continue on formatting problems but not on, like, network glitches and stuff.
a simple test to verify the behaviour would be nice. |
11c0ab2
to
7f02b36
Compare
Agree that a test would be nice, I could do one at some point before the PR is merged. i.e., Soon™ For now though does the general approach seem reasonable? Especially in the context of #1350? |
7f02b36
to
147b39a
Compare
LGTM. |
@@ -252,8 +252,14 @@ public void run() | |||
try { | |||
try { | |||
inputRow = firehose.nextRow(); | |||
|
|||
if (inputRow == null) { | |||
log.debug("thrown away null input row, considering unparseable"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to give a row # in the debug log?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like how many inputRows have been read up until this one? Yes, but would that actually be helpful? I would think it's not helpful and what you'd actually want is the kafka partition and offset. But there isn't really a way to get that at this point in the code, since it's buried inside the firehose.
That could potentially be solved by having Firehoses return InputRowAndMetadata (with two methods: getRow and getMetadata) where the metadata is something that we can toString in this error message. But, that's a Firehose interface change, which is tough to do since it's an external api.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That also seems like it would add a lot of overhead for what is arguably a corner case.
Ultimately it would be nice if "where did my failure happen" had some sort of breadcrumb here, but given the nature of ingestion it probably makes more sense to have a validation layer well before this point in the data stream (aka not in druid). And any failures here could be assumed to be fault in the communication, making this a safeguard to prevent catastrophic behavior on rare events.
Does that sound like a correct statement?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definitely think validation pre-Druid is the way to go, so I agree with the statement that unparseable events should be rare and probably indicate something wrong with the data pipeline.
👍 |
plumber.persist(firehose.commit()); | ||
nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); | ||
} | ||
|
||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this change make it possible to try and persist an empty event sequence? If I recall correctly that throws an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example: long time until first event, first event is null, plumber tries to flush without having events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because there's a continue; after a null inputRow is found, so the loop will go directly back to firehose.hasMore() + firehose.nextRow() before trying to persist anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, missed it. Thanks!
Is this covered in a test somewhere? |
…me ingestion. Also, harmonize exception handling between the RealtimeIndexTask and the RealtimeManager. Conditions other than null inputRows and parse errors bubble up in both.
147b39a
to
9068bcd
Compare
Added a test. |
+1, once travis is green :) |
closing/reopening to rebuild, build failure seems unrelated |
Consider null inputRows and parse errors as unparseable during realtime ingestion.
Consider null inputRows as unparseable during realtime ingestion. Also, harmonize exception handling between the RealtimeIndexTask and the RealtimeManager.
Related to #1350