Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expression aggregator #11104

Merged
merged 17 commits into from
Apr 23, 2021
Merged

Conversation

clintropolis
Copy link
Member

@clintropolis clintropolis commented Apr 12, 2021

Description

I loved the idea of the Javascript aggregator - the flexibility of being able to define arbitrary computations across any number of inputs to accumulate values is very powerful. But, it turns out it was a bit too powerful due to the number of exploitive things you can do with it to the host machine, so it broke my heart. Additionally, it was limited to only supporting aggregations of double types, which really cuts down on the expressiveness possible.

In this PR, I have re-imagined the concept of such a flexible aggregator, but sandboxed, using native Druid expressions. Further, this ExpressionLambdaAggregatorFactory really rounds out the role of the Druid expression system in the query engine. With its introduction, Druid native expressions can now be used to perform a "fold" or "reduce" operation on any number of input columns, in addition to the previously possible "map" (ExpressionVirtualColumn), "filter" (ExpressionFilter) and post-transform (ExpressionPostAggregator).

ExpressionLambdaAggregatorFactory also supports all native Druid expression types, including array types, which means that it can be used to build things such as ARRAY_AGG and GROUP_CONCAT and similar functionality.

ExpressionLambdaAggregatorFactory offers near complete control over the AggregatorFactory with expressions.

property description required
name aggregator name true
fields aggregator input columns true
accumulatorIdentifier variable which identifies the accumulator value in the fold and combine expressions false (default __acc)
initialValue initial value of the accumulator for fold (and combine, if InitialCombineValue is null) expression true
initialCombineValue initial value of the accumulator for combine expression false (default initialValue)
fold expression to accumulate values from fields. The result of the expression will be stored in accumulatorIdentifier and available to the next computation. true
combine expression to combine the results of various fold expressions. If not defined and fold has a single input column in fields, then the fold expression may be used, otherwise the input is available to the expression as the name false (default to fold expression if and only if the expression has a single input in fields)
compare comparator expression which can only refer to 2 input variables, o1 and o2, where o1 and o2 are the output of fold or combine expressions, and must adhere to the Java comparator contract. If not set, this will try to fall back to an output type appropriate comparator false
finalize finalize expression which can only refer to a single input variable, o, and is used to perform any final transformation of the output of fold or combine expressions. If not set, then the value is not transformed false
maxSizeBytes maximum size in bytes that variably sized aggregator output types such as strings and arrays are allowed to grow before the aggregation will fail. false (8192 bytes)

Examples (some contrived)

"count" aggregator

    {
      "type": "expression",
      "name": "expression_count",
      "fields": [],
      "initialValue": "0",
      "fold": "__acc + 1",
      "combine": "__acc + expression_count"
    }

"sum" aggregator

    {
      "type": "expression",
      "name": "expression_sum",
      "fields": ["column_a"],
      "initialValue": "0",
      "fold": "__acc + column_a"
    }

"array" aggregator, sorted by array length

    {
      "type": "expression",
      "name": "expression_array_agg_distinct",
      "fields": ["column_a"],
      "initialValue": "[]",
      "fold": "array_set_add(__acc, column_a)",
      "combine": "array_set_add_all(__acc, expression_array_agg_distinct)",
      "compare": "if(array_length(o1) > array_length(o2), 1, if (array_length(o1) == array_length(o2), 0, -1))"
    }

"group_concat" aggregator, sorted by array length

    {
      "type": "expression",
      "name": "expression_group_concat_distinct",
      "fields": ["column_a"],
      "initialValue": "[]",
      "fold": "array_set_add(__acc, column_a)",
      "combine": "array_set_add_all(__acc, expression_array_agg_distinct)",
      "compare": "if(array_length(o1) > array_length(o2), 1, if (array_length(o1) == array_length(o2), 0, -1))",
      "finalize": "array_to_string(o, ',')"
    }

decomposed "sum" aggregator, where instead of merging by summing it merges into an array of each individual sum, before finally summing values in a finalizer

    {
      "type": "expression",
      "name": "expression_decomposed_sum",
      "fields": ["column_a"],
      "initialValue": "0.0",
      "initialCombineValue": "<DOUBLE>[]",
      "fold": "__acc + column_a",
      "combine": "array_concat(__acc, expression_decomposed_sum)",
      "finalize": "fold((x, __acc) -> x + __acc, o, 0.0)"
    }

Variably sized results such as string and array types are controlled with a maximum byte setting. Unlike some other aggregators like string first/last, the expression aggregator will currently fail if the size grows too large instead of silently truncating.

Since array types can get quite large, i've added array_set_add and array_set_add_all which allows working with array types as sets so that they can contain only unique values (which also allows for modeling things such as use of distinct keyword with SQL functions such as ARRAY_AGG and GROUP_CONCAT).

I have not actually documented the native expression aggregator itself in this PR yet, because I am unsure if it is ready for large scale use quite yet, but would consider adding it to the native querying docs perhaps with a disclaimer.


Key changed/added classes in this PR
  • ExprEval
  • Parser
  • ExpressionLambdaAggregatorFactory
  • ExpressionLambdaAggregator
  • ExpressionLambdaBufferAggregator

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@lgtm-com
Copy link

lgtm-com bot commented Apr 13, 2021

This pull request introduces 1 alert when merging 7007ac4 into a6a2758 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

@clintropolis clintropolis removed the WIP label Apr 14, 2021
@Override
public byte[] getCacheKey()
{
byte[] fieldsBytes = StringUtils.toUtf8WithNullToEmpty(String.join(",", fields));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest using CacheKeyBuilder.

switch (eval.type()) {
case LONG:
if (eval.isNumericNull()) {
buffer.put(offset, NullHandling.IS_NULL_BYTE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems reasonable to me to ignore maxSizeBytes for primitive types, but please document it at least in the javadoc.

buffer.putInt(offset, stringBytes.length);
offset += Integer.BYTES;
for (byte stringByte : stringBytes) {
buffer.put(offset++, stringByte);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why not buffer.position(offset); buffer.put(stringBytes, offset, stringBytes.length) and restoring the original position?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, I meant to switch to doing what you suggest here.. originally I was doing all cases manually but then started to switch some over to the bulk methods but I guess didn't finish all the way

}
} else {
checkMaxBytes(eval.type(), 1 + Integer.BYTES, maxSizeBytes);
buffer.putInt(offset, -1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest adding a static variable for -1 and using it.

return new String[]{null};
}

private static Class convertType(@Nullable Class existing, Class next)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some javadoc?

@@ -177,14 +177,17 @@ See javadoc of java.lang.Math for detailed explanation for each function.
| array_offset_of(arr,expr) | returns the 0 based index of the first occurrence of expr in the array, or `-1` or `null` if `druid.generic.useDefaultValueForNull=false`if no matching elements exist in the array. |
| array_ordinal_of(arr,expr) | returns the 1 based index of the first occurrence of expr in the array, or `-1` or `null` if `druid.generic.useDefaultValueForNull=false` if no matching elements exist in the array. |
| array_prepend(expr,arr) | adds expr to arr at the beginning, the resulting array type determined by the type of the array |
| array_append(arr1,expr) | appends expr to arr, the resulting array type determined by the type of the first array |
| array_append(arr,expr) | appends expr to arr, the resulting array type determined by the type of the first array |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the resulting array type determined by the type of the first array

Is this true? Should it mention how the type is determined per https://github.com/apache/druid/pull/11104/files#diff-7badc739fd6eef810cbd31950d52f28267273e129b9371beeea0bc5d125da7f7R267?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it is true. The method you linked is used when converting the values from the input binding to expressions. Within expression evaluation the types are already decided, and so the first array type dictates the output type (see the case expressions in the array function eval methods)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah cool, thanks for the explanation 👍

| array_concat(arr1,arr2) | concatenates 2 arrays, the resulting array type determined by the type of the first array |
| array_set_add(arr,expr) | adds expr to arr and converts the array to a new array composed of the unique set of elements. The resulting array type determined by the type of the array |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resulting array type determined by the type of the array

Similarly, is the type determined by inspecting all elements in the new set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, for the same reason as #11104 (comment)

@JsonProperty("combine") @Nullable final String combineExpression,
@JsonProperty("compare") @Nullable final String compareExpression,
@JsonProperty("finalize") @Nullable final String finalizeExpression,
@JsonProperty("maxSizeBytes") @Nullable final Integer maxSizeBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HumanReadableBytes?

public class ExpressionLambdaAggregatorFactory extends AggregatorFactory
{
private static final String DEFAULT_ACCUMULATOR_ID = "__acc";
private static final int DEFAULT_MAX_SIZE_BYTES = 1 << 13;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8K by default seems pretty big. Maybe 1K instead?

private final Supplier<Expr> combineExpression;
private final Supplier<Expr> compareExpression;
private final Supplier<Expr> finalizeExpression;
private final int maxSizeBytes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document that it's ignored when combinedValue is a primitive type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to have a precondition that the size is at minimum the size required to hold a long or double then we wouldn't really have to ignore it because it would be illegal

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @clintropolis.

Copy link
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the interface for the aggregator makes sense, LGTM

@jon-wei jon-wei merged commit 57ff1f9 into apache:master Apr 23, 2021
@clintropolis clintropolis deleted the lambda-the-ultimate-aggregator branch April 23, 2021 01:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants