From 0ae5b940c27eccda4661230befb7aa8b786e6d5f Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 29 Apr 2021 01:28:33 -0700 Subject: [PATCH 1/2] Vectorize the cardinality aggregator. Does not include a byRow implementation, so if byRow is true then the aggregator still goes through the non-vectorized path. Testing strategy: - New tests that exercise both styles of "aggregate" for supported types. - Some existing tests have also become active (note the deleted "cannotVectorize" lines). --- .../CardinalityAggregatorFactory.java | 33 +- .../CardinalityBufferAggregator.java | 5 +- .../CardinalityVectorAggregator.java | 73 ++++ ...alityAggregatorColumnSelectorStrategy.java | 7 +- ...alityAggregatorColumnSelectorStrategy.java | 7 +- ...alityAggregatorColumnSelectorStrategy.java | 7 +- ...alityAggregatorColumnSelectorStrategy.java | 20 +- .../vector/CardinalityVectorProcessor.java | 41 +++ .../CardinalityVectorProcessorFactory.java | 74 ++++ .../DoubleCardinalityVectorProcessor.java | 94 +++++ .../FloatCardinalityVectorProcessor.java | 94 +++++ .../LongCardinalityVectorProcessor.java | 94 +++++ ...ValueStringCardinalityVectorProcessor.java | 102 ++++++ .../vector/NilCardinalityVectorProcessor.java | 46 +++ ...ValueStringCardinalityVectorProcessor.java | 92 +++++ .../CardinalityVectorAggregatorTest.java | 338 ++++++++++++++++++ .../query/groupby/GroupByQueryRunnerTest.java | 10 +- .../druid/sql/calcite/CalciteQueryTest.java | 26 +- 18 files changed, 1120 insertions(+), 43 deletions(-) create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregator.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessor.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/DoubleCardinalityVectorProcessor.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/FloatCardinalityVectorProcessor.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/LongCardinalityVectorProcessor.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java create mode 100644 processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/SingleValueStringCardinalityVectorProcessor.java create mode 100644 processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregatorTest.java diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java index 4990d5d4b77f..57c5c7e73068 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityAggregatorFactory.java @@ -35,15 +35,21 @@ import org.apache.druid.query.aggregation.BufferAggregator; import org.apache.druid.query.aggregation.NoopAggregator; import org.apache.druid.query.aggregation.NoopBufferAggregator; +import org.apache.druid.query.aggregation.NoopVectorAggregator; +import org.apache.druid.query.aggregation.VectorAggregator; import org.apache.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy; import org.apache.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory; +import org.apache.druid.query.aggregation.cardinality.vector.CardinalityVectorProcessorFactory; import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; import org.apache.druid.query.cache.CacheKeyBuilder; import org.apache.druid.query.dimension.DefaultDimensionSpec; import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.segment.ColumnInspector; +import org.apache.druid.segment.ColumnProcessors; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.DimensionHandlerUtils; import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.vector.VectorColumnSelectorFactory; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -146,7 +152,6 @@ public Aggregator factorize(final ColumnSelectorFactory columnFactory) return new CardinalityAggregator(selectorPluses, byRow); } - @Override public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) { @@ -163,6 +168,32 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory) return new CardinalityBufferAggregator(selectorPluses, byRow); } + @Override + public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory) + { + if (fields.isEmpty()) { + return NoopVectorAggregator.instance(); + } + + return new CardinalityVectorAggregator( + fields.stream().map( + field -> + ColumnProcessors.makeVectorProcessor( + field, + CardinalityVectorProcessorFactory.INSTANCE, + selectorFactory + ) + ).collect(Collectors.toList()) + ); + } + + @Override + public boolean canVectorize(ColumnInspector columnInspector) + { + // !byRow because there is not yet a vector implementation. + return !byRow && fields.stream().allMatch(DimensionSpec::canVectorize); + } + @Override public Comparator getComparator() { diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java index 64d70d47459d..0e3d698de551 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityBufferAggregator.java @@ -55,10 +55,11 @@ public void aggregate(ByteBuffer buf, int position) // Save position, limit and restore later instead of allocating a new ByteBuffer object final int oldPosition = buf.position(); final int oldLimit = buf.limit(); - buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); - buf.position(position); try { + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); if (byRow) { CardinalityAggregator.hashRow(selectorPluses, collector); diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregator.java new file mode 100644 index 000000000000..59d1ecff63d7 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.cardinality; + +import org.apache.druid.query.aggregation.VectorAggregator; +import org.apache.druid.query.aggregation.cardinality.vector.CardinalityVectorProcessor; +import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesBufferAggregator; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.List; + +public class CardinalityVectorAggregator implements VectorAggregator +{ + private final List processors; + + CardinalityVectorAggregator(List processors) + { + this.processors = processors; + } + + @Override + public void init(ByteBuffer buf, int position) + { + HyperUniquesBufferAggregator.doInit(buf, position); + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + for (final CardinalityVectorProcessor processor : processors) { + processor.aggregate(buf, position, startRow, endRow); + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + for (final CardinalityVectorProcessor processor : processors) { + processor.aggregate(buf, numRows, positions, rows, positionOffset); + } + } + + @Nullable + @Override + public Object get(ByteBuffer buf, int position) + { + return HyperUniquesBufferAggregator.doGet(buf, position); + } + + @Override + public void close() + { + // Nothing to close. + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java index 14714758ed92..4fc7a3209890 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/DoubleCardinalityAggregatorColumnSelectorStrategy.java @@ -34,6 +34,11 @@ public class DoubleCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy { + public static void addDoubleToCollector(final HyperLogLogCollector collector, final double n) + { + collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits(n)).asBytes()); + } + @Override public void hashRow(BaseDoubleColumnValueSelector selector, Hasher hasher) { @@ -46,7 +51,7 @@ public void hashRow(BaseDoubleColumnValueSelector selector, Hasher hasher) public void hashValues(BaseDoubleColumnValueSelector selector, HyperLogLogCollector collector) { if (NullHandling.replaceWithDefault() || !selector.isNull()) { - collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits(selector.getDouble())).asBytes()); + addDoubleToCollector(collector, selector.getDouble()); } } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java index 59a242f1f49d..2b04e1fe6355 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/FloatCardinalityAggregatorColumnSelectorStrategy.java @@ -34,6 +34,11 @@ public class FloatCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy { + public static void addFloatToCollector(final HyperLogLogCollector collector, final float n) + { + collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits(n)).asBytes()); + } + @Override public void hashRow(BaseFloatColumnValueSelector selector, Hasher hasher) { @@ -46,7 +51,7 @@ public void hashRow(BaseFloatColumnValueSelector selector, Hasher hasher) public void hashValues(BaseFloatColumnValueSelector selector, HyperLogLogCollector collector) { if (NullHandling.replaceWithDefault() || !selector.isNull()) { - collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits(selector.getFloat())).asBytes()); + addFloatToCollector(collector, selector.getFloat()); } } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java index d6ffea54e247..75a0f3f428f9 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/LongCardinalityAggregatorColumnSelectorStrategy.java @@ -34,6 +34,11 @@ public class LongCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy { + public static void addLongToCollector(final HyperLogLogCollector collector, final long n) + { + collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(n).asBytes()); + } + @Override public void hashRow(BaseLongColumnValueSelector selector, Hasher hasher) { @@ -46,7 +51,7 @@ public void hashRow(BaseLongColumnValueSelector selector, Hasher hasher) public void hashValues(BaseLongColumnValueSelector selector, HyperLogLogCollector collector) { if (NullHandling.replaceWithDefault() || !selector.isNull()) { - collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(selector.getLong()).asBytes()); + addLongToCollector(collector, selector.getLong()); } } } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java index ca4c69c8cfae..9d49e30c21ef 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/types/StringCardinalityAggregatorColumnSelectorStrategy.java @@ -26,6 +26,7 @@ import org.apache.druid.segment.DimensionSelector; import org.apache.druid.segment.data.IndexedInts; +import javax.annotation.Nullable; import java.util.Arrays; public class StringCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy @@ -33,6 +34,16 @@ public class StringCardinalityAggregatorColumnSelectorStrategy implements Cardin public static final String CARDINALITY_AGG_NULL_STRING = "\u0000"; public static final char CARDINALITY_AGG_SEPARATOR = '\u0001'; + public static void addStringToCollector(final HyperLogLogCollector collector, @Nullable final String s) + { + // SQL standard spec does not count null values, + // Skip counting null values when we are not replacing null with default value. + // A special value for null in case null handling is configured to use empty string for null. + if (NullHandling.replaceWithDefault() || s != null) { + collector.add(CardinalityAggregator.HASH_FUNCTION.hashUnencodedChars(nullToSpecial(s)).asBytes()); + } + } + @Override public void hashRow(DimensionSelector dimSelector, Hasher hasher) { @@ -80,16 +91,11 @@ public void hashValues(DimensionSelector dimSelector, HyperLogLogCollector colle for (int i = 0, rowSize = row.size(); i < rowSize; i++) { int index = row.get(i); final String value = dimSelector.lookupName(index); - // SQL standard spec does not count null values, - // Skip counting null values when we are not replacing null with default value. - // A special value for null in case null handling is configured to use empty string for null. - if (NullHandling.replaceWithDefault() || value != null) { - collector.add(CardinalityAggregator.HASH_FUNCTION.hashUnencodedChars(nullToSpecial(value)).asBytes()); - } + addStringToCollector(collector, value); } } - private String nullToSpecial(String value) + private static String nullToSpecial(String value) { return value == null ? CARDINALITY_AGG_NULL_STRING : value; } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessor.java new file mode 100644 index 000000000000..9c79d0b9e48d --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.cardinality.vector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +/** + * Processor for {@link org.apache.druid.query.aggregation.cardinality.CardinalityVectorAggregator}. + */ +public interface CardinalityVectorProcessor +{ + /** + * Processor for {@link org.apache.druid.query.aggregation.VectorAggregator#aggregate(ByteBuffer, int, int, int)} + * in byRow = false mode. + */ + void aggregate(ByteBuffer buf, int position, int startRow, int endRow); + + /** + * Processor for {@link org.apache.druid.query.aggregation.VectorAggregator#aggregate(ByteBuffer, int, int, int)} + * in byRow = false mode. + */ + void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset); +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java new file mode 100644 index 000000000000..3d745831b910 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/CardinalityVectorProcessorFactory.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.cardinality.vector; + +import org.apache.druid.segment.VectorColumnProcessorFactory; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.segment.vector.VectorObjectSelector; +import org.apache.druid.segment.vector.VectorValueSelector; + +public class CardinalityVectorProcessorFactory implements VectorColumnProcessorFactory +{ + public static final CardinalityVectorProcessorFactory INSTANCE = new CardinalityVectorProcessorFactory(); + + @Override + public CardinalityVectorProcessor makeSingleValueDimensionProcessor( + ColumnCapabilities capabilities, + SingleValueDimensionVectorSelector selector + ) + { + return new SingleValueStringCardinalityVectorProcessor(selector); + } + + @Override + public CardinalityVectorProcessor makeMultiValueDimensionProcessor( + ColumnCapabilities capabilities, + MultiValueDimensionVectorSelector selector + ) + { + return new MultiValueStringCardinalityVectorProcessor(selector); + } + + @Override + public CardinalityVectorProcessor makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + return new FloatCardinalityVectorProcessor(selector); + } + + @Override + public CardinalityVectorProcessor makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + return new DoubleCardinalityVectorProcessor(selector); + } + + @Override + public CardinalityVectorProcessor makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector) + { + return new LongCardinalityVectorProcessor(selector); + } + + @Override + public CardinalityVectorProcessor makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector) + { + return NilCardinalityVectorProcessor.INSTANCE; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/DoubleCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/DoubleCardinalityVectorProcessor.java new file mode 100644 index 000000000000..ad63f1ac8896 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/DoubleCardinalityVectorProcessor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.cardinality.vector; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.query.aggregation.cardinality.types.DoubleCardinalityAggregatorColumnSelectorStrategy; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class DoubleCardinalityVectorProcessor implements CardinalityVectorProcessor +{ + private final VectorValueSelector selector; + + public DoubleCardinalityVectorProcessor(final VectorValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final double[] vector = selector.getDoubleVector(); + final boolean[] nullVector = selector.getNullVector(); + + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + + for (int i = startRow; i < endRow; i++) { + if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) { + DoubleCardinalityAggregatorColumnSelectorStrategy.addDoubleToCollector(collector, vector[i]); + } + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final double[] vector = selector.getDoubleVector(); + final boolean[] nullVector = selector.getNullVector(); + + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; + if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) { + final int position = positions[i] + positionOffset; + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + DoubleCardinalityAggregatorColumnSelectorStrategy.addDoubleToCollector(collector, vector[idx]); + } + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/FloatCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/FloatCardinalityVectorProcessor.java new file mode 100644 index 000000000000..1be4dd3d5952 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/FloatCardinalityVectorProcessor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.cardinality.vector; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.query.aggregation.cardinality.types.FloatCardinalityAggregatorColumnSelectorStrategy; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class FloatCardinalityVectorProcessor implements CardinalityVectorProcessor +{ + private final VectorValueSelector selector; + + public FloatCardinalityVectorProcessor(final VectorValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final float[] vector = selector.getFloatVector(); + final boolean[] nullVector = selector.getNullVector(); + + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + + for (int i = startRow; i < endRow; i++) { + if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) { + FloatCardinalityAggregatorColumnSelectorStrategy.addFloatToCollector(collector, vector[i]); + } + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final float[] vector = selector.getFloatVector(); + final boolean[] nullVector = selector.getNullVector(); + + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; + if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) { + final int position = positions[i] + positionOffset; + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + FloatCardinalityAggregatorColumnSelectorStrategy.addFloatToCollector(collector, vector[idx]); + } + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/LongCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/LongCardinalityVectorProcessor.java new file mode 100644 index 000000000000..f69ab5f8635b --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/LongCardinalityVectorProcessor.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.cardinality.vector; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.query.aggregation.cardinality.types.LongCardinalityAggregatorColumnSelectorStrategy; +import org.apache.druid.segment.vector.VectorValueSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class LongCardinalityVectorProcessor implements CardinalityVectorProcessor +{ + private final VectorValueSelector selector; + + public LongCardinalityVectorProcessor(final VectorValueSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final long[] vector = selector.getLongVector(); + final boolean[] nullVector = selector.getNullVector(); + + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + + for (int i = startRow; i < endRow; i++) { + if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[i]) { + LongCardinalityAggregatorColumnSelectorStrategy.addLongToCollector(collector, vector[i]); + } + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final long[] vector = selector.getLongVector(); + final boolean[] nullVector = selector.getNullVector(); + + for (int i = 0; i < numRows; i++) { + final int idx = rows != null ? rows[i] : i; + if (NullHandling.replaceWithDefault() || nullVector == null || !nullVector[idx]) { + final int position = positions[i] + positionOffset; + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + LongCardinalityAggregatorColumnSelectorStrategy.addLongToCollector(collector, vector[idx]); + } + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java new file mode 100644 index 000000000000..8b973dc21e85 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.cardinality.vector; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy; +import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class MultiValueStringCardinalityVectorProcessor implements CardinalityVectorProcessor +{ + private final MultiValueDimensionVectorSelector selector; + + public MultiValueStringCardinalityVectorProcessor(final MultiValueDimensionVectorSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final IndexedInts[] vector = selector.getRowVector(); + + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + + for (int i = startRow; i < endRow; i++) { + final IndexedInts ids = vector[i]; + final int sz = ids.size(); + + for (int j = 0 ; j < sz; j++) { + final String value = selector.lookupName(ids.get(j)); + StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, value); + } + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final IndexedInts[] vector = selector.getRowVector(); + + for (int i = 0; i < numRows; i++) { + final IndexedInts ids = vector[rows != null ? rows[i] : i]; + final int sz = ids.size(); + + for (int j = 0 ; j < sz; j++) { + final String s = selector.lookupName(ids.get(j)); + if (NullHandling.replaceWithDefault() || s != null) { + final int position = positions[i] + positionOffset; + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, s); + } + } + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java new file mode 100644 index 000000000000..42f1e0be50c0 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/NilCardinalityVectorProcessor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.cardinality.vector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class NilCardinalityVectorProcessor implements CardinalityVectorProcessor +{ + public static final NilCardinalityVectorProcessor INSTANCE = new NilCardinalityVectorProcessor(); + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + // Do nothing. + } + + @Override + public void aggregate( + ByteBuffer buf, + int numRows, + int[] positions, + @Nullable int[] rows, + int positionOffset + ) + { + // Do nothing. + } +} diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/SingleValueStringCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/SingleValueStringCardinalityVectorProcessor.java new file mode 100644 index 000000000000..080ce0495c1a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/SingleValueStringCardinalityVectorProcessor.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.cardinality.vector; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.query.aggregation.cardinality.types.StringCardinalityAggregatorColumnSelectorStrategy; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; + +public class SingleValueStringCardinalityVectorProcessor implements CardinalityVectorProcessor +{ + private final SingleValueDimensionVectorSelector selector; + + public SingleValueStringCardinalityVectorProcessor(final SingleValueDimensionVectorSelector selector) + { + this.selector = selector; + } + + @Override + public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final int[] vector = selector.getRowVector(); + + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + + for (int i = startRow; i < endRow; i++) { + final String value = selector.lookupName(vector[i]); + StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, value); + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } + + @Override + public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset) + { + // Save position, limit and restore later instead of allocating a new ByteBuffer object + final int oldPosition = buf.position(); + final int oldLimit = buf.limit(); + + try { + final int[] vector = selector.getRowVector(); + + for (int i = 0; i < numRows; i++) { + final String s = selector.lookupName(vector[rows != null ? rows[i] : i]); + + if (NullHandling.replaceWithDefault() || s != null) { + final int position = positions[i] + positionOffset; + buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage()); + buf.position(position); + final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf); + StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, s); + } + } + } + finally { + buf.limit(oldLimit); + buf.position(oldPosition); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregatorTest.java b/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregatorTest.java new file mode 100644 index 000000000000..0206d1156b5f --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/aggregation/cardinality/CardinalityVectorAggregatorTest.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.aggregation.cardinality; + +import org.apache.druid.common.config.NullHandling; +import org.apache.druid.hll.HyperLogLogCollector; +import org.apache.druid.query.aggregation.cardinality.vector.DoubleCardinalityVectorProcessor; +import org.apache.druid.query.aggregation.cardinality.vector.FloatCardinalityVectorProcessor; +import org.apache.druid.query.aggregation.cardinality.vector.LongCardinalityVectorProcessor; +import org.apache.druid.query.aggregation.cardinality.vector.MultiValueStringCardinalityVectorProcessor; +import org.apache.druid.query.aggregation.cardinality.vector.SingleValueStringCardinalityVectorProcessor; +import org.apache.druid.segment.IdLookup; +import org.apache.druid.segment.data.ArrayBasedIndexedInts; +import org.apache.druid.segment.data.IndexedInts; +import org.apache.druid.segment.vector.BaseDoubleVectorValueSelector; +import org.apache.druid.segment.vector.BaseFloatVectorValueSelector; +import org.apache.druid.segment.vector.BaseLongVectorValueSelector; +import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector; +import org.apache.druid.segment.vector.NoFilterVectorOffset; +import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.nio.ByteBuffer; +import java.util.Collections; + +public class CardinalityVectorAggregatorTest extends InitializedNullHandlingTest +{ + @Test + public void testAggregateLong() + { + final long[] values = {1, 2, 2, 3, 3, 3, 0}; + final boolean[] nulls = NullHandling.replaceWithDefault() + ? null + : new boolean[]{false, false, false, false, false, false, true}; + + final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator( + Collections.singletonList( + new LongCardinalityVectorProcessor( + new BaseLongVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length)) + { + @Override + public long[] getLongVector() + { + return values; + } + + @Nullable + @Override + public boolean[] getNullVector() + { + return nulls; + } + } + ) + ) + ); + + testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3); + } + + @Test + public void testAggregateDouble() + { + final double[] values = {1, 2, 2, 3, 3, 3, 0}; + final boolean[] nulls = NullHandling.replaceWithDefault() + ? null + : new boolean[]{false, false, false, false, false, false, true}; + + final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator( + Collections.singletonList( + new DoubleCardinalityVectorProcessor( + new BaseDoubleVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length)) + { + @Override + public double[] getDoubleVector() + { + return values; + } + + @Nullable + @Override + public boolean[] getNullVector() + { + return nulls; + } + } + ) + ) + ); + + testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3); + } + + @Test + public void testAggregateFloat() + { + final float[] values = {1, 2, 2, 3, 3, 3, 0}; + final boolean[] nulls = NullHandling.replaceWithDefault() + ? null + : new boolean[]{false, false, false, false, false, false, true}; + + final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator( + Collections.singletonList( + new FloatCardinalityVectorProcessor( + new BaseFloatVectorValueSelector(new NoFilterVectorOffset(values.length, 0, values.length)) + { + @Override + public float[] getFloatVector() + { + return values; + } + + @Nullable + @Override + public boolean[] getNullVector() + { + return nulls; + } + } + ) + ) + ); + + testAggregate(aggregator, values.length, NullHandling.replaceWithDefault() ? 4 : 3); + } + + @Test + public void testAggregateSingleValueString() + { + final int[] ids = {1, 2, 2, 3, 3, 3, 0}; + final String[] dict = {null, "abc", "def", "foo"}; + + final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator( + Collections.singletonList( + new SingleValueStringCardinalityVectorProcessor( + new SingleValueDimensionVectorSelector() + { + @Override + public int[] getRowVector() + { + return ids; + } + + @Override + public int getValueCardinality() + { + return dict.length; + } + + @Nullable + @Override + public String lookupName(int id) + { + return dict[id]; + } + + @Override + public boolean nameLookupPossibleInAdvance() + { + return true; + } + + @Nullable + @Override + public IdLookup idLookup() + { + return null; + } + + @Override + public int getMaxVectorSize() + { + return ids.length; + } + + @Override + public int getCurrentVectorSize() + { + return ids.length; + } + } + ) + ) + ); + + testAggregate(aggregator, ids.length, NullHandling.replaceWithDefault() ? 4 : 3); + } + + @Test + public void testAggregateMultiValueString() + { + final IndexedInts[] ids = { + new ArrayBasedIndexedInts(new int[]{1, 2}), + new ArrayBasedIndexedInts(new int[]{2, 3}), + new ArrayBasedIndexedInts(new int[]{3, 3}), + new ArrayBasedIndexedInts(new int[]{0}) + }; + + final String[] dict = {null, "abc", "def", "foo"}; + + final CardinalityVectorAggregator aggregator = new CardinalityVectorAggregator( + Collections.singletonList( + new MultiValueStringCardinalityVectorProcessor( + new MultiValueDimensionVectorSelector() + { + @Override + public IndexedInts[] getRowVector() + { + return ids; + } + + @Override + public int getValueCardinality() + { + return dict.length; + } + + @Nullable + @Override + public String lookupName(int id) + { + return dict[id]; + } + + @Override + public boolean nameLookupPossibleInAdvance() + { + return true; + } + + @Nullable + @Override + public IdLookup idLookup() + { + return null; + } + + @Override + public int getMaxVectorSize() + { + return ids.length; + } + + @Override + public int getCurrentVectorSize() + { + return ids.length; + } + } + ) + ) + ); + + testAggregate(aggregator, ids.length, NullHandling.replaceWithDefault() ? 4 : 3); + } + + private static void testAggregate( + final CardinalityVectorAggregator aggregator, + final int numRows, + final double expectedResult + ) + { + testAggregateStyle1(aggregator, numRows, expectedResult); + testAggregateStyle2(aggregator, numRows, expectedResult); + } + + private static void testAggregateStyle1( + final CardinalityVectorAggregator aggregator, + final int numRows, + final double expectedResult + ) + { + final int position = 1; + final ByteBuffer buf = ByteBuffer.allocate(HyperLogLogCollector.getLatestNumBytesForDenseStorage() + position); + aggregator.init(buf, position); + aggregator.aggregate(buf, position, 0, numRows); + + Assert.assertEquals( + "style1", + expectedResult, + ((HyperLogLogCollector) aggregator.get(buf, position)).estimateCardinality(), + 0.01 + ); + } + + private static void testAggregateStyle2( + final CardinalityVectorAggregator aggregator, + final int numRows, + final double expectedResult + ) + { + final int positionOffset = 1; + + final int aggregatorSize = HyperLogLogCollector.getLatestNumBytesForDenseStorage(); + final ByteBuffer buf = ByteBuffer.allocate(positionOffset + 2 * aggregatorSize); + aggregator.init(buf, positionOffset); + aggregator.init(buf, positionOffset + aggregatorSize); + + final int[] positions = new int[numRows]; + final int[] rows = new int[numRows]; + + for (int i = 0; i < numRows; i++) { + positions[i] = (i % 2) * aggregatorSize; + rows[i] = (i + 1) % numRows; + } + + aggregator.aggregate(buf, numRows, positions, rows, positionOffset); + + Assert.assertEquals( + "style2", + expectedResult, + ((HyperLogLogCollector) aggregator.get(buf, positionOffset)) + .fold((HyperLogLogCollector) aggregator.get(buf, positionOffset + aggregatorSize)) + .estimateCardinality(), + 0.01 + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index 8d0c8648b0b4..4139c8ca4723 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -2566,9 +2566,6 @@ public void testGroupByWithUniquesAndPostAggWithSameName() @Test public void testGroupByWithCardinality() { - // Cannot vectorize due to "cardinality" aggregator. - cannotVectorize(); - GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) @@ -2684,7 +2681,7 @@ public void testGroupByWithFirstLast() @Test public void testGroupByWithNoResult() { - // Cannot vectorize due to "cardinality" aggregator. + // Cannot vectorize due to first, last aggregators. cannotVectorize(); GroupByQuery query = makeQueryBuilder() @@ -8684,7 +8681,7 @@ public void testGroupByWithAllFiltersOnNullDimsWithExtractionFns() @Test public void testGroupByCardinalityAggWithExtractionFn() { - // Cannot vectorize due to "cardinality" aggregator. + // Cannot vectorize due to extraction dimension spec. cannotVectorize(); String helloJsFn = "function(str) { return 'hello' }"; @@ -8776,9 +8773,6 @@ public void testGroupByCardinalityAggWithExtractionFn() @Test public void testGroupByCardinalityAggOnFloat() { - // Cannot vectorize due to "cardinality" aggregator. - cannotVectorize(); - GroupByQuery query = makeQueryBuilder() .setDataSource(QueryRunnerTestHelper.DATA_SOURCE) .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 0ce2d2bd901f..cf30419c1ae7 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -3459,9 +3459,6 @@ public void testHavingOnDoubleSum() throws Exception @Test public void testHavingOnApproximateCountDistinct() throws Exception { - // Cannot vectorize due to "cardinality" aggregator. - cannotVectorize(); - testQuery( "SELECT dim2, COUNT(DISTINCT m1) FROM druid.foo GROUP BY dim2 HAVING COUNT(DISTINCT m1) > 1", ImmutableList.of( @@ -6269,9 +6266,6 @@ public void testGroupByWithSortOnPostAggregationNoTopNContext() throws Exception @Test public void testFilteredAggregations() throws Exception { - // Cannot vectorize due to "cardinality" aggregator. - cannotVectorize(); - testQuery( "SELECT " + "SUM(case dim1 when 'abc' then cnt end), " @@ -7657,9 +7651,6 @@ public void testSelectAggregatingWithLimitReducedToZero() throws Exception @Test public void testCountDistinct() throws Exception { - // Cannot vectorize due to "cardinality" aggregator. - cannotVectorize(); - testQuery( "SELECT SUM(cnt), COUNT(distinct dim2), COUNT(distinct unique_dim1) FROM druid.foo", ImmutableList.of( @@ -7692,9 +7683,6 @@ public void testCountDistinct() throws Exception @Test public void testCountDistinctOfCaseWhen() throws Exception { - // Cannot vectorize due to "cardinality" aggregator. - cannotVectorize(); - testQuery( "SELECT\n" + "COUNT(DISTINCT CASE WHEN m1 >= 4 THEN m1 END),\n" @@ -7787,9 +7775,6 @@ public void testApproxCountDistinctWhenHllDisabled() throws Exception { // When HLL is disabled, APPROX_COUNT_DISTINCT is still approximate. - // Cannot vectorize due to "cardinality" aggregator. - cannotVectorize(); - testQuery( PLANNER_CONFIG_NO_HLL, "SELECT APPROX_COUNT_DISTINCT(dim2) FROM druid.foo", @@ -8362,7 +8347,7 @@ public void testMinMaxAvgDailyCountWithLimit() throws Exception @Test public void testAvgDailyCountDistinct() throws Exception { - // Cannot vectorize due to virtual columns. + // Cannot vectorize outer query due to inlined inner query. cannotVectorize(); testQuery( @@ -9034,9 +9019,6 @@ public void testHistogramUsingSubqueryWithSort() throws Exception @Test public void testCountDistinctArithmetic() throws Exception { - // Cannot vectorize due to "cardinality" aggregator. - cannotVectorize(); - testQuery( "SELECT\n" + " SUM(cnt),\n" @@ -9081,7 +9063,7 @@ public void testCountDistinctArithmetic() throws Exception @Test public void testCountDistinctOfSubstring() throws Exception { - // Cannot vectorize due to "cardinality" aggregator. + // Cannot vectorize due to extraction dimension spec. cannotVectorize(); testQuery( @@ -11808,7 +11790,7 @@ public void testSelectOnLookupUsingFullJoinOperator(Map queryCon @Test public void testCountDistinctOfLookup() throws Exception { - // Cannot vectorize due to "cardinality" aggregator. + // Cannot vectorize due to extraction dimension spec. cannotVectorize(); final RegisteredLookupExtractionFn extractionFn = new RegisteredLookupExtractionFn( @@ -15219,7 +15201,7 @@ public void testRequireTimeConditionPositive() throws Exception ) ); - // Cannot vectorize next test due to "cardinality" aggregator. + // Cannot vectorize next test due to extraction dimension spec. cannotVectorize(); // semi-join requires time condition on both left and right query From 21d9e58001af8cbdd6ab0e1aaaea9c77f15a15d4 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 29 Apr 2021 14:57:51 -0700 Subject: [PATCH 2/2] Adjust whitespace. --- .../vector/MultiValueStringCardinalityVectorProcessor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java index 8b973dc21e85..1ccee04ad02d 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/cardinality/vector/MultiValueStringCardinalityVectorProcessor.java @@ -56,7 +56,7 @@ public void aggregate(ByteBuffer buf, int position, int startRow, int endRow) final IndexedInts ids = vector[i]; final int sz = ids.size(); - for (int j = 0 ; j < sz; j++) { + for (int j = 0; j < sz; j++) { final String value = selector.lookupName(ids.get(j)); StringCardinalityAggregatorColumnSelectorStrategy.addStringToCollector(collector, value); } @@ -82,7 +82,7 @@ public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable in final IndexedInts ids = vector[rows != null ? rows[i] : i]; final int sz = ids.size(); - for (int j = 0 ; j < sz; j++) { + for (int j = 0; j < sz; j++) { final String s = selector.lookupName(ids.get(j)); if (NullHandling.replaceWithDefault() || s != null) { final int position = positions[i] + positionOffset;