Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always return a response from query execution. #6596

Merged
merged 14 commits into from Feb 25, 2021
Expand Up @@ -306,6 +306,10 @@ private byte[] serializeDataTable(@Nonnull ServerQueryRequest queryRequest, @Non
protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest queryRequest,
ProcessingException error) {
DataTable result = new DataTableImplV2();

Map<String, String> dataTableMetadata = result.getMetadata();
mayankshriv marked this conversation as resolved.
Show resolved Hide resolved
dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(queryRequest.getRequestId()));

result.addException(error);
return Futures.immediateFuture(serializeDataTable(queryRequest, result));
}
Expand Down
Expand Up @@ -25,16 +25,20 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableImplV2;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.slf4j.Logger;
Expand All @@ -60,67 +64,119 @@ public InstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serve
_serverMetrics = serverMetrics;
}

/**
* Always return a response even when query execution throws exception; otherwise, broker
* will keep waiting until timeout.
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
long queryArrivalTimeMs = System.currentTimeMillis();
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
int requestSize = msg.readableBytes();
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
byte[] requestBytes = new byte[requestSize];
msg.readBytes(requestBytes);

InstanceRequest instanceRequest = new InstanceRequest();
long queryArrivalTimeMs = 0;
InstanceRequest instanceRequest = null;
byte[] requestBytes = null;

try {
// all code inside try code, so that we are able to catch all exceptions.
final int requestSize = msg.readableBytes();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the style I also followed of using final for local variables for my first few PRs. Glad to know someone else did it too. However, Pinot coding style doesn't follow this. So you may want to remove it for now.

But something we can discuss independent of this PR. I think it makes sense to use final for constant local variables as well and not just instance variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

instanceRequest = new InstanceRequest();
ServerQueryRequest queryRequest;
requestBytes = new byte[requestSize];

queryArrivalTimeMs = System.currentTimeMillis();
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);

// parse instance request into a query result.
msg.readBytes(requestBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo? Should be parse instance request into a query request

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

_deserializer.deserialize(instanceRequest, requestBytes);
queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
.stopAndRecord();

// Submit query for execution and register callback for execution results.
Futures.addCallback(_queryScheduler.submit(queryRequest),
createCallback(ctx, queryArrivalTimeMs, instanceRequest, queryRequest), MoreExecutors.directExecutor());
} catch (Exception e) {
LOGGER
.error("Caught exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes),
e);
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
return;
String hexString = requestBytes != null ? BytesUtils.toHexString(requestBytes) : "";
long reqestId = instanceRequest != null ? instanceRequest.getRequestId() : 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt't this null check late? I think we should check this after lines 90 and 91 respectively to avoid NPE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't get a NPE for normal case because instanceRequest gets instantiated in line 82. If by chance an exception is thrown before that, we need a null check in the exception handler to avoid NPE. Same with requestBytes. Declaration is separate from instantiation and that is forcing Null check in exception handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I didn't notice the instantiation. Makes sense now

LOGGER.error("Exception while deserializing the instance request: {}", hexString, e);
sendResponse(ctx, reqestId, queryArrivalTimeMs, new DataTableImplV2(), e);
}

ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
.stopAndRecord();

// NOTE: executor must be provided as addCallback(future, callback) is removed from newer guava version
Futures.addCallback(_queryScheduler.submit(queryRequest), new FutureCallback<byte[]>() {
}

private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx, long queryArrivalTimeMs,
InstanceRequest instanceRequest, ServerQueryRequest queryRequest) {
return new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] responseBytes) {
// NOTE: response bytes can be null if data table serialization throws exception
if (responseBytes != null) {
long sendResponseStartTimeMs = System.currentTimeMillis();
int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
long sendResponseEndTimeMs = System.currentTimeMillis();
int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, responseBytes.length);
_serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
TimeUnit.MILLISECONDS);

int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
LOGGER.info(
"Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
}
});
// responseBytes contains either query results or exception.
sendResponse(ctx, queryArrivalTimeMs, responseBytes);
} else {
// Send exception response.
sendResponse(ctx, queryRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(),
new Exception("Null query response."));
}
}

@Override
public void onFailure(Throwable t) {
LOGGER.error("Caught exception while processing instance request", t);
_serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
// Send exception response.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please also update the PR description to call out the metrics that will no longer be emitted after this PR. Looks like UNCAUGHT_EXCEPTIONS and REQUEST_FETCH_EXCEPTION are the ones

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

LOGGER.error("Exception while processing instance request", t);
sendResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(), new Exception(t));
}
}, MoreExecutors.directExecutor());
};
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("Caught exception while fetching instance request", cause);
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1);
// Since we do not know the requestId of the original request here, there is no way for Broker to know which query
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use ChannelHandlerContext for storing that, or is that a read-only object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can see in the javadoc, ChannelHandlerContext doesn't provide this functionality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amrishlal It seems that we can set a value via ctx.channel().attr(<key>).set(<value>);, can you take a look if it works?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mqliang Good find, although we refactored the channelRead0 method to catch all exception so this is no longer needed. The comment is somewhat misleading now and I will update it. Note that an exception can occur before or while deserializing the message bytes also, so we may not have requestId in any case even if we set an attribute. It is probably better to handle all exceptions in channelRead0 method itself in any case.

// request this response belongs to. Hence, Broker will continue to wait for the original request until time out.
// To prevent Broker from waiting unnecessarily, try to catch and handle all exceptions in channelRead0 method so
// that this function is never called.
LOGGER.error("Unhandled Exception in " + getClass().getCanonicalName(), cause);
sendResponse(ctx, 0, System.currentTimeMillis(), new DataTableImplV2(), new Exception(cause));
}

/**
* Send an exception back to broker as response to the query request.
*/
private void sendResponse(ChannelHandlerContext ctx, long requestId, long queryArrivalTimeMs, DataTable dataTable,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand why we need to separate signatures for this? Can the caller not prepare the DaaTable as necessary for the response, (eg set the exception in the data table if there's one)? And this method is just responsible for sending back the response?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onSuccess method in the callback gets byte data so to send that byte data back to server we need a sendResponse function that takes byte data as input. Then there are cases where we detect error conditions and need to create our own byte representation of the error condition. This is where the first sendResponse method is called. The first sendResponse method converts DataTable into byte data and calls the second more generic sendResponse method. I can rename the first sendResponse method to sendErrorResponse if that clarifies things better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably refactor this as two separate utility methods:

  • Building the response to send. This can have two flavors, one for success, and one for failure
  • Another one just for sending the response. And if this turns out to be just function call, then it might even be inline.

More of suggestion at this point, not a blocker for the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the first function to sendErrorResponse for clarity. Although further refactoring can organize the code better, but the refactoring seems a bit outside the scope of this issue. For example, currently, the success (and some failure cases) inside QuerySchedular _queryScheduler.submit(queryRequest) and we get the response bytes back through a callback.

Exception e) {
try {
Map<String, String> dataTableMetadata = dataTable.getMetadata();
dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(requestId));

dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) remove empty line if not intentional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

byte[] serializedDataTable = dataTable.toBytes();
sendResponse(ctx, queryArrivalTimeMs, serializedDataTable);

_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
} catch (Throwable t) {
// Ignore since we are already handling a higher level exceptions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then we won't be sending a response back (ie line 140 may not execute)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are referring to the fact that byte[] serializedDataTable = dataTable.toBytes(); can throw an exception and that would prevent sendResponse from being called in the next line. The problem here is that we need to send byte data in sendResponse. One option may be to hardcode some serialized bytes and send that if an exception occurs?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least, raise a metric.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

}
}

/**
* Send a response (either query results or exception) back to broker as response to the query request.
*/
private void sendResponse(ChannelHandlerContext ctx, long queryArrivalTimeMs, byte[] serializedDataTable) {
long sendResponseStartTimeMs = System.currentTimeMillis();
int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
ctx.writeAndFlush(Unpooled.wrappedBuffer(serializedDataTable)).addListener(f -> {
long sendResponseEndTimeMs = System.currentTimeMillis();
int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, serializedDataTable.length);
_serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
TimeUnit.MILLISECONDS);

int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
LOGGER.info(
"Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
}
});
}
}
Expand Up @@ -33,6 +33,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.AssertTrue;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.exception.QueryException;
Expand Down Expand Up @@ -438,6 +439,29 @@ public void testBloomFilterTriggering()
}, 600_000L, "Failed to generate bloom filter");
}

/** Check if server returns error response quickly without timing out Broker. */
@Test
public void testServerErrorWithBrokerTimeout()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test to ensure that if there are multiple requests, the right one gets the exception on the broker side. We discussed earlier in the PR where it was difficult to identify which request threw the exception?

https://github.com/apache/incubator-pinot/pull/6596/files#r579845996

Copy link
Contributor Author

@amrishlal amrishlal Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryRoutingTest (unit) does some of this, but mocks the server away. I am not seeing a straightforward way of adding an integration test that would involve both server and broker since the data structures that map broker request to server response are deep down. Although, now that all exceptions are being handled within channelRead0 function, this should not be an issue.

throws Exception {
// Set query timeout
long queryTimeout = 5000;
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQueryConfig(new QueryConfig(queryTimeout));
updateTableConfig(tableConfig);

long startTime = System.currentTimeMillis();
// The query below will fail execution due to use of double quotes around value in IN clause.
JsonNode queryResponse = postSqlQuery("SELECT count(*) FROM mytable WHERE Dest IN (\"DFW\")");
String result = queryResponse.toPrettyString();

assertTrue(System.currentTimeMillis() - startTime < queryTimeout);
assertTrue(queryResponse.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));

// Remove timeout
tableConfig.setQueryConfig(null);
updateTableConfig(tableConfig);
}

@Test
public void testStarTreeTriggering()
throws Exception {
Expand Down