Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Aggregation functions with multiple arguments. #5259

Merged
merged 1 commit into from
Apr 17, 2020

Conversation

mayankshriv
Copy link
Contributor

The current implementation assumes that all AggregationFunctions take one argument
with the exception of DistinctAggregationFunction. This PR handles changes related
to supporting AggregationFunctions with multiple arguments, as we anticipate new
aggregation functions to be added that take multiple arguments.

  1. Enhanced parser to allow multiple arguments for aggregation functions.
  2. AggregationFunctionFactory provides the right set of arguments when instantiating
    individual aggregation functions.
  3. AggregationFunctions now store their arguments, as opposed to assuming that the right
    BlockValSet is passed to the aggregate() api's.
  4. AggregationFunction.aggregate() api's now take a Map<String, BlockValSet> where the key
    is the argument expression (columnName for simple case), as opposed to a variable array
    as that interface does not provide a way to associate BlockValSet with the argument.
  5. Cleanup: Removed env variable to enable/disable Distinct, as there is no need for it to be
    disabled anymore.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend directly passing TransformBlock to the AggregationFunction, where AggregationFunction can get the column needed, instead of fetching the data on caller side and creating the map.

@mayankshriv
Copy link
Contributor Author

Recommend directly passing TransformBlock to the AggregationFunction, where AggregationFunction can get the column needed, instead of fetching the data on caller side and creating the map.

Yeah, I thought about that as well. I feel having a map passes only the relevant content and makes the api more crisp and unit testable, and hence chose to create a Map.

Copy link
Member

@kishoreg kishoreg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any major issues with this PR. There are possible improvements but we can do that in another PR.

* Class to host compiler related constants
*/
public class CompilerConstants {
public static final String AGGREGATION_FUNCTION_ARG_SEPARATOR = ":";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only needed for distinct and it should go away after your change, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach is also based on concatenated arguments using a separator ":". Two other possibilities:

  • Change AggregationInfo thrift object to pass multiple arguments in a much cleaner manner right from the parser stage.
  • Change the way arguments are stored in the map that is already there in AggregationInfo. This doesn't require change to thrift definition. We can adopt a convention where multiple arguments are stored as individual KV pairs in the map that is already there in AggregationInfo. Right now there exists a single entry in this map. Key being "column" and value being column name (argument to aggr function). The value is a concatenated string for aggr functions that can take multiple arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @siddharthteotia pointed out, passing multiple args to aggregation function via AggregationInfo may end up creating a backward incompatible change. For now, I am piggy backing on the concatenation approach to pass multiple args. We can make a backward compatible cleanup separately, unless there is trivial fix that can be included in this PR.

@@ -121,7 +121,8 @@ private void convertSelectList(PinotQuery pinotQuery, BrokerRequest brokerReques
List<AggregationInfo> aggregationInfoList = null;
for (Expression expression : pinotQuery.getSelectList()) {
ExpressionType type = expression.getType();
if (type == ExpressionType.FUNCTION && expression.getFunctionCall().getOperator().equalsIgnoreCase(SqlKind.AS.toString())) {
if (type == ExpressionType.FUNCTION && expression.getFunctionCall().getOperator()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dint see the use of SQLKind here, we don't want to leak this. Can you please move this constant to PinotQuery.Constants or some other place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This diff got flagged only due to formatting change. Will file a separate PR for this.

distinctColumnExpr.append(expression);
}

// TODO: Remove all distinct special-casing.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+100

@@ -42,79 +47,87 @@ private AggregationFunctionFactory() {
public static AggregationFunction getAggregationFunction(AggregationInfo aggregationInfo,
@Nullable BrokerRequest brokerRequest) {
String functionName = aggregationInfo.getAggregationType();
String argumentsString = AggregationFunctionUtils.getColumn(aggregationInfo);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this temporary for Distinct or permanent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for passing multiple args as one concatenated string in this PR, to avoid backward incompatible change in request.thrift for AggregationInfo. Open to suggestions if there is a trivial fix here that can be done in this PR. Otherwise, will address it separately in a backward compatible way.

}

AggregationInfo aggregationInfo = new AggregationInfo();
aggregationInfo.setAggregationType(_name);
aggregationInfo.putToAggregationParams(COLUMN_KEY_IN_AGGREGATION_INFO, expression);
aggregationInfo.putToAggregationParams(CompilerConstants.COLUMN_KEY_IN_AGGREGATION_INFO, expression);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we instead fix AggregationInfo to take in multiple arguments instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should. I didn't want to break backward compatibility in this PR though.

switch (AggregationFunctionType.valueOf(upperCaseFunctionName)) {
case COUNT:
return new CountAggregationFunction();
return new CountAggregationFunction(column);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add an init method instead that passes the arguments[] as it is, this might allow us to plugin aggregation functions easily. we can do this in the next PR as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, next PR.

@mayankshriv mayankshriv force-pushed the aggregation-changes branch 2 times, most recently from c43e2c2 to f236728 Compare April 16, 2020 14:05
}

AggregationInfo aggregationInfo = new AggregationInfo();
aggregationInfo.setAggregationType(functionName);
aggregationInfo.putToAggregationParams(FunctionCallAstNode.COLUMN_KEY_IN_AGGREGATION_INFO, columnName);
aggregationInfo.putToAggregationParams(CompilerConstants.COLUMN_KEY_IN_AGGREGATION_INFO, argumentString);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should introduce a map that specifies the number of arguments aggregation function expects (if fixed)? We can then throw an error right here for functions that cannot handle multiple arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AggregationFunctions will do that.

import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.spi.utils.EqualityUtils;

import static org.apache.pinot.parsers.CompilerConstants.AGGREGATION_FUNCTION_ARG_SEPARATOR;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
import static org.apache.pinot.parsers.CompilerConstants.AGGREGATION_FUNCTION_ARG_SEPARATOR;
import org.apache.pinot.parsers.CompilerConstants;

} else {
// handle rest of the aggregate functions -- sum, min, max etc
function.aggregate(length, resultHolder, transformBlock.getBlockValueSet(_expressions[i]));
function.aggregate(length, resultHolder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldnt the code from line 88 to 92 hold here as well? Can we move the call to function.aggregate() below, initializing the map differently (or, are we optimizing the map creation here with Collections.singletonMap())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the optimization for now because we want to clean up the DISTINCT special casing anyways.

@Jackie-Jiang
Copy link
Contributor

Recommend directly passing TransformBlock to the AggregationFunction, where AggregationFunction can get the column needed, instead of fetching the data on caller side and creating the map.

Yeah, I thought about that as well. I feel having a map passes only the relevant content and makes the api more crisp and unit testable, and hence chose to create a Map.

@mayankshriv Passing TransformBlock is still unit testable, where you just need to mock the TransformBlock.
Here what I'm actually proposing is to wrap the column fetch logic inside aggregation function. Only the aggregation function knows how each column should be processed, and in what order, either the column is identifier or constant etc.
Another concern is about the overhead and extra garbage of creating a map. For very high QPS use case, this might cause higher latency.

@mayankshriv
Copy link
Contributor Author

Recommend directly passing TransformBlock to the AggregationFunction, where AggregationFunction can get the column needed, instead of fetching the data on caller side and creating the map.

Yeah, I thought about that as well. I feel having a map passes only the relevant content and makes the api more crisp and unit testable, and hence chose to create a Map.

@mayankshriv Passing TransformBlock is still unit testable, where you just need to mock the TransformBlock.
Here what I'm actually proposing is to wrap the column fetch logic inside aggregation function. Only the aggregation function knows how each column should be processed, and in what order, either the column is identifier or constant etc.
Another concern is about the overhead and extra garbage of creating a map. For very high QPS use case, this might cause higher latency.

It enforces that AggregationFunction works on TransformBlocks, which we might change in future, if we have more dynamic query planning. So I am still not convinced on that. Also, this will happen once per block (5k or 10k filtered rows) so there may not be a lot of overhead.

The current implementation assumes that all AggregationFunctions take one argument
with the exception of DistinctAggregationFunction. This PR handles changes related
to supporting AggregationFunctions with multiple arguments, as we anticipate new
aggregation functions to be added that take multiple arguments.

1. Enhanced parser to allow multiple arguments for aggregation functions.
2. AggregationFunctionFactory provides the right set of arguments when instantiating
   individual aggregation functions.
3. AggregationFunctions now store their arguments, as opposed to assuming that the right
   BlockValSet is passed to the aggregate() api's.
4. AggregationFunction.aggregate() api's now take a Map<String, BlockValSet> where the key
   is the argument expression (columnName for simple case), as opposed to a variable array
   as that interface does not provide a way to associate BlockValSet with the argument.
5. Cleanup: Removed env variable to enable/disable Distinct, as there is no need for it to be
   disabled anymore.
@mayankshriv mayankshriv merged commit 2a31111 into apache:master Apr 17, 2020
@mayankshriv mayankshriv deleted the aggregation-changes branch April 17, 2020 00:49
mayankshriv added a commit to mayankshriv/pinot that referenced this pull request Apr 18, 2020
…td).

This PR is a continuation of apache#5259
to address the issue apache#5261.

1. Added new field in request.thrift `aggregationFunctionArgs` as a list of String
   arguments for the aggregation funciton.
   - Could not use the existing `aggregationParams` as it is a Map, and functions with
     variable arguments may not provide a name for the arg (to be used as key in Map).
   - Maintain backward compatibility by first check for the new field, and fall back to
     the existing one if it does not exist.

2. Ensure that all calls to the old AggregationInfo.getAggregationParams() is replaced
   with backward compatible AgguregationFunctionUtils.getAggregationArgs().

3. Since most aggregation functions today have just one argument, added a separate api
   AggregationFuncitonContext.getFirstArgument() as an optimization.

4. Cleaned up getColumnName() and getResultColumnName() api's in AggregationFunctionContext
   class to not require the column name argument, as this is already stored in the
   AggregationFunction.

5. Modified all tests to use aggregationFunctionArgs instead of aggregationParams.

TODO:
Remove the AggregationFunctionContext class as AggregationFunctions now store their arguments,
and this class no longer provides any additional value.
mayankshriv added a commit to mayankshriv/pinot that referenced this pull request Apr 19, 2020
…td).

This PR is a continuation of apache#5259
to address the issue apache#5261.

1. Added new field in request.thrift `aggregationFunctionArgs` as a list of String
   arguments for the aggregation funciton.
   - Could not use the existing `aggregationParams` as it is a Map, and functions with
     variable arguments may not provide a name for the arg (to be used as key in Map).
   - Maintain backward compatibility by first check for the new field, and fall back to
     the existing one if it does not exist.

2. Ensure that all calls to the old AggregationInfo.getAggregationParams() is replaced
   with backward compatible AgguregationFunctionUtils.getAggregationArgs().

3. Since most aggregation functions today have just one argument, added a separate api
   AggregationFuncitonContext.getFirstArgument() as an optimization.

4. Cleaned up getColumnName() and getResultColumnName() api's in AggregationFunctionContext
   class to not require the column name argument, as this is already stored in the
   AggregationFunction.

5. Modified all tests to use aggregationFunctionArgs instead of aggregationParams.

TODO:
Remove the AggregationFunctionContext class as AggregationFunctions now store their arguments,
and this class no longer provides any additional value.
mayankshriv added a commit that referenced this pull request Apr 19, 2020
…td). (#5275)

This PR is a continuation of #5259
to address the issue #5261.

1. Added new field in request.thrift `aggregationFunctionArgs` as a list of String
   arguments for the aggregation funciton.
   - Could not use the existing `aggregationParams` as it is a Map, and functions with
     variable arguments may not provide a name for the arg (to be used as key in Map).
   - Maintain backward compatibility by first check for the new field, and fall back to
     the existing one if it does not exist.

2. Ensure that all calls to the old AggregationInfo.getAggregationParams() is replaced
   with backward compatible AgguregationFunctionUtils.getAggregationArgs().

3. Since most aggregation functions today have just one argument, added a separate api
   AggregationFuncitonContext.getFirstArgument() as an optimization.

4. Cleaned up getColumnName() and getResultColumnName() api's in AggregationFunctionContext
   class to not require the column name argument, as this is already stored in the
   AggregationFunction.

5. Modified all tests to use aggregationFunctionArgs instead of aggregationParams.

TODO:
Remove the AggregationFunctionContext class as AggregationFunctions now store their arguments,
and this class no longer provides any additional value.
mayankshriv added a commit to mayankshriv/pinot that referenced this pull request May 6, 2020
With PR apache#5259 and apache#5275, the broker starts to send aggregation function arguments
in a new field in the thrift class. While new server prefers new field and falls back
to old field, in case of new broker and old server the server is unable to find
values in the field.

This PR fixes this issue by adding both old and new field in the broker. We will need to
change back broker to stop setting the old field with a future release.
mayankshriv added a commit that referenced this pull request May 6, 2020
With PR #5259 and #5275, the broker starts to send aggregation function arguments
in a new field in the thrift class. While new server prefers new field and falls back
to old field, in case of new broker and old server the server is unable to find
values in the field.

This PR fixes this issue by adding both old and new field in the broker. We will need to
change back broker to stop setting the old field with a future release.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants