Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.druid.query.aggregation.momentsketch.MomentSketchWrapper;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ValueType;

Expand Down Expand Up @@ -110,11 +109,9 @@ public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
ColumnCapabilities cap = metricFactory.getColumnCapabilities(fieldName);
if (cap == null || ValueType.isNumeric(cap.getType())) {
final ColumnValueSelector<Double> selector = metricFactory.makeColumnValueSelector(fieldName);
return new MomentSketchBuildAggregator(selector, k, getCompress());
return new MomentSketchBuildAggregator(metricFactory.makeColumnValueSelector(fieldName), k, getCompress());
} else {
final ColumnValueSelector<MomentSketchWrapper> selector = metricFactory.makeColumnValueSelector(fieldName);
return new MomentSketchMergeAggregator(selector, k, getCompress());
return new MomentSketchMergeAggregator(metricFactory.makeColumnValueSelector(fieldName), k, getCompress());
}
}

Expand All @@ -123,11 +120,9 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
ColumnCapabilities cap = metricFactory.getColumnCapabilities(fieldName);
if (cap == null || ValueType.isNumeric(cap.getType())) {
final ColumnValueSelector<Double> selector = metricFactory.makeColumnValueSelector(fieldName);
return new MomentSketchBuildBufferAggregator(selector, k, getCompress());
return new MomentSketchBuildBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), k, getCompress());
} else {
final ColumnValueSelector<MomentSketchWrapper> selector = metricFactory.makeColumnValueSelector(fieldName);
return new MomentSketchMergeBufferAggregator(selector, k, getCompress());
return new MomentSketchMergeBufferAggregator(metricFactory.makeColumnValueSelector(fieldName), k, getCompress());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public MomentSketchBuildAggregator(
@Override
public void aggregate()
{
if (valueSelector.isNull()) {
return;
}
momentsSketch.add(valueSelector.getDouble());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public synchronized void init(final ByteBuffer buffer, final int position)
@Override
public synchronized void aggregate(final ByteBuffer buffer, final int position)
{
if (selector.isNull()) {
return;
}
ByteBuffer mutationBuffer = buffer.duplicate();
mutationBuffer.position(position);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.momentsketch.aggregator;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.junit.Assert;
import org.junit.Test;

public class MomentSketchAggregatorFactorySerdeTest
{
@Test
public void serializeDeserializeFactoryWithFieldName() throws Exception
{
ObjectMapper objectMapper = new DefaultObjectMapper();
MomentSketchAggregatorFactory factory = new MomentSketchAggregatorFactory(
"name", "fieldName", 128, true
);

MomentSketchAggregatorFactory other = objectMapper.readValue(
objectMapper.writeValueAsString(factory),
MomentSketchAggregatorFactory.class
);

Assert.assertEquals(factory, other);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@
package org.apache.druid.query.aggregation.momentsketch.aggregator;


import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.AggregationTestHelper;
Expand All @@ -47,6 +46,7 @@
@RunWith(Parameterized.class)
public class MomentsSketchAggregatorTest extends InitializedNullHandlingTest
{
private final boolean hasNulls = !NullHandling.replaceWithDefault();
private final AggregationTestHelper helper;

@Rule
Expand All @@ -70,23 +70,6 @@ public static Collection<?> constructorFeeder()
return constructors;
}

// this is to test Json properties and equals
@Test
public void serializeDeserializeFactoryWithFieldName() throws Exception
{
ObjectMapper objectMapper = new DefaultObjectMapper();
MomentSketchAggregatorFactory factory = new MomentSketchAggregatorFactory(
"name", "fieldName", 128, true
);

MomentSketchAggregatorFactory other = objectMapper.readValue(
objectMapper.writeValueAsString(factory),
MomentSketchAggregatorFactory.class
);

Assert.assertEquals(factory, other);
}

@Test
public void buildingSketchesAtIngestionTime() throws Exception
{
Expand All @@ -104,11 +87,14 @@ public void buildingSketchesAtIngestionTime() throws Exception
" \"dimensionExclusions\": [ \"sequenceNumber\"],",
" \"spatialDimensions\": []",
" },",
" \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]",
" \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\", \"valueWithNulls\"]",
" }",
"}"
),
"[{\"type\": \"momentSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"k\": 10, \"compress\": true}]",
"["
+ "{\"type\": \"momentSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"k\": 10, \"compress\": true},"
+ "{\"type\": \"momentSketch\", \"name\": \"sketchWithNulls\", \"fieldName\": \"valueWithNulls\", \"k\": 10, \"compress\": true}"
+ "]",
0,
// minTimestamp
Granularities.NONE,
Expand All @@ -122,12 +108,16 @@ public void buildingSketchesAtIngestionTime() throws Exception
" \"granularity\": \"ALL\",",
" \"dimensions\": [],",
" \"aggregations\": [",
" {\"type\": \"momentSketchMerge\", \"name\": \"sketch\", \"fieldName\": \"sketch\", \"k\": 10, \"compress\": true}",
" {\"type\": \"momentSketchMerge\", \"name\": \"sketch\", \"fieldName\": \"sketch\", \"k\": 10, \"compress\": true},",
" {\"type\": \"momentSketchMerge\", \"name\": \"sketchWithNulls\", \"fieldName\": \"sketchWithNulls\", \"k\": 10, \"compress\": true}",
" ],",
" \"postAggregations\": [",
" {\"type\": \"momentSketchSolveQuantiles\", \"name\": \"quantiles\", \"fractions\": [0, 0.5, 1], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}},",
" {\"type\": \"momentSketchMin\", \"name\": \"min\", \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}},",
" {\"type\": \"momentSketchMax\", \"name\": \"max\", \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}}",
" {\"type\": \"momentSketchMax\", \"name\": \"max\", \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}},",
" {\"type\": \"momentSketchSolveQuantiles\", \"name\": \"quantilesWithNulls\", \"fractions\": [0, 0.5, 1], \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketchWithNulls\"}},",
" {\"type\": \"momentSketchMin\", \"name\": \"minWithNulls\", \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketchWithNulls\"}},",
" {\"type\": \"momentSketchMax\", \"name\": \"maxWithNulls\", \"field\": {\"type\": \"fieldAccess\", \"fieldName\": \"sketchWithNulls\"}}",
" ],",
" \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
"}"
Expand All @@ -136,19 +126,36 @@ public void buildingSketchesAtIngestionTime() throws Exception
List<ResultRow> results = seq.toList();
Assert.assertEquals(1, results.size());
ResultRow row = results.get(0);
double[] quantilesArray = (double[]) row.get(1); // "quantiles"
MomentSketchWrapper sketchObject = (MomentSketchWrapper) row.get(0); // "sketch"
// 400 total products since this is pre-rollup
Assert.assertEquals(400.0, sketchObject.getPowerSums()[0], 1e-10);

MomentSketchWrapper sketchObjectWithNulls = (MomentSketchWrapper) row.get(1); // "sketchWithNulls"
// 23 null values, nulls at ingestion time are not replaced with default values for complex metrics inputs
Assert.assertEquals(377.0, sketchObjectWithNulls.getPowerSums()[0], 1e-10);

double[] quantilesArray = (double[]) row.get(2); // "quantiles"
Assert.assertEquals(0, quantilesArray[0], 0.05);
Assert.assertEquals(.5, quantilesArray[1], 0.05);
Assert.assertEquals(1.0, quantilesArray[2], 0.05);

Double minValue = (Double) row.get(2); // "min"
Double minValue = (Double) row.get(3); // "min"
Assert.assertEquals(0.0011, minValue, 0.0001);

Double maxValue = (Double) row.get(3); // "max"
Double maxValue = (Double) row.get(4); // "max"
Assert.assertEquals(0.9969, maxValue, 0.0001);

MomentSketchWrapper sketchObject = (MomentSketchWrapper) row.get(0); // "sketch"
Assert.assertEquals(400.0, sketchObject.getPowerSums()[0], 1e-10);
double[] quantilesArrayWithNulls = (double[]) row.get(5); // "quantilesWithNulls"
Assert.assertEquals(5.0, quantilesArrayWithNulls[0], 0.05);
Assert.assertEquals(7.57, quantilesArrayWithNulls[1], 0.05);
Assert.assertEquals(10.0, quantilesArrayWithNulls[2], 0.05);

Double minValueWithNulls = (Double) row.get(6); // "minWithNulls"
Assert.assertEquals(5.0164, minValueWithNulls, 0.0001);

Double maxValueWithNulls = (Double) row.get(7); // "maxWithNulls"
Assert.assertEquals(9.9788, maxValueWithNulls, 0.0001);

}

@Test
Expand All @@ -164,11 +171,11 @@ public void buildingSketchesAtQueryTime() throws Exception
" \"format\": \"tsv\",",
" \"timestampSpec\": {\"column\": \"timestamp\", \"format\": \"yyyyMMddHH\"},",
" \"dimensionsSpec\": {",
" \"dimensions\": [ \"product\"],",
" \"dimensions\": [ \"product\", {\"name\":\"valueWithNulls\", \"type\":\"double\"}],",
" \"dimensionExclusions\": [\"sequenceNumber\"],",
" \"spatialDimensions\": []",
" },",
" \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\"]",
" \"columns\": [\"timestamp\", \"sequenceNumber\", \"product\", \"value\", \"valueWithNulls\"]",
" }",
"}"
),
Expand All @@ -184,7 +191,8 @@ public void buildingSketchesAtQueryTime() throws Exception
" \"granularity\": \"ALL\",",
" \"dimensions\": [],",
" \"aggregations\": [",
" {\"type\": \"momentSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"k\": 10}",
" {\"type\": \"momentSketch\", \"name\": \"sketch\", \"fieldName\": \"value\", \"k\": 10},",
" {\"type\": \"momentSketch\", \"name\": \"sketchWithNulls\", \"fieldName\": \"valueWithNulls\", \"k\": 10}",
" ],",
" \"intervals\": [\"2016-01-01T00:00:00.000Z/2016-01-31T00:00:00.000Z\"]",
"}"
Expand All @@ -196,8 +204,14 @@ public void buildingSketchesAtQueryTime() throws Exception
ResultRow row = results.get(0);

MomentSketchWrapper sketchObject = (MomentSketchWrapper) row.get(0); // "sketch"
// 9 total products since we pre-sum the values.
Assert.assertEquals(9.0, sketchObject.getPowerSums()[0], 1e-10);
// 385 total products since roll-up limited by valueWithNulls column
Assert.assertEquals(385.0, sketchObject.getPowerSums()[0], 1e-10);

MomentSketchWrapper sketchObjectWithNulls = (MomentSketchWrapper) row.get(1); // "sketchWithNulls"

// in default mode, all 385 rows have a number value so will be computed, but only 377 rows have actual values in
// sql null mode
Assert.assertEquals(hasNulls ? 377.0 : 385.0, sketchObjectWithNulls.getPowerSums()[0], 1e-10);
}
}

Loading