From 9ac773b8ae7c7dd5ee8c866af6a9f6946b396527 Mon Sep 17 00:00:00 2001 From: Sai Valla Date: Thu, 2 Apr 2026 13:50:08 -0700 Subject: [PATCH 1/7] feat: add PercentileTDigest support for MergeAndRollup aggregation Add PercentileTDigestAggregator for minion merge/rollup tasks, enabling TDigest sketch merging with configurable compression factor. - New PercentileTDigestAggregator implementing ValueAggregator - Register PERCENTILETDIGEST and PERCENTILERAWTDIGEST in ValueAggregatorFactory - Add both types to AVAILABLE_CORE_VALUE_AGGREGATORS in MinionConstants - Allow compressionFactor in MergeRollupTaskGenerator validation - Unit tests for the aggregator with default and custom compression - Integration test exercising the full MergeRollupTaskExecutor pipeline with TDigest BYTES columns, multiple dimension groups, cross-segment merging, skewed distributions, and edge cases --- .../pinot/core/common/MinionConstants.java | 2 +- .../PercentileTDigestAggregator.java | 48 ++ .../aggregator/ValueAggregatorFactory.java | 3 + .../PercentileTDigestAggregatorTest.java | 89 +++ .../mergerollup/MergeRollupTaskGenerator.java | 9 +- .../MergeRollupTDigestTaskExecutorTest.java | 543 ++++++++++++++++++ .../MergeRollupTaskGeneratorTest.java | 38 ++ 7 files changed, 728 insertions(+), 4 deletions(-) create mode 100644 pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java create mode 100644 pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java create mode 100644 pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTDigestTaskExecutorTest.java diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 9e059e984175..16e94670f75d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -189,7 +189,7 @@ public static class RealtimeToOfflineSegmentsTask extends MergeTask { DISTINCTCOUNTRAWTHETASKETCH, DISTINCTCOUNTTUPLESKETCH, DISTINCTCOUNTRAWINTEGERSUMTUPLESKETCH, SUMVALUESINTEGERSUMTUPLESKETCH, AVGVALUEINTEGERSUMTUPLESKETCH, DISTINCTCOUNTHLLPLUS, DISTINCTCOUNTRAWHLLPLUS, DISTINCTCOUNTCPCSKETCH, DISTINCTCOUNTRAWCPCSKETCH, DISTINCTCOUNTULL, - DISTINCTCOUNTRAWULL, PERCENTILEKLL, PERCENTILERAWKLL); + DISTINCTCOUNTRAWULL, PERCENTILEKLL, PERCENTILERAWKLL, PERCENTILETDIGEST, PERCENTILERAWTDIGEST); } // Generate segment and push to controller based on batch ingestion configs diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java new file mode 100644 index 000000000000..8672eca901d6 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.aggregator; + +import com.tdunning.math.stats.TDigest; +import java.util.Map; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggregationFunction; +import org.apache.pinot.segment.spi.Constants; + + +public class PercentileTDigestAggregator implements ValueAggregator { + + @Override + public Object aggregate(Object value1, Object value2, Map functionParameters) { + String compressionParam = functionParameters.get(Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY); + + int compression; + if (compressionParam != null) { + compression = Integer.parseInt(compressionParam); + } else { + compression = PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION; + } + + TDigest first = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value1); + TDigest second = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value2); + TDigest merged = TDigest.createMergingDigest(compression); + merged.add(first); + merged.add(second); + return ObjectSerDeUtils.TDIGEST_SER_DE.serialize(merged); + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java index d126cad0d536..abc8acc449ca 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregatorFactory.java @@ -64,6 +64,9 @@ public static ValueAggregator getValueAggregator(AggregationFunctionType aggrega case PERCENTILEKLL: case PERCENTILERAWKLL: return new PercentileKLLSketchAggregator(); + case PERCENTILETDIGEST: + case PERCENTILERAWTDIGEST: + return new PercentileTDigestAggregator(); default: throw new IllegalStateException("Unsupported aggregation type: " + aggregationType); } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java new file mode 100644 index 000000000000..6853c9071ad5 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.processing.aggregator; + +import com.tdunning.math.stats.TDigest; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.segment.spi.Constants; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + + +public class PercentileTDigestAggregatorTest { + + private PercentileTDigestAggregator _aggregator; + + @BeforeMethod + public void setUp() { + _aggregator = new PercentileTDigestAggregator(); + } + + @Test + public void testAggregateWithDefaultCompression() { + TDigest first = TDigest.createMergingDigest(100); + for (int i = 0; i < 100; i++) { + first.add(i); + } + TDigest second = TDigest.createMergingDigest(100); + for (int i = 100; i < 200; i++) { + second.add(i); + } + + byte[] value1 = ObjectSerDeUtils.TDIGEST_SER_DE.serialize(first); + byte[] value2 = ObjectSerDeUtils.TDIGEST_SER_DE.serialize(second); + + Map functionParameters = new HashMap<>(); + byte[] result = (byte[]) _aggregator.aggregate(value1, value2, functionParameters); + + TDigest resultDigest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(result); + assertNotNull(resultDigest); + assertEquals(resultDigest.size(), 200); + assertEquals(resultDigest.quantile(0.5), 99.5, 1.0); + } + + @Test + public void testAggregateWithCustomCompression() { + TDigest first = TDigest.createMergingDigest(100); + for (int i = 0; i < 50; i++) { + first.add(i); + } + TDigest second = TDigest.createMergingDigest(100); + for (int i = 50; i < 100; i++) { + second.add(i); + } + + byte[] value1 = ObjectSerDeUtils.TDIGEST_SER_DE.serialize(first); + byte[] value2 = ObjectSerDeUtils.TDIGEST_SER_DE.serialize(second); + + Map functionParameters = new HashMap<>(); + functionParameters.put(Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY, "200"); + + byte[] result = (byte[]) _aggregator.aggregate(value1, value2, functionParameters); + + TDigest resultDigest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(result); + assertNotNull(resultDigest); + assertEquals(resultDigest.size(), 100); + assertEquals(resultDigest.quantile(0.5), 49.5, 1.0); + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java index 8748cb133899..b6f6b6dda217 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java @@ -504,7 +504,8 @@ public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map allowedFunctionParameterNames = ImmutableSet.of(Constants.CPCSKETCH_LGK_KEY.toLowerCase(), Constants.THETA_TUPLE_SKETCH_SAMPLING_PROBABILITY.toLowerCase(), - Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES.toLowerCase()); + Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES.toLowerCase(), + Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY.toLowerCase()); Map> aggregationFunctionParameters = MergeRollupTaskUtils.getAggregationFunctionParameters(taskConfigs); for (String fieldName : aggregationFunctionParameters.keySet()) { @@ -515,10 +516,12 @@ public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map + * Data layout across 4 segments with 3 dimension groups: + * Segment 0: group1=[1..100], group2=[500..599] + * Segment 1: group1=[101..200], group2=[600..699], group3=[1..50] + * Segment 2: group1=[201..300], group3=[51..150] + * Segment 3: group2=[700..799], group3=[151..200] + *

+ * After rollup: + * group1: 300 values [1..300], P50 ~ 150 + * group2: 300 values [500..799], P50 ~ 649 + * group3: 200 values [1..200], P50 ~ 100 + */ +public class MergeRollupTDigestTaskExecutorTest { + private static final File TEMP_DIR = + new File(FileUtils.getTempDirectory(), "MergeRollupTDigestTaskExecutorTest"); + private static final File ORIGINAL_SEGMENT_DIR = new File(TEMP_DIR, "originalSegment"); + private static final String TABLE_NAME = "tdigest_table"; + private static final String DIMENSION_COL = "groupKey"; + private static final String TDIGEST_COL = "tdigest_metric"; + private static final int DEFAULT_COMPRESSION = 100; + + private static final String GROUP_1 = "latency-group1"; + private static final String GROUP_2 = "latency-group2"; + private static final String GROUP_3 = "latency-group3"; + + private TableConfig _tableConfig; + private Schema _schema; + private int _workingDirCounter = 0; + + @BeforeClass + public void setUp() throws Exception { + FileUtils.deleteDirectory(TEMP_DIR); + _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build(); + _schema = new Schema.SchemaBuilder() + .addSingleValueDimension(DIMENSION_COL, FieldSpec.DataType.STRING) + .addMetric(TDIGEST_COL, FieldSpec.DataType.BYTES) + .build(); + + MinionContext minionContext = MinionContext.getInstance(); + //noinspection unchecked + ZkHelixPropertyStore helixPropertyStore = Mockito.mock(ZkHelixPropertyStore.class); + Mockito.when(helixPropertyStore + .get("/CONFIGS/TABLE/" + TABLE_NAME + "_OFFLINE", null, AccessOption.PERSISTENT)) + .thenReturn(TableConfigUtils.toZNRecord(_tableConfig)); + Mockito.when(helixPropertyStore + .get("/SCHEMAS/" + TABLE_NAME, null, AccessOption.PERSISTENT)) + .thenReturn(SchemaUtils.toZNRecord(_schema)); + minionContext.setHelixPropertyStore(helixPropertyStore); + } + + @AfterClass + public void tearDown() throws Exception { + FileUtils.deleteDirectory(TEMP_DIR); + } + + /** + * Verifies that 3 distinct dimension groups across 4 segments are independently merged, + * each with correct count and quantile estimates (P0, P50, P99, P100). + */ + @Test + public void testMultipleDimensionGroupsIndependentMerge() throws Exception { + List segmentDirs = buildSegments(buildStandardSegmentData()); + List results = runExecutor(segmentDirs, null); + + Assert.assertEquals(results.size(), 1); + File mergedSegment = results.get(0).getFile(); + + // 3 distinct dimension keys => 3 rows after rollup + SegmentMetadataImpl metadata = new SegmentMetadataImpl(mergedSegment); + Assert.assertEquals(metadata.getTotalDocs(), 3); + + Map digests = readMergedTDigests(mergedSegment); + Assert.assertEquals(digests.size(), 3); + + // group1: 300 values [1..300] + TDigest digest1 = digests.get(GROUP_1); + Assert.assertNotNull(digest1); + Assert.assertEquals(digest1.size(), 300L); + Assert.assertEquals(digest1.quantile(0.0), 1.0, 1.0); // P0 ~ 1 + Assert.assertEquals(digest1.quantile(0.5), 150, 5.0); // P50 ~ 150.5 + Assert.assertEquals(digest1.quantile(0.99), 297.0, 5.0); // P99 ~ 297 + Assert.assertEquals(digest1.quantile(1.0), 300.0, 1.0); // P100 ~ 300 + + // group2: 300 values [500..799] + TDigest digest2 = digests.get(GROUP_2); + Assert.assertNotNull(digest2); + Assert.assertEquals(digest2.size(), 300L); + Assert.assertEquals(digest2.quantile(0.0), 500.0, 1.0); // P0 ~ 500 + Assert.assertEquals(digest2.quantile(0.5), 649, 5.0); // P50 ~ 649.5 + Assert.assertEquals(digest2.quantile(0.99), 796.0, 5.0); // P99 ~ 796 + Assert.assertEquals(digest2.quantile(1.0), 799.0, 1.0); // P100 ~ 799 + + // group3: 200 values [1..200] + TDigest digest3 = digests.get(GROUP_3); + Assert.assertNotNull(digest3); + Assert.assertEquals(digest3.size(), 200L); + Assert.assertEquals(digest3.quantile(0.0), 1.0, 1.0); // P0 ~ 1 + Assert.assertEquals(digest3.quantile(0.5), 100, 5.0); // P50 ~ 100.5 + Assert.assertEquals(digest3.quantile(0.99), 198.0, 5.0); // P99 ~ 198 + Assert.assertEquals(digest3.quantile(1.0), 200.0, 1.0); // P100 ~ 200 + } + + /** + * Verifies cross-segment merging: the same dimension key appears in multiple segments + * and all values are correctly aggregated. + * + * group1 appears in segments 0, 1, 2; group2 in segments 0, 1, 3; group3 in segments 1, 2, 3. + */ + @Test + public void testCrossSegmentMerging() throws Exception { + List segmentDirs = buildSegments(buildStandardSegmentData()); + List results = runExecutor(segmentDirs, null); + + Assert.assertEquals(results.size(), 1); + Map digests = readMergedTDigests(results.get(0).getFile()); + + // group1 came from 3 segments (100+100+100=300 values) + TDigest digest1 = digests.get(GROUP_1); + Assert.assertEquals(digest1.size(), 300L); + // Verify min and max span the full range + Assert.assertTrue(digest1.quantile(0.0) <= 2.0, "Min should be close to 1"); + Assert.assertTrue(digest1.quantile(1.0) >= 299.0, "Max should be close to 300"); + + // group2 came from 3 segments (100+100+100=300 values) + TDigest digest2 = digests.get(GROUP_2); + Assert.assertEquals(digest2.size(), 300L); + Assert.assertTrue(digest2.quantile(0.0) <= 501.0, "Min should be close to 500"); + Assert.assertTrue(digest2.quantile(1.0) >= 798.0, "Max should be close to 799"); + + // group3 came from 3 segments (50+100+50=200 values) + TDigest digest3 = digests.get(GROUP_3); + Assert.assertEquals(digest3.size(), 200L); + Assert.assertTrue(digest3.quantile(0.0) <= 2.0, "Min should be close to 1"); + Assert.assertTrue(digest3.quantile(1.0) >= 199.0, "Max should be close to 200"); + } + + /** + * Passes a custom compressionFactor via function parameters and verifies merge succeeds + * with correct results. + */ + @Test + public void testCustomCompressionFactor() throws Exception { + List segmentDirs = buildSegments(buildStandardSegmentData()); + + Map extraConfigs = new HashMap<>(); + extraConfigs.put("daily." + MinionConstants.MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + + TDIGEST_COL + ".compressionFactor", "200"); + + List results = runExecutor(segmentDirs, extraConfigs); + + Assert.assertEquals(results.size(), 1); + File mergedSegment = results.get(0).getFile(); + SegmentMetadataImpl metadata = new SegmentMetadataImpl(mergedSegment); + Assert.assertEquals(metadata.getTotalDocs(), 3); + + Map digests = readMergedTDigests(mergedSegment); + + // With higher compression, quantile estimates should still be accurate + TDigest digest1 = digests.get(GROUP_1); + Assert.assertEquals(digest1.size(), 300L); + Assert.assertEquals(digest1.quantile(0.5), 150, 3.0); + + TDigest digest2 = digests.get(GROUP_2); + Assert.assertEquals(digest2.size(), 300L); + Assert.assertEquals(digest2.quantile(0.5), 649, 3.0); + + TDigest digest3 = digests.get(GROUP_3); + Assert.assertEquals(digest3.size(), 200L); + Assert.assertEquals(digest3.quantile(0.5), 100, 3.0); + } + + /** + * Uses realistic latency value ranges (1ms to 10000ms) with varying distributions per group + * to verify quantile accuracy on wider data ranges. + */ + @Test + public void testLargeValueRange() throws Exception { + List> segments = new ArrayList<>(); + + // Segment 0: group1 [1..500], group2 [5000..10000] + List seg0 = new ArrayList<>(); + seg0.add(makeRow(GROUP_1, createTDigest(1, 501))); + seg0.add(makeRow(GROUP_2, createTDigest(5000, 10001))); + segments.add(seg0); + + // Segment 1: group1 [501..1000], group3 spread [1..10000] step=10 + List seg1 = new ArrayList<>(); + seg1.add(makeRow(GROUP_1, createTDigest(501, 1001))); + TDigest group3Digest = TDigest.createMergingDigest(DEFAULT_COMPRESSION); + for (int v = 1; v <= 10000; v += 10) { + group3Digest.add(v); + } + seg1.add(makeRow(GROUP_3, group3Digest)); + segments.add(seg1); + + List segmentDirs = buildSegments(segments); + List results = runExecutor(segmentDirs, null); + + Assert.assertEquals(results.size(), 1); + Map digests = readMergedTDigests(results.get(0).getFile()); + + // group1: 1000 values [1..1000], P50 ~ 500 + TDigest digest1 = digests.get(GROUP_1); + Assert.assertEquals(digest1.size(), 1000L); + Assert.assertEquals(digest1.quantile(0.5), 500, 10.0); + Assert.assertEquals(digest1.quantile(0.0), 1.0, 1.0); + Assert.assertEquals(digest1.quantile(1.0), 1000.0, 1.0); + + // group2: 5001 values [5000..10000], P50 ~ 7500 + TDigest digest2 = digests.get(GROUP_2); + Assert.assertEquals(digest2.size(), 5001L); + Assert.assertEquals(digest2.quantile(0.5), 7500.0, 50.0); + + // group3: 1000 values [1,11,21,...,9991], P50 ~ 5000 + TDigest digest3 = digests.get(GROUP_3); + Assert.assertEquals(digest3.size(), 1000L); + Assert.assertEquals(digest3.quantile(0.5), 5000.0, 100.0); + } + + /** + * Verifies that an empty TDigest (no values added) is handled correctly during merge. + * The merged result for that group should reflect only the non-empty input. + */ + @Test + public void testEmptyTDigestHandling() throws Exception { + List> segments = new ArrayList<>(); + + // Segment 0: group with an empty TDigest + List seg0 = new ArrayList<>(); + TDigest emptyDigest = TDigest.createMergingDigest(DEFAULT_COMPRESSION); + seg0.add(makeRow(GROUP_1, emptyDigest)); + segments.add(seg0); + + // Segment 1: group with real values + List seg1 = new ArrayList<>(); + seg1.add(makeRow(GROUP_1, createTDigest(1, 101))); + segments.add(seg1); + + List segmentDirs = buildSegments(segments); + List results = runExecutor(segmentDirs, null); + + Assert.assertEquals(results.size(), 1); + SegmentMetadataImpl metadata = new SegmentMetadataImpl(results.get(0).getFile()); + Assert.assertEquals(metadata.getTotalDocs(), 1); + + Map digests = readMergedTDigests(results.get(0).getFile()); + TDigest digest1 = digests.get(GROUP_1); + Assert.assertNotNull(digest1); + // The merged digest should contain the 100 values from the non-empty input + Assert.assertEquals(digest1.size(), 100L); + Assert.assertEquals(digest1.quantile(0.5), 50, 2.0); + } + + /** + * Verifies merge works correctly with TDigests that contain exactly 1 value each. + */ + @Test + public void testSingleValueTDigest() throws Exception { + List> segments = new ArrayList<>(); + + // Segment 0: single value 42 + List seg0 = new ArrayList<>(); + TDigest single0 = TDigest.createMergingDigest(DEFAULT_COMPRESSION); + single0.add(42); + seg0.add(makeRow(GROUP_1, single0)); + segments.add(seg0); + + // Segment 1: single value 58 + List seg1 = new ArrayList<>(); + TDigest single1 = TDigest.createMergingDigest(DEFAULT_COMPRESSION); + single1.add(58); + seg1.add(makeRow(GROUP_1, single1)); + segments.add(seg1); + + // Segment 2: single value 100 for a different group + List seg2 = new ArrayList<>(); + TDigest single2 = TDigest.createMergingDigest(DEFAULT_COMPRESSION); + single2.add(100); + seg2.add(makeRow(GROUP_2, single2)); + segments.add(seg2); + + List segmentDirs = buildSegments(segments); + List results = runExecutor(segmentDirs, null); + + Assert.assertEquals(results.size(), 1); + SegmentMetadataImpl metadata = new SegmentMetadataImpl(results.get(0).getFile()); + Assert.assertEquals(metadata.getTotalDocs(), 2); + + Map digests = readMergedTDigests(results.get(0).getFile()); + + // group1: 2 values (42, 58) + TDigest digest1 = digests.get(GROUP_1); + Assert.assertNotNull(digest1); + Assert.assertEquals(digest1.size(), 2L); + Assert.assertEquals(digest1.quantile(0.0), 42.0, 0.1); + Assert.assertEquals(digest1.quantile(1.0), 58.0, 0.1); + Assert.assertEquals(digest1.quantile(0.5), 50.0, 8.1); // median of 42 and 58 + + // group2: 1 value (100) + TDigest digest2 = digests.get(GROUP_2); + Assert.assertNotNull(digest2); + Assert.assertEquals(digest2.size(), 1L); + Assert.assertEquals(digest2.quantile(0.5), 100.0, 0.1); + } + + /** + * Tests skewed distributions: one group has mostly small values, another mostly large. + * Verifies quantiles are correct after merge. + */ + @Test + public void testSkewedDistributions() throws Exception { + List> segments = new ArrayList<>(); + + // Group 1 is heavily skewed low: 900 values in [1..10] and 100 values in [1000..1100] + // Group 2 is heavily skewed high: 100 values in [1..100] and 900 values in [9000..9900] + List seg0 = new ArrayList<>(); + TDigest digest1Low = TDigest.createMergingDigest(DEFAULT_COMPRESSION); + for (int i = 0; i < 900; i++) { + digest1Low.add(1 + (i % 10)); // values 1-10, repeated + } + seg0.add(makeRow(GROUP_1, digest1Low)); + + TDigest digest2Low = TDigest.createMergingDigest(DEFAULT_COMPRESSION); + for (int i = 1; i <= 100; i++) { + digest2Low.add(i); + } + seg0.add(makeRow(GROUP_2, digest2Low)); + segments.add(seg0); + + List seg1 = new ArrayList<>(); + TDigest digest1High = TDigest.createMergingDigest(DEFAULT_COMPRESSION); + for (int i = 1000; i < 1100; i++) { + digest1High.add(i); + } + seg1.add(makeRow(GROUP_1, digest1High)); + + TDigest digest2High = TDigest.createMergingDigest(DEFAULT_COMPRESSION); + for (int i = 0; i < 900; i++) { + digest2High.add(9000 + (i % 900)); + } + seg1.add(makeRow(GROUP_2, digest2High)); + segments.add(seg1); + + List segmentDirs = buildSegments(segments); + List results = runExecutor(segmentDirs, null); + + Assert.assertEquals(results.size(), 1); + Map digests = readMergedTDigests(results.get(0).getFile()); + + // group1: 1000 total, 90% are in [1..10] + TDigest digest1 = digests.get(GROUP_1); + Assert.assertEquals(digest1.size(), 1000L); + // P50 should be in the low range [1..10] since 90% of values are there + Assert.assertTrue(digest1.quantile(0.5) <= 10.0, + "P50 should be in [1..10] since 90% of values are there, got: " + digest1.quantile(0.5)); + // P80 should still be in the low range (900 of 1000 are low) + Assert.assertTrue(digest1.quantile(0.80) <= 10.0, + "P80 should be in [1..10], got: " + digest1.quantile(0.80)); + // P100 should be near 1099 + Assert.assertEquals(digest1.quantile(1.0), 1099.0, 1.0); + + // group2: 1000 total, 90% are in [9000..9899] + TDigest digest2 = digests.get(GROUP_2); + Assert.assertEquals(digest2.size(), 1000L); + // P50 should be in the high range since 90% of values are there + Assert.assertTrue(digest2.quantile(0.5) >= 9000.0, + "P50 should be >= 9000 since 90% of values are there, got: " + digest2.quantile(0.5)); + // P0 should be near 1 + Assert.assertEquals(digest2.quantile(0.0), 1.0, 1.0); + } + + /** + * After rollup, the total doc count must equal the number of distinct dimension keys. + */ + @Test + public void testSegmentDocCountEqualsDistinctKeys() throws Exception { + List segmentDirs = buildSegments(buildStandardSegmentData()); + List results = runExecutor(segmentDirs, null); + + Assert.assertEquals(results.size(), 1); + SegmentMetadataImpl metadata = new SegmentMetadataImpl(results.get(0).getFile()); + // 3 distinct dimension keys: latency-group1, latency-group2, latency-group3 + Assert.assertEquals(metadata.getTotalDocs(), 3, + "Rollup doc count should equal the number of distinct dimension keys"); + } + + private static TDigest createTDigest(final int start, final int end) { + TDigest tDigest = TDigest.createMergingDigest(MergeRollupTDigestTaskExecutorTest.DEFAULT_COMPRESSION); + for (int v = start; v < end; v++) { + tDigest.add(v); + } + return tDigest; + } + + private static GenericRow makeRow(String dimensionValue, TDigest tDigest) { + GenericRow row = new GenericRow(); + row.putValue(DIMENSION_COL, dimensionValue); + row.putValue(TDIGEST_COL, ObjectSerDeUtils.TDIGEST_SER_DE.serialize(tDigest)); + return row; + } + + private List buildSegments(List> segmentRows) throws Exception { + List segmentDirs = new ArrayList<>(); + for (int i = 0; i < segmentRows.size(); i++) { + String segmentName = TABLE_NAME + "_seg" + i + "_" + System.nanoTime(); + RecordReader recordReader = new GenericRowRecordReader(segmentRows.get(i)); + SegmentGeneratorConfig config = new SegmentGeneratorConfig(_tableConfig, _schema); + config.setOutDir(ORIGINAL_SEGMENT_DIR.getPath()); + config.setTableName(TABLE_NAME); + config.setSegmentName(segmentName); + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, recordReader); + driver.build(); + segmentDirs.add(new File(ORIGINAL_SEGMENT_DIR, segmentName)); + } + return segmentDirs; + } + + private List runExecutor(final List segmentDirs, + final Map extraConfigs) throws Exception { + File workingDir = new File(TEMP_DIR, "workingDir_" + (_workingDirCounter++)); + MergeRollupTaskExecutor executor = new MergeRollupTaskExecutor(new MinionConf()); + executor.setMinionEventObserver(MinionTaskTestUtils.getMinionProgressObserver()); + + Map configs = new HashMap<>(); + configs.put(MinionConstants.TABLE_NAME_KEY, TABLE_NAME + "_OFFLINE"); + configs.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, "daily"); + configs.put(MinionConstants.MergeTask.MERGE_TYPE_KEY, "rollup"); + configs.put(TDIGEST_COL + MinionConstants.MergeTask.AGGREGATION_TYPE_KEY_SUFFIX, "percentiletdigest"); + if (extraConfigs != null) { + configs.putAll(extraConfigs); + } + + PinotTaskConfig pinotTaskConfig = new PinotTaskConfig(MinionConstants.MergeRollupTask.TASK_TYPE, configs); + return executor.convert(pinotTaskConfig, segmentDirs, workingDir); + } + + private Map readMergedTDigests(final File mergedSegmentDir) throws Exception { + Map result = new HashMap<>(); + PinotSegmentRecordReader reader = new PinotSegmentRecordReader(); + reader.init(mergedSegmentDir, null, null, true); + while (reader.hasNext()) { + GenericRow row = reader.next(); + String key = (String) row.getValue(DIMENSION_COL); + byte[] bytes = (byte[]) row.getValue(TDIGEST_COL); + TDigest digest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(bytes); + result.put(key, digest); + } + reader.close(); + return result; + } + + private static List> buildStandardSegmentData() { + List> segments = new ArrayList<>(); + + // Segment 0: group1=[1..100], group2=[500..600] + List seg0 = new ArrayList<>(); + seg0.add(makeRow(GROUP_1, createTDigest(1, 101))); + seg0.add(makeRow(GROUP_2, createTDigest(500, 600))); + segments.add(seg0); + + // Segment 1: group1=[101..200], group2=[600..699], group3=[1..50] + List seg1 = new ArrayList<>(); + seg1.add(makeRow(GROUP_1, createTDigest(101, 201))); + seg1.add(makeRow(GROUP_2, createTDigest(600, 700))); + seg1.add(makeRow(GROUP_3, createTDigest(1, 51))); + segments.add(seg1); + + // Segment 2: group1=[201..300], group3=[51..150] + List seg2 = new ArrayList<>(); + seg2.add(makeRow(GROUP_1, createTDigest(201, 301))); + seg2.add(makeRow(GROUP_3, createTDigest(51, 151))); + segments.add(seg2); + + // Segment 3: group2=[700..799], group3=[151..200] + List seg3 = new ArrayList<>(); + seg3.add(makeRow(GROUP_2, createTDigest(700, 800))); + seg3.add(makeRow(GROUP_3, createTDigest(151, 201))); + segments.add(seg3); + + return segments; + } +} diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java index 2b376219d662..35399629b48c 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java @@ -234,6 +234,44 @@ public void testInvalidLgK() { }); } + @Test + public void testValidCompressionFactor() { + MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator(); + Schema schema = new Schema(); + schema.addField(new MetricFieldSpec("a", FieldSpec.DataType.BYTES)); + + String mergeLevel = "hourly"; + String prefix = mergeLevel + "." + MinionConstants.MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX; + + Map validConfig = new HashMap<>(); + validConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel); + validConfig.put(prefix + "a.compressionFactor", "200"); + TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.MergeRollupTask.TASK_TYPE, validConfig))) + .build(); + taskGenerator.validateTaskConfigs(offlineTableConfig, schema, validConfig); + } + + @Test + public void testInvalidCompressionFactor() { + MergeRollupTaskGenerator taskGenerator = new MergeRollupTaskGenerator(); + Schema schema = new Schema(); + schema.addField(new MetricFieldSpec("a", FieldSpec.DataType.BYTES)); + + String mergeLevel = "hourly"; + String prefix = mergeLevel + "." + MinionConstants.MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX; + + Map invalidConfig = new HashMap<>(); + invalidConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel); + invalidConfig.put(prefix + "a.compressionFactor", "0"); + TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) + .setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.MergeRollupTask.TASK_TYPE, invalidConfig))) + .build(); + assertThrows(IllegalStateException.class, () -> { + taskGenerator.validateTaskConfigs(offlineTableConfig, schema, invalidConfig); + }); + } + /** * Tests for some config checks */ From d556a16982ada1cf409f91c5e137b65ed25dcc68 Mon Sep 17 00:00:00 2001 From: Sai Valla Date: Thu, 2 Apr 2026 20:57:33 -0700 Subject: [PATCH 2/7] fix: compilation errors and add empty byte[] guard to PercentileTDigestAggregator - Fix MergeRollupTDigestTaskExecutorTest: wrong imports for SchemaUtils and TableConfigUtils (should be SchemaSerDeUtils/TableConfigSerDeUtils) - Fix MergeRollupTaskGeneratorTest: missing ImmutableMap import - Add empty byte[] handling to PercentileTDigestAggregator to prevent BufferUnderflowException when BYTES columns have default null value (byte[0]). Follows same pattern as DistinctCountCPCSketchAggregator. - Add unit tests for empty byte[] aggregation cases - Clean up test assertion style: remove unnecessary .0 suffixes --- .../PercentileTDigestAggregator.java | 31 ++++++--- .../PercentileTDigestAggregatorTest.java | 54 ++++++++++++++- .../MergeRollupTDigestTaskExecutorTest.java | 68 +++++++++---------- .../MergeRollupTaskGeneratorTest.java | 1 + 4 files changed, 110 insertions(+), 44 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java index 8672eca901d6..75524eb2b53e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java @@ -29,20 +29,35 @@ public class PercentileTDigestAggregator implements ValueAggregator { @Override public Object aggregate(Object value1, Object value2, Map functionParameters) { - String compressionParam = functionParameters.get(Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY); + byte[] bytes1 = (byte[]) value1; + byte[] bytes2 = (byte[]) value2; - int compression; - if (compressionParam != null) { - compression = Integer.parseInt(compressionParam); - } else { - compression = PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION; + // Empty byte arrays represent the default null value for BYTES columns. + // Deserializing byte[0] would throw BufferUnderflowException, so handle it explicitly. + if (bytes1.length == 0 && bytes2.length == 0) { + int compression = getCompression(functionParameters); + return ObjectSerDeUtils.TDIGEST_SER_DE.serialize(TDigest.createMergingDigest(compression)); + } + if (bytes1.length == 0) { + return bytes2; + } + if (bytes2.length == 0) { + return bytes1; } - TDigest first = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value1); - TDigest second = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize((byte[]) value2); + int compression = getCompression(functionParameters); + TDigest first = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(bytes1); + TDigest second = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(bytes2); TDigest merged = TDigest.createMergingDigest(compression); merged.add(first); merged.add(second); return ObjectSerDeUtils.TDIGEST_SER_DE.serialize(merged); } + + private int getCompression(Map functionParameters) { + String compressionParam = functionParameters.get(Constants.PERCENTILETDIGEST_COMPRESSION_FACTOR_KEY); + return compressionParam != null + ? Integer.parseInt(compressionParam) + : PercentileTDigestAggregationFunction.DEFAULT_TDIGEST_COMPRESSION; + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java index 6853c9071ad5..5abb3ebf74b4 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java @@ -59,7 +59,7 @@ public void testAggregateWithDefaultCompression() { TDigest resultDigest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(result); assertNotNull(resultDigest); assertEquals(resultDigest.size(), 200); - assertEquals(resultDigest.quantile(0.5), 99.5, 1.0); + assertEquals(resultDigest.quantile(0.5), 99.5, 1); } @Test @@ -84,6 +84,56 @@ public void testAggregateWithCustomCompression() { TDigest resultDigest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(result); assertNotNull(resultDigest); assertEquals(resultDigest.size(), 100); - assertEquals(resultDigest.quantile(0.5), 49.5, 1.0); + assertEquals(resultDigest.quantile(0.5), 49.5, 1); + } + + @Test + public void testAggregateWithBothEmptyBytes() { + byte[] empty1 = new byte[0]; + byte[] empty2 = new byte[0]; + + Map functionParameters = new HashMap<>(); + byte[] result = (byte[]) _aggregator.aggregate(empty1, empty2, functionParameters); + + // Should return a valid serialized empty TDigest, not crash + TDigest resultDigest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(result); + assertNotNull(resultDigest); + assertEquals(resultDigest.size(), 0); + } + + @Test + public void testAggregateWithFirstEmptyBytes() { + TDigest second = TDigest.createMergingDigest(100); + for (int i = 0; i < 50; i++) { + second.add(i); + } + byte[] empty = new byte[0]; + byte[] value2 = ObjectSerDeUtils.TDIGEST_SER_DE.serialize(second); + + Map functionParameters = new HashMap<>(); + byte[] result = (byte[]) _aggregator.aggregate(empty, value2, functionParameters); + + // Should return the non-empty side as-is + assertEquals(result, value2); + TDigest resultDigest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(result); + assertEquals(resultDigest.size(), 50); + } + + @Test + public void testAggregateWithSecondEmptyBytes() { + TDigest first = TDigest.createMergingDigest(100); + for (int i = 0; i < 50; i++) { + first.add(i); + } + byte[] value1 = ObjectSerDeUtils.TDIGEST_SER_DE.serialize(first); + byte[] empty = new byte[0]; + + Map functionParameters = new HashMap<>(); + byte[] result = (byte[]) _aggregator.aggregate(value1, empty, functionParameters); + + // Should return the non-empty side as-is + assertEquals(result, value1); + TDigest resultDigest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(result); + assertEquals(resultDigest.size(), 50); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTDigestTaskExecutorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTDigestTaskExecutorTest.java index 2c3f86e2d7fc..6cf5ad7a1257 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTDigestTaskExecutorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTDigestTaskExecutorTest.java @@ -28,8 +28,8 @@ import org.apache.helix.AccessOption; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; -import org.apache.pinot.common.utils.SchemaUtils; -import org.apache.pinot.common.utils.config.TableConfigUtils; +import org.apache.pinot.common.utils.config.SchemaSerDeUtils; +import org.apache.pinot.common.utils.config.TableConfigSerDeUtils; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.common.ObjectSerDeUtils; import org.apache.pinot.core.minion.PinotTaskConfig; @@ -101,10 +101,10 @@ public void setUp() throws Exception { ZkHelixPropertyStore helixPropertyStore = Mockito.mock(ZkHelixPropertyStore.class); Mockito.when(helixPropertyStore .get("/CONFIGS/TABLE/" + TABLE_NAME + "_OFFLINE", null, AccessOption.PERSISTENT)) - .thenReturn(TableConfigUtils.toZNRecord(_tableConfig)); + .thenReturn(TableConfigSerDeUtils.toZNRecord(_tableConfig)); Mockito.when(helixPropertyStore .get("/SCHEMAS/" + TABLE_NAME, null, AccessOption.PERSISTENT)) - .thenReturn(SchemaUtils.toZNRecord(_schema)); + .thenReturn(SchemaSerDeUtils.toZNRecord(_schema)); minionContext.setHelixPropertyStore(helixPropertyStore); } @@ -136,28 +136,28 @@ public void testMultipleDimensionGroupsIndependentMerge() throws Exception { TDigest digest1 = digests.get(GROUP_1); Assert.assertNotNull(digest1); Assert.assertEquals(digest1.size(), 300L); - Assert.assertEquals(digest1.quantile(0.0), 1.0, 1.0); // P0 ~ 1 - Assert.assertEquals(digest1.quantile(0.5), 150, 5.0); // P50 ~ 150.5 - Assert.assertEquals(digest1.quantile(0.99), 297.0, 5.0); // P99 ~ 297 - Assert.assertEquals(digest1.quantile(1.0), 300.0, 1.0); // P100 ~ 300 + Assert.assertEquals(digest1.quantile(0.0), 1, 1); // P0 ~ 1 + Assert.assertEquals(digest1.quantile(0.5), 150, 5); // P50 ~ 150 + Assert.assertEquals(digest1.quantile(0.99), 297, 5); // P99 ~ 297 + Assert.assertEquals(digest1.quantile(1.0), 300, 1); // P100 ~ 300 // group2: 300 values [500..799] TDigest digest2 = digests.get(GROUP_2); Assert.assertNotNull(digest2); Assert.assertEquals(digest2.size(), 300L); - Assert.assertEquals(digest2.quantile(0.0), 500.0, 1.0); // P0 ~ 500 - Assert.assertEquals(digest2.quantile(0.5), 649, 5.0); // P50 ~ 649.5 - Assert.assertEquals(digest2.quantile(0.99), 796.0, 5.0); // P99 ~ 796 - Assert.assertEquals(digest2.quantile(1.0), 799.0, 1.0); // P100 ~ 799 + Assert.assertEquals(digest2.quantile(0.0), 500, 1); // P0 ~ 500 + Assert.assertEquals(digest2.quantile(0.5), 649, 5); // P50 ~ 649 + Assert.assertEquals(digest2.quantile(0.99), 796, 5); // P99 ~ 796 + Assert.assertEquals(digest2.quantile(1.0), 799, 1); // P100 ~ 799 // group3: 200 values [1..200] TDigest digest3 = digests.get(GROUP_3); Assert.assertNotNull(digest3); Assert.assertEquals(digest3.size(), 200L); - Assert.assertEquals(digest3.quantile(0.0), 1.0, 1.0); // P0 ~ 1 - Assert.assertEquals(digest3.quantile(0.5), 100, 5.0); // P50 ~ 100.5 - Assert.assertEquals(digest3.quantile(0.99), 198.0, 5.0); // P99 ~ 198 - Assert.assertEquals(digest3.quantile(1.0), 200.0, 1.0); // P100 ~ 200 + Assert.assertEquals(digest3.quantile(0.0), 1, 1); // P0 ~ 1 + Assert.assertEquals(digest3.quantile(0.5), 100, 5); // P50 ~ 100 + Assert.assertEquals(digest3.quantile(0.99), 198, 5); // P99 ~ 198 + Assert.assertEquals(digest3.quantile(1.0), 200, 1); // P100 ~ 200 } /** @@ -218,15 +218,15 @@ public void testCustomCompressionFactor() throws Exception { // With higher compression, quantile estimates should still be accurate TDigest digest1 = digests.get(GROUP_1); Assert.assertEquals(digest1.size(), 300L); - Assert.assertEquals(digest1.quantile(0.5), 150, 3.0); + Assert.assertEquals(digest1.quantile(0.5), 150, 3); TDigest digest2 = digests.get(GROUP_2); Assert.assertEquals(digest2.size(), 300L); - Assert.assertEquals(digest2.quantile(0.5), 649, 3.0); + Assert.assertEquals(digest2.quantile(0.5), 649, 3); TDigest digest3 = digests.get(GROUP_3); Assert.assertEquals(digest3.size(), 200L); - Assert.assertEquals(digest3.quantile(0.5), 100, 3.0); + Assert.assertEquals(digest3.quantile(0.5), 100, 3); } /** @@ -262,19 +262,19 @@ public void testLargeValueRange() throws Exception { // group1: 1000 values [1..1000], P50 ~ 500 TDigest digest1 = digests.get(GROUP_1); Assert.assertEquals(digest1.size(), 1000L); - Assert.assertEquals(digest1.quantile(0.5), 500, 10.0); - Assert.assertEquals(digest1.quantile(0.0), 1.0, 1.0); - Assert.assertEquals(digest1.quantile(1.0), 1000.0, 1.0); + Assert.assertEquals(digest1.quantile(0.5), 500, 10); + Assert.assertEquals(digest1.quantile(0.0), 1, 1); + Assert.assertEquals(digest1.quantile(1.0), 1000, 1); // group2: 5001 values [5000..10000], P50 ~ 7500 TDigest digest2 = digests.get(GROUP_2); Assert.assertEquals(digest2.size(), 5001L); - Assert.assertEquals(digest2.quantile(0.5), 7500.0, 50.0); + Assert.assertEquals(digest2.quantile(0.5), 7500, 50); // group3: 1000 values [1,11,21,...,9991], P50 ~ 5000 TDigest digest3 = digests.get(GROUP_3); Assert.assertEquals(digest3.size(), 1000L); - Assert.assertEquals(digest3.quantile(0.5), 5000.0, 100.0); + Assert.assertEquals(digest3.quantile(0.5), 5000, 100); } /** @@ -308,7 +308,7 @@ public void testEmptyTDigestHandling() throws Exception { Assert.assertNotNull(digest1); // The merged digest should contain the 100 values from the non-empty input Assert.assertEquals(digest1.size(), 100L); - Assert.assertEquals(digest1.quantile(0.5), 50, 2.0); + Assert.assertEquals(digest1.quantile(0.5), 50, 2); } /** @@ -352,15 +352,15 @@ public void testSingleValueTDigest() throws Exception { TDigest digest1 = digests.get(GROUP_1); Assert.assertNotNull(digest1); Assert.assertEquals(digest1.size(), 2L); - Assert.assertEquals(digest1.quantile(0.0), 42.0, 0.1); - Assert.assertEquals(digest1.quantile(1.0), 58.0, 0.1); - Assert.assertEquals(digest1.quantile(0.5), 50.0, 8.1); // median of 42 and 58 + Assert.assertEquals(digest1.quantile(0.0), 42, 0.1); + Assert.assertEquals(digest1.quantile(1.0), 58, 0.1); + Assert.assertEquals(digest1.quantile(0.5), 50, 8.1); // median of 42 and 58 // group2: 1 value (100) TDigest digest2 = digests.get(GROUP_2); Assert.assertNotNull(digest2); Assert.assertEquals(digest2.size(), 1L); - Assert.assertEquals(digest2.quantile(0.5), 100.0, 0.1); + Assert.assertEquals(digest2.quantile(0.5), 100, 0.1); } /** @@ -411,22 +411,22 @@ public void testSkewedDistributions() throws Exception { TDigest digest1 = digests.get(GROUP_1); Assert.assertEquals(digest1.size(), 1000L); // P50 should be in the low range [1..10] since 90% of values are there - Assert.assertTrue(digest1.quantile(0.5) <= 10.0, + Assert.assertTrue(digest1.quantile(0.5) <= 10, "P50 should be in [1..10] since 90% of values are there, got: " + digest1.quantile(0.5)); // P80 should still be in the low range (900 of 1000 are low) - Assert.assertTrue(digest1.quantile(0.80) <= 10.0, + Assert.assertTrue(digest1.quantile(0.80) <= 10, "P80 should be in [1..10], got: " + digest1.quantile(0.80)); // P100 should be near 1099 - Assert.assertEquals(digest1.quantile(1.0), 1099.0, 1.0); + Assert.assertEquals(digest1.quantile(1.0), 1099, 1); // group2: 1000 total, 90% are in [9000..9899] TDigest digest2 = digests.get(GROUP_2); Assert.assertEquals(digest2.size(), 1000L); // P50 should be in the high range since 90% of values are there - Assert.assertTrue(digest2.quantile(0.5) >= 9000.0, + Assert.assertTrue(digest2.quantile(0.5) >= 9000, "P50 should be >= 9000 since 90% of values are there, got: " + digest2.quantile(0.5)); // P0 should be near 1 - Assert.assertEquals(digest2.quantile(0.0), 1.0, 1.0); + Assert.assertEquals(digest2.quantile(0.0), 1, 1); } /** diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java index 35399629b48c..c77148101509 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.plugin.minion.tasks.mergerollup; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.util.Arrays; import java.util.Collections; From 3f5793ca0c8632e4fdd1549ddf5865de8e528a4d Mon Sep 17 00:00:00 2001 From: Sai Valla Date: Mon, 13 Apr 2026 15:14:07 -0700 Subject: [PATCH 3/7] Address review comments: add JavaDoc, factory tests, fix comments - Add JavaDoc to PercentileTDigestAggregator documenting raw/non-raw support - Add factory tests verifying both PERCENTILETDIGEST and PERCENTILERAWTDIGEST resolve to the same aggregator - Remove stale "nominal entries" comment in MergeRollupTaskGenerator - Fix off-by-one in test comment: [500..600] -> [500..599] --- .../aggregator/PercentileTDigestAggregator.java | 6 ++++++ .../PercentileTDigestAggregatorTest.java | 17 +++++++++++++++++ .../mergerollup/MergeRollupTaskGenerator.java | 1 - .../MergeRollupTDigestTaskExecutorTest.java | 2 +- 4 files changed, 24 insertions(+), 2 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java index 75524eb2b53e..cf23919e8ed7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java @@ -25,6 +25,12 @@ import org.apache.pinot.segment.spi.Constants; +/** + * Aggregator for merging serialized TDigest sketches during segment processing + * (e.g., MergeAndRollup). Handles both {@code PERCENTILETDIGEST} and + * {@code PERCENTILERAWTDIGEST} aggregation types, as they share the same + * underlying byte representation. + */ public class PercentileTDigestAggregator implements ValueAggregator { @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java index 5abb3ebf74b4..dfed54bb3e02 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java @@ -22,12 +22,15 @@ import java.util.HashMap; import java.util.Map; import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.spi.data.FieldSpec.DataType; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; public class PercentileTDigestAggregatorTest { @@ -136,4 +139,18 @@ public void testAggregateWithSecondEmptyBytes() { TDigest resultDigest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(result); assertEquals(resultDigest.size(), 50); } + + @Test + public void testFactoryReturnsAggregatorForNonRawType() { + ValueAggregator aggregator = ValueAggregatorFactory.getValueAggregator( + AggregationFunctionType.PERCENTILETDIGEST, DataType.BYTES); + assertTrue(aggregator instanceof PercentileTDigestAggregator); + } + + @Test + public void testFactoryReturnsAggregatorForRawType() { + ValueAggregator aggregator = ValueAggregatorFactory.getValueAggregator( + AggregationFunctionType.PERCENTILERAWTDIGEST, DataType.BYTES); + assertTrue(aggregator instanceof PercentileTDigestAggregator); + } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java index b6f6b6dda217..7170c26b32df 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java @@ -518,7 +518,6 @@ public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map readMergedTDigests(final File mergedSegmentDir) thr private static List> buildStandardSegmentData() { List> segments = new ArrayList<>(); - // Segment 0: group1=[1..100], group2=[500..600] + // Segment 0: group1=[1..100], group2=[500..599] List seg0 = new ArrayList<>(); seg0.add(makeRow(GROUP_1, createTDigest(1, 101))); seg0.add(makeRow(GROUP_2, createTDigest(500, 600))); From 7902fa60a3794421d463ab626ac1a6f2c6c7dce3 Mon Sep 17 00:00:00 2001 From: Sai Valla Date: Mon, 13 Apr 2026 19:24:41 -0700 Subject: [PATCH 4/7] Apply suggestion from @justahuman1 --- .../processing/aggregator/PercentileTDigestAggregator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java index cf23919e8ed7..d06834a56bcf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java @@ -28,8 +28,7 @@ /** * Aggregator for merging serialized TDigest sketches during segment processing * (e.g., MergeAndRollup). Handles both {@code PERCENTILETDIGEST} and - * {@code PERCENTILERAWTDIGEST} aggregation types, as they share the same - * underlying byte representation. + * {@code PERCENTILERAWTDIGEST} aggregation types``` */ public class PercentileTDigestAggregator implements ValueAggregator { From b60766b8b157e384f1ba04b4b0b8230c33e1dd5e Mon Sep 17 00:00:00 2001 From: Sai Valla Date: Mon, 13 Apr 2026 19:25:01 -0700 Subject: [PATCH 5/7] Apply suggestion from @justahuman1 --- .../processing/aggregator/PercentileTDigestAggregator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java index d06834a56bcf..0fb0e890703f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java @@ -28,7 +28,7 @@ /** * Aggregator for merging serialized TDigest sketches during segment processing * (e.g., MergeAndRollup). Handles both {@code PERCENTILETDIGEST} and - * {@code PERCENTILERAWTDIGEST} aggregation types``` + * {@code PERCENTILERAWTDIGEST} aggregation types */ public class PercentileTDigestAggregator implements ValueAggregator { From a7cd736164b3ca203bf08bb8f8c9e56e0ef2d457 Mon Sep 17 00:00:00 2001 From: Sai Valla Date: Tue, 14 Apr 2026 10:17:04 -0700 Subject: [PATCH 6/7] fix: replace illegal ImmutableMap import with Map.of --- .../tasks/mergerollup/MergeRollupTaskGeneratorTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java index c77148101509..d012762ad83c 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.plugin.minion.tasks.mergerollup; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import java.util.Arrays; import java.util.Collections; @@ -248,7 +247,7 @@ public void testValidCompressionFactor() { validConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel); validConfig.put(prefix + "a.compressionFactor", "200"); TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) - .setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.MergeRollupTask.TASK_TYPE, validConfig))) + .setTaskConfig(new TableTaskConfig(Map.of(MinionConstants.MergeRollupTask.TASK_TYPE, validConfig))) .build(); taskGenerator.validateTaskConfigs(offlineTableConfig, schema, validConfig); } @@ -266,7 +265,7 @@ public void testInvalidCompressionFactor() { invalidConfig.put(MinionConstants.MergeRollupTask.MERGE_LEVEL_KEY, mergeLevel); invalidConfig.put(prefix + "a.compressionFactor", "0"); TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME) - .setTaskConfig(new TableTaskConfig(ImmutableMap.of(MinionConstants.MergeRollupTask.TASK_TYPE, invalidConfig))) + .setTaskConfig(new TableTaskConfig(Map.of(MinionConstants.MergeRollupTask.TASK_TYPE, invalidConfig))) .build(); assertThrows(IllegalStateException.class, () -> { taskGenerator.validateTaskConfigs(offlineTableConfig, schema, invalidConfig); From 99aa269685ff693d38401acfc2c50e2599f7d9d6 Mon Sep 17 00:00:00 2001 From: Sai Valla Date: Fri, 17 Apr 2026 18:43:07 -0700 Subject: [PATCH 7/7] address PR feedback: treat empty bytes as missing, improve error message - Treat empty byte[] as missing value in PercentileTDigestAggregator (pass through instead of synthesizing an empty TDigest); full cross-layer fix to follow in a separate PR - Print allowedFunctionParameterNames in the error message so it stays in sync with the set - Use Map.of instead of Guava ImmutableMap in tests --- .../aggregator/PercentileTDigestAggregator.java | 10 ++-------- .../aggregator/PercentileTDigestAggregatorTest.java | 6 ++---- .../tasks/mergerollup/MergeRollupTaskGenerator.java | 4 ++-- 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java index 0fb0e890703f..78eb5e09b026 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregator.java @@ -37,16 +37,10 @@ public Object aggregate(Object value1, Object value2, Map functi byte[] bytes1 = (byte[]) value1; byte[] bytes2 = (byte[]) value2; - // Empty byte arrays represent the default null value for BYTES columns. - // Deserializing byte[0] would throw BufferUnderflowException, so handle it explicitly. - if (bytes1.length == 0 && bytes2.length == 0) { - int compression = getCompression(functionParameters); - return ObjectSerDeUtils.TDIGEST_SER_DE.serialize(TDigest.createMergingDigest(compression)); - } + // Treat empty byte arrays (default null value for BYTES columns) as missing values if (bytes1.length == 0) { return bytes2; - } - if (bytes2.length == 0) { + } else if (bytes2.length == 0) { return bytes1; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java index dfed54bb3e02..b8cc54a8d775 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/aggregator/PercentileTDigestAggregatorTest.java @@ -98,10 +98,8 @@ public void testAggregateWithBothEmptyBytes() { Map functionParameters = new HashMap<>(); byte[] result = (byte[]) _aggregator.aggregate(empty1, empty2, functionParameters); - // Should return a valid serialized empty TDigest, not crash - TDigest resultDigest = ObjectSerDeUtils.TDIGEST_SER_DE.deserialize(result); - assertNotNull(resultDigest); - assertEquals(resultDigest.size(), 0); + // Both empty — treat as missing, return empty bytes + assertEquals(result.length, 0); } @Test diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java index 7170c26b32df..7208d5144d5f 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java @@ -516,8 +516,8 @@ public void validateTaskConfigs(TableConfig tableConfig, Schema schema, Map