Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorize the cardinality aggregator. #11182

Merged
merged 2 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,21 @@
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.aggregation.NoopAggregator;
import org.apache.druid.query.aggregation.NoopBufferAggregator;
import org.apache.druid.query.aggregation.NoopVectorAggregator;
import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategy;
import org.apache.druid.query.aggregation.cardinality.types.CardinalityAggregatorColumnSelectorStrategyFactory;
import org.apache.druid.query.aggregation.cardinality.vector.CardinalityVectorProcessorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.query.cache.CacheKeyBuilder;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.dimension.DimensionSpec;
import org.apache.druid.segment.ColumnInspector;
import org.apache.druid.segment.ColumnProcessors;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.vector.VectorColumnSelectorFactory;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -146,7 +152,6 @@ public Aggregator factorize(final ColumnSelectorFactory columnFactory)
return new CardinalityAggregator(selectorPluses, byRow);
}


@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
{
Expand All @@ -163,6 +168,32 @@ public BufferAggregator factorizeBuffered(ColumnSelectorFactory columnFactory)
return new CardinalityBufferAggregator(selectorPluses, byRow);
}

@Override
public VectorAggregator factorizeVector(VectorColumnSelectorFactory selectorFactory)
{
if (fields.isEmpty()) {
return NoopVectorAggregator.instance();
}

return new CardinalityVectorAggregator(
fields.stream().map(
field ->
ColumnProcessors.makeVectorProcessor(
field,
CardinalityVectorProcessorFactory.INSTANCE,
selectorFactory
)
).collect(Collectors.toList())
);
}

@Override
public boolean canVectorize(ColumnInspector columnInspector)
{
// !byRow because there is not yet a vector implementation.
return !byRow && fields.stream().allMatch(DimensionSpec::canVectorize);
}

@Override
public Comparator getComparator()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ public void aggregate(ByteBuffer buf, int position)
// Save position, limit and restore later instead of allocating a new ByteBuffer object
final int oldPosition = buf.position();
final int oldLimit = buf.limit();
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
buf.position(position);

try {
buf.limit(position + HyperLogLogCollector.getLatestNumBytesForDenseStorage());
buf.position(position);

final HyperLogLogCollector collector = HyperLogLogCollector.makeCollector(buf);
if (byRow) {
CardinalityAggregator.hashRow(selectorPluses, collector);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.cardinality;

import org.apache.druid.query.aggregation.VectorAggregator;
import org.apache.druid.query.aggregation.cardinality.vector.CardinalityVectorProcessor;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesBufferAggregator;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
import java.util.List;

public class CardinalityVectorAggregator implements VectorAggregator
{
private final List<CardinalityVectorProcessor> processors;

CardinalityVectorAggregator(List<CardinalityVectorProcessor> processors)
{
this.processors = processors;
}

@Override
public void init(ByteBuffer buf, int position)
{
HyperUniquesBufferAggregator.doInit(buf, position);
}

@Override
public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
{
for (final CardinalityVectorProcessor processor : processors) {
processor.aggregate(buf, position, startRow, endRow);
}
}

@Override
public void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset)
{
for (final CardinalityVectorProcessor processor : processors) {
processor.aggregate(buf, numRows, positions, rows, positionOffset);
}
}

@Nullable
@Override
public Object get(ByteBuffer buf, int position)
{
return HyperUniquesBufferAggregator.doGet(buf, position);
}

@Override
public void close()
{
// Nothing to close.
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
public class DoubleCardinalityAggregatorColumnSelectorStrategy
implements CardinalityAggregatorColumnSelectorStrategy<BaseDoubleColumnValueSelector>
{
public static void addDoubleToCollector(final HyperLogLogCollector collector, final double n)
{
collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits(n)).asBytes());
}

@Override
public void hashRow(BaseDoubleColumnValueSelector selector, Hasher hasher)
{
Expand All @@ -46,7 +51,7 @@ public void hashRow(BaseDoubleColumnValueSelector selector, Hasher hasher)
public void hashValues(BaseDoubleColumnValueSelector selector, HyperLogLogCollector collector)
{
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(Double.doubleToLongBits(selector.getDouble())).asBytes());
addDoubleToCollector(collector, selector.getDouble());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
public class FloatCardinalityAggregatorColumnSelectorStrategy
implements CardinalityAggregatorColumnSelectorStrategy<BaseFloatColumnValueSelector>
{
public static void addFloatToCollector(final HyperLogLogCollector collector, final float n)
{
collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits(n)).asBytes());
}

@Override
public void hashRow(BaseFloatColumnValueSelector selector, Hasher hasher)
{
Expand All @@ -46,7 +51,7 @@ public void hashRow(BaseFloatColumnValueSelector selector, Hasher hasher)
public void hashValues(BaseFloatColumnValueSelector selector, HyperLogLogCollector collector)
{
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
collector.add(CardinalityAggregator.HASH_FUNCTION.hashInt(Float.floatToIntBits(selector.getFloat())).asBytes());
addFloatToCollector(collector, selector.getFloat());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
public class LongCardinalityAggregatorColumnSelectorStrategy
implements CardinalityAggregatorColumnSelectorStrategy<BaseLongColumnValueSelector>
{
public static void addLongToCollector(final HyperLogLogCollector collector, final long n)
{
collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(n).asBytes());
}

@Override
public void hashRow(BaseLongColumnValueSelector selector, Hasher hasher)
{
Expand All @@ -46,7 +51,7 @@ public void hashRow(BaseLongColumnValueSelector selector, Hasher hasher)
public void hashValues(BaseLongColumnValueSelector selector, HyperLogLogCollector collector)
{
if (NullHandling.replaceWithDefault() || !selector.isNull()) {
collector.add(CardinalityAggregator.HASH_FUNCTION.hashLong(selector.getLong()).asBytes());
addLongToCollector(collector, selector.getLong());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,24 @@
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.data.IndexedInts;

import javax.annotation.Nullable;
import java.util.Arrays;

public class StringCardinalityAggregatorColumnSelectorStrategy implements CardinalityAggregatorColumnSelectorStrategy<DimensionSelector>
{
public static final String CARDINALITY_AGG_NULL_STRING = "\u0000";
public static final char CARDINALITY_AGG_SEPARATOR = '\u0001';

public static void addStringToCollector(final HyperLogLogCollector collector, @Nullable final String s)
{
// SQL standard spec does not count null values,
// Skip counting null values when we are not replacing null with default value.
// A special value for null in case null handling is configured to use empty string for null.
if (NullHandling.replaceWithDefault() || s != null) {
collector.add(CardinalityAggregator.HASH_FUNCTION.hashUnencodedChars(nullToSpecial(s)).asBytes());
}
}

@Override
public void hashRow(DimensionSelector dimSelector, Hasher hasher)
{
Expand Down Expand Up @@ -80,16 +91,11 @@ public void hashValues(DimensionSelector dimSelector, HyperLogLogCollector colle
for (int i = 0, rowSize = row.size(); i < rowSize; i++) {
int index = row.get(i);
final String value = dimSelector.lookupName(index);
// SQL standard spec does not count null values,
// Skip counting null values when we are not replacing null with default value.
// A special value for null in case null handling is configured to use empty string for null.
if (NullHandling.replaceWithDefault() || value != null) {
collector.add(CardinalityAggregator.HASH_FUNCTION.hashUnencodedChars(nullToSpecial(value)).asBytes());
}
addStringToCollector(collector, value);
}
}

private String nullToSpecial(String value)
private static String nullToSpecial(String value)
{
return value == null ? CARDINALITY_AGG_NULL_STRING : value;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.cardinality.vector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;

/**
* Processor for {@link org.apache.druid.query.aggregation.cardinality.CardinalityVectorAggregator}.
*/
public interface CardinalityVectorProcessor
{
/**
* Processor for {@link org.apache.druid.query.aggregation.VectorAggregator#aggregate(ByteBuffer, int, int, int)}
* in byRow = false mode.
*/
void aggregate(ByteBuffer buf, int position, int startRow, int endRow);

/**
* Processor for {@link org.apache.druid.query.aggregation.VectorAggregator#aggregate(ByteBuffer, int, int, int)}
* in byRow = false mode.
*/
void aggregate(ByteBuffer buf, int numRows, int[] positions, @Nullable int[] rows, int positionOffset);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query.aggregation.cardinality.vector;

import org.apache.druid.segment.VectorColumnProcessorFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
import org.apache.druid.segment.vector.VectorObjectSelector;
import org.apache.druid.segment.vector.VectorValueSelector;

public class CardinalityVectorProcessorFactory implements VectorColumnProcessorFactory<CardinalityVectorProcessor>
{
public static final CardinalityVectorProcessorFactory INSTANCE = new CardinalityVectorProcessorFactory();

@Override
public CardinalityVectorProcessor makeSingleValueDimensionProcessor(
ColumnCapabilities capabilities,
SingleValueDimensionVectorSelector selector
)
{
return new SingleValueStringCardinalityVectorProcessor(selector);
}

@Override
public CardinalityVectorProcessor makeMultiValueDimensionProcessor(
ColumnCapabilities capabilities,
MultiValueDimensionVectorSelector selector
)
{
return new MultiValueStringCardinalityVectorProcessor(selector);
}

@Override
public CardinalityVectorProcessor makeFloatProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
{
return new FloatCardinalityVectorProcessor(selector);
}

@Override
public CardinalityVectorProcessor makeDoubleProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
{
return new DoubleCardinalityVectorProcessor(selector);
}

@Override
public CardinalityVectorProcessor makeLongProcessor(ColumnCapabilities capabilities, VectorValueSelector selector)
{
return new LongCardinalityVectorProcessor(selector);
}

@Override
public CardinalityVectorProcessor makeObjectProcessor(ColumnCapabilities capabilities, VectorObjectSelector selector)
{
return NilCardinalityVectorProcessor.INSTANCE;
}
}
Loading