Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IN_SUBQUERY support #6022

Merged
merged 1 commit into from Nov 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,7 +19,9 @@
package org.apache.pinot.broker.requesthandler;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.util.concurrent.RateLimiter;
import java.net.InetAddress;
Expand Down Expand Up @@ -48,6 +50,7 @@
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.function.AggregationFunctionType;
import org.apache.pinot.common.function.TransformFunctionType;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerQueryPhase;
Expand Down Expand Up @@ -75,6 +78,7 @@
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
import org.apache.pinot.core.query.utils.idset.IdSets;
import org.apache.pinot.core.requesthandler.BrokerRequestOptimizer;
import org.apache.pinot.core.requesthandler.PinotQueryParserFactory;
import org.apache.pinot.core.requesthandler.PinotQueryRequest;
Expand All @@ -89,9 +93,11 @@
import org.slf4j.LoggerFactory;


@SuppressWarnings("UnstableApiUsage")
@ThreadSafe
public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(BaseBrokerRequestHandler.class);
private static final String IN_SUBQUERY = "inSubquery";

protected final PinotConfiguration _config;
protected final RoutingManager _routingManager;
Expand Down Expand Up @@ -160,7 +166,6 @@ private String getDefaultBrokerId() {
}
}

@SuppressWarnings("Duplicates")
@Override
public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity requesterIdentity,
RequestStatistics requestStatistics)
Expand All @@ -186,6 +191,8 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentit
requestStatistics.setErrorCode(QueryException.PQL_PARSING_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.PQL_PARSING_ERROR, e));
}
setOptions(requestId, query, request, brokerRequest);

if (isLiteralOnlyQuery(brokerRequest)) {
LOGGER.debug("Request {} contains only Literal, skipping server query: {}", requestId, query);
try {
Expand All @@ -197,11 +204,27 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentit
e.getMessage());
}
}

// TODO:
// Separate the query rewrite for SQL and PQL and only rewrite one format (PinotQuery for SQL, BrokerRequest for
// PQL) to save the unnecessary overhead.
// Prerequisite:
// Support filter optimizer for SQL. Currently the filter is always picked from the BrokerRequest.

try {
handleSubquery(brokerRequest, request, requesterIdentity, requestStatistics);
} catch (Exception e) {
LOGGER
.info("Caught exception while handling the subquery in request {}: {}, {}", requestId, query, e.getMessage());
requestStatistics.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
}

updateTableName(brokerRequest);
try {
updateColumnNames(brokerRequest);
} catch (Exception e) {
LOGGER.warn("Caught exception while updating Column names in Query {}: {}, {}", requestId, query, e);
LOGGER.warn("Caught exception while updating Column names in Query {}: {}, {}", requestId, query, e.getMessage());
}
if (_defaultHllLog2m > 0) {
handleHyperloglogLog2mOverride(brokerRequest, _defaultHllLog2m);
Expand Down Expand Up @@ -284,9 +307,6 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentit
return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e));
}

// Set extra settings into broker request
setOptions(requestId, query, request, brokerRequest);

// Optimize the query
// TODO: get time column name from schema or table config so that we can apply it for REALTIME only case
// We get timeColumnName from time boundary service currently, which only exists for offline table
Expand Down Expand Up @@ -436,6 +456,138 @@ public BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentit
return brokerResponse;
}

/**
* Handles the subquery in the given broker request.
* <p>Currently only supports subquery within the filter.
*/
private void handleSubquery(BrokerRequest brokerRequest, JsonNode jsonRequest,
@Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
throws Exception {
FilterQueryMap filterSubQueryMap = brokerRequest.getFilterSubQueryMap();
if (filterSubQueryMap != null) {
for (FilterQuery filterQuery : filterSubQueryMap.getFilterQueryMap().values()) {
String column = filterQuery.getColumn();
if (column != null) {
TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(column);
handleSubquery(expression, jsonRequest, requesterIdentity, requestStatistics);
filterQuery.setColumn(expression.toString());
}
}
}

PinotQuery pinotQuery = brokerRequest.getPinotQuery();
if (pinotQuery != null) {
Expression filterExpression = pinotQuery.getFilterExpression();
if (filterExpression != null) {
handleSubquery(filterExpression, jsonRequest, requesterIdentity, requestStatistics);
}
}
}

/**
* Handles the subquery in the given PQL expression.
* <p>When subquery is detected, first executes the subquery and gets the response, then rewrites the expression with
* the subquery response.
* <p>Currently only supports ID_SET subquery within the IN_SUBQUERY transform function, which will be rewritten to an
* IN_ID_SET transform function.
*/
private void handleSubquery(TransformExpressionTree expression, JsonNode jsonRequest,
@Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
throws Exception {
if (expression.getExpressionType() != TransformExpressionTree.ExpressionType.FUNCTION) {
return;
}
List<TransformExpressionTree> children = expression.getChildren();
if (StringUtils.remove(expression.getValue(), '_').equalsIgnoreCase(IN_SUBQUERY)) {
Preconditions.checkState(children.size() == 2, "IN_SUBQUERY requires 2 arguments: expression, subquery");
TransformExpressionTree subqueryExpression = children.get(1);
Preconditions.checkState(subqueryExpression.getExpressionType() == TransformExpressionTree.ExpressionType.LITERAL,
"Second argument of IN_SUBQUERY must be a literal (subquery)");
String serializedIdSet =
getSerializedIdSetFromSubquery(subqueryExpression.getValue(), jsonRequest, requesterIdentity,
requestStatistics);
expression.setValue(TransformFunctionType.INIDSET.name());
children
.set(1, new TransformExpressionTree(TransformExpressionTree.ExpressionType.LITERAL, serializedIdSet, null));
} else {
for (TransformExpressionTree child : children) {
handleSubquery(child, jsonRequest, requesterIdentity, requestStatistics);
}
}
}

/**
* Handles the subquery in the given SQL expression.
* <p>When subquery is detected, first executes the subquery and gets the response, then rewrites the expression with
* the subquery response.
* <p>Currently only supports ID_SET subquery within the IN_SUBQUERY transform function, which will be rewritten to an
* IN_ID_SET transform function.
*/
private void handleSubquery(Expression expression, JsonNode jsonRequest,
@Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
throws Exception {
Function function = expression.getFunctionCall();
if (function == null) {
return;
}
List<Expression> operands = function.getOperands();
if (StringUtils.remove(function.getOperator(), '_').equalsIgnoreCase(IN_SUBQUERY)) {
Preconditions.checkState(operands.size() == 2, "IN_SUBQUERY requires 2 arguments: expression, subquery");
Literal subqueryLiteral = operands.get(1).getLiteral();
Preconditions.checkState(subqueryLiteral != null, "Second argument of IN_SUBQUERY must be a literal (subquery)");
String serializedIdSet =
getSerializedIdSetFromSubquery(subqueryLiteral.getStringValue(), jsonRequest, requesterIdentity,
requestStatistics);
function.setOperator(TransformFunctionType.INIDSET.name());
operands.set(1, RequestUtils.getLiteralExpression(serializedIdSet));
} else {
for (Expression operand : operands) {
handleSubquery(operand, jsonRequest, requesterIdentity, requestStatistics);
}
}
}

/**
* Returns the result serialized IdSet of the subquery.
* <p>The subquery should be an aggregation-only query with one single IdSet aggregation function.
*/
private String getSerializedIdSetFromSubquery(String subquery, JsonNode jsonRequest,
@Nullable RequesterIdentity requesterIdentity, RequestStatistics requestStatistics)
throws Exception {
// Make a copy of the query request to construct the subquery request so that they share the query options
ObjectNode subqueryRequest = jsonRequest.deepCopy();

if (subqueryRequest.has(Broker.Request.SQL)) {
subqueryRequest.put(Broker.Request.SQL, subquery);
} else {
subqueryRequest.put(Broker.Request.PQL, subquery);
}

BrokerResponseNative response =
(BrokerResponseNative) handleRequest(subqueryRequest, requesterIdentity, requestStatistics);
if (response.getExceptionsSize() != 0) {
throw new RuntimeException("Caught exception while executing subquery: " + subquery);
}

String serializedIdSet;
if (response.getResultTable() != null) {
serializedIdSet = (String) response.getResultTable().getRows().get(0)[0];
} else if (response.getAggregationResults() != null) {
serializedIdSet = (String) response.getAggregationResults().get(0).getValue();
} else {
throw new RuntimeException("Failed to get serialized IdSet from subquery: " + subquery);
}

try {
Preconditions.checkNotNull(IdSets.fromBase64String(serializedIdSet));
} catch (Exception e) {
throw new RuntimeException(
"Invalid serialized IdSet: " + serializedIdSet + " returned from the subquery: " + subquery);
}

return serializedIdSet;
}

/**
* Check if table is in the format of [database_name].[table_name].
*
Expand Down Expand Up @@ -499,11 +651,9 @@ private void setTableName(BrokerRequest brokerRequest, String tableName) {
}

/**
* Set Log2m value for DistinctCountHLL Function
* @param brokerRequest
* @param hllLog2mOverride
* Sets HyperLogLog log2m for DistinctCountHLL functions if not explicitly set.
*/
static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int hllLog2mOverride) {
private static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int hllLog2mOverride) {
if (brokerRequest.getAggregationsInfo() != null) {
for (AggregationInfo aggregationInfo : brokerRequest.getAggregationsInfo()) {
switch (aggregationInfo.getAggregationType().toUpperCase()) {
Expand All @@ -524,6 +674,9 @@ static void handleHyperloglogLog2mOverride(BrokerRequest brokerRequest, int hllL
}
}

/**
* Sets HyperLogLog log2m for DistinctCountHLL functions if not explicitly set.
*/
private static void updateDistinctCountHllExpr(Expression expr, int hllLog2mOverride) {
Function functionCall = expr.getFunctionCall();
if (functionCall == null) {
Expand All @@ -547,11 +700,9 @@ private static void updateDistinctCountHllExpr(Expression expr, int hllLog2mOver
}

/**
* Reset limit for selection query if it exceeds maxQuerySelectionLimit.
* @param brokerRequest
* @param queryLimit
*
* Overrides the LIMIT/TOP of the query if it exceeds the query limit.
*/
@VisibleForTesting
static void handleQueryLimitOverride(BrokerRequest brokerRequest, int queryLimit) {
if (queryLimit > 0) {
// Handle GroupBy for BrokerRequest
Expand All @@ -576,7 +727,7 @@ static void handleQueryLimitOverride(BrokerRequest brokerRequest, int queryLimit
}

/**
* Helper method to rewrite 'DistinctCount' with 'DistinctCountBitmap' for the given broker request.
* Rewrites 'DistinctCount' to 'DistinctCountBitmap' for the given broker request.
*/
private static void handleDistinctCountBitmapOverride(BrokerRequest brokerRequest) {
List<AggregationInfo> aggregationsInfo = brokerRequest.getAggregationsInfo();
Expand Down Expand Up @@ -607,7 +758,7 @@ private static void handleDistinctCountBitmapOverride(BrokerRequest brokerReques
}

/**
* Helper method to rewrite 'DistinctCount' with 'DistinctCountBitmap' for the given expression.
* Rewrites 'DistinctCount' with 'DistinctCountBitmap' for the given expression.
*/
private static void handleDistinctCountBitmapOverride(Expression expression) {
Function function = expression.getFunctionCall();
Expand All @@ -625,10 +776,7 @@ private static void handleDistinctCountBitmapOverride(Expression expression) {
}

/**
* Check if a SQL parsed BrokerRequest is a literal only query.
* @param brokerRequest
* @return true if this query selects only Literals
*
* Returns {@code true} if the given query only contains literal, {@code false} otherwise.
*/
@VisibleForTesting
static boolean isLiteralOnlyQuery(BrokerRequest brokerRequest) {
Expand All @@ -644,12 +792,7 @@ static boolean isLiteralOnlyQuery(BrokerRequest brokerRequest) {
}

/**
* Compute BrokerResponse for literal only query.
*
* @param brokerRequest
* @param compilationStartTimeNs
* @param requestStatistics
* @return BrokerResponse
* Processes the literal only query.
*/
private BrokerResponse processLiteralOnlyBrokerRequest(BrokerRequest brokerRequest, long compilationStartTimeNs,
RequestStatistics requestStatistics)
Expand Down
Expand Up @@ -254,19 +254,36 @@ public void testHardcodedSqlQueries()
testSqlQuery(query, Collections.singletonList(query));

// IN_ID_SET
IdSet idSet = IdSets.create(FieldSpec.DataType.LONG);
idSet.add(19690L);
idSet.add(20355L);
idSet.add(21171L);
// Also include a non-existing id
idSet.add(0L);
String serializedIdSet = idSet.toBase64String();
String inIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 1";
String inQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID IN (19690, 20355, 21171, 0)";
testSqlQuery(inIdSetQuery, Collections.singletonList(inQuery));
String notInIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 0";
String notInQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID NOT IN (19690, 20355, 21171, 0)";
testSqlQuery(notInIdSetQuery, Collections.singletonList(notInQuery));
{
IdSet idSet = IdSets.create(FieldSpec.DataType.LONG);
idSet.add(19690L);
idSet.add(20355L);
idSet.add(21171L);
// Also include a non-existing id
idSet.add(0L);
String serializedIdSet = idSet.toBase64String();
String inIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 1";
String inQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID IN (19690, 20355, 21171, 0)";
testSqlQuery(inIdSetQuery, Collections.singletonList(inQuery));
String notInIdSetQuery = "SELECT COUNT(*) FROM mytable WHERE INIDSET(AirlineID, '" + serializedIdSet + "') = 0";
String notInQuery = "SELECT COUNT(*) FROM mytable WHERE AirlineID NOT IN (19690, 20355, 21171, 0)";
testSqlQuery(notInIdSetQuery, Collections.singletonList(notInQuery));
}

// IN_SUBQUERY
{
String inSubqueryQuery =
"SELECT COUNT(*) FROM mytable WHERE INSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM mytable WHERE DaysSinceEpoch = 16430') = 1";
String inQuery =
"SELECT COUNT(*) FROM mytable WHERE DestAirportID IN (SELECT DestAirportID FROM mytable WHERE DaysSinceEpoch = 16430)";
testSqlQuery(inSubqueryQuery, Collections.singletonList(inQuery));

String notInSubqueryQuery =
"SELECT COUNT(*) FROM mytable WHERE INSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM mytable WHERE DaysSinceEpoch = 16430') = 0";
String notInQuery =
"SELECT COUNT(*) FROM mytable WHERE DestAirportID NOT IN (SELECT DestAirportID FROM mytable WHERE DaysSinceEpoch = 16430)";
testSqlQuery(notInSubqueryQuery, Collections.singletonList(notInQuery));
}
}

/**
Expand Down