Skip to content

Commit

Permalink
Optimize merging a Dense HLL with a Sparse HLL
Browse files Browse the repository at this point in the history
Add dedicated method for merging Dense vs Sparse that just applies
the values of the Sparse HLL instead of walking all the buckets
of the Dense HLL.

Results:

Dense vs Dense:  11.677 ± 0.788  us/op
Dense vs Sparse:  0.320 ± 0.013  us/op
  • Loading branch information
martint committed Dec 2, 2019
1 parent 4c44233 commit 0f59185
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 8 deletions.
@@ -0,0 +1,19 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.stats.cardinality;

public interface BucketListener
{
void visit(int bucket, int value);
}
17 changes: 17 additions & 0 deletions stats/src/main/java/io/airlift/stats/cardinality/DenseHll.java
Expand Up @@ -498,6 +498,23 @@ public DenseHll mergeWith(DenseHll other)
return this;
}

/**
* Returns "this" for chaining
*/
public DenseHll mergeWith(SparseHll other)
{
if (indexBitLength != other.getIndexBitLength()) {
throw new IllegalArgumentException(String.format(
"Cannot merge HLLs with different number of buckets: %s vs %s",
numberOfBuckets(indexBitLength),
numberOfBuckets(other.getIndexBitLength())));
}

other.eachBucket(this::insert);

return this;
}

private int updateOverflow(int bucket, int overflowEntry, int delta)
{
if (delta > MAX_DELTA) {
Expand Down
Expand Up @@ -84,6 +84,9 @@ public void mergeWith(HyperLogLog other)
((SparseHll) instance).mergeWith((SparseHll) other.instance);
instance = makeDenseIfNecessary((SparseHll) instance);
}
else if (instance instanceof DenseHll && other.instance instanceof SparseHll) {
((DenseHll) instance).mergeWith((SparseHll) other.instance);
}
else {
DenseHll dense = instance.toDense();
dense.mergeWith(other.instance.toDense());
Expand Down
Expand Up @@ -151,7 +151,12 @@ public void mergeWith(SparseHll other)
public DenseHll toDense()
{
DenseHll result = new DenseHll(indexBitLength);
eachBucket(result::insert);
return result;
}

public void eachBucket(BucketListener listener)
{
for (int i = 0; i < numberOfEntries; i++) {
int entry = entries[i];

Expand All @@ -170,10 +175,8 @@ public DenseHll toDense()
zeros = bits + decodeBucketValue(entry);
}

result.insert(bucket, zeros + 1); // + 1 because HLL stores leading number of zeros + 1
listener.visit(bucket, zeros + 1);
}

return result;
}

@Override
Expand Down
Expand Up @@ -52,15 +52,21 @@ public DenseHll benchmarkInsert(InsertData data)
}

@Benchmark
public DenseHll benchmarkMerge(MergeData data)
public DenseHll benchmarkMergeWithDense(MergeWithDenseData data)
{
return data.base.mergeWith(data.toMerge);
}

@Benchmark
public DenseHll benchmarkMergeWithSparse(MergeWithSparseData data)
{
return data.base.mergeWith(data.toMerge);
}

@State(Scope.Thread)
public static class InsertData
{
public final DenseHll instance = new DenseHll(11);
public final DenseHll instance = new DenseHll(12);
public final long[] hashes = new long[500];

@Setup(Level.Iteration)
Expand All @@ -73,23 +79,48 @@ public void initialize()
}

@State(Scope.Thread)
public static class MergeData
public static class MergeWithDenseData
{
public DenseHll base;
public DenseHll toMerge;

@Setup(Level.Iteration)
public void initialize()
{
base = new DenseHll(12);
DenseHll hll = new DenseHll(12);
for (int i = 0; i < 1_000_000; i++) {
base.insertHash(ThreadLocalRandom.current().nextLong());
hll.insertHash(ThreadLocalRandom.current().nextLong());
}

toMerge = new DenseHll(12);
for (int i = 0; i < 100; i++) {
toMerge.insertHash(ThreadLocalRandom.current().nextLong());
}

this.base = hll;
}
}

@State(Scope.Thread)
public static class MergeWithSparseData
{
public DenseHll base;
public SparseHll toMerge;

@Setup(Level.Iteration)
public void initialize()
{
DenseHll hll = new DenseHll(12);
for (int i = 0; i < 1_000_000; i++) {
hll.insertHash(ThreadLocalRandom.current().nextLong());
}

toMerge = new SparseHll(12);
for (int i = 0; i < 100; i++) {
toMerge.insertHash(ThreadLocalRandom.current().nextLong());
}

this.base = hll;
}
}

Expand Down

0 comments on commit 0f59185

Please sign in to comment.