From 9e753ce8e3c12dcf714c8b7a83853e555d3e1b0a Mon Sep 17 00:00:00 2001 From: Salim Achouche Date: Sun, 22 Apr 2018 18:02:35 -0700 Subject: [PATCH] DRILL-6348: Received batches are now owned by the receive operators instead of the parent --- .../drill/exec/ops/FragmentContextImpl.java | 5 ++++- .../physical/impl/MergingReceiverCreator.java | 2 +- .../impl/mergereceiver/MergingRecordBatch.java | 3 +++ .../UnorderedReceiverBatch.java | 3 +++ .../UnorderedReceiverCreator.java | 2 +- .../exec/work/batch/AbstractDataCollector.java | 17 +++++++++++++++++ .../drill/exec/work/batch/DataCollector.java | 14 +++++++++++++- .../drill/exec/work/batch/IncomingBuffers.java | 10 +++++++--- 8 files changed, 49 insertions(+), 7 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java index c9b20705ffb..65792827d2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java @@ -428,13 +428,16 @@ public boolean isImpersonationEnabled() { public void close() { waitForSendComplete(); + // Close the buffers before closing the operators; this is needed as buffer ownership + // is attached to the receive operators. + suppressingClose(buffers); + // close operator context for (OperatorContextImpl opContext : contexts) { suppressingClose(opContext); } suppressingClose(bufferManager); - suppressingClose(buffers); suppressingClose(allocator); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java index 0ef84b960f6..66a0cc2c254 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java @@ -42,7 +42,7 @@ public MergingRecordBatch getBatch(ExecutorFragmentContext context, IncomingBuffers bufHolder = context.getBuffers(); assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared."; - RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId()); + RawBatchBuffer[] buffers = bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers(); return new MergingRecordBatch(context, receiver, buffers); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 7e5ff2126fe..9087757b0ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -136,6 +136,9 @@ public MergingRecordBatch(final ExchangeFragmentContext context, this.config = config; this.inputCounts = new long[config.getNumSenders()]; this.outputCounts = new long[config.getNumSenders()]; + + // Register this operator's buffer allocator so that incoming buffers are owned by this allocator + context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator()); } @SuppressWarnings("resource") diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 9da8a4b6f13..fcf258e3778 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -86,6 +86,9 @@ public UnorderedReceiverBatch(final ExchangeFragmentContext context, final RawFr this.stats = oContext.getStats(); this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); this.config = config; + + // Register this operator's buffer allocator so that incoming buffers are owned by this allocator + context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator()); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java index 01a458890ac..3dcdfc4ef7a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java @@ -37,7 +37,7 @@ public UnorderedReceiverBatch getBatch(ExecutorFragmentContext context, Unordere IncomingBuffers bufHolder = context.getBuffers(); assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared."; - RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId()); + RawBatchBuffer[] buffers = bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers(); assert buffers.length == 1; RawBatchBuffer buffer = buffers[0]; return new UnorderedReceiverBatch(context, buffer, receiver); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java index b6b4183e771..bb3a5a266da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicIntegerArray; import org.apache.drill.common.AutoCloseables; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitControl.Collector; import org.apache.drill.exec.record.RawFragmentBatch; @@ -37,6 +38,8 @@ public abstract class AbstractDataCollector implements DataCollector { private final int incomingStreams; protected final RawBatchBuffer[] buffers; protected final ArrayWrappedIntIntMap fragmentMap; + /** Allocator which owns incoming batches */ + protected BufferAllocator ownerAllocator; /** * @param parentAccounter @@ -53,6 +56,7 @@ public AbstractDataCollector(AtomicInteger parentAccounter, this.parentAccounter = parentAccounter; this.remainders = new AtomicIntegerArray(incomingStreams); this.oppositeMajorFragmentId = collector.getOppositeMajorFragmentId(); + this.ownerAllocator = context.getAllocator(); // Create fragmentId to index that is within the range [0, incoming.size()-1] // We use this mapping to find objects belonging to the fragment in buffers and remainders arrays. fragmentMap = new ArrayWrappedIntIntMap(); @@ -116,4 +120,17 @@ public void close() throws Exception { AutoCloseables.close(buffers); } + /** {@inheritDoc} */ + @Override + public BufferAllocator getAllocator() { + return this.ownerAllocator; + } + + /** {@inheritDoc} */ + @Override + public void setAllocator(BufferAllocator allocator) { + Preconditions.checkArgument(allocator != null, "buffer allocator cannot be null"); + this.ownerAllocator = allocator; + } + } \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java index 026fc81e1e1..fa746770b18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java @@ -19,13 +19,25 @@ import java.io.IOException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.RawFragmentBatch; -interface DataCollector extends AutoCloseable { +public interface DataCollector extends AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataCollector.class); public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException ; public int getOppositeMajorFragmentId(); public RawBatchBuffer[] getBuffers(); public int getTotalIncomingFragments(); public void close() throws Exception; + /** + * Enables caller (e.g., receiver) to attach its buffer allocator to this Data Collector in order + * to claim ownership of incoming batches; by default, the fragment allocator owns these batches. + * + * @param allocator operator buffer allocator + */ + void setAllocator(BufferAllocator allocator); + /** + * @return allocator + */ + BufferAllocator getAllocator(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java index 876c8b5b558..2d1b4f2f0c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java @@ -27,6 +27,7 @@ import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.concurrent.AutoCloseableLock; import org.apache.drill.exec.exception.FragmentSetupException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitControl.Collector; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -103,8 +104,11 @@ public boolean batchArrived(final IncomingDataBatch incomingBatch) throws Fragme Arrays.toString(collectorMap.values().toArray()))); } + // Use the Data Collector's buffer allocator if set, otherwise the fragment's one + BufferAllocator ownerAllocator = collector.getAllocator(); + synchronized (collector) { - final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(context.getAllocator()); + final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(ownerAllocator); boolean decrementedToZero = collector .batchArrived(incomingBatch.getHeader().getSendingMinorFragmentId(), newRawFragmentBatch); newRawFragmentBatch.release(); @@ -125,8 +129,8 @@ public int getRemainingRequired() { return rem; } - public RawBatchBuffer[] getBuffers(int senderMajorFragmentId) { - return collectorMap.get(senderMajorFragmentId).getBuffers(); + public DataCollector getCollector(int senderMajorFragmentId) { + return collectorMap.get(senderMajorFragmentId); } public boolean isDone() {