Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENG-46252 : Pinot time range filtering support #224

Merged
merged 12 commits into from
Jul 26, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package org.hypertrace.core.query.service;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.util.JsonFormat;
import com.typesafe.config.Config;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.experimental.NonFinal;
import lombok.extern.slf4j.Slf4j;
import org.hypertrace.core.query.service.api.Filter;
import org.hypertrace.core.query.service.api.Operator;

@Slf4j
public class AdditionalHandlerFiltersConfig {
anujgoyal1 marked this conversation as resolved.
Show resolved Hide resolved
private static final String ADDITIONAL_TENANT_FILTERS_CONFIG_KEY = "additionalTenantFilters";
anujgoyal1 marked this conversation as resolved.
Show resolved Hide resolved
private final Map<String, List<Filter>> tenantToAdditionalFiltersMap;

public AdditionalHandlerFiltersConfig(Config config, Optional<String> timeFilterColumnName) {
if (config.hasPath(ADDITIONAL_TENANT_FILTERS_CONFIG_KEY)) {
this.tenantToAdditionalFiltersMap =
config.getConfigList(ADDITIONAL_TENANT_FILTERS_CONFIG_KEY).stream()
.map(filterConfig -> new TenantFilters(filterConfig, timeFilterColumnName))
.collect(
Collectors.toMap(
tenantFilters -> tenantFilters.tenantId,
tenantFilters -> tenantFilters.filters));
} else {
this.tenantToAdditionalFiltersMap = Collections.emptyMap();
}
}

public List<Filter> getAdditionalFiltersForTenant(String tenantId) {
anujgoyal1 marked this conversation as resolved.
Show resolved Hide resolved
return this.tenantToAdditionalFiltersMap.getOrDefault(tenantId, Collections.emptyList());
}

@lombok.Value
@NonFinal
private static class TenantFilters {
private static final String TENANT_ID_CONFIG_KEY = "tenantId";
private static final String TIME_RANGE_AND_FILTERS_CONFIG_KEY = "timeRangeAndFilters";
String tenantId;
List<Filter> filters;

private TenantFilters(Config config, Optional<String> startTimeAttributeName) {
this.tenantId = config.getString(TENANT_ID_CONFIG_KEY);
this.filters =
config.getConfigList(TIME_RANGE_AND_FILTERS_CONFIG_KEY).stream()
.map(TimeRangeAndFilter::new)
.map(
timeRangeAndFilter ->
timeRangeAndFilter.buildTimeRangeAndFilters(startTimeAttributeName))
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}
}

@lombok.Value
@NonFinal
private static class TimeRangeAndFilter {
anujgoyal1 marked this conversation as resolved.
Show resolved Hide resolved
private static final String START_TIME_CONFIG_PATH = "startTimeMillis";
private static final String END_TIME_CONFIG_PATH = "endTimeMillis";
private static final String FILTER_CONFIG_PATH = "filter";
private static final ObjectMapper objectMapper = new ObjectMapper();
Optional<Long> startTimeMillis;
Optional<Long> endTimeMillis;
Optional<Filter> filter;

private TimeRangeAndFilter(Config config) {
if (config.hasPath(START_TIME_CONFIG_PATH) && config.hasPath(END_TIME_CONFIG_PATH)) {
this.startTimeMillis = Optional.of(config.getLong(START_TIME_CONFIG_PATH));
this.endTimeMillis = Optional.of(config.getLong(END_TIME_CONFIG_PATH));
} else {
startTimeMillis = Optional.empty();
endTimeMillis = Optional.empty();
}
if (config.hasPath(FILTER_CONFIG_PATH)) {
this.filter = deserializeFilter(config.getString(FILTER_CONFIG_PATH));
} else {
filter = Optional.empty();
}
}

private Optional<Filter> buildTimeRangeAndFilters(Optional<String> timeRangeAttribute) {
Filter.Builder filterBuilder = Filter.newBuilder();
filterBuilder.setOperator(Operator.OR);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why OR? don't we want the filter to be: time > startTime and time < endTime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (timeRangeAttribute.isPresent()) {
if (this.startTimeMillis.isPresent() && this.endTimeMillis.isPresent()) {
filterBuilder.addChildFilter(
QueryRequestUtil.createLongFilter(
timeRangeAttribute.get(), Operator.GT, this.endTimeMillis.get()));
filterBuilder.addChildFilter(
QueryRequestUtil.createLongFilter(
timeRangeAttribute.get(), Operator.LT, this.startTimeMillis.get()));
}
}

this.filter.ifPresent(filterBuilder::addChildFilter);
if (filterBuilder.getChildFilterCount() == 0) {
return Optional.empty();
}
if (filterBuilder.getChildFilterCount() == 1) {
return Optional.of(filterBuilder.getChildFilter(0));
}

return Optional.of(filterBuilder.build());
}

private Optional<Filter> deserializeFilter(String filterJson) {
try {
JsonNode jsonNode = objectMapper.readTree(filterJson);

Filter.Builder filterBuilder = Filter.newBuilder();
JsonFormat.parser().merge(jsonNode.toString(), filterBuilder);

return Optional.of(filterBuilder.build());

} catch (Exception e) {
log.error("Error deserializing additional filter config to query request filter");
return Optional.empty();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,24 @@ public static Optional<String> getLogicalColumnName(Expression expression) {
}
}

public static Filter createLongFilter(String columnName, Operator op, long value) {
return createFilter(columnName, op, createLongLiteralExpression(value));
}

public static Filter createFilter(String columnName, Operator op, Expression value) {
return createFilter(createAttributeExpression(columnName), op, value);
}

public static Expression createAttributeExpression(String attributeId) {
return Expression.newBuilder()
.setAttributeExpression(AttributeExpression.newBuilder().setAttributeId(attributeId))
.build();
}

public static Filter createFilter(Expression columnExpression, Operator op, Expression value) {
return Filter.newBuilder().setLhs(columnExpression).setOperator(op).setRhs(value).build();
}

public static Optional<String> getAlias(Expression expression) {
switch (expression.getValueCase()) {
case COLUMNIDENTIFIER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ private Observable<ResultSetChunk> executeTransformedRequest(
.flatMapObservable(
handler -> {
handler.getTimeFilterColumn().ifPresent(context::setTimeFilterColumn);
return handler.handleRequest(transformedRequest, context);
return handler.handleRequest(
handler.applyAdditionalFilters(transformedRequest, context), context);
})
.lift(chunkRows(context.getResultSetMetadata()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,9 @@ public interface RequestHandler {

/** Handle the request and add rows to the collector. */
Observable<Row> handleRequest(QueryRequest request, ExecutionContext executionContext);

default QueryRequest applyAdditionalFilters(
Copy link
Contributor

@satish-mittal satish-mittal Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is already an existing step of query transformation. Look at QueryTransformation. This step of adding filters should go there.
https://github.com/hypertrace/query-service/blob/main/query-service-impl/src/main/java/org/hypertrace/core/query/service/QueryTransformation.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We started from this, then realised we don't get scope information in query request.
If we apply a filter on Transformation step, it'll be applied on all the scopes and would work as global filter. Which is wrong behaviour
Query Service decides the scope and it's handler based of the query. Hence pushed down these filters to handler

QueryRequest queryRequest, ExecutionContext executionContext) {
return queryRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import javax.annotation.Nonnull;
import org.apache.pinot.client.ResultSet;
import org.apache.pinot.client.ResultSetGroup;
import org.hypertrace.core.query.service.AdditionalHandlerFiltersConfig;
import org.hypertrace.core.query.service.ExecutionContext;
import org.hypertrace.core.query.service.QueryCost;
import org.hypertrace.core.query.service.RequestHandler;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class PinotBasedRequestHandler implements RequestHandler {
private Optional<String> startTimeAttributeName;
private QueryRequestToPinotSQLConverter request2PinotSqlConverter;
private final PinotMapConverter pinotMapConverter;
private AdditionalHandlerFiltersConfig additionalHandlerFiltersConfig;
// The implementations of ResultSet are package private and hence there's no way to determine the
// shape of the results
// other than to do string comparison on the simple class names. In order to be able to unit test
Expand Down Expand Up @@ -138,6 +140,9 @@ private void processConfig(Config config) {
if (config.hasPath(SLOW_QUERY_THRESHOLD_MS_CONFIG)) {
this.slowQueryThreshold = config.getInt(SLOW_QUERY_THRESHOLD_MS_CONFIG);
}

this.additionalHandlerFiltersConfig =
new AdditionalHandlerFiltersConfig(config, this.startTimeAttributeName);
LOG.info(
"Using {}ms as the threshold for logging slow queries of handler: {}",
slowQueryThreshold,
Expand All @@ -146,6 +151,22 @@ private void processConfig(Config config) {
initMetrics();
}

@Override
public QueryRequest applyAdditionalFilters(
QueryRequest queryRequest, ExecutionContext executionContext) {
List<Filter> additionalFilters =
this.additionalHandlerFiltersConfig.getAdditionalFiltersForTenant(
executionContext.getTenantId());
if (additionalFilters.isEmpty()) {
return queryRequest;
}
Filter.Builder filterBuilder = Filter.newBuilder();
filterBuilder.setOperator(Operator.AND);
filterBuilder.addChildFilter(queryRequest.getFilter());
filterBuilder.addAllChildFilter(additionalFilters);
return queryRequest.toBuilder().setFilter(filterBuilder.build()).build();
}

/**
* Returns a QueryCost that is an indication of whether the given query can be handled by this
* handler and if so, how costly is it to handle that query.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ void invokesHandlerAndPropagatesResults() {
RequestHandlerSelector mockSelector = mock(RequestHandlerSelector.class);
RequestHandler mockHandler = mock(RequestHandler.class);
Row mockRow = Row.getDefaultInstance();
when(mockHandler.applyAdditionalFilters(eq(originalRequest), any(ExecutionContext.class)))
.thenReturn(originalRequest);
when(mockHandler.handleRequest(eq(originalRequest), any(ExecutionContext.class)))
.thenReturn(Observable.just(mockRow));
when(mockSelector.select(same(originalRequest), any())).thenReturn(Optional.of(mockHandler));
Expand Down
Loading
Loading