From 8cd302cc6c7d7d8382e81e687930c925e064d357 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 12 Apr 2019 14:51:34 -0700 Subject: [PATCH 1/3] Adjust BufferAggregator.get() impls to return copies --- .../datasketches/hll/HllSketchBuildBufferAggregator.java | 2 +- .../aggregation/bloom/BaseBloomFilterBufferAggregator.java | 5 ++++- .../org/apache/druid/query/aggregation/BufferAggregator.java | 4 +++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java index 0ec525ead4a5..ce1582182d66 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchBuildBufferAggregator.java @@ -108,7 +108,7 @@ public Object get(final ByteBuffer buf, final int position) final Lock lock = stripedLock.getAt(lockIndex(position)).readLock(); lock.lock(); try { - return sketchCache.get(buf).get(position); + return sketchCache.get(buf).get(position).copy(); } finally { lock.unlock(); diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java index 74def15c0626..2e03d3cf6c96 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java @@ -66,7 +66,10 @@ public Object get(ByteBuffer buf, int position) // | k (byte) | numLongs (int) | bitset (long[numLongs]) | int sizeBytes = 1 + Integer.BYTES + (buf.getInt(position + 1) * Long.BYTES); mutationBuffer.limit(position + sizeBytes); - return mutationBuffer.slice(); + + ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes); + resultCopy.put(mutationBuffer.slice()); + return resultCopy; } @Override diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java index ecd0c11b526a..9ad18eecf126 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java @@ -72,7 +72,9 @@ public interface BufferAggregator extends HotLoopCallee * * Converts the given byte buffer representation into an intermediate aggregate Object * - * Implementations must not change the position, limit or mark of the given buffer + * Implementations must not change the position, limit or mark of the given buffer. + * + * The object returned must not have any references to the given buffer (i.e., make a copy). * * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the aggregate value is stored From 0ab2dcb7c09bd0d2e22af10ca47c8e37f4b667ad Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 12 Apr 2019 16:05:14 -0700 Subject: [PATCH 2/3] Update BufferAggregator docs, more agg fixes --- .../quantiles/DoublesSketchBuildBufferAggregator.java | 2 +- .../aggregation/bloom/BaseBloomFilterBufferAggregator.java | 1 + .../org/apache/druid/query/aggregation/BufferAggregator.java | 3 +++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java index ead9a6aa2807..609a46e01c07 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchBuildBufferAggregator.java @@ -69,7 +69,7 @@ public synchronized void aggregate(final ByteBuffer buffer, final int position) @Override public synchronized Object get(final ByteBuffer buffer, final int position) { - return sketches.get(buffer).get(position); + return sketches.get(buffer).get(position).compact(); } @Override diff --git a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java index 2e03d3cf6c96..ff866f9ffd65 100644 --- a/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java +++ b/extensions-core/druid-bloom-filter/src/main/java/org/apache/druid/query/aggregation/bloom/BaseBloomFilterBufferAggregator.java @@ -69,6 +69,7 @@ public Object get(ByteBuffer buf, int position) ByteBuffer resultCopy = ByteBuffer.allocate(sizeBytes); resultCopy.put(mutationBuffer.slice()); + resultCopy.rewind(); return resultCopy; } diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java index 9ad18eecf126..934f258fb182 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java @@ -76,6 +76,9 @@ public interface BufferAggregator extends HotLoopCallee * * The object returned must not have any references to the given buffer (i.e., make a copy). * + * If the corresponding {@link AggregatorFactory#combine(Object, Object)} method for this aggregator + * expects its inputs to be mutable, then the object returned by this method must be mutable. + * * @param buf byte buffer storing the byte array representation of the aggregate * @param position offset within the byte buffer at which the aggregate value is stored * @return the Object representation of the aggregate From 48f7f3be4645d167492be24b5d966f339d635ada Mon Sep 17 00:00:00 2001 From: jon-wei Date: Fri, 12 Apr 2019 16:09:30 -0700 Subject: [PATCH 3/3] Update BufferAggregator get() doc --- .../apache/druid/query/aggregation/BufferAggregator.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java index 934f258fb182..ed77c912c736 100644 --- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java +++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java @@ -74,7 +74,11 @@ public interface BufferAggregator extends HotLoopCallee * * Implementations must not change the position, limit or mark of the given buffer. * - * The object returned must not have any references to the given buffer (i.e., make a copy). + * + * The object returned must not have any references to the given buffer (i.e., make a copy), since the + * underlying buffer is a shared resource and may be given to another processing thread + * while the objects returned by this aggregator are still in use. + * * * If the corresponding {@link AggregatorFactory#combine(Object, Object)} method for this aggregator * expects its inputs to be mutable, then the object returned by this method must be mutable.