New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Always return a response from query execution. #6596
Always return a response from query execution. #6596
Conversation
Codecov Report
@@ Coverage Diff @@
## master #6596 +/- ##
==========================================
- Coverage 66.44% 65.53% -0.92%
==========================================
Files 1075 1352 +277
Lines 54773 66558 +11785
Branches 8168 9697 +1529
==========================================
+ Hits 36396 43621 +7225
- Misses 15700 19820 +4120
- Partials 2677 3117 +440
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
Show resolved
Hide resolved
|
||
InstanceRequest instanceRequest = new InstanceRequest(); | ||
ServerQueryRequest queryRequest; | ||
byte[] requestBytes = new byte[requestSize]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not put all non-declaration statements inside the try block, in the rare event an of these fail, the code will be able to handle it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the code a bit more to do this.
} | ||
}, MoreExecutors.directExecutor()); | ||
} | ||
|
||
@Override | ||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { | ||
LOGGER.error("Caught exception while fetching instance request", cause); | ||
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1); | ||
// Since we do not know the requestId of the original request here, there is no way for Broker to know which query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use ChannelHandlerContext for storing that, or is that a read-only object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I can see in the javadoc, ChannelHandlerContext doesn't provide this functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@amrishlal It seems that we can set a value via ctx.channel().attr(<key>).set(<value>);
, can you take a look if it works?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mqliang Good find, although we refactored the channelRead0 method to catch all exception so this is no longer needed. The comment is somewhat misleading now and I will update it. Note that an exception can occur before or while deserializing the message bytes also, so we may not have requestId in any case even if we set an attribute. It is probably better to handle all exceptions in channelRead0 method itself in any case.
|
||
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1); | ||
} catch (Throwable t) { | ||
// Ignore since we are already handling a higher level exceptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then we won't be sending a response back (ie line 140 may not execute)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you are referring to the fact that byte[] serializedDataTable = dataTable.toBytes();
can throw an exception and that would prevent sendResponse
from being called in the next line. The problem here is that we need to send byte data in sendResponse. One option may be to hardcode some serialized bytes and send that if an exception occurs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least, raise a metric.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
/** | ||
* Send an exception back to broker as response to the query request. | ||
*/ | ||
private void sendResponse(ChannelHandlerContext ctx, long requestId, long queryArrivalTimeMs, DataTable dataTable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if I understand why we need to separate signatures for this? Can the caller not prepare the DaaTable as necessary for the response, (eg set the exception in the data table if there's one)? And this method is just responsible for sending back the response?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The onSuccess
method in the callback gets byte data so to send that byte data back to server we need a sendResponse
function that takes byte data as input. Then there are cases where we detect error conditions and need to create our own byte representation of the error condition. This is where the first sendResponse
method is called. The first sendResponse
method converts DataTable into byte data and calls the second more generic sendResponse
method. I can rename the first sendResponse
method to sendErrorResponse
if that clarifies things better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably refactor this as two separate utility methods:
- Building the response to send. This can have two flavors, one for success, and one for failure
- Another one just for sending the response. And if this turns out to be just function call, then it might even be inline.
More of suggestion at this point, not a blocker for the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed the first function to sendErrorResponse
for clarity. Although further refactoring can organize the code better, but the refactoring seems a bit outside the scope of this issue. For example, currently, the success (and some failure cases) inside QuerySchedular _queryScheduler.submit(queryRequest)
and we get the response bytes back through a callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please also add a test (that should fail without your fix)?
Added |
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
Show resolved
Hide resolved
/** | ||
* Send an exception back to broker as response to the query request. | ||
*/ | ||
private void sendResponse(ChannelHandlerContext ctx, long requestId, long queryArrivalTimeMs, DataTable dataTable, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would probably refactor this as two separate utility methods:
- Building the response to send. This can have two flavors, one for success, and one for failure
- Another one just for sending the response. And if this turns out to be just function call, then it might even be inline.
More of suggestion at this point, not a blocker for the PR.
|
||
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1); | ||
} catch (Throwable t) { | ||
// Ignore since we are already handling a higher level exceptions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least, raise a metric.
@@ -438,6 +439,29 @@ public void testBloomFilterTriggering() | |||
}, 600_000L, "Failed to generate bloom filter"); | |||
} | |||
|
|||
/** Check if server returns error response quickly without timing out Broker. */ | |||
@Test | |||
public void testServerErrorWithBrokerTimeout() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also add a test to ensure that if there are multiple requests, the right one gets the exception on the broker side. We discussed earlier in the PR where it was difficult to identify which request threw the exception?
https://github.com/apache/incubator-pinot/pull/6596/files#r579845996
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
QueryRoutingTest (unit) does some of this, but mocks the server away. I am not seeing a straightforward way of adding an integration test that would involve both server and broker since the data structures that map broker request to server response are deep down. Although, now that all exceptions are being handled within channelRead0 function, this should not be an issue.
try { | ||
// put all inside try block to catch all exceptions. | ||
final int requestSize = msg.readableBytes(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the style I also followed of using final for local variables for my first few PRs. Glad to know someone else did it too. However, Pinot coding style doesn't follow this. So you may want to remove it for now.
But something we can discuss independent of this PR. I think it makes sense to use final for constant local variables as well and not just instance variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize); | ||
|
||
// parse instance request into a query result. | ||
msg.readBytes(requestBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo? Should be parse instance request into a query request
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1); | ||
return; | ||
String hexString = requestBytes != null ? BytesUtils.toHexString(requestBytes) : ""; | ||
long reqestId = instanceRequest != null ? instanceRequest.getRequestId() : 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isnt't this null check late? I think we should check this after lines 90 and 91 respectively to avoid NPE
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't get a NPE for normal case because instanceRequest gets instantiated in line 82. If by chance an exception is thrown before that, we need a null check in the exception handler to avoid NPE. Same with requestBytes. Declaration is separate from instantiation and that is forcing Null check in exception handler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I didn't notice the instantiation. Makes sense now
String hexString = requestBytes != null ? BytesUtils.toHexString(requestBytes) : ""; | ||
long reqestId = instanceRequest != null ? instanceRequest.getRequestId() : 0; | ||
LOGGER.error("Exception while deserializing the instance request: {}", hexString, e); | ||
sendErrorResponse(ctx, reqestId, queryArrivalTimeMs, new DataTableImplV2(), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we are no longer emitting the metric REQUEST_DESERIALIZATION_EXCEPTIONS if we catch exception during deserialization ?
I see that sendErrorResponse doing the following
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
I think we should continue to emit the request deserialization exception metric because I believe the above metric will be emitted even if the exception didn't happen during request deserialization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added it back.
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Throwable t) { | ||
LOGGER.error("Caught exception while processing instance request", t); | ||
_serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1); | ||
// Send exception response. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please also update the PR description to call out the metrics that will no longer be emitted after this PR. Looks like UNCAUGHT_EXCEPTIONS and REQUEST_FETCH_EXCEPTION are the ones
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Map<String, String> dataTableMetadata = dataTable.getMetadata(); | ||
dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(requestId)); | ||
|
||
dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(nit) remove empty line if not intentional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
return; | ||
} | ||
if (e instanceof TException) { | ||
// deserialization exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this if check of the exception type guarantees that exception is coming from line 92 and not before or after that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, _deserializer.deserialize(instanceRequest, requestBytes)
is the only method that throws TException
. Also, any exceptions thrown within:
Futures.addCallback(_queryScheduler.submit(queryRequest),
createCallback(ctx, queryArrivalTimeMs, instanceRequest, queryRequest), MoreExecutors.directExecutor());
will get handled by the callback's onFailure
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for fixing this.
Description
Server does not return a response for some of the queries that fail. As a result broker will wait until it times out. This causes un-necessary latency for queries on the Broker side. One example of such a query is:
select playerID from baseballStats where playerID in ("aardsda01")
This PR fixes the issue by always returning a response back to the Broker irrespective of whether query succeeds or fails. Note that UNCAUGHT_EXCEPTIONS and REQUEST_FETCH_EXCEPTION metrics are no longer being emitted.
Upgrade Notes
Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
backward-incompat
, and complete the section below on Release Notes)Does this PR fix a zero-downtime upgrade introduced earlier?
backward-incompat
, and complete the section below on Release Notes)Does this PR otherwise need attention when creating release notes? Things to consider:
release-notes
and complete the section on Release Notes)Release Notes
If you have tagged this as either backward-incompat or release-notes,
you MUST add text here that you would like to see appear in release notes of the
next release.
If you have a series of commits adding or enabling a feature, then
add this section only in final commit that marks the feature completed.
Refer to earlier release notes to see examples of text
Documentation
If you have introduced a new feature or configuration, please add it to the documentation as well.
See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document