Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Dense HLL merge performance #779

Merged
merged 4 commits into from Dec 3, 2019
Merged
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -0,0 +1,19 @@
/*
This conversation was marked as resolved by martint

This comment has been minimized.

Copy link
@sopel39

sopel39 Dec 2, 2019

Contributor

Do you have results from before?

This comment has been minimized.

Copy link
@martint

martint Dec 2, 2019

Author Member

Before, we'd convert the sparse HLL into a dense HLL and then merge it. So the performance is approximately what the first benchmark showed (there's some extra cost in the conversion that we're not measuring):

21.557 ± 1.116 us/op for current implementation,
11.677 ± 0.788 us/op after Dense HLL merging is optimized,
0.320 ± 0.013 us/op after dedicated merge implementation for Dense vs Sparse

* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.airlift.stats.cardinality;

public interface BucketListener
{
void visit(int bucket, int value);
}
@@ -249,16 +249,14 @@ public void insert(int bucket, int value)
}

if (delta > MAX_DELTA) {
int overflow = delta - MAX_DELTA;
byte overflow = (byte) (delta - MAX_DELTA);

if (!setOverflow(bucket, overflow)) {
// grow overflows arrays if necessary
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT);
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT);

overflowBuckets[overflows] = bucket;
overflowValues[overflows] = (byte) overflow;
overflows++;
int overflowEntry = findOverflowEntry(bucket);
if (overflowEntry != -1) {
This conversation was marked as resolved by martint

This comment has been minimized.

Copy link
@sopel39

sopel39 Dec 2, 2019

Contributor

Why this makes it faster? It introduces branching

This comment has been minimized.

Copy link
@martint

martint Dec 2, 2019

Author Member

delta > MAX_DELTA is statistically unlikely, so this branch doesn't hit often. In any case, the old code already had a branch if (!setOverflow(...)). This just moves things around a bit.

setOverflow(overflowEntry, overflow);
}
else {
addOverflow(bucket, overflow);
}

delta = MAX_DELTA;
@@ -272,30 +270,6 @@ public void insert(int bucket, int value)
}
}

private int getOverflow(int bucket)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
return overflowValues[i];
}
}
return 0;
}

/**
* Returns false if not overflow bucket matching the given bucket id was found
*/
private boolean setOverflow(int bucket, int overflow)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
overflowValues[i] = (byte) overflow;
return true;
}
}
return false;
}

public Slice serialize()
{
int size = estimatedSerializedSize();
@@ -448,42 +422,54 @@ public DenseHll mergeWith(DenseHll other)
numberOfBuckets(other.indexBitLength)));
}

int baseline = Math.max(this.baseline, other.baseline);
int newBaseline = Math.max(this.baseline, other.baseline);
int baselineCount = 0;

int overflows = 0;
int[] overflowBuckets = new int[OVERFLOW_GROW_INCREMENT];
byte[] overflowValues = new byte[OVERFLOW_GROW_INCREMENT];
int bucket = 0;
for (int i = 0; i < deltas.length; i++) {
int newSlot = 0;

int numberOfBuckets = numberOfBuckets(indexBitLength);
for (int i = 0; i < numberOfBuckets; i++) {
int value = Math.max(getValue(i), other.getValue(i));
byte slot1 = deltas[i];
byte slot2 = other.deltas[i];

int delta = value - baseline;
if (delta == 0) {
baselineCount++;
}
else if (delta > MAX_DELTA) {
// grow overflows arrays if necessary
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT);
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT);
for (int shift = 4; shift >= 0; shift -= 4) {
int delta1 = (slot1 >>> shift) & 0b1111;
int delta2 = (slot2 >>> shift) & 0b1111;

overflowBuckets[overflows] = i;
overflowValues[overflows] = (byte) (delta - MAX_DELTA);
int value1 = this.baseline + delta1;
int value2 = other.baseline + delta2;

overflows++;
int overflowEntry = -1;
if (delta1 == MAX_DELTA) {
overflowEntry = findOverflowEntry(bucket);
if (overflowEntry != -1) {
value1 += overflowValues[overflowEntry];
}
}

if (delta2 == MAX_DELTA) {
value2 += other.getOverflow(bucket);
}

int newValue = Math.max(value1, value2);
int newDelta = newValue - newBaseline;

if (newDelta == 0) {
baselineCount++;
}

delta = MAX_DELTA;
newDelta = updateOverflow(bucket, overflowEntry, newDelta);

newSlot <<= 4;
newSlot |= newDelta;
bucket++;
}

setDelta(i, delta);
this.deltas[i] = (byte) newSlot;
}

this.baseline = (byte) baseline;
this.baseline = (byte) newBaseline;
this.baselineCount = baselineCount;
this.overflows = overflows;
this.overflowBuckets = overflowBuckets;
this.overflowValues = overflowValues;

// all baseline values in one of the HLLs lost to the values
// in the other HLL, so we need to adjust the final baseline
@@ -492,6 +478,87 @@ else if (delta > MAX_DELTA) {
return this;
}

/**
* Returns "this" for chaining
*/
public DenseHll mergeWith(SparseHll other)
{
if (indexBitLength != other.getIndexBitLength()) {
throw new IllegalArgumentException(String.format(
"Cannot merge HLLs with different number of buckets: %s vs %s",
numberOfBuckets(indexBitLength),
numberOfBuckets(other.getIndexBitLength())));
}

other.eachBucket(this::insert);

return this;
}

private int findOverflowEntry(int bucket)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
return i;
}
}
return -1;
}

private int getOverflow(int bucket)
{
for (int i = 0; i < overflows; i++) {
if (overflowBuckets[i] == bucket) {
return overflowValues[i];
}
}
return 0;
}

private int updateOverflow(int bucket, int overflowEntry, int delta)
{
if (delta > MAX_DELTA) {
if (overflowEntry != -1) {
// update existing overflow
setOverflow(overflowEntry, (byte) (delta - MAX_DELTA));
}
else {
addOverflow(bucket, (byte) (delta - MAX_DELTA));
}
delta = MAX_DELTA;
}
else if (overflowEntry != -1) {
removeOverflow(overflowEntry);
}

return delta;
}

private void setOverflow(int overflowEntry, byte overflow)
{
overflowValues[overflowEntry] = overflow;
}

private void removeOverflow(int overflowEntry)

This comment has been minimized.

Copy link
@findepi

findepi Dec 3, 2019

Member

can we have {get,set,find,remove}Overflow next to each other?

{
// remove existing overflow
overflowBuckets[overflowEntry] = overflowBuckets[overflows - 1];
overflowValues[overflowEntry] = overflowValues[overflows - 1];
overflows--;
}

private void addOverflow(int bucket, byte overflow)
{
// add new delta
overflowBuckets = Ints.ensureCapacity(overflowBuckets, overflows + 1, OVERFLOW_GROW_INCREMENT);
overflowValues = Bytes.ensureCapacity(overflowValues, overflows + 1, OVERFLOW_GROW_INCREMENT);

overflowBuckets[overflows] = bucket;
overflowValues[overflows] = overflow;

overflows++;
}

public static int estimatedInMemorySize(int indexBitLength)
{
// note: we don't take into account overflow entries since their number can vary
@@ -84,6 +84,9 @@ public void mergeWith(HyperLogLog other)
((SparseHll) instance).mergeWith((SparseHll) other.instance);
instance = makeDenseIfNecessary((SparseHll) instance);
}
else if (instance instanceof DenseHll && other.instance instanceof SparseHll) {
This conversation was marked as resolved by martint

This comment has been minimized.

Copy link
@dain

dain Dec 2, 2019

Member

What about the other way around?

This comment has been minimized.

Copy link
@martint

martint Dec 2, 2019

Author Member

In that case, the sparse needs to be converted to dense before the merge can take place. That’s handled by the fall-through case

((DenseHll) instance).mergeWith((SparseHll) other.instance);
}
else {
DenseHll dense = instance.toDense();
dense.mergeWith(other.instance.toDense());
@@ -151,7 +151,12 @@ public void mergeWith(SparseHll other)
public DenseHll toDense()
{
DenseHll result = new DenseHll(indexBitLength);
eachBucket(result::insert);
return result;
}

public void eachBucket(BucketListener listener)
{
for (int i = 0; i < numberOfEntries; i++) {
int entry = entries[i];

@@ -170,10 +175,8 @@ public DenseHll toDense()
zeros = bits + decodeBucketValue(entry);
}

result.insert(bucket, zeros + 1); // + 1 because HLL stores leading number of zeros + 1
listener.visit(bucket, zeros + 1);
}

return result;
}

@Override
@@ -14,9 +14,11 @@
package io.airlift.stats.cardinality;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
@@ -32,14 +34,18 @@
import java.util.concurrent.TimeUnit;

@State(Scope.Thread)
@OutputTimeUnit(TimeUnit.SECONDS)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.AverageTime)
@Fork(5)
@Warmup(iterations = 5, time = 500, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS)
public class BenchmarkDenseHll
{
private static final int LARGE_CARDINALITY = 1_000_000;
private static final int SMALL_CARDINALITY = 100;

@Benchmark
public DenseHll benchmarkInsert(Data data)
public DenseHll benchmarkInsert(InsertData data)
{
for (long hash : data.hashes) {
data.instance.insertHash(hash);
@@ -48,10 +54,22 @@ public DenseHll benchmarkInsert(Data data)
return data.instance;
}

@Benchmark
public DenseHll benchmarkMergeWithDense(MergeWithDenseData data)
{
return data.base.mergeWith(data.toMerge);
}

@Benchmark
public DenseHll benchmarkMergeWithSparse(MergeWithSparseData data)
{
return data.base.mergeWith(data.toMerge);
}

@State(Scope.Thread)
public static class Data
public static class InsertData
{
public final DenseHll instance = new DenseHll(11);
public final DenseHll instance = new DenseHll(12);
public final long[] hashes = new long[500];

@Setup(Level.Iteration)
@@ -63,6 +81,51 @@ public void initialize()
}
}

@State(Scope.Thread)
public static class MergeWithDenseData
{
public DenseHll base;
public DenseHll toMerge;

@Setup(Level.Iteration)
public void initialize()
{
base = new DenseHll(12);
for (int i = 0; i < LARGE_CARDINALITY; i++) {
base.insertHash(ThreadLocalRandom.current().nextLong());
}

// Small cardinality so we can do an apples-to-apples comparison
// between dense/dense vs dense/sparse merge. Sparse only supports
// small cardinalities.
toMerge = new DenseHll(12);
for (int i = 0; i < SMALL_CARDINALITY; i++) {
toMerge.insertHash(ThreadLocalRandom.current().nextLong());
}
}
}

@State(Scope.Thread)
public static class MergeWithSparseData
{
public DenseHll base;
public SparseHll toMerge;

@Setup(Level.Iteration)
public void initialize()
{
base = new DenseHll(12);
for (int i = 0; i < LARGE_CARDINALITY; i++) {
base.insertHash(ThreadLocalRandom.current().nextLong());
}

toMerge = new SparseHll(12);
for (int i = 0; i < SMALL_CARDINALITY; i++) {
toMerge.insertHash(ThreadLocalRandom.current().nextLong());
}
}
}

public static void main(String[] args)
throws RunnerException
{
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.