Skip to content

Commit

Permalink
refactor druid-bloom-filter aggregators (#7496)
Browse files Browse the repository at this point in the history
* now with 100% more buffer

* there can be only 1

* simplify

* javadoc

* clean up unused test method

* fix exception message

* style

* why does style hate javadocs

* review stuff

* style :(
  • Loading branch information
clintropolis authored and gianm committed Apr 18, 2019
1 parent 99ddce1 commit be65cca
Show file tree
Hide file tree
Showing 21 changed files with 294 additions and 597 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,119 @@
package org.apache.druid.query.aggregation.bloom;

import org.apache.druid.query.aggregation.Aggregator;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.query.filter.BloomKFilter;
import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
import org.apache.druid.segment.BaseNullableColumnValueSelector;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;

public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector> implements Aggregator
/**
* All bloom filter aggregations are done using {@link ByteBuffer}, so this base class implements both {@link Aggregator}
* and {@link BufferAggregator}.
*
* If used as an {@link Aggregator} the caller MUST specify the 'onHeap' parameter in the
* constructor as "true", or else the "collector" will not be allocated and null pointer exceptions will make things sad.
*
* If used as a {@link BufferAggregator}, the "collector" buffer is not necessary, and should be called with "false",
* but at least nothing dramatic will happen like incorrect use in the {@link Aggregator} case.
*
* {@link BloomFilterAggregatorFactory} and {@link BloomFilterMergeAggregatorFactory}, which should be the creators of
* all implementations of {@link BaseBloomFilterAggregator} outside of tests, should be sure to set the 'onHeap' value
* to "true" and "false" respectively for
* {@link org.apache.druid.query.aggregation.AggregatorFactory#factorize} and
* {@link org.apache.druid.query.aggregation.AggregatorFactory#factorizeBuffered}
*
* @param <TSelector> type of {@link BaseNullableColumnValueSelector} that feeds this aggregator, likely either values
* to add to a bloom filter, or other bloom filters to merge into this bloom filter.
*/
public abstract class BaseBloomFilterAggregator<TSelector extends BaseNullableColumnValueSelector>
implements BufferAggregator, Aggregator
{
final BloomKFilter collector;
@Nullable
private final ByteBuffer collector;
protected final int maxNumEntries;
protected final TSelector selector;

BaseBloomFilterAggregator(TSelector selector, BloomKFilter collector)
/**
* @param selector selector that feeds values to the aggregator
* @param maxNumEntries maximum number of entries that can be added to a bloom filter before accuracy degrades rapidly
* @param onHeap allocate a ByteBuffer "collector" to use as an {@link Aggregator}
*/
BaseBloomFilterAggregator(TSelector selector, int maxNumEntries, boolean onHeap)
{
this.collector = collector;
this.selector = selector;
this.maxNumEntries = maxNumEntries;
if (onHeap) {
BloomKFilter bloomFilter = new BloomKFilter(maxNumEntries);
this.collector = ByteBuffer.allocate(BloomKFilter.computeSizeBytes(maxNumEntries));
BloomKFilter.serialize(collector, bloomFilter);
} else {
collector = null;
}
}

abstract void bufferAdd(ByteBuffer buf);

@Override
public void init(ByteBuffer buf, int position)
{
final ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
BloomKFilter filter = new BloomKFilter(maxNumEntries);
BloomKFilter.serialize(mutationBuffer, filter);
}

@Override
public void aggregate(ByteBuffer buf, int position)
{
final int oldPosition = buf.position();
try {
buf.position(position);
bufferAdd(buf);
}
finally {
buf.position(oldPosition);
}
}

@Override
public Object get(ByteBuffer buf, int position)
{
ByteBuffer mutationBuffer = buf.duplicate();
mutationBuffer.position(position);
int sizeBytes = BloomKFilter.computeSizeBytes(maxNumEntries);
mutationBuffer.limit(position + sizeBytes);

ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes);
resultCopy.put(mutationBuffer.slice());
resultCopy.rewind();
return resultCopy;
}

@Override
public float getFloat(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("BloomFilterAggregator does not support getFloat()");
}

@Override
public long getLong(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("BloomFilterAggregator does not support getLong()");
}

@Override
public double getDouble(ByteBuffer buf, int position)
{
throw new UnsupportedOperationException("BloomFilterAggregator does not support getDouble()");
}

@Override
public void aggregate()
{
aggregate(collector, 0);
}

@Nullable
Expand Down Expand Up @@ -66,4 +165,10 @@ public void close()
{
// nothing to close
}

@Override
public void inspectRuntimeShape(RuntimeShapeInspector inspector)
{
inspector.visit("selector", selector);
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit be65cca

Please sign in to comment.