msq path to emit unparseable exceptions#13670
msq path to emit unparseable exceptions#13670TSFenwick wants to merge 9 commits intoapache:masterfrom
Conversation
cryptoe
left a comment
There was a problem hiding this comment.
Left some comments.
Thanks for taking this up.
Please update the description whenever you have time.
...-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/FrameProcessorFactory.java
Outdated
Show resolved
Hide resolved
| return; | ||
| } | ||
| } | ||
| if (CannotParseExternalDataFault.CODE.equals(errorCode)) { |
There was a problem hiding this comment.
This is what I would do:
Rename MSQWarningReportPublisher to ExceptionPublisher.
Create a new CompositeExceptionPublisher.
One implementation can be of a FilteredEmitterExceptionPublisher where in the constructor you take the error code.
It would be cool to avoid This call
:MSQErrorReport msqFault = MSQErrorReport.fromException(workerId, host, stageNumber, e);
There was a problem hiding this comment.
:MSQErrorReport msqFault = MSQErrorReport.fromException(workerId, host, stageNumber, e);
will definitely avoid repeating this call
There was a problem hiding this comment.
How do i go about attaching a debugger in MSQ to see if these reports are being captured/emitted
There was a problem hiding this comment.
I'm not sure i follow
This is what I would do:
Rename MSQWarningReportPublisher to ExceptionPublisher.Create a new CompositeExceptionPublisher.
One implementation can be of a FilteredEmitterExceptionPublisher where in the constructor you take the error code.
Are you saying create another class that implements the interface that i will rename.
The CompositeExceptionPublisher i don't understand?
FilteredEmitterExceptionPublisher i also don't understand
There was a problem hiding this comment.
How do i go about attaching a debugger in MSQ to see if these reports are being captured/emitted
MSQWarningsTest#testThrowExceptionWhenParseExceptionsExceedLimit should be a good one.
CompositeExceptionPublisher will implement the renamed interface ExceptionPublisher something like
class CompositeExceptionPublisher implements ExceptionPublisher{
List<ExceptionPublisher> delegates;
public CompositeExceptionPublisher(List<ExceptionPublisher> delegates){
this.delegates=delegates;
}
void publishException(int stageNumber, Throwable e){
for(ExceptionPublisher delegate:delegates){
delegate.publishException(stage,e);
}
}
void close() throws IOException{
for(ExceptionPublisher delegate:delegates){
delegate.close();
}
}
}Now you can create a new impl FilteredEmitterExceptionPublisher and pass it errorCodes in the constructor.
Then in WorkerImpl#runTask method create a CompositeExceptionPublisher with MSQWarningReportLimiterPublisher and FilteredEmitterExceptionPublisher and pass it along.
Maybe this strategy can work.
There was a problem hiding this comment.
@cryptoe How does this look?
We have the Limiter publisher and we pass in the composite publisher that contains the normal publisher and the emitter publisher
have a couple of todos as well like setting up the correct event to emit, and getting more relevant information
...ons-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestWorkerContext.java
Outdated
Show resolved
Hide resolved
LakshSingla
left a comment
There was a problem hiding this comment.
How do i go about attaching a debugger in MSQ to see if these reports are being captured/emitted
There should be a class MSQWarningsTest which test a few conditions when warnings should be thrown. Also, to test if the metrics are emitted, a few ways would be to either create a mock emitter that captures the calls made to it or spy on the existing noop emmiter and then verifying the arguments.
| import java.io.IOException; | ||
| import java.util.Set; | ||
|
|
||
| public class MSQFilteredEmitterWarningPublisher implements MSQWarningPublisher |
There was a problem hiding this comment.
I think it might be appropriate to have separate classes:
- MSQFilteredPublisher which takes in a delegate and publishes the warning only if it passes through the filter
- MSQEmitterPublisher that emits the warning if
publishExceptionis called
There was a problem hiding this comment.
That makes sense, but this is the only consumer of filtering and emitting right now so does that make sense to do that level of abstraction?
There was a problem hiding this comment.
If this is necessary i'll leave this change to later as it is pretty straightforward
...ry/src/main/java/org/apache/druid/msq/indexing/error/MSQFilteredEmitterWarningPublisher.java
Outdated
Show resolved
Hide resolved
|
|
||
| Bouncer processorBouncer(); | ||
|
|
||
| Emitter emitter(); |
There was a problem hiding this comment.
Can the emitter be nullable? If so, appropriate checks and annotations would be helpful
There was a problem hiding this comment.
We are getting this from TaskToolBox and the emitter there isn't labeled as nullable
core/src/main/java/org/apache/druid/java/util/emitter/service/ServiceLogEvent.java
Show resolved
Hide resolved
| public MSQFilteredEmitterWarningPublisher( | ||
| final String workerId, | ||
| final String controllerTaskId, | ||
| final String taskId, |
...uery/src/main/java/org/apache/druid/msq/indexing/error/MSQWarningReportLimiterPublisher.java
Show resolved
Hide resolved
| Integer.MAX_VALUE, | ||
| 0 | ||
| 0, | ||
| null |
There was a problem hiding this comment.
change nulls to a noop parse exception handler if possible
| * is trully published | ||
| */ | ||
| public class MSQWarningReportLimiterPublisher implements MSQWarningReportPublisher | ||
| public class MSQWarningReportLimiterPublisher implements MSQWarningPublisher |
There was a problem hiding this comment.
change class to MSQWarningLimiterPublisher
server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchool.java
Show resolved
Hide resolved
| @@ -0,0 +1,38 @@ | |||
| package org.apache.druid.indexing.common; | |||
There was a problem hiding this comment.
is this the correct package for this?
There was a problem hiding this comment.
This class seems avoidable. Looks like this is introduced cause of parseExceptionHandler cause you need the taskId, groupId, data source.
TaskID, groupId is part of the Task.java and seems to be repeated here.
What happens if task is operating on 2 data sources ?
So for the above reasons I think we should think of another way.
There was a problem hiding this comment.
Can a task operate on two datasources?
There was a problem hiding this comment.
ok i understand that it can now after chatting with some other people. Would it be clearer if i said TargetDataSource?
There was a problem hiding this comment.
Something like parseExceptionMetadata ?
| 0 | ||
| 0, | ||
| null | ||
| ); |
There was a problem hiding this comment.
missing task metadata
suneet-s
left a comment
There was a problem hiding this comment.
High level design feedback:
- I think a new ErrorLogger and set of associated classes should be added. Similar to RequestLogger, they can be backed by different implementations like logging to a file or emitting somewhere.
- ParseExceptionHandler should accept an ErrorLogger in it's constructor. Any class that instantiates a ParseExceptionHandler should be able to get ErrorLogger from the injector
- Update ParseExceptionHandler#handle to accept a Map<String, Object> metadata in addition to the parseException. This makes it clear that the caller is responsible for passing additional metadata about the exception. If the metadata is consistent, maybe the method should accept a POJO instead of a map.
If this approach works, it would probably be best to split this PR into 2 - one where you introduce the ErrorLogger and another where it is wired up to the ParseExceptionHandler
cryptoe
left a comment
There was a problem hiding this comment.
I agree with suneet on the 1) and 2) points. That would be much cleaner. Regarding the third point since we will not have common metadata like taskId where ever the handle method is called, it may be better to just wire the common dimensions it in the constructor.
| @@ -0,0 +1,38 @@ | |||
| package org.apache.druid.indexing.common; | |||
There was a problem hiding this comment.
Something like parseExceptionMetadata ?
…vent was overeager with module as i followed RequestLogger and QueryableModule for inspiration. this should be scoped down more also cleaned up checkstyle issues
|
@suneet-s i'll go down the path of creating error logger, but i think the name will need to be changed. |
|
This pull request has been marked as stale due to 60 days of inactivity. |
|
This pull request/issue has been closed due to lack of activity. If you think that |
Emit UnparseableExceptions in realtime as a new event.
Description
The goal of this PR is to emit an unparseableEvent in real time that an admin of druid might be interested in storing elsewhere. The new event shouldn't be emitted by default and should be opted into.
Release note
For tips about how to write a good release note, see Release notes.
Key changed/added classes in this PR
ServiceLogEventParseExceptionHandlerMSQCompositeWarningReporterMSQFilteredEmitterReporterThis PR has: