Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement: Sketch value aggregator performance #13020

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@
import java.util.Collections;
import java.util.Random;
import org.apache.datasketches.cpc.CpcSketch;
import org.apache.datasketches.cpc.CpcUnion;
import org.apache.pinot.segment.local.aggregator.DistinctCountCPCSketchValueAggregator;
import org.apache.pinot.segment.local.aggregator.ValueAggregator;
import org.apache.pinot.spi.data.FieldSpec.DataType;

import static org.testng.Assert.assertEquals;


public class DistinctCountCPCSketchStarTreeV2Test extends BaseStarTreeV2Test<Object, CpcSketch> {
public class DistinctCountCPCSketchStarTreeV2Test extends BaseStarTreeV2Test<Object, Object> {

@Override
ValueAggregator<Object, CpcSketch> getValueAggregator() {
ValueAggregator<Object, Object> getValueAggregator() {
return new DistinctCountCPCSketchValueAggregator(Collections.emptyList());
}

Expand All @@ -46,7 +47,22 @@ Object getRandomRawValue(Random random) {
}

@Override
void assertAggregatedValue(CpcSketch starTreeResult, CpcSketch nonStarTreeResult) {
assertEquals((long) starTreeResult.getEstimate(), (long) nonStarTreeResult.getEstimate());
void assertAggregatedValue(Object starTreeResult, Object nonStarTreeResult) {
// Use error at (lgK=12, stddev=2) from:
// https://datasketches.apache.org/docs/CPC/CpcPerformance.html
double delta = (1 << 12) * 0.01;
assertEquals((long) toSketch(starTreeResult).getEstimate(), (long) toSketch(nonStarTreeResult).getEstimate(),
delta);
}

private CpcSketch toSketch(Object value) {
if (value instanceof CpcUnion) {
return ((CpcUnion) value).getResult();
} else if (value instanceof CpcSketch) {
return (CpcSketch) value;
} else {
throw new IllegalStateException(
"Unsupported data type for CPC Sketch aggregation: " + value.getClass().getSimpleName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Random;
import org.apache.datasketches.tuple.Sketch;
import org.apache.datasketches.tuple.Union;
import org.apache.datasketches.tuple.aninteger.IntegerSketch;
import org.apache.datasketches.tuple.aninteger.IntegerSummary;
import org.apache.pinot.core.common.ObjectSerDeUtils;
Expand All @@ -30,11 +31,10 @@
import static org.testng.Assert.assertEquals;


public class DistinctCountIntegerSumTupleSketchStarTreeV2Test
extends BaseStarTreeV2Test<byte[], Sketch<IntegerSummary>> {
public class DistinctCountIntegerSumTupleSketchStarTreeV2Test extends BaseStarTreeV2Test<byte[], Object> {

@Override
ValueAggregator<byte[], Sketch<IntegerSummary>> getValueAggregator() {
ValueAggregator<byte[], Object> getValueAggregator() {
return new IntegerTupleSketchValueAggregator(IntegerSummary.Mode.Sum);
}

Expand All @@ -51,7 +51,22 @@ byte[] getRandomRawValue(Random random) {
}

@Override
void assertAggregatedValue(Sketch<IntegerSummary> starTreeResult, Sketch<IntegerSummary> nonStarTreeResult) {
assertEquals(starTreeResult.getEstimate(), nonStarTreeResult.getEstimate());
void assertAggregatedValue(Object starTreeResult, Object nonStarTreeResult) {
// Use error at (lgK=14, stddev=2) from:
// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
double delta = (1 << 14) * 0.01563;
assertEquals(toSketch(starTreeResult).getEstimate(), toSketch(nonStarTreeResult).getEstimate(), delta);
}

@SuppressWarnings("unchecked")
private Sketch<IntegerSummary> toSketch(Object value) {
if (value instanceof Union) {
return ((Union) value).getResult();
} else if (value instanceof Sketch) {
return ((Sketch) value);
} else {
throw new IllegalStateException(
"Unsupported data type for Integer Tuple Sketch aggregation: " + value.getClass().getSimpleName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@

import java.util.Random;
import org.apache.datasketches.theta.Sketch;
import org.apache.datasketches.theta.Union;
import org.apache.pinot.segment.local.aggregator.DistinctCountThetaSketchValueAggregator;
import org.apache.pinot.segment.local.aggregator.ValueAggregator;
import org.apache.pinot.spi.data.FieldSpec.DataType;

import static org.testng.Assert.assertEquals;


public class DistinctCountThetaSketchStarTreeV2Test extends BaseStarTreeV2Test<Object, Sketch> {
public class DistinctCountThetaSketchStarTreeV2Test extends BaseStarTreeV2Test<Object, Object> {

@Override
ValueAggregator<Object, Sketch> getValueAggregator() {
ValueAggregator<Object, Object> getValueAggregator() {
return new DistinctCountThetaSketchValueAggregator();
}

Expand All @@ -45,7 +46,21 @@ Object getRandomRawValue(Random random) {
}

@Override
void assertAggregatedValue(Sketch starTreeResult, Sketch nonStarTreeResult) {
assertEquals(starTreeResult.getEstimate(), nonStarTreeResult.getEstimate());
void assertAggregatedValue(Object starTreeResult, Object nonStarTreeResult) {
// Use error at (lgK=14, stddev=2) from:
// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
double delta = (1 << 14) * 0.01563;
assertEquals(toSketch(starTreeResult).getEstimate(), toSketch(nonStarTreeResult).getEstimate(), delta);
}

private Sketch toSketch(Object value) {
if (value instanceof Union) {
return ((Union) value).getResult();
} else if (value instanceof Sketch) {
return (Sketch) value;
} else {
throw new IllegalStateException(
"Unsupported data type for Theta Sketch aggregation: " + value.getClass().getSimpleName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@
import org.apache.pinot.spi.utils.CommonConstants;


public class DistinctCountCPCSketchValueAggregator implements ValueAggregator<Object, CpcSketch> {
public class DistinctCountCPCSketchValueAggregator implements ValueAggregator<Object, Object> {
public static final DataType AGGREGATED_VALUE_TYPE = DataType.BYTES;

private final int _lgK;

private int _maxByteSize;

public DistinctCountCPCSketchValueAggregator(List<ExpressionContext> arguments) {
// length 1 means we use the Helix default
if (arguments.size() <= 1) {
Expand All @@ -55,64 +53,61 @@ public DataType getAggregatedValueType() {
}

@Override
public CpcSketch getInitialAggregatedValue(Object rawValue) {
CpcSketch initialValue;
public Object getInitialAggregatedValue(Object rawValue) {
CpcUnion cpcUnion = new CpcUnion(_lgK);
if (rawValue instanceof byte[]) { // Serialized Sketch
byte[] bytes = (byte[]) rawValue;
initialValue = deserializeAggregatedValue(bytes);
_maxByteSize = Math.max(_maxByteSize, bytes.length);
cpcUnion.update(deserializeAggregatedValue(bytes));
} else if (rawValue instanceof byte[][]) { // Multiple Serialized Sketches
byte[][] serializedSketches = (byte[][]) rawValue;
CpcUnion union = new CpcUnion(_lgK);
for (byte[] bytes : serializedSketches) {
union.update(deserializeAggregatedValue(bytes));
cpcUnion.update(deserializeAggregatedValue(bytes));
}
initialValue = union.getResult();
updateMaxByteSize(initialValue);
} else {
initialValue = empty();
addObjectToSketch(rawValue, initialValue);
updateMaxByteSize(initialValue);
CpcSketch pristineSketch = empty();
addObjectToSketch(rawValue, pristineSketch);
cpcUnion.update(pristineSketch);
}
return initialValue;
return cpcUnion;
}

@Override
public CpcSketch applyRawValue(CpcSketch value, Object rawValue) {
public Object applyRawValue(Object aggregatedValue, Object rawValue) {
CpcUnion cpcUnion = extractUnion(aggregatedValue);
if (rawValue instanceof byte[]) {
byte[] bytes = (byte[]) rawValue;
CpcSketch sketch = union(value, deserializeAggregatedValue(bytes));
updateMaxByteSize(sketch);
return sketch;
CpcSketch sketch = deserializeAggregatedValue(bytes);
cpcUnion.update(sketch);
} else {
addObjectToSketch(rawValue, value);
updateMaxByteSize(value);
return value;
CpcSketch pristineSketch = empty();
addObjectToSketch(rawValue, pristineSketch);
cpcUnion.update(pristineSketch);
}
return cpcUnion;
}

@Override
public CpcSketch applyAggregatedValue(CpcSketch value, CpcSketch aggregatedValue) {
CpcSketch result = union(value, aggregatedValue);
updateMaxByteSize(result);
return result;
public Object applyAggregatedValue(Object value, Object aggregatedValue) {
CpcUnion cpcUnion = extractUnion(aggregatedValue);
CpcSketch sketch = extractSketch(value);
cpcUnion.update(sketch);
return cpcUnion;
}

@Override
public CpcSketch cloneAggregatedValue(CpcSketch value) {
public Object cloneAggregatedValue(Object value) {
return deserializeAggregatedValue(serializeAggregatedValue(value));
}

@Override
public int getMaxAggregatedValueByteSize() {
// NOTE: For aggregated metrics, initial aggregated value might have not been generated. Returns the byte size
// based on lgK.
return _maxByteSize > 0 ? _maxByteSize : CpcSketch.getMaxSerializedBytes(_lgK);
return CpcSketch.getMaxSerializedBytes(_lgK);
}

@Override
public byte[] serializeAggregatedValue(CpcSketch value) {
return CustomSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(value);
public byte[] serializeAggregatedValue(Object value) {
CpcSketch sketch = extractSketch(value);
return CustomSerDeUtils.DATA_SKETCH_CPC_SER_DE.serialize(sketch);
}

@Override
Expand Down Expand Up @@ -181,9 +176,32 @@ private void addObjectsToSketch(Object[] rawValues, CpcSketch sketch) {
}
}

private void updateMaxByteSize(CpcSketch sketch) {
if (sketch != null) {
_maxByteSize = Math.max(_maxByteSize, sketch.toByteArray().length);
private CpcUnion extractUnion(Object value) {
if (value == null) {
return new CpcUnion(_lgK);
} else if (value instanceof CpcUnion) {
return (CpcUnion) value;
} else if (value instanceof CpcSketch) {
CpcSketch sketch = (CpcSketch) value;
CpcUnion cpcUnion = new CpcUnion(_lgK);
cpcUnion.update(sketch);
return cpcUnion;
} else {
throw new IllegalStateException(
"Unsupported data type for CPC Sketch aggregation: " + value.getClass().getSimpleName());
}
}

private CpcSketch extractSketch(Object value) {
if (value == null) {
return empty();
} else if (value instanceof CpcUnion) {
return ((CpcUnion) value).getResult();
} else if (value instanceof CpcSketch) {
return (CpcSketch) value;
} else {
throw new IllegalStateException(
"Unsupported data type for CPC Sketch aggregation: " + value.getClass().getSimpleName());
}
}

Expand Down
Loading
Loading