Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always return a response from query execution. #6596

Merged
merged 14 commits into from Feb 25, 2021
Expand Up @@ -306,6 +306,10 @@ private byte[] serializeDataTable(@Nonnull ServerQueryRequest queryRequest, @Non
protected ListenableFuture<byte[]> immediateErrorResponse(ServerQueryRequest queryRequest,
ProcessingException error) {
DataTable result = new DataTableImplV2();

Map<String, String> dataTableMetadata = result.getMetadata();
mayankshriv marked this conversation as resolved.
Show resolved Hide resolved
dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(queryRequest.getRequestId()));

result.addException(error);
return Futures.immediateFuture(serializeDataTable(queryRequest, result));
}
Expand Down
Expand Up @@ -25,16 +25,20 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.metrics.ServerQueryPhase;
import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.request.InstanceRequest;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.datatable.DataTableImplV2;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.core.query.scheduler.QueryScheduler;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.slf4j.Logger;
Expand All @@ -60,67 +64,120 @@ public InstanceRequestHandler(QueryScheduler queryScheduler, ServerMetrics serve
_serverMetrics = serverMetrics;
}

/**
* Always return a response even when query execution throws exception; otherwise, broker
* will keep waiting until timeout.
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
long queryArrivalTimeMs = System.currentTimeMillis();
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
int requestSize = msg.readableBytes();
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);
byte[] requestBytes = new byte[requestSize];
msg.readBytes(requestBytes);

InstanceRequest instanceRequest = new InstanceRequest();
long queryArrivalTimeMs = 0;
InstanceRequest instanceRequest = null;
byte[] requestBytes = null;

try {
// put all inside try block to catch all exceptions.
final int requestSize = msg.readableBytes();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the style I also followed of using final for local variables for my first few PRs. Glad to know someone else did it too. However, Pinot coding style doesn't follow this. So you may want to remove it for now.

But something we can discuss independent of this PR. I think it makes sense to use final for constant local variables as well and not just instance variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

instanceRequest = new InstanceRequest();
ServerQueryRequest queryRequest;
requestBytes = new byte[requestSize];

queryArrivalTimeMs = System.currentTimeMillis();
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERIES, 1);
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_RECEIVED, requestSize);

// parse instance request into a query result.
msg.readBytes(requestBytes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo? Should be parse instance request into a query request

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

_deserializer.deserialize(instanceRequest, requestBytes);
queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
.stopAndRecord();

// Submit query for execution and register callback for execution results.
Futures.addCallback(_queryScheduler.submit(queryRequest),
createCallback(ctx, queryArrivalTimeMs, instanceRequest, queryRequest), MoreExecutors.directExecutor());
} catch (Exception e) {
LOGGER
.error("Caught exception while deserializing the instance request: {}", BytesUtils.toHexString(requestBytes),
e);
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_DESERIALIZATION_EXCEPTIONS, 1);
return;
String hexString = requestBytes != null ? BytesUtils.toHexString(requestBytes) : "";
long reqestId = instanceRequest != null ? instanceRequest.getRequestId() : 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt't this null check late? I think we should check this after lines 90 and 91 respectively to avoid NPE

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't get a NPE for normal case because instanceRequest gets instantiated in line 82. If by chance an exception is thrown before that, we need a null check in the exception handler to avoid NPE. Same with requestBytes. Declaration is separate from instantiation and that is forcing Null check in exception handler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I didn't notice the instantiation. Makes sense now

LOGGER.error("Exception while deserializing the instance request: {}", hexString, e);
sendErrorResponse(ctx, reqestId, queryArrivalTimeMs, new DataTableImplV2(), e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are no longer emitting the metric REQUEST_DESERIALIZATION_EXCEPTIONS if we catch exception during deserialization ?

I see that sendErrorResponse doing the following
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);

I think we should continue to emit the request deserialization exception metric because I believe the above metric will be emitted even if the exception didn't happen during request deserialization

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added it back.

}
}

ServerQueryRequest queryRequest = new ServerQueryRequest(instanceRequest, _serverMetrics, queryArrivalTimeMs);
queryRequest.getTimerContext().startNewPhaseTimer(ServerQueryPhase.REQUEST_DESERIALIZATION, queryArrivalTimeMs)
.stopAndRecord();

// NOTE: executor must be provided as addCallback(future, callback) is removed from newer guava version
Futures.addCallback(_queryScheduler.submit(queryRequest), new FutureCallback<byte[]>() {
private FutureCallback<byte[]> createCallback(ChannelHandlerContext ctx, long queryArrivalTimeMs,
InstanceRequest instanceRequest, ServerQueryRequest queryRequest) {
return new FutureCallback<byte[]>() {
@Override
public void onSuccess(@Nullable byte[] responseBytes) {
// NOTE: response bytes can be null if data table serialization throws exception
if (responseBytes != null) {
long sendResponseStartTimeMs = System.currentTimeMillis();
int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
ctx.writeAndFlush(Unpooled.wrappedBuffer(responseBytes)).addListener(f -> {
long sendResponseEndTimeMs = System.currentTimeMillis();
int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, responseBytes.length);
_serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
TimeUnit.MILLISECONDS);

int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
LOGGER.info(
"Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
}
});
// responseBytes contains either query results or exception.
sendResponse(ctx, queryArrivalTimeMs, responseBytes);
} else {
// Send exception response.
sendErrorResponse(ctx, queryRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(),
new Exception("Null query response."));
}
}

@Override
public void onFailure(Throwable t) {
LOGGER.error("Caught exception while processing instance request", t);
_serverMetrics.addMeteredGlobalValue(ServerMeter.UNCAUGHT_EXCEPTIONS, 1);
// Send exception response.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please also update the PR description to call out the metrics that will no longer be emitted after this PR. Looks like UNCAUGHT_EXCEPTIONS and REQUEST_FETCH_EXCEPTION are the ones

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

LOGGER.error("Exception while processing instance request", t);
sendErrorResponse(ctx, instanceRequest.getRequestId(), queryArrivalTimeMs, new DataTableImplV2(), new Exception(t));
}
}, MoreExecutors.directExecutor());
};
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOGGER.error("Caught exception while fetching instance request", cause);
_serverMetrics.addMeteredGlobalValue(ServerMeter.REQUEST_FETCH_EXCEPTIONS, 1);
// All exceptions should be caught and handled in channelRead0 method. This is a fallback method that
// will only be called if for some remote reason we are unable to handle exceptions in channelRead0.
String message = "Unhandled Exception in " + getClass().getCanonicalName();
LOGGER.error(message, cause);
sendErrorResponse(ctx, 0, System.currentTimeMillis(), new DataTableImplV2(), new Exception(message, cause));
}

/**
* Send an exception back to broker as response to the query request.
*/
private void sendErrorResponse(ChannelHandlerContext ctx, long requestId, long queryArrivalTimeMs, DataTable dataTable,
Exception e) {
try {
Map<String, String> dataTableMetadata = dataTable.getMetadata();
dataTableMetadata.put(DataTable.REQUEST_ID_METADATA_KEY, Long.toString(requestId));

dataTable.addException(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(nit) remove empty line if not intentional

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

byte[] serializedDataTable = dataTable.toBytes();
sendResponse(ctx, queryArrivalTimeMs, serializedDataTable);
} catch (Exception exception) {
LOGGER.error("Exception while sending query processing error to Broker.", exception);
} finally {
// log query processing exception
LOGGER.error("Query processing error: ", e);
_serverMetrics.addMeteredGlobalValue(ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
}
}

/**
* Send a response (either query results or exception) back to broker as response to the query request.
*/
private void sendResponse(ChannelHandlerContext ctx, long queryArrivalTimeMs, byte[] serializedDataTable) {
long sendResponseStartTimeMs = System.currentTimeMillis();
int queryProcessingTimeMs = (int) (sendResponseStartTimeMs - queryArrivalTimeMs);
ctx.writeAndFlush(Unpooled.wrappedBuffer(serializedDataTable)).addListener(f -> {
long sendResponseEndTimeMs = System.currentTimeMillis();
int sendResponseLatencyMs = (int) (sendResponseEndTimeMs - sendResponseStartTimeMs);
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_RESPONSES_SENT, 1);
_serverMetrics.addMeteredGlobalValue(ServerMeter.NETTY_CONNECTION_BYTES_SENT, serializedDataTable.length);
_serverMetrics.addTimedValue(ServerTimer.NETTY_CONNECTION_SEND_RESPONSE_LATENCY, sendResponseLatencyMs,
TimeUnit.MILLISECONDS);

int totalQueryTimeMs = (int) (sendResponseEndTimeMs - queryArrivalTimeMs);
if (totalQueryTimeMs > SLOW_QUERY_LATENCY_THRESHOLD_MS) {
LOGGER.info(
"Slow query: request handler processing time: {}, send response latency: {}, total time to handle request: {}",
queryProcessingTimeMs, sendResponseLatencyMs, totalQueryTimeMs);
}
});
}
}
Expand Up @@ -33,6 +33,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.validation.constraints.AssertTrue;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.exception.QueryException;
Expand Down Expand Up @@ -438,6 +439,29 @@ public void testBloomFilterTriggering()
}, 600_000L, "Failed to generate bloom filter");
}

/** Check if server returns error response quickly without timing out Broker. */
@Test
public void testServerErrorWithBrokerTimeout()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add a test to ensure that if there are multiple requests, the right one gets the exception on the broker side. We discussed earlier in the PR where it was difficult to identify which request threw the exception?

https://github.com/apache/incubator-pinot/pull/6596/files#r579845996

Copy link
Contributor Author

@amrishlal amrishlal Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryRoutingTest (unit) does some of this, but mocks the server away. I am not seeing a straightforward way of adding an integration test that would involve both server and broker since the data structures that map broker request to server response are deep down. Although, now that all exceptions are being handled within channelRead0 function, this should not be an issue.

throws Exception {
// Set query timeout
long queryTimeout = 5000;
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.setQueryConfig(new QueryConfig(queryTimeout));
updateTableConfig(tableConfig);

long startTime = System.currentTimeMillis();
// The query below will fail execution due to use of double quotes around value in IN clause.
JsonNode queryResponse = postSqlQuery("SELECT count(*) FROM mytable WHERE Dest IN (\"DFW\")");
String result = queryResponse.toPrettyString();

assertTrue(System.currentTimeMillis() - startTime < queryTimeout);
assertTrue(queryResponse.get("exceptions").get(0).get("message").toString().startsWith("\"QueryExecutionError"));

// Remove timeout
tableConfig.setQueryConfig(null);
updateTableConfig(tableConfig);
}

@Test
public void testStarTreeTriggering()
throws Exception {
Expand Down