-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor druid-bloom-filter aggregators #7496
refactor druid-bloom-filter aggregators #7496
Conversation
Going to tag this as 'bug' too since it does fix a bug - right now the type mismatch between what |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some suggestions and just 1 thing I think really should change (the try/finally).
{ | ||
final BloomKFilter collector; | ||
|
||
protected final ByteBuffer collector; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you annotate this @Nullable
please?
final int oldPosition = buf.position(); | ||
buf.position(position); | ||
bufferAdd(buf); | ||
buf.position(oldPosition); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The position adjustment and restore should be wrapped in a try/finally, because we don't want the buffer to be left in an invalid state if an exception is thrown.
{ | ||
final BloomKFilter collector; | ||
|
||
protected final ByteBuffer collector; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please mark this @Nullable
.
{ | ||
ByteBuffer mutationBuffer = buf.duplicate(); | ||
mutationBuffer.position(position); | ||
// | k (byte) | numLongs (int) | bitset (long[numLongs]) | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment would make more sense attached to "computeSizeBytes"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, yes i switched this method to using computeSizeBytes
and forgot to remove the comment (previously it was calculating by hand)
|
||
public class BloomFilterAggregateCombiner extends ObjectAggregateCombiner<BloomKFilter> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AggregateCombiner is only used at ingestion time AFAIK. Since this aggregator isn't meant to be used at ingestion time, you might as well delete this and make makeAggregateCombiner
throw an UnsupportedOperationException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't certain so left it in the original PR. However, all of my tests pass with it removed, as well as test queries against a debugging cluster, so I think you're right and it should likely be ok removed.
@@ -23,20 +23,22 @@ | |||
import org.apache.druid.query.filter.BloomKFilter; | |||
import org.apache.druid.segment.BaseDoubleColumnValueSelector; | |||
|
|||
import java.nio.ByteBuffer; | |||
|
|||
public final class DoubleBloomFilterAggregator extends BaseBloomFilterAggregator<BaseDoubleColumnValueSelector> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you could collapse all of these implementations into a single one using a similar technique to CardinalityAggregator. (ColumnSelectorPluses encapsulating a strategy for reading different types)
No need to do it for this PR, IMO, unless you want to. Just wanted to mention the technique.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Heh, originally the bloom filter aggregators were done like that, refactored in the original PR in this commit, but it actually takes more classes to implement it that way (especially the way it is in this PR), and puts more indirection between the aggregators which doesn't really seem necessary. See this comment chain original PR for reference, and the issue spawned from it, #6909 for more details. I think I find it cleaner this way personally, especially after the changes of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. Well, I think the way the CardinalityAggregator does it is cleaner, since it only splits out the logic for how to read from differently-typed inputs, and then composes that logic into a single Aggregator class, instead of using inheritance and a ton of Aggregator subclasses. I just generally find inheritance based structures less easy to follow since the logic seems 'inside out' to me.
But, I guess if reasonable people could differ on this point, it's up to you how you want to structure it. I don't think this inheritance based approach is better than the type-based-input-strategy approach, but it's acceptable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By the way, one original reason for the CardinalityAggregator-style design choice is that the logic for choosing which implementation to use could be centralized / standardized. The equivalent of this code here: https://github.com/apache/incubator-druid/pull/7496/files#diff-06cafba60d560f4a5bb1551a56b8041dR99 is this code in CardinalityAggregatorFactory:
ColumnSelectorPlus<CardinalityAggregatorColumnSelectorStrategy>[] selectorPluses =
DimensionHandlerUtils.createColumnSelectorPluses(
STRATEGY_FACTORY,
fields,
columnFactory
);
Which calls out to a standard library function. One other cool thing we can do here is modify the ColumnSelectorStrategyFactory interface to look more like this:
public interface ColumnStrategizer<T>
{
T makeDimensionStrategy(DimensionSelector selector);
T makeFloatStrategy(ColumnValueSelector selector);
T makeDoubleStrategy(ColumnValueSelector selector);
T makeLongStrategy(ColumnValueSelector selector);
}
That way, there's a structure in place to make sure every type has some sort of handling. Or if we used default methods, at least consistent exceptions being thrown when types are missing handling. It isn't like this yet, but it could be, and I think it would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel super strongly either way, but I think it is slightly fewer classes the way it is in this PR since just the base agg and the subclasses with the selector specific handling, instead of the agg, strategy factory, and strategy for each selector. Maybe there is another abstraction somewhere in between these two approaches, or maybe a way to just supply strategies to a common strategy factory so you don't have to do that strategy factory boilerplate? I'll think about this a bit more and maybe follow up in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For logic as simple as this case, the strategies could probably all be lambdas inlined inside the strategy factory.
@clintropolis checkstyle has some words for you:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after the style issues / CI addressed
Tagged 0.15 since it is a bug. |
* now with 100% more buffer * there can be only 1 * simplify * javadoc * clean up unused test method * fix exception message * style * why does style hate javadocs * review stuff * style :(
* now with 100% more buffer * there can be only 1 * simplify * javadoc * clean up unused test method * fix exception message * style * why does style hate javadocs * review stuff * style :(
This PR refactors the
druid-bloom-filter
extension aggregators to strictly useByteBuffer
representation ofBloomKFilter
and buffer based methods to manipulate them in place, allowing for combinedAggregator
andBufferAggregator
implementations and vastly simplifying the code. It also fixes a bug in thecombine
logic of the current implementation, resulting from the mixed use ofBloomKFilter
andByteBuffer
(the current implementation only can combineBloomKFilter
, but the broker will receivebyte[]
from the historicals, resulting in a cast exception).Originally introduced in #6397, the PR sort of grew organically over its 41 commit lifetime; from starting out where it only dealt with
BloomKFilter
on heap and suffered the overhead of a lot of serde, to where it ended up, which mixed usage ofBloomKFilter
andByteBuffer
after adding some methods to optimize theBufferAggregator
implementation, done in an attempt to reduce the overhead of serializing and deserializing such potentially large values so often, and picking up a bug or two along the way.This new approach combines implementations of both types of the aggregators and now always operates with
ByteBuffer
. TheBloomFilterAggregatorFactory
andBloomFilterMergeAggregatorFactory
construct theAggregator
andBufferAggregator
implementations with a perhaps not greatly named boolean parameter on the constructor,onHeap
. IfonHeap
is set totrue
, the aggregator will allocate an appropriately sizedByteBuffer
to hold aBloomKFilter
allow usage as anAggregator
, and if not must rely on theByteBuffer
that will be passed to it's methods during it's life as aBufferAggregator
.There is probably room for improvement to make this a bit more elegant, maybe making the aggregator constructors private and adding static methods to more explicitly construct either an
Aggregator
withonHeap = true
, or aBufferAggregator
withonHeap = false
? Regardless, this eliminates all of the extra serde and should be a lot more simple to reason about and troubleshoot.I have tested with top-n, timeseries, and group-by queries on a small test cluster with multiple-historical spun up on my laptop, and things look good so far.