Skip to content
Permalink
Browse files
Fix bug in auto compaction preserveExistingMetrics feature (#12438)
* fix bug

* fix test

* fix IT
  • Loading branch information
maytasm committed Apr 15, 2022
1 parent 0460d45 commit c25a5568275c506115914cc5622a829c89f8e384
Showing 9 changed files with 323 additions and 32 deletions.
@@ -45,6 +45,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
@@ -121,6 +122,101 @@ public void setup() throws Exception
fullDatasourceName = "wikipedia_index_test_" + UUID.randomUUID() + config.getExtraDatasourceNameSuffix();
}

@Test
public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetricsUsingAggregatorWithDifferentReturnType() throws Exception
{
// added = null, count = 2, sum_added = 62, quantilesDoublesSketch = 2, thetaSketch = 2, HLLSketchBuild = 2
loadData(INDEX_TASK_WITH_ROLLUP_FOR_PRESERVE_METRICS);
// added = 31, count = null, sum_added = null, quantilesDoublesSketch = null, thetaSketch = null, HLLSketchBuild = null
loadData(INDEX_TASK_WITHOUT_ROLLUP_FOR_PRESERVE_METRICS);
try (final Closeable ignored = unloader(fullDatasourceName)) {
final List<String> intervalsBeforeCompaction = coordinator.getSegmentIntervals(fullDatasourceName);
intervalsBeforeCompaction.sort(null);
// 2 segments across 1 days...
verifySegmentsCount(2);
ArrayList<Object> nullList = new ArrayList<Object>();
nullList.add(null);
Map<String, Object> queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)), ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(31))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "count",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(2))), ImmutableMap.of("events", ImmutableList.of(nullList)))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "sum_added",
"%%EXPECTED_COUNT_RESULT%%", 2,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(62))), ImmutableMap.of("events", ImmutableList.of(nullList)))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%QUANTILESRESULT%%", 2,
"%%THETARESULT%%", 2.0,
"%%HLLRESULT%%", 2
);
verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);

submitCompactionConfig(
MAX_ROWS_PER_SEGMENT_COMPACTED,
NO_SKIP_OFFSET,
new UserCompactionTaskGranularityConfig(null, null, true),
new UserCompactionTaskDimensionsConfig(DimensionsSpec.getDefaultSchemas(ImmutableList.of("language"))),
null,
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
// FloatSumAggregator combine method takes in two Float but return Double
new FloatSumAggregatorFactory("sum_added", "added"),
new SketchMergeAggregatorFactory("thetaSketch", "user", 16384, true, false, null),
new HllSketchBuildAggregatorFactory("HLLSketchBuild", "user", 12, TgtHllType.HLL_4.name(), false),
new DoublesSketchAggregatorFactory("quantilesDoublesSketch", "delta", 128, 1000000000L)
},
false
);
// should now only have 1 row after compaction
// added = null, count = 3, sum_added = 93.0
forceTriggerAutoCompaction(1);

queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "added",
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(nullList)))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "count",
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(3))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%FIELD_TO_QUERY%%", "sum_added",
"%%EXPECTED_COUNT_RESULT%%", 1,
"%%EXPECTED_SCAN_RESULT%%", ImmutableList.of(ImmutableMap.of("events", ImmutableList.of(ImmutableList.of(93.0f))))
);
verifyQuery(INDEX_ROLLUP_QUERIES_RESOURCE, queryAndResultFields);
queryAndResultFields = ImmutableMap.of(
"%%QUANTILESRESULT%%", 3,
"%%THETARESULT%%", 3.0,
"%%HLLRESULT%%", 3
);
verifyQuery(INDEX_ROLLUP_SKETCH_QUERIES_RESOURCE, queryAndResultFields);

verifySegmentsCompacted(1, MAX_ROWS_PER_SEGMENT_COMPACTED);
checkCompactionIntervals(intervalsBeforeCompaction);

List<TaskResponseObject> compactTasksBefore = indexer.getCompleteTasksForDataSource(fullDatasourceName);
// Verify rollup segments does not get compacted again
forceTriggerAutoCompaction(1);
List<TaskResponseObject> compactTasksAfter = indexer.getCompleteTasksForDataSource(fullDatasourceName);
Assert.assertEquals(compactTasksAfter.size(), compactTasksBefore.size());
}
}

@Test
public void testAutoCompactionRowWithMetricAndRowWithoutMetricShouldPreserveExistingMetrics() throws Exception
{
@@ -452,13 +452,13 @@ protected Aggregator[] getAggsForRow(int rowOffset)
@Override
public float getMetricFloatValue(int rowOffset, int aggOffset)
{
return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat);
return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getFloat)).floatValue();
}

@Override
public long getMetricLongValue(int rowOffset, int aggOffset)
{
return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong);
return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getLong)).longValue();
}

@Override
@@ -470,7 +470,7 @@ public Object getMetricObjectValue(int rowOffset, int aggOffset)
@Override
protected double getMetricDoubleValue(int rowOffset, int aggOffset)
{
return getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble);
return ((Number) getMetricHelper(getMetricAggs(), concurrentGet(rowOffset), aggOffset, Aggregator::getDouble)).doubleValue();
}

@Override
@@ -544,7 +544,7 @@ public Iterable<Row> iterableWithPostAggregations(
* If preserveExistingMetrics flag is set, then this method will combine values from two aggregators, the aggregator
* for aggregating from input into output field and the aggregator for combining already aggregated field, as needed
*/
private <T> T getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
private <T> Object getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, int aggOffset, Function<Aggregator, T> getMetricTypeFunction)
{
if (preserveExistingMetrics) {
// Since the preserveExistingMetrics flag is set, we will have to check and possibly retrieve the aggregated values
@@ -564,7 +564,7 @@ private <T> T getMetricHelper(AggregatorFactory[] metrics, Aggregator[] aggs, in
AggregatorFactory aggregatorFactory = metrics[aggOffset];
T aggregatedFromSource = getMetricTypeFunction.apply(aggs[aggOffset]);
T aggregatedFromCombined = getMetricTypeFunction.apply(aggs[aggOffset + metrics.length]);
return (T) aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined);
return aggregatorFactory.combine(aggregatedFromSource, aggregatedFromCombined);
}
} else {
// If preserveExistingMetrics flag is not set then we simply get metrics from the list of Aggregator, aggs, using the
@@ -50,6 +50,7 @@
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.query.aggregation.FilteredAggregatorFactory;
import org.apache.druid.query.aggregation.FloatSumAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.SelectorDimFilter;
@@ -799,6 +800,74 @@ public void testSchemaRollupWithRowWithExistingMetricsAndWithoutMetric() throws
}
}

@Test
public void testSchemaRollupWithRowWithExistingMetricsAndWithoutMetricUsingAggregatorWithDifferentReturnType() throws IndexSizeExceededException
{
AggregatorFactory[] aggregatorFactories = new AggregatorFactory[]{
new CountAggregatorFactory("count"),
// FloatSumAggregator combine method takes in two Float but return Double
new FloatSumAggregatorFactory("sum_of_x", "x")
};
final IncrementalIndex index = indexCreator.createIndex((Object) aggregatorFactories);
index.add(
new MapBasedInputRow(
1481871600000L,
Arrays.asList("name", "host"),
ImmutableMap.of("name", "name1", "host", "host", "x", 2)
)
);
index.add(
new MapBasedInputRow(
1481871600000L,
Arrays.asList("name", "host"),
ImmutableMap.of("name", "name1", "host", "host", "x", 3)
)
);
index.add(
new MapBasedInputRow(
1481871600000L,
Arrays.asList("name", "host"),
ImmutableMap.of("name", "name1", "host", "host", "count", 2, "sum_of_x", 4)
)
);
index.add(
new MapBasedInputRow(
1481871600000L,
Arrays.asList("name", "host"),
ImmutableMap.of("name", "name1", "host", "host", "count", 3, "sum_of_x", 5)
)
);

Assert.assertEquals(index.isRollup() ? 1 : 4, index.size());
Iterator<Row> iterator = index.iterator();
int rowCount = 0;
while (iterator.hasNext()) {
rowCount++;
Row row = iterator.next();
Assert.assertEquals(1481871600000L, row.getTimestampFromEpoch());
if (index.isRollup()) {
// All rows are rollup into one row
Assert.assertEquals(isPreserveExistingMetrics ? 7 : 4, row.getMetric("count").intValue());
Assert.assertEquals(isPreserveExistingMetrics ? 14 : 5, row.getMetric("sum_of_x").intValue());
} else {
// We still have 4 rows
if (rowCount == 1 || rowCount == 2) {
Assert.assertEquals(1, row.getMetric("count").intValue());
Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
} else {
if (isPreserveExistingMetrics) {
Assert.assertEquals(rowCount - 1, row.getMetric("count").intValue());
Assert.assertEquals(1 + rowCount, row.getMetric("sum_of_x").intValue());
} else {
Assert.assertEquals(1, row.getMetric("count").intValue());
// The rows does not have the dim "x", hence metric is null (useDefaultValueForNull=false) or 0 (useDefaultValueForNull=true)
Assert.assertEquals(NullHandling.sqlCompatible() ? null : 0.0f, row.getMetric("sum_of_x"));
}
}
}
}
}

@Test
public void testSchemaRollupWithRowWithOnlyExistingMetrics() throws IndexSizeExceededException
{
@@ -79,13 +79,14 @@

public static ClientCompactionTaskQueryTuningConfig from(
@Nullable UserCompactionTaskQueryTuningConfig userCompactionTaskQueryTuningConfig,
@Nullable Integer maxRowsPerSegment
@Nullable Integer maxRowsPerSegment,
@Nullable Boolean preserveExistingMetrics
)
{
if (userCompactionTaskQueryTuningConfig == null) {
return new ClientCompactionTaskQueryTuningConfig(
maxRowsPerSegment,
new OnheapIncrementalIndex.Spec(true),
new OnheapIncrementalIndex.Spec(preserveExistingMetrics),
null,
null,
null,
@@ -107,7 +108,7 @@ public static ClientCompactionTaskQueryTuningConfig from(
} else {
AppendableIndexSpec appendableIndexSpecToUse = userCompactionTaskQueryTuningConfig.getAppendableIndexSpec() != null
? userCompactionTaskQueryTuningConfig.getAppendableIndexSpec()
: new OnheapIncrementalIndex.Spec(true);
: new OnheapIncrementalIndex.Spec(preserveExistingMetrics);
return new ClientCompactionTaskQueryTuningConfig(
maxRowsPerSegment,
appendableIndexSpecToUse,
@@ -452,7 +452,7 @@ private CoordinatorStats doRun(
"coordinator-issued",
segmentsToCompact,
config.getTaskPriority(),
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()),
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), config.getMetricsSpec() != null),
granularitySpec,
dimensionsSpec,
config.getMetricsSpec(),
@@ -338,7 +338,7 @@ private boolean needsCompaction(DataSourceCompactionConfig config, SegmentsToCom
{
Preconditions.checkState(!candidates.isEmpty(), "Empty candidates");
final ClientCompactionTaskQueryTuningConfig tuningConfig =
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment());
ClientCompactionTaskQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment(), null);
final PartitionsSpec partitionsSpecFromConfig = findPartitionsSpecFromConfig(tuningConfig);
final CompactionState lastCompactionState = candidates.segments.get(0).getLastCompactionState();
if (lastCompactionState == null) {

0 comments on commit c25a556

Please sign in to comment.