Skip to content
Permalink
Browse files
latest datasketches-java-3.1.0 (#12224)
These changes are to use the latest datasketches-java-3.1.0 and also to restore support for quantile and HLL4 sketches to be able to grow larger than a given buffer in a buffer aggregator and move to heap in rare cases. This was discussed in #11544.

Co-authored-by: AlexanderSaydakov <AlexanderSaydakov@users.noreply.github.com>
  • Loading branch information
AlexanderSaydakov and AlexanderSaydakov committed Mar 2, 2022
1 parent 3f709db commit 50038d9344fb745bfe47c81a59bdc29ae3dcff1d
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 144 deletions.
@@ -23,6 +23,8 @@
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.hll.HllSketch;
import org.apache.datasketches.hll.TgtHllType;
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableMemory;

import java.nio.ByteBuffer;
@@ -31,6 +33,7 @@

public class HllSketchBuildBufferAggregatorHelper
{
private static final MemoryRequestServer MEM_REQ_SERVER = new DefaultMemoryRequestServer();
private final int lgK;
private final int size;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
@@ -123,7 +126,8 @@ public int getLgK()

private WritableMemory getMemory(final ByteBuffer buf)
{
return memCache.computeIfAbsent(buf, b -> WritableMemory.writableWrap(b, ByteOrder.LITTLE_ENDIAN));
return memCache.computeIfAbsent(buf,
b -> WritableMemory.writableWrap(b, ByteOrder.LITTLE_ENDIAN, MEM_REQ_SERVER));
}

private void putSketchIntoCache(final ByteBuffer buf, final int position, final HllSketch sketch)
@@ -57,7 +57,7 @@ public void aggregate(final ByteBuffer buffer, final int position)
}

final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buffer, position);
DoublesSketches.handleMaxStreamLengthLimit(() -> sketch.update(selector.getDouble()));
sketch.update(selector.getDouble());
}

@Nullable
@@ -21,6 +21,8 @@

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.CompactDoublesSketch;
import org.apache.datasketches.quantiles.DoublesSketch;
@@ -32,6 +34,7 @@

public class DoublesSketchBuildBufferAggregatorHelper
{
private static final MemoryRequestServer MEM_REQ_SERVER = new DefaultMemoryRequestServer();
private final int size;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
@@ -92,7 +95,8 @@ public UpdateDoublesSketch getSketchAtPosition(final ByteBuffer buf, final int p

private WritableMemory getMemory(final ByteBuffer buffer)
{
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN));
return memCache.computeIfAbsent(buffer,
buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN, MEM_REQ_SERVER));
}

private void putSketch(final ByteBuffer buffer, final int position, final UpdateDoublesSketch sketch)
@@ -55,13 +55,11 @@ public void aggregate(final ByteBuffer buf, final int position, final int startR

final UpdateDoublesSketch sketch = helper.getSketchAtPosition(buf, position);

DoublesSketches.handleMaxStreamLengthLimit(() -> {
for (int i = startRow; i < endRow; i++) {
if (nulls == null || !nulls[i]) {
sketch.update(doubles[i]);
}
for (int i = startRow; i < endRow; i++) {
if (nulls == null || !nulls[i]) {
sketch.update(doubles[i]);
}
});
}
}

@Override
@@ -76,16 +74,14 @@ public void aggregate(
final double[] doubles = selector.getDoubleVector();
final boolean[] nulls = selector.getNullVector();

DoublesSketches.handleMaxStreamLengthLimit(() -> {
for (int i = 0; i < numRows; i++) {
final int idx = rows != null ? rows[i] : i;
for (int i = 0; i < numRows; i++) {
final int idx = rows != null ? rows[i] : i;

if (nulls == null || !nulls[idx]) {
final int position = positions[i] + positionOffset;
helper.getSketchAtPosition(buf, position).update(doubles[idx]);
}
if (nulls == null || !nulls[idx]) {
final int position = positions[i] + positionOffset;
helper.getSketchAtPosition(buf, position).update(doubles[idx]);
}
});
}
}

@Override
@@ -76,12 +76,10 @@ static void updateUnion(ColumnValueSelector selector, DoublesUnion union)
if (object == null) {
return;
}
DoublesSketches.handleMaxStreamLengthLimit(() -> {
if (object instanceof DoublesSketch) {
union.update((DoublesSketch) object);
} else {
union.update(selector.getDouble());
}
});
if (object instanceof DoublesSketch) {
union.update((DoublesSketch) object);
} else {
union.update(selector.getDouble());
}
}
}
@@ -21,6 +21,8 @@

import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import org.apache.datasketches.memory.DefaultMemoryRequestServer;
import org.apache.datasketches.memory.MemoryRequestServer;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.datasketches.quantiles.DoublesUnion;

@@ -30,6 +32,7 @@

public class DoublesSketchMergeBufferAggregatorHelper
{
private static final MemoryRequestServer MEM_REQ_SERVER = new DefaultMemoryRequestServer();
private final int k;
private final int maxIntermediateSize;
private final IdentityHashMap<ByteBuffer, WritableMemory> memCache = new IdentityHashMap<>();
@@ -93,7 +96,8 @@ public DoublesUnion getSketchAtPosition(final ByteBuffer buf, final int position

private WritableMemory getMemory(final ByteBuffer buffer)
{
return memCache.computeIfAbsent(buffer, buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN));
return memCache.computeIfAbsent(buffer,
buf -> WritableMemory.writableWrap(buf, ByteOrder.LITTLE_ENDIAN, MEM_REQ_SERVER));
}

private void putUnion(final ByteBuffer buffer, final int position, final DoublesUnion union)
@@ -55,14 +55,12 @@ public void aggregate(final ByteBuffer buf, final int position, final int startR

final DoublesUnion union = helper.getSketchAtPosition(buf, position);

DoublesSketches.handleMaxStreamLengthLimit(() -> {
for (int i = startRow; i < endRow; i++) {
final DoublesSketch sketch = (DoublesSketch) vector[i];
if (sketch != null) {
union.update(sketch);
}
for (int i = startRow; i < endRow; i++) {
final DoublesSketch sketch = (DoublesSketch) vector[i];
if (sketch != null) {
union.update(sketch);
}
});
}
}

@Override
@@ -76,17 +74,15 @@ public void aggregate(
{
final Object[] vector = selector.getObjectVector();

DoublesSketches.handleMaxStreamLengthLimit(() -> {
for (int i = 0; i < numRows; i++) {
final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i];
for (int i = 0; i < numRows; i++) {
final DoublesSketch sketch = (DoublesSketch) vector[rows != null ? rows[i] : i];

if (sketch != null) {
final int position = positions[i] + positionOffset;
final DoublesUnion union = helper.getSketchAtPosition(buf, position);
union.update(sketch);
}
if (sketch != null) {
final int position = positions[i] + positionOffset;
final DoublesUnion union = helper.getSketchAtPosition(buf, position);
union.update(sketch);
}
});
}
}

@Nullable

This file was deleted.

@@ -33,13 +33,10 @@
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -60,9 +57,6 @@ public class DoublesSketchAggregatorTest extends InitializedNullHandlingTest
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();

@Rule
public final ExpectedException expectedException = ExpectedException.none();

public DoublesSketchAggregatorTest(final GroupByQueryConfig config, final String vectorize)
{
this.config = config;
@@ -544,12 +538,9 @@ public void timeSeriesQueryInputAsFloat() throws Exception
}

@Test
public void testFailureWhenMaxStreamLengthHit() throws Exception
public void testSuccessWhenMaxStreamLengthHit() throws Exception
{
if (GroupByStrategySelector.STRATEGY_V1.equals(config.getDefaultStrategy())) {
expectedException.expect(new RecursiveExceptionMatcher(IllegalStateException.class));
expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch");

helper.createIndexAndRunQueryOnSegment(
new File(this.getClass().getClassLoader().getResource("quantiles/doubles_build_data.tsv").getFile()),
String.join(
@@ -633,39 +624,8 @@ public void testFailureWhenMaxStreamLengthHit() throws Exception
"}"
)
);

expectedException.expect(new RecursiveExceptionMatcher(IllegalStateException.class));
expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch");
seq.toList();
}
}

private static class RecursiveExceptionMatcher extends BaseMatcher<Object>
{
private final Class<? extends Throwable> expected;

private RecursiveExceptionMatcher(Class<? extends Throwable> expected)
{
this.expected = expected;
}

@Override
public boolean matches(Object item)
{
if (expected.isInstance(item)) {
return true;
} else if (item instanceof Throwable) {
if (((Throwable) item).getCause() != null) {
return matches(((Throwable) item).getCause());
}
}
return false;
}

@Override
public void describeTo(Description description)
{
description.appendText("a recursive instance of ").appendText(expected.getName());
}
}
}
@@ -828,14 +828,14 @@ public void testGroupByAggregatorDefaultValues() throws Exception
}

@Test
public void testFailWithSmallMaxStreamLength() throws Exception
public void testSuccessWithSmallMaxStreamLength() throws Exception
{
final Map<String, Object> context = new HashMap<>(QUERY_CONTEXT_DEFAULT);
context.put(
DoublesSketchApproxQuantileSqlAggregator.CTX_APPROX_QUANTILE_DS_MAX_STREAM_LENGTH,
1
);
testQueryThrows(
testQuery(
"SELECT\n"
+ "APPROX_QUANTILE_DS(m1, 0.01),\n"
+ "APPROX_QUANTILE_DS(cnt, 0.5)\n"
@@ -856,11 +856,13 @@ public void testFailWithSmallMaxStreamLength() throws Exception
)
.context(context)
.build()
),
expectedException -> {
expectedException.expect(IllegalStateException.class);
expectedException.expectMessage("NullPointerException was thrown while updating Doubles sketch");
}
),
ImmutableList.of(
new Object[]{
1.0,
1.0
}
)
);
}

@@ -242,13 +242,15 @@ public void ingestingSketchesTwoValues() throws Exception
" \"name\": \"intersection\",",
" \"operation\": \"INTERSECT\",",
" \"nominalEntries\": 1024,",
" \"numberOfValues\": 2,",
" \"fields\": [{\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}, {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}]",
" }},",
" {\"type\": \"arrayOfDoublesSketchToEstimate\", \"name\": \"anotb\", \"field\": {",
" \"type\": \"arrayOfDoublesSketchSetOp\",",
" \"name\": \"anotb\",",
" \"operation\": \"NOT\",",
" \"nominalEntries\": 1024,",
" \"numberOfValues\": 2,",
" \"fields\": [{\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}, {\"type\": \"fieldAccess\", \"fieldName\": \"sketch\"}]",
" }},",
" {",
@@ -3731,7 +3731,7 @@ name: DataSketches
license_category: binary
module: java-core
license_name: Apache License version 2.0
version: 3.0.0
version: 3.1.0
libraries:
- org.apache.datasketches: datasketches-java

@@ -84,7 +84,7 @@
<!-- sql/src/main/codegen/config.fmpp is based on a file from calcite-core, and needs to be
updated when upgrading Calcite. Refer to the top-level comments in that file for details. -->
<calcite.version>1.21.0</calcite.version>
<datasketches.version>3.0.0</datasketches.version>
<datasketches.version>3.1.0</datasketches.version>
<datasketches.memory.version>2.0.0</datasketches.memory.version>
<derby.version>10.14.2.0</derby.version>
<dropwizard.metrics.version>4.0.0</dropwizard.metrics.version>

0 comments on commit 50038d9

Please sign in to comment.