Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/development/extensions-core/datasketches-theta.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ druid.extensions.loadList=["druid-datasketches"]
"name" : <output_name>,
"fieldName" : <metric_name>,
"isInputThetaSketch": false,
"size": 16384
"size": 16384,
"samplingProbability": 1.0
}
```

Expand All @@ -52,6 +53,7 @@ druid.extensions.loadList=["druid-datasketches"]
|fieldName|A String for the name of the aggregator used at ingestion time.|yes|
|isInputThetaSketch|This should only be used at indexing time if your input data contains theta sketch objects. This would be the case if you use datasketches library outside of Druid, say with Pig/Hive, to produce the data that you are ingesting into Druid |no, defaults to false|
|size|Must be a power of 2. Internally, size refers to the maximum number of entries sketch object will retain. Higher size means higher accuracy but more space to store sketches. Note that after you index with a particular size, druid will persist sketch in segments and you will use size greater or equal to that at query time. See the [DataSketches site](https://datasketches.github.io/docs/Theta/ThetaSize.html) for details. In general, We recommend just sticking to default size. |no, defaults to 16384|
|samplingProbability|Must be between 0 to 1.0, Setting the sampling to, say, p = 0.5, for all sketches, will automatically throw out 50% of all the data coming in to all the sketches. See the [DataSketches site](https://datasketches.github.io/docs/Theta/ThetaPSampling.html) for details.|no, defaults to 1.0|

### Post Aggregators

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,21 @@ public class SketchAggregator implements Aggregator
{
private final BaseObjectColumnValueSelector selector;
private final int size;
private final float samplingProbability;

@Nullable
private Union union;

public SketchAggregator(BaseObjectColumnValueSelector selector, int size)
public SketchAggregator(BaseObjectColumnValueSelector selector, int size, float samplingProbability)
{
this.selector = selector;
this.size = size;
this.samplingProbability = samplingProbability;
}

private void initUnion()
{
union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION);
union = (Union) SetOperation.builder().setNominalEntries(size).setP(samplingProbability).build(Family.UNION);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,24 @@
public abstract class SketchAggregatorFactory extends AggregatorFactory
{
public static final int DEFAULT_MAX_SKETCH_SIZE = 16384;
public static final float DEFAULT_SAMPLING_PROBABILITY = 1.0F;

protected final String name;
protected final String fieldName;
protected final int size;
protected final float samplingProbability;
private final byte cacheId;

public SketchAggregatorFactory(String name, String fieldName, Integer size, byte cacheId)

public SketchAggregatorFactory(String name, String fieldName, Integer size, byte cacheId, Float samplingProbability)
{
this.name = Preconditions.checkNotNull(name, "Must have a valid, non-null aggregator name");
this.fieldName = Preconditions.checkNotNull(fieldName, "Must have a valid, non-null fieldName");

this.size = size == null ? DEFAULT_MAX_SKETCH_SIZE : size;
Util.checkIfPowerOf2(this.size, "size");
this.samplingProbability = samplingProbability == null ? DEFAULT_SAMPLING_PROBABILITY : samplingProbability;
Util.checkProbability(this.samplingProbability, "sampling probability");

this.cacheId = cacheId;
}
Expand All @@ -66,15 +71,15 @@ public SketchAggregatorFactory(String name, String fieldName, Integer size, byte
public Aggregator factorize(ColumnSelectorFactory metricFactory)
{
BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
return new SketchAggregator(selector, size);
return new SketchAggregator(selector, size, samplingProbability);
}

@SuppressWarnings("unchecked")
@Override
public BufferAggregator factorizeBuffered(ColumnSelectorFactory metricFactory)
{
BaseObjectColumnValueSelector selector = metricFactory.makeColumnValueSelector(fieldName);
return new SketchBufferAggregator(selector, size, getMaxIntermediateSizeWithNulls());
return new SketchBufferAggregator(selector, size, getMaxIntermediateSizeWithNulls(), samplingProbability);
}

@Override
Expand All @@ -92,15 +97,15 @@ public Comparator<Object> getComparator()
@Override
public Object combine(Object lhs, Object rhs)
{
return SketchHolder.combine(lhs, rhs, size);
return SketchHolder.combine(lhs, rhs, size, samplingProbability);
}

@Override
public AggregateCombiner makeAggregateCombiner()
{
return new ObjectAggregateCombiner<SketchHolder>()
{
private final Union union = (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION);
private final Union union = (Union) SetOperation.builder().setNominalEntries(size).setP(samplingProbability).build(Family.UNION);
private final SketchHolder combined = SketchHolder.of(union);

@Override
Expand Down Expand Up @@ -155,6 +160,12 @@ public int getSize()
return size;
}

@JsonProperty
public float getSamplingProbability()
{
return samplingProbability;
}

@Override
public int getMaxIntermediateSize()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ public class SketchBufferAggregator implements BufferAggregator
{
private final BaseObjectColumnValueSelector selector;
private final int size;
private final float samplingProbability;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, Int2ObjectMap<Union>> unions = new IdentityHashMap<>();
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();

public SketchBufferAggregator(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize)
public SketchBufferAggregator(BaseObjectColumnValueSelector selector, int size, int maxIntermediateSize, float samplingProbability)
{
this.selector = selector;
this.size = size;
this.maxIntermediateSize = maxIntermediateSize;
this.samplingProbability = samplingProbability;
}

@Override
Expand Down Expand Up @@ -97,7 +99,7 @@ private Union createNewUnion(ByteBuffer buf, int position, boolean isWrapped)
WritableMemory mem = getMemory(buf).writableRegion(position, maxIntermediateSize);
Union union = isWrapped
? (Union) SetOperation.wrap(mem)
: (Union) SetOperation.builder().setNominalEntries(size).build(Family.UNION, mem);
: (Union) SetOperation.builder().setNominalEntries(size).setP(samplingProbability).build(Family.UNION, mem);
Int2ObjectMap<Union> unionMap = unions.get(buf);
if (unionMap == null) {
unionMap = new Int2ObjectOpenHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public SketchEstimateWithErrorBounds getEstimateWithErrorBounds(int errorBoundsS
return result;
}

public static SketchHolder combine(Object o1, Object o2, int nomEntries)
public static SketchHolder combine(Object o1, Object o2, int nomEntries, float samplingProbability)
{
SketchHolder holder1 = (SketchHolder) o1;
SketchHolder holder2 = (SketchHolder) o2;
Expand All @@ -191,7 +191,7 @@ public static SketchHolder combine(Object o1, Object o2, int nomEntries)
holder2.invalidateCache();
return holder2;
} else {
Union union = (Union) SetOperation.builder().setNominalEntries(nomEntries).build(Family.UNION);
Union union = (Union) SetOperation.builder().setNominalEntries(nomEntries).setP(samplingProbability).build(Family.UNION);
holder1.updateUnion(union);
holder2.updateUnion(union);
return SketchHolder.of(union);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ public SketchMergeAggregatorFactory(
@JsonProperty("size") @Nullable Integer size,
@JsonProperty("shouldFinalize") @Nullable Boolean shouldFinalize,
@JsonProperty("isInputThetaSketch") @Nullable Boolean isInputThetaSketch,
@JsonProperty("errorBoundsStdDev") @Nullable Integer errorBoundsStdDev
@JsonProperty("errorBoundsStdDev") @Nullable Integer errorBoundsStdDev,
@JsonProperty("samplingProbability") @Nullable Float samplingProbability
)
{
super(name, fieldName, size, AggregatorUtil.SKETCH_MERGE_CACHE_TYPE_ID);
super(name, fieldName, size, AggregatorUtil.SKETCH_MERGE_CACHE_TYPE_ID, samplingProbability);
this.shouldFinalize = (shouldFinalize == null) ? true : shouldFinalize.booleanValue();
this.isInputThetaSketch = (isInputThetaSketch == null) ? false : isInputThetaSketch.booleanValue();
this.errorBoundsStdDev = errorBoundsStdDev;
Expand All @@ -61,15 +62,16 @@ public List<AggregatorFactory> getRequiredColumns()
size,
shouldFinalize,
isInputThetaSketch,
errorBoundsStdDev
errorBoundsStdDev,
samplingProbability
)
);
}

@Override
public AggregatorFactory getCombiningFactory()
{
return new SketchMergeAggregatorFactory(name, name, size, shouldFinalize, false, errorBoundsStdDev);
return new SketchMergeAggregatorFactory(name, name, size, shouldFinalize, false, errorBoundsStdDev, samplingProbability);
}

@Override
Expand All @@ -84,7 +86,8 @@ public AggregatorFactory getMergingFactory(AggregatorFactory other) throws Aggre
Math.max(size, castedOther.size),
shouldFinalize,
false,
errorBoundsStdDev
errorBoundsStdDev,
samplingProbability
);
} else {
throw new AggregatorFactoryNotMergeableException(this, other);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ public class OldSketchBuildAggregatorFactory extends SketchMergeAggregatorFactor
public OldSketchBuildAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty("size") Integer size
@JsonProperty("size") Integer size,
@JsonProperty("samplingProbability") Float samplingProbability
)
{
super(name, fieldName, size, true, false, null);
super(name, fieldName, size, true, false, null, samplingProbability);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ public OldSketchMergeAggregatorFactory(
@JsonProperty("name") String name,
@JsonProperty("fieldName") String fieldName,
@JsonProperty("size") Integer size,
@JsonProperty("shouldFinalize") Boolean shouldFinalize
@JsonProperty("shouldFinalize") Boolean shouldFinalize,
@JsonProperty("samplingProbability") Float samplingProbability
)
{
super(name, fieldName, size, shouldFinalize, true, null);
super(name, fieldName, size, shouldFinalize, true, null, samplingProbability);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public Aggregation toDruidAggregation(
sketchSize,
null,
null,
null,
null
);
} else {
Expand Down Expand Up @@ -134,6 +135,7 @@ public Aggregation toDruidAggregation(
sketchSize,
null,
null,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private static BufferHashGrouper<Integer> makeGrouper(
AggregatorAdapters.factorizeBuffered(
columnSelectorFactory,
ImmutableList.of(
new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2),
new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2, 1.0f),
new CountAggregatorFactory("count")
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,27 +280,27 @@ public void testThetaCardinalityOnSimpleColumn() throws Exception
@Test
public void testSketchMergeAggregatorFactorySerde() throws Exception
{
assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null));
assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, false, true, null));
assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, true, false, null));
assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, true, false, 2));
assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null, 1.0f));
assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, false, true, null, 1.0f));
assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, true, false, null, 1.0f));
assertAggregatorFactorySerde(new SketchMergeAggregatorFactory("name", "fieldName", 16, true, false, 2, 1.0f));
}

@Test
public void testSketchMergeFinalization()
{
SketchHolder sketch = SketchHolder.of(Sketches.updateSketchBuilder().setNominalEntries(128).build());

SketchMergeAggregatorFactory agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null);
SketchMergeAggregatorFactory agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, null, null, null, 1.0f);
Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001);

agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, true, null, null);
agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, true, null, null, 1.0f);
Assert.assertEquals(0.0, ((Double) agg.finalizeComputation(sketch)).doubleValue(), 0.0001);

agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, false, null, null);
agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, false, null, null, 1.0f);
Assert.assertEquals(sketch, agg.finalizeComputation(sketch));

agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, true, null, 2);
agg = new SketchMergeAggregatorFactory("name", "fieldName", 16, true, null, 2, 1.0f);
SketchEstimateWithErrorBounds est = (SketchEstimateWithErrorBounds) agg.finalizeComputation(sketch);
Assert.assertEquals(0.0, est.getEstimate(), 0.0001);
Assert.assertEquals(0.0, est.getHighBound(), 0.0001);
Expand Down Expand Up @@ -385,23 +385,26 @@ public void testCacheKey()
16,
null,
null,
null
null,
1.0f
);
final SketchMergeAggregatorFactory factory2 = new SketchMergeAggregatorFactory(
"name",
"fieldName",
16,
null,
null,
null
null,
1.0f
);
final SketchMergeAggregatorFactory factory3 = new SketchMergeAggregatorFactory(
"name",
"fieldName",
32,
null,
null,
null
null,
1.0f
);

Assert.assertTrue(Arrays.equals(factory1.getCacheKey(), factory2.getCacheKey()));
Expand Down Expand Up @@ -486,7 +489,7 @@ public void testRelocation()

columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder)));
SketchHolder[] holders = helper.runRelocateVerificationTest(
new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2),
new SketchMergeAggregatorFactory("sketch", "sketch", 16, false, true, 2, 1.0f),
columnSelectorFactory,
SketchHolder.class
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void test()
{
// not going to iterate over the selector since getting a summary of an empty sketch is sufficient
final TestObjectColumnSelector selector = new TestObjectColumnSelector(new Object[0]);
final Aggregator agg = new SketchAggregator(selector, 4096);
final Aggregator agg = new SketchAggregator(selector, 4096, 1.0f);

final Map<String, Object> fields = new HashMap<>();
fields.put("sketch", agg.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ public void testSketchDataIngestAndQuery() throws Exception
@Test
public void testSketchMergeAggregatorFactorySerde() throws Exception
{
assertAggregatorFactorySerde(new OldSketchMergeAggregatorFactory("name", "fieldName", 16, null));
assertAggregatorFactorySerde(new OldSketchMergeAggregatorFactory("name", "fieldName", 16, false));
assertAggregatorFactorySerde(new OldSketchMergeAggregatorFactory("name", "fieldName", 16, true));
assertAggregatorFactorySerde(new OldSketchMergeAggregatorFactory("name", "fieldName", 16, null, 1.0f));
assertAggregatorFactorySerde(new OldSketchMergeAggregatorFactory("name", "fieldName", 16, false, 1.0f));
assertAggregatorFactorySerde(new OldSketchMergeAggregatorFactory("name", "fieldName", 16, true, 1.0f));
}

@Test
public void testSketchBuildAggregatorFactorySerde() throws Exception
{
assertAggregatorFactorySerde(new OldSketchBuildAggregatorFactory("name", "fieldName", 16));
assertAggregatorFactorySerde(new OldSketchBuildAggregatorFactory("name", "fieldName", 16, 1.0f));
}

private void assertAggregatorFactorySerde(AggregatorFactory agg) throws Exception
Expand Down Expand Up @@ -234,7 +234,7 @@ public void testRelocation()

columnSelectorFactory.setRow(new MapBasedRow(0, ImmutableMap.of("sketch", sketchHolder)));
SketchHolder[] holders = helper.runRelocateVerificationTest(
new OldSketchMergeAggregatorFactory("sketch", "sketch", 16, false),
new OldSketchMergeAggregatorFactory("sketch", "sketch", 16, false, 1.0f),
columnSelectorFactory,
SketchHolder.class
);
Expand Down
Loading