Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integer Tuple Sketch support #10427

Merged
merged 24 commits into from May 25, 2023
Merged

Conversation

andimiller
Copy link
Contributor

This adds support for BYTES columns containing Tuple Sketches with Integer as the summary type.

The added classes currently support Sum as the semigroup, but are generic so others can be added.

Feature breakdown:

  1. Add transform functions that can be used to create Integer Tuple Sketches during ingestion, eg. toIntegerSumTupleSketch(colA, colbB, 16)
  2. Add Codecs that use the Datasketches serialization
  3. Add aggregation functions:
  • DISTINCT_COUNT_TUPLE_SKETCH will just get the estimate for the number of unique keys, same as Theta or HLL
  • DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH will merge the sketches using Sum as the semigroup and return the raw sketch
  • SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH will merge the sketches using Sum as the semigroup and estimate the sum of the value side
  • AVG_VALUES_INTEGER_SUM_TUPLE_SKETCH will merge the sketches using Sum as the semigroup and estimate the average of the value side
  1. Add ValueAggregator<_, _>s for use in StarTree indexes for all 4 above aggregations
  2. Add ValueAggregators for use in rollups for all 4 above aggregations

This adds support for `BYTES` columns containing Tuple Sketches with Integer as the summary type.

The added classes currently support `Sum` as the semigroup, but are generic so others can be added.

Feature breakdown:

1. Add transform functions that can be used to create Integer Tuple Sketches during ingestion, eg. `toIntegerSumTupleSketch(colA, colbB, 16)`
2. Add Codecs that use the Datasketches serialization
3. Add aggregation functions:
  * `DISTINCT_COUNT_TUPLE_SKETCH` will just get the estimate for the number of unique keys, same as Theta or HLL
  * `DISTINCT_COUNT_RAW_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and return the raw sketch
  * `SUM_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the sum of the value side
  * `AVG_VALUES_INTEGER_SUM_TUPLE_SKETCH` will merge the sketches using `Sum` as the semigroup and estimate the average of the value side
4. Add `ValueAggregator<_, _>`s for use in `StarTree` indexes for all 4 above aggregations
5. Add `ValueAggregator`s for use in rollups for all 4 above aggregations
@andimiller
Copy link
Contributor Author

I could do with some advice on the best place to add tests for the aggregation functions, I've been looking through the existing tests and can't find anywhere suitable

@andimiller andimiller changed the title Tuple sketch support Integer Tuple Sketch support Mar 15, 2023
@codecov-commenter
Copy link

codecov-commenter commented Mar 15, 2023

Codecov Report

Merging #10427 (1b7fe74) into master (00d3133) will decrease coverage by 56.66%.
The diff coverage is 0.00%.

@@              Coverage Diff              @@
##             master   #10427       +/-   ##
=============================================
- Coverage     70.30%   13.64%   -56.66%     
+ Complexity     6494      439     -6055     
=============================================
  Files          2158     2110       -48     
  Lines        116070   113782     -2288     
  Branches      17566    17294      -272     
=============================================
- Hits          81608    15531    -66077     
- Misses        28778    96979    +68201     
+ Partials       5684     1272     -4412     
Flag Coverage Δ
integration1 ?
integration2 ?
unittests1 ?
unittests2 13.64% <0.00%> (-0.04%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...org/apache/pinot/core/common/ObjectSerDeUtils.java 0.00% <0.00%> (-90.83%) ⬇️
...he/pinot/core/function/scalar/SketchFunctions.java 0.00% <0.00%> (-100.00%) ⬇️
...gregation/function/AggregationFunctionFactory.java 0.00% <0.00%> (-82.66%) ⬇️
...AvgValueIntegerTupleSketchAggregationFunction.java 0.00% <0.00%> (ø)
...nctCountIntegerTupleSketchAggregationFunction.java 0.00% <0.00%> (ø)
...unction/IntegerTupleSketchAggregationFunction.java 0.00% <0.00%> (ø)
...umValuesIntegerTupleSketchAggregationFunction.java 0.00% <0.00%> (ø)
...ssing/aggregator/IntegerTupleSketchAggregator.java 0.00% <0.00%> (ø)
.../processing/aggregator/ValueAggregatorFactory.java 0.00% <0.00%> (-42.86%) ⬇️
.../aggregator/IntegerTupleSketchValueAggregator.java 0.00% <0.00%> (ø)
... and 4 more

... and 1699 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more


@Override
public byte[] serializeAggregatedValue(Sketch<IntegerSummary> value) {
return CustomSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious to know if there's a reason why we have 2 ser/deser utilities (CustomSerDeUtils, ObjectSerDeUtils) ? @Jackie-Jiang

Copy link
Member

@davecromberge davecromberge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good @andimiller!

@@ -213,6 +216,8 @@ public static ObjectType getObjectType(Object value) {
return ObjectType.VarianceTuple;
} else if (value instanceof PinotFourthMoment) {
return ObjectType.PinotFourthMoment;
} else if (value instanceof org.apache.datasketches.tuple.Sketch) {
return ObjectType.IntegerTupleSketch;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a safe assumption? Is it also necessary to inspect the summary type to verify integer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now it is, but to add other types of tuple Sketch we'd need to add wrapper types, due to JVM type erasure

}
double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate();
return Double.valueOf(estimate).longValue();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the serde always deserialise bytes to a compact sketch? It could be better to use the base Sketch abstraction for cases where the sketches have been created outside the system and not compacted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can give that a go, I swapped it to all compact because I was having issues with the non-threadsafe nature of Sketch

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same question as well :).

@@ -96,6 +96,9 @@ public static class Helix {
// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
public static final int DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES = 65536;


public static final int DEFAULT_TUPLE_SKETCH_LGK = 16;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any references that can help explain this value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a comment, it's the same as the theta one above, but log 2

is.update((String) key, value);
} else if (key instanceof byte[]) {
is.update((byte[]) key, value);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case you want to validate/catch invalid types, consider throwing an IllegalStateException/illegalArg exception ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, added it for theta too and expanded the tests to cover

import org.apache.pinot.spi.data.FieldSpec.DataType;


public class IntegerTupleSketchValueAggregator implements ValueAggregator<byte[], Sketch<IntegerSummary>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the raw type (R) be Sketch, instead of byte[] here ? Looking at the other sketch implementation (DistinctCountThetaSketchValueAggregator), which has Object as the raw type, I just wanted to check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be but Sketch isn't thread-safe, and I swapped this to byte[] while hunting down some thread safety issues, I will see if I can swap it back now that I've made all the Union use thread safe

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may need to be Object, this was a good catch, doing more local testing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have tested it more locally, it is fine being byte[] because we only handle aggregated sketches

@@ -299,13 +299,21 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.KURTOSIS);
case FOURTHMOMENT:
return new FourthMomentAggregationFunction(firstArgument, FourthMomentAggregationFunction.Type.MOMENT);
case DISTINCTCOUNTTUPLESKETCH:
// mode actually doesn't matter here because we only care about keys, not values
return new DistinctCountIntegerTupleSketchAggregationFunction(arguments, IntegerSummary.Mode.Sum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason why we pass IntegerSummary.Mode.Sum as a parameter ? We are already differentiating based on the aggregation implementations IntegerTupleSketchAggregationFunction vs AvgIntegerTupleSketchAggregationFunction vs SumValuesIntegerTupleSketchAggregationFunction

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is the mode for IntegerSummary merging, all of these use Sum

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so there can be functions that can use other summary modes (min, max..) in the future.

import org.apache.pinot.segment.spi.AggregationFunctionType;


public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would composition + delegation make the APIs for Sum, Avg, distinct clearer than inheritance ? That way we know when/how IntegerTupleSketchAggregationFunction is exactly used and it'll decouple the Integer API from the rest.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've followed the way it was implemented for Theta, using the simplest one as the base and inheriting it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes makes sense to keep them consistent.

}
ArrayList<CompactSketch<IntegerSummary>> merged =
new ArrayList<>(intermediateResult1.size() + intermediateResult2.size());
merged.addAll(intermediateResult1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious - We dont want to do a union here for the merge? Im looking at DistinctCountThetaSketchAggregationFunction for reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an optimisation similar to the one used in the Theta version, where merges can be quite expensive, and it's better to delay the merge til we have a lot of sketches to combine, hence using List as the intermediate type

}
}
} catch (Exception e) {
throw new RuntimeException("Caught exception while merging Tuple Sketches", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is groupBy and not merging tuple sketches ?

byte[] value = valueArray[i];
CompactSketch<IntegerSummary> newSketch =
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize(value).compact();
for (int groupKey : groupKeysArray[i]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks exactly the same as aggregateGroupBySV except that we iterate over group keys as it can be multivalued ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

import org.apache.pinot.segment.spi.AggregationFunctionType;


public class SumValuesIntegerTupleSketchAggregationFunction extends IntegerTupleSketchAggregationFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes makes sense to keep them consistent.

}
double estimate = retainedTotal / union.getResult().getRetainedEntries() * union.getResult().getEstimate();
return Double.valueOf(estimate).longValue();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same question as well :).

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM
Can you please rebase and resolve the conflict, and also respond to the pending comments?

@Jackie-Jiang Jackie-Jiang added feature release-notes Referenced by PRs that need attention when compiling the next release notes labels Apr 26, 2023
@swaminathanmanish
Copy link
Contributor

Thanks for taking care of comments !

@mayankshriv mayankshriv merged commit ded7e8f into apache:master May 25, 2023
14 checks passed
aidar-stripe pushed a commit to aidar-stripe/pinot that referenced this pull request Nov 22, 2023
…pache#250)

### Notify
cc stripe-private-oss-forks/pinot-reviewers


### Summary
For upstream pulls, please list all the PRs of interest and risk

Interesting commits:
- apache#10598 : better null handling in transform functions
- apache#10757: table configs has an updated version?
- apache#10766 - ideal state compression is on for findata and rad clusters.
- apache#10643 - new perecentile agg function - better error rates then t-digest
- apache#10687 - refactoring + new mutable index spi - will require specific commit to fix our HLL implementation + proper testing in QA
- apache#10784 (@dang contributed): making sure servers wait the full time they need to wait before shutting down
- apache#10785 - allow env var substitution for pinot configs
- apache#10427 - another new agg function - integer sketch support

### Motivation
biweekly upstream pull

### Testing
Specifics to include:
- diff some test table configs, see if anything changes
- HLL pre-agg and big decimal tests

Table config diff testing + Big decimal / HLL testing: https://docs.google.com/document/d/1E8X_ARM_m9VtecRhoscOztZ_cPTZ6aV2YqjEJ4n9PaM/edit?usp=sharing

Upstream pull load testing: https://docs.google.com/document/d/1GmCiHhaFP8HVVaoQ4t_3a2SqF9JE6RtF6on8erzCX3U/edit?usp=sharing

### Rollout/monitoring/revert plan
rollout to prod sandbox, perform load testing, then roll out to a400 clusters and a200 next day

(Squashed by Merge Queue - Original PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/250)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature release-notes Referenced by PRs that need attention when compiling the next release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants