Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding average rate logic in query service #87

Merged
merged 71 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from 67 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
9ca540d
init commit
Sep 23, 2021
8ce1e48
updated tests
Sep 23, 2021
a93b752
removed debug statement
Sep 23, 2021
ae7e640
modified test
Sep 23, 2021
aad7af7
added support for sum and avg_rate
Sep 23, 2021
7f0097c
updated test files
Sep 23, 2021
089cb3c
refactored
Sep 24, 2021
4f2692f
added test
Sep 24, 2021
334b090
refactored
Sep 24, 2021
f0c2675
changed map signature
Sep 24, 2021
d53b439
refactored the code
Sep 24, 2021
216ace8
updated tests
Sep 24, 2021
711325a
restored gradle changes
Sep 24, 2021
33ebefa
refactored convert method
Sep 24, 2021
3bd93d6
refactored
Sep 24, 2021
67f55da
refactored avg_rate functions
Sep 24, 2021
c1ebc65
added comments for avg_rate
Sep 24, 2021
7dd43eb
added AVG_RATE in pinot
Sep 27, 2021
305632f
removed unused code
Sep 27, 2021
9cca2be
removed unused code
Sep 27, 2021
ee60629
modified comment
Sep 28, 2021
246600d
added unit test
Sep 28, 2021
18781c2
added info in execution context
Oct 1, 2021
50203ef
changed function signature to pass executionContext
Oct 1, 2021
c7d51dd
added avg_rate support
Oct 1, 2021
20b4073
merged main to run spotless
Oct 4, 2021
c4cd3f2
refactored
Oct 4, 2021
5b77e64
refactored executioncontext file
Oct 4, 2021
beeed4b
resolved bugs
Oct 4, 2021
c006e70
refactored
Oct 4, 2021
69a500f
resolved comments
Oct 4, 2021
7cfb5a7
refactored
Oct 5, 2021
93cb20b
updated test
Oct 5, 2021
f1c9029
resolved PR comments
Oct 5, 2021
88b66d7
resolved PR comments
Oct 5, 2021
029f188
changed functionToStringForAvgRate function
Oct 11, 2021
f322ed5
resolved pr comments
Oct 11, 2021
2041c72
added tree traversal for filter
Oct 11, 2021
83e1d1e
resolved comments
Oct 11, 2021
3b9180a
read time attribute from config
Oct 11, 2021
a159461
refactored implementation
Oct 11, 2021
4f8ddaf
.
Oct 11, 2021
67b9e9c
removed comments
Oct 11, 2021
e76db76
refactored
Oct 12, 2021
ab7dcfe
renamed method
Oct 12, 2021
b87da0b
refactored
Oct 12, 2021
dcb6e04
added unit test for changed handler interface
Oct 13, 2021
db84e83
added test for pinotFunctionConverter
Oct 13, 2021
81e6b1d
added unit test
Oct 13, 2021
4002459
resolved PR comments
Oct 13, 2021
b735a55
changed tree traversal signature
Oct 13, 2021
909f014
added unit test
Oct 13, 2021
a1e298c
resolved PR comments
Oct 13, 2021
bc0c19c
minor refactorings
Oct 13, 2021
a4fac59
resolved PR comments
Oct 13, 2021
cd3036e
resolved PR comments
Oct 13, 2021
6f9fa28
updated time in unit tests
Oct 13, 2021
4a49ce0
minor fix
Oct 13, 2021
5378145
refactored
Oct 13, 2021
8ba8fae
resolved PR comments
Oct 14, 2021
49a8f9a
resolved comments
Oct 14, 2021
8e9585f
resolved comments
Oct 14, 2021
a5438ec
refactored test files
Oct 14, 2021
b9e8f1f
refactoring
Oct 14, 2021
f182691
minor refactorings
Oct 14, 2021
31c1633
resolved PR comments
Oct 14, 2021
7052c4a
resolved PR comments
Oct 14, 2021
8d6152d
nit changes
Oct 14, 2021
9c3606e
refactored integration tests
Oct 14, 2021
12f8c3e
resolved comments
Oct 14, 2021
21acc1f
resolved PR comments
Oct 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
package org.hypertrace.core.query.service;

import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.hypertrace.core.query.service.api.ColumnIdentifier;
import org.hypertrace.core.query.service.api.ColumnMetadata;
import org.hypertrace.core.query.service.api.Expression;
import org.hypertrace.core.query.service.api.Expression.ValueCase;
import org.hypertrace.core.query.service.api.Filter;
import org.hypertrace.core.query.service.api.Function;
import org.hypertrace.core.query.service.api.Operator;
import org.hypertrace.core.query.service.api.OrderByExpression;
import org.hypertrace.core.query.service.api.QueryRequest;
import org.hypertrace.core.query.service.api.ResultSetMetadata;
Expand All @@ -27,6 +36,10 @@ public class ExecutionContext {
private Set<String> referencedColumns;
private final LinkedHashSet<String> selectedColumns;
private ResultSetMetadata resultSetMetadata;
// default value null
private String timeFilterColumn = null;
private Supplier<Optional<Duration>> timeRangeDurationSupplier;
sarthak77 marked this conversation as resolved.
Show resolved Hide resolved

// Contains all selections to be made in the DB: selections on group by, single columns and
// aggregations in that order.
// There should be a one-to-one mapping between this and the columnMetadataSet in
Expand All @@ -35,14 +48,41 @@ public class ExecutionContext {
// while the selectedColumns
// is a set of column names.
private final LinkedHashSet<Expression> allSelections;
private final Optional<Duration> timeSeriesPeriod;
private final Filter queryRequestFilter;

public ExecutionContext(String tenantId, QueryRequest request) {
this.tenantId = tenantId;
this.selectedColumns = new LinkedHashSet<>();
this.allSelections = new LinkedHashSet<>();
this.timeSeriesPeriod = calculateTimeSeriesPeriod(request);
this.queryRequestFilter = request.getFilter();
timeRangeDurationSupplier =
Suppliers.memoize(
() -> findTimeRangeDuration(this.queryRequestFilter, this.timeFilterColumn));
analyze(request);
}

private Optional<Duration> calculateTimeSeriesPeriod(QueryRequest request) {
if (request.getGroupByCount() > 0) {
for (Expression expression : request.getGroupByList()) {
if (expression.getValueCase() == ValueCase.FUNCTION
&& expression.getFunction().getFunctionName().equals("dateTimeConvert")) {
String timeSeriesPeriod =
expression
.getFunction()
.getArgumentsList()
.get(3)
.getLiteral()
.getValue()
.getString();
return Optional.of(parseDuration(timeSeriesPeriod));
}
}
}
return Optional.empty();
}

private void analyze(QueryRequest request) {
List<String> filterColumns = new ArrayList<>();
LinkedList<Filter> filterQueue = new LinkedList<>();
Expand Down Expand Up @@ -154,6 +194,58 @@ private void extractColumns(List<String> columns, Expression expression) {
}
}

private Duration parseDuration(String timeSeriesPeriod) {
String[] splitPeriodString = timeSeriesPeriod.split(":");
long amount = Long.parseLong(splitPeriodString[0]);
ChronoUnit unit = TimeUnit.valueOf(splitPeriodString[1]).toChronoUnit();
return Duration.of(amount, unit);
}

private Optional<Duration> findTimeRangeDuration(Filter filter, String timeFilterColumn) {

// time filter will always be present with AND operator
if (filter.getOperator() != Operator.AND) {
return Optional.empty();
}

Optional<Long> timeRangeStart =
filter.getChildFilterList().stream()
.filter(
childFilter ->
this.isMatchingFilter(
childFilter, timeFilterColumn, List.of(Operator.GE, Operator.GT)))
.map(matchingFilter -> matchingFilter.getRhs().getLiteral().getValue().getLong())
.findFirst();

Optional<Long> timeRangeEnd =
filter.getChildFilterList().stream()
.filter(
childFilter ->
this.isMatchingFilter(
childFilter, timeFilterColumn, List.of(Operator.LT, Operator.LE)))
.map(matchingFilter -> matchingFilter.getRhs().getLiteral().getValue().getLong())
.findFirst();

if (timeRangeStart.isPresent() && timeRangeEnd.isPresent()) {
return Optional.of(Duration.ofMillis(timeRangeEnd.get() - timeRangeStart.get()));
}

return filter.getChildFilterList().stream()
.map(childFilter -> this.findTimeRangeDuration(childFilter, timeFilterColumn))
.flatMap(Optional::stream)
.findFirst();
}

private boolean isMatchingFilter(Filter filter, String column, Collection<Operator> operators) {
return column.equals(filter.getLhs().getColumnIdentifier().getColumnName())
&& (operators.stream()
.anyMatch(operator -> Objects.equals(operator, filter.getOperator())));
}

public void setTimeFilterColumn(String timeFilterColumn) {
this.timeFilterColumn = timeFilterColumn;
}

public String getTenantId() {
return this.tenantId;
}
Expand All @@ -173,4 +265,12 @@ public LinkedHashSet<String> getSelectedColumns() {
public LinkedHashSet<Expression> getAllSelections() {
return this.allSelections;
}

public Optional<Duration> getTimeSeriesPeriod() {
return this.timeSeriesPeriod;
}

public Optional<Duration> getTimeRangeDuration() {
return timeRangeDurationSupplier.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public interface QueryFunctionConstants {
String QUERY_FUNCTION_MIN = "MIN";
String QUERY_FUNCTION_MAX = "MAX";
String QUERY_FUNCTION_COUNT = "COUNT";
String QUERY_FUNCTION_AVG_RATE = "AVG_RATE";
String QUERY_FUNCTION_PERCENTILE = "PERCENTILE";
String QUERY_FUNCTION_DISTINCTCOUNT = "DISTINCTCOUNT";
String QUERY_FUNCTION_CONCAT = "CONCAT";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,11 @@ private Observable<ResultSetChunk> executeTransformedRequest(
Status.FAILED_PRECONDITION
.withDescription("No handler available matching request")
.asException()))
.flatMapObservable(handler -> handler.handleRequest(transformedRequest, context))
.flatMapObservable(
handler -> {
handler.getTimeFilterColumn().ifPresent(context::setTimeFilterColumn);
return handler.handleRequest(transformedRequest, context);
})
.lift(chunkRows(context.getResultSetMetadata()));
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.hypertrace.core.query.service;

import io.reactivex.rxjava3.core.Observable;
import java.util.Optional;
import org.hypertrace.core.query.service.api.QueryRequest;
import org.hypertrace.core.query.service.api.Row;

Expand All @@ -20,6 +21,8 @@ public interface RequestHandler {

String getName();

Optional<String> getTimeFilterColumn();

QueryCost canHandle(QueryRequest request, ExecutionContext context);

/** Handle the request and add rows to the collector. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public String getName() {
return name;
}

@Override
public Optional<String> getTimeFilterColumn() {
return this.startTimeAttributeName;
}

private void processConfig(Config config) {

if (!config.hasPath(TENANT_COLUMN_NAME_CONFIG_KEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.hypertrace.core.query.service.api.SortOrder;
import org.hypertrace.core.query.service.api.Value;
import org.hypertrace.core.query.service.api.ValueType;
import org.hypertrace.core.query.service.pinot.Params.Builder;
import org.hypertrace.core.query.service.pinot.converters.DestinationColumnValueConverter;
import org.hypertrace.core.query.service.pinot.converters.PinotFunctionConverter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -63,7 +64,7 @@ Entry<String, Params> toSQL(
// how it is created.
for (Expression expr : allSelections) {
pqlBuilder.append(delim);
pqlBuilder.append(convertExpression2String(expr, paramsBuilder));
pqlBuilder.append(convertExpression2String(expr, paramsBuilder, executionContext));
delim = ", ";
}

Expand All @@ -75,7 +76,8 @@ Entry<String, Params> toSQL(

if (request.hasFilter()) {
pqlBuilder.append(" AND ");
String filterClause = convertFilter2String(request.getFilter(), paramsBuilder);
String filterClause =
convertFilter2String(request.getFilter(), paramsBuilder, executionContext);
pqlBuilder.append(filterClause);
}

Expand All @@ -84,7 +86,8 @@ Entry<String, Params> toSQL(
delim = "";
for (Expression groupByExpression : request.getGroupByList()) {
pqlBuilder.append(delim);
pqlBuilder.append(convertExpression2String(groupByExpression, paramsBuilder));
pqlBuilder.append(
convertExpression2String(groupByExpression, paramsBuilder, executionContext));
delim = ", ";
}
}
Expand All @@ -93,7 +96,9 @@ Entry<String, Params> toSQL(
delim = "";
for (OrderByExpression orderByExpression : request.getOrderByList()) {
pqlBuilder.append(delim);
String orderBy = convertExpression2String(orderByExpression.getExpression(), paramsBuilder);
String orderBy =
convertExpression2String(
orderByExpression.getExpression(), paramsBuilder, executionContext);
pqlBuilder.append(orderBy);
if (SortOrder.DESC.equals(orderByExpression.getOrder())) {
pqlBuilder.append(" desc ");
Expand All @@ -119,15 +124,16 @@ Entry<String, Params> toSQL(
return new SimpleEntry<>(pqlBuilder.toString(), paramsBuilder.build());
}

private String convertFilter2String(Filter filter, Params.Builder paramsBuilder) {
private String convertFilter2String(
Filter filter, Builder paramsBuilder, ExecutionContext executionContext) {
StringBuilder builder = new StringBuilder();
String operator = convertOperator2String(filter.getOperator());
if (filter.getChildFilterCount() > 0) {
String delim = "";
builder.append("( ");
for (Filter childFilter : filter.getChildFilterList()) {
builder.append(delim);
builder.append(convertFilter2String(childFilter, paramsBuilder));
builder.append(convertFilter2String(childFilter, paramsBuilder, executionContext));
builder.append(" ");
delim = operator + " ";
}
Expand All @@ -140,9 +146,10 @@ private String convertFilter2String(Filter filter, Params.Builder paramsBuilder)
handleValueConversionForLiteralExpression(filter.getLhs(), filter.getRhs());
builder.append(operator);
builder.append("(");
builder.append(convertExpression2String(filter.getLhs(), paramsBuilder));
builder.append(
convertExpression2String(filter.getLhs(), paramsBuilder, executionContext));
builder.append(",");
builder.append(convertExpression2String(rhs, paramsBuilder));
builder.append(convertExpression2String(rhs, paramsBuilder, executionContext));
builder.append(")");
break;
case CONTAINS_KEY:
Expand Down Expand Up @@ -175,11 +182,12 @@ private String convertFilter2String(Filter filter, Params.Builder paramsBuilder)
break;
default:
rhs = handleValueConversionForLiteralExpression(filter.getLhs(), filter.getRhs());
builder.append(convertExpression2String(filter.getLhs(), paramsBuilder));
builder.append(
convertExpression2String(filter.getLhs(), paramsBuilder, executionContext));
builder.append(" ");
builder.append(operator);
builder.append(" ");
builder.append(convertExpression2String(rhs, paramsBuilder));
builder.append(convertExpression2String(rhs, paramsBuilder, executionContext));
}
}
return builder.toString();
Expand Down Expand Up @@ -251,7 +259,8 @@ private String convertOperator2String(Operator operator) {
}
}

private String convertExpression2String(Expression expression, Params.Builder paramsBuilder) {
private String convertExpression2String(
Expression expression, Builder paramsBuilder, ExecutionContext executionContext) {
switch (expression.getValueCase()) {
case COLUMNIDENTIFIER:
String logicalColumnName = expression.getColumnIdentifier().getColumnName();
Expand All @@ -262,11 +271,13 @@ private String convertExpression2String(Expression expression, Params.Builder pa
return convertLiteralToString(expression.getLiteral(), paramsBuilder);
case FUNCTION:
return this.functionConverter.convert(
executionContext,
expression.getFunction(),
argExpression -> convertExpression2String(argExpression, paramsBuilder));
argExpression ->
convertExpression2String(argExpression, paramsBuilder, executionContext));
case ORDERBY:
OrderByExpression orderBy = expression.getOrderBy();
return convertExpression2String(orderBy.getExpression(), paramsBuilder);
return convertExpression2String(orderBy.getExpression(), paramsBuilder, executionContext);
case VALUE_NOT_SET:
break;
}
Expand Down
Loading