From 7f19b500facffe52bf5611d0ae58bfd4e4d0336f Mon Sep 17 00:00:00 2001 From: Sorabh Hamirwasia Date: Wed, 14 Mar 2018 16:59:29 -0700 Subject: [PATCH] DRILL-6323: Lateral Join - Refactor BatchMemorySize to put outputBatchSize in abstract class. Created a new JoinBatchMemoryManager to be shared across join record batches. Changed merge join to use AbstractBinaryRecordBatch instead of AbstractRecordBatch, and use JoinBatchMemoryManager --- .../exec/ops/OperatorMetricRegistry.java | 6 +- .../impl/flatten/FlattenRecordBatch.java | 14 +- .../physical/impl/join/MergeJoinBatch.java | 184 +++--------------- .../record/AbstractBinaryRecordBatch.java | 88 +++++++-- .../exec/record/AbstractRecordBatch.java | 1 - .../exec/record/JoinBatchMemoryManager.java | 124 ++++++++++++ .../exec/record/RecordBatchMemoryManager.java | 25 ++- .../drill/exec/record/RecordIterator.java | 2 +- 8 files changed, 260 insertions(+), 184 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java index c703071c55e..d9a5fdcae23 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java @@ -23,13 +23,13 @@ import org.apache.drill.exec.physical.impl.broadcastsender.BroadcastSenderRootExec; import org.apache.drill.exec.physical.impl.flatten.FlattenRecordBatch; import org.apache.drill.exec.physical.impl.join.HashJoinBatch; -import org.apache.drill.exec.physical.impl.join.MergeJoinBatch; import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec; import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch; import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch; -import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; +import org.apache.drill.exec.record.JoinBatchMemoryManager; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; /** * Registry of operator metrics. @@ -52,7 +52,7 @@ public class OperatorMetricRegistry { register(CoreOperatorType.EXTERNAL_SORT_VALUE, ExternalSortBatch.Metric.class); register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, ParquetRecordReader.Metric.class); register(CoreOperatorType.FLATTEN_VALUE, FlattenRecordBatch.Metric.class); - register(CoreOperatorType.MERGE_JOIN_VALUE, MergeJoinBatch.Metric.class); + register(CoreOperatorType.MERGE_JOIN_VALUE, JoinBatchMemoryManager.Metric.class); } private static void register(final int operatorType, final Class metricDef) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index aea415bb4e0..bbe9f7683f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -72,8 +72,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch { private boolean hasRemainder = false; private int remainderIndex = 0; private int recordCount; - private int outputBatchSize; - private final FlattenMemoryManager flattenMemoryManager = new FlattenMemoryManager(); + private final FlattenMemoryManager flattenMemoryManager; private final Flattener.Monitor monitor = new Flattener.Monitor() { @Override @@ -118,6 +117,10 @@ public int metricId() { private class FlattenMemoryManager extends RecordBatchMemoryManager { + FlattenMemoryManager(int outputBatchSize) { + super(outputBatchSize); + } + @Override public void update() { // Get sizing information for the batch. @@ -138,11 +141,13 @@ public void update() { // Average rowWidth of single element in the flatten list. // subtract the offset vector size from column data size. final int avgRowWidthSingleFlattenEntry = - RecordBatchSizer.safeDivide(columnSize.getTotalNetSize() - (OFFSET_VECTOR_WIDTH * columnSize.getValueCount()), columnSize.getElementCount()); + RecordBatchSizer.safeDivide(columnSize.getTotalNetSize() - (getOffsetVectorWidth() * columnSize.getValueCount()), + columnSize.getElementCount()); // Average rowWidth of outgoing batch. final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry; + final int outputBatchSize = getOutputBatchSize(); // Number of rows in outgoing batch setOutputRowCount(outputBatchSize, avgOutgoingRowWidth); @@ -165,7 +170,8 @@ public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext super(pop, context, incoming); // get the output batch size from config. - outputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + flattenMemoryManager = new FlattenMemoryManager(configuredBatchSize); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index ab50b227f26..ffcbae3780c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -17,12 +17,13 @@ */ package org.apache.drill.exec.physical.impl.join; -import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; - -import java.io.IOException; -import java.util.List; - +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.sun.codemodel.JClass; +import com.sun.codemodel.JConditional; +import com.sun.codemodel.JExpr; +import com.sun.codemodel.JMod; +import com.sun.codemodel.JVar; import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; @@ -44,34 +45,31 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.impl.common.Comparator; -import org.apache.drill.exec.record.RecordBatchSizer; +import org.apache.drill.exec.record.AbstractBinaryRecordBatch; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.JoinBatchMemoryManager; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatchSizer; import org.apache.drill.exec.record.RecordIterator; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.AbstractRecordBatch; -import org.apache.drill.exec.record.RecordBatchMemoryManager; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; -import com.google.common.base.Preconditions; -import com.sun.codemodel.JClass; -import com.sun.codemodel.JConditional; -import com.sun.codemodel.JExpr; -import com.sun.codemodel.JMod; -import com.sun.codemodel.JVar; +import java.io.IOException; +import java.util.List; + +import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; /** * A join operator merges two sorted streams using record iterator. */ -public class MergeJoinBatch extends AbstractRecordBatch { +public class MergeJoinBatch extends AbstractBinaryRecordBatch { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class); @@ -96,8 +94,6 @@ public class MergeJoinBatch extends AbstractRecordBatch { GM("doSetup", "doSetup", null, null), GM("doSetup", "doCompare", null, null)); - private final RecordBatch left; - private final RecordBatch right; private final RecordIterator leftIterator; private final RecordIterator rightIterator; private final JoinStatus status; @@ -105,41 +101,14 @@ public class MergeJoinBatch extends AbstractRecordBatch { private final List comparators; private final JoinRelType joinType; private JoinWorker worker; - private final int outputBatchSize; private static final String LEFT_INPUT = "LEFT INPUT"; private static final String RIGHT_INPUT = "RIGHT INPUT"; - private static final int numInputs = 2; - private static final int LEFT_INDEX = 0; - private static final int RIGHT_INDEX = 1; - - public enum Metric implements MetricDef { - LEFT_INPUT_BATCH_COUNT, - LEFT_AVG_INPUT_BATCH_BYTES, - LEFT_AVG_INPUT_ROW_BYTES, - LEFT_INPUT_RECORD_COUNT, - RIGHT_INPUT_BATCH_COUNT, - RIGHT_AVG_INPUT_BATCH_BYTES, - RIGHT_AVG_INPUT_ROW_BYTES, - RIGHT_INPUT_RECORD_COUNT, - OUTPUT_BATCH_COUNT, - AVG_OUTPUT_BATCH_BYTES, - AVG_OUTPUT_ROW_BYTES, - OUTPUT_RECORD_COUNT; + private class MergeJoinMemoryManager extends JoinBatchMemoryManager { - @Override - public int metricId() { - return ordinal(); - } - } - - private class MergeJoinMemoryManager extends RecordBatchMemoryManager { - private int leftRowWidth; - private int rightRowWidth; - - public MergeJoinMemoryManager() { - super(numInputs); + MergeJoinMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) { + super(outputBatchSize, leftBatch, rightBatch); } /** @@ -152,73 +121,22 @@ public MergeJoinMemoryManager() { */ @Override public void update(int inputIndex) { - switch(inputIndex) { - case LEFT_INDEX: - setRecordBatchSizer(inputIndex, new RecordBatchSizer(left)); - leftRowWidth = getRecordBatchSizer(inputIndex).netRowWidth(); - logger.debug("left incoming batch size : {}", getRecordBatchSizer(inputIndex)); - break; - case RIGHT_INDEX: - setRecordBatchSizer(inputIndex, new RecordBatchSizer(right)); - rightRowWidth = getRecordBatchSizer(inputIndex).netRowWidth(); - logger.debug("right incoming batch size : {}", getRecordBatchSizer(inputIndex)); - default: - break; - } - - updateIncomingStats(inputIndex); - final int newOutgoingRowWidth = leftRowWidth + rightRowWidth; - - // If outgoing row width is 0, just return. This is possible for empty batches or - // when first set of batches come with OK_NEW_SCHEMA and no data. - if (newOutgoingRowWidth == 0) { - return; - } - - // update the value to be used for next batch(es) - setOutputRowCount(outputBatchSize, newOutgoingRowWidth); - - // Adjust for the current batch. - // calculate memory used so far based on previous outgoing row width and how many rows we already processed. - final long memoryUsed = status.getOutPosition() * getOutgoingRowWidth(); - // This is the remaining memory. - final long remainingMemory = Math.max(outputBatchSize - memoryUsed, 0); - // These are number of rows we can fit in remaining memory based on new outgoing row width. - final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth); - - status.setTargetOutputRowCount(adjustOutputRowCount(status.getOutPosition() + numOutputRowsRemaining)); - setOutgoingRowWidth(newOutgoingRowWidth); - - logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output rowCount : {}", - outputBatchSize, getOutgoingRowWidth(), getOutputRowCount()); - } - - @Override - public RecordBatchSizer.ColumnSize getColumnSize(String name) { - RecordBatchSizer leftSizer = getRecordBatchSizer(LEFT_INDEX); - RecordBatchSizer rightSizer = getRecordBatchSizer(RIGHT_INDEX); - - if (leftSizer != null && leftSizer.getColumn(name) != null) { - return leftSizer.getColumn(name); - } - return rightSizer == null ? null : rightSizer.getColumn(name); + status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition())); } } - private final MergeJoinMemoryManager mergeJoinMemoryManager = new MergeJoinMemoryManager(); - protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { - super(popConfig, context, true); + super(popConfig, context, true, left, right); - outputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + // Instantiate the batch memory manager + final int outputBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + batchMemoryManager = new MergeJoinMemoryManager(outputBatchSize, left, right); if (popConfig.getConditions().size() == 0) { throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions"); } - this.left = left; - this.leftIterator = new RecordIterator(left, this, oContext, 0, false, mergeJoinMemoryManager); - this.right = right; - this.rightIterator = new RecordIterator(right, this, oContext, 1, mergeJoinMemoryManager); + this.leftIterator = new RecordIterator(left, this, oContext, 0, false, batchMemoryManager); + this.rightIterator = new RecordIterator(right, this, oContext, 1, batchMemoryManager); this.joinType = popConfig.getJoinType(); this.status = new JoinStatus(leftIterator, rightIterator, this); this.conditions = popConfig.getConditions(); @@ -242,21 +160,10 @@ public int getRecordCount() { public void buildSchema() { // initialize iterators status.initialize(); - final IterOutcome leftOutcome = status.getLeftStatus(); final IterOutcome rightOutcome = status.getRightStatus(); - if (leftOutcome == IterOutcome.STOP || rightOutcome == IterOutcome.STOP) { - state = BatchState.STOP; - return; - } - - if (leftOutcome == IterOutcome.OUT_OF_MEMORY || rightOutcome == IterOutcome.OUT_OF_MEMORY) { - state = BatchState.OUT_OF_MEMORY; - return; - } - if (leftOutcome == IterOutcome.NONE && rightOutcome == IterOutcome.NONE) { - state = BatchState.DONE; + if (!verifyOutcomeToSetBatchState(leftOutcome, rightOutcome)) { return; } @@ -274,12 +181,12 @@ public IterOutcome innerNext() { case BATCH_RETURNED: allocateBatch(false); status.resetOutputPos(); - status.setTargetOutputRowCount(mergeJoinMemoryManager.getOutputRowCount()); + status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount()); break; case SCHEMA_CHANGED: allocateBatch(true); status.resetOutputPos(); - status.setTargetOutputRowCount(mergeJoinMemoryManager.getOutputRowCount()); + status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount()); break; case NO_MORE_DATA: status.resetOutputPos(); @@ -359,12 +266,12 @@ private void setRecordCountInContainer() { Preconditions.checkArgument(!vw.isHyper()); vw.getValueVector().getMutator().setValueCount(getRecordCount()); } - mergeJoinMemoryManager.updateOutgoingStats(getRecordCount()); + batchMemoryManager.updateOutgoingStats(getRecordCount()); } @Override public void close() { - updateStats(); + updateBatchMemoryManagerStats(); super.close(); leftIterator.close(); rightIterator.close(); @@ -542,9 +449,9 @@ private void allocateBatch(boolean newSchema) { // Allocate memory for the vectors. // This will iteratively allocate memory for all nested columns underneath. - int outputRowCount = mergeJoinMemoryManager.getOutputRowCount(); + int outputRowCount = batchMemoryManager.getOutputRowCount(); for (VectorWrapper w : container) { - RecordBatchSizer.ColumnSize colSize = mergeJoinMemoryManager.getColumnSize(w.getField().getName()); + RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName()); colSize.allocateVector(w.getValueVector(), outputRowCount); } @@ -610,33 +517,4 @@ private LogicalExpression materializeExpression(LogicalExpression expression, It } return materializedExpr; } - - private void updateStats() { - stats.setLongStat(MergeJoinBatch.Metric.LEFT_INPUT_BATCH_COUNT, mergeJoinMemoryManager.getNumIncomingBatches(LEFT_INDEX)); - stats.setLongStat(MergeJoinBatch.Metric.LEFT_AVG_INPUT_BATCH_BYTES, mergeJoinMemoryManager.getAvgInputBatchSize(LEFT_INDEX)); - stats.setLongStat(MergeJoinBatch.Metric.LEFT_AVG_INPUT_ROW_BYTES, mergeJoinMemoryManager.getAvgInputRowWidth(LEFT_INDEX)); - stats.setLongStat(Metric.LEFT_INPUT_RECORD_COUNT, mergeJoinMemoryManager.getTotalInputRecords(LEFT_INDEX)); - - stats.setLongStat(MergeJoinBatch.Metric.RIGHT_INPUT_BATCH_COUNT, mergeJoinMemoryManager.getNumIncomingBatches(RIGHT_INDEX)); - stats.setLongStat(MergeJoinBatch.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, mergeJoinMemoryManager.getAvgInputBatchSize(RIGHT_INDEX)); - stats.setLongStat(MergeJoinBatch.Metric.RIGHT_AVG_INPUT_ROW_BYTES, mergeJoinMemoryManager.getAvgInputRowWidth(RIGHT_INDEX)); - stats.setLongStat(Metric.RIGHT_INPUT_RECORD_COUNT, mergeJoinMemoryManager.getTotalInputRecords(RIGHT_INDEX)); - - stats.setLongStat(MergeJoinBatch.Metric.OUTPUT_BATCH_COUNT, mergeJoinMemoryManager.getNumOutgoingBatches()); - stats.setLongStat(MergeJoinBatch.Metric.AVG_OUTPUT_BATCH_BYTES, mergeJoinMemoryManager.getAvgOutputBatchSize()); - stats.setLongStat(MergeJoinBatch.Metric.AVG_OUTPUT_ROW_BYTES, mergeJoinMemoryManager.getAvgOutputRowWidth()); - stats.setLongStat(MergeJoinBatch.Metric.OUTPUT_RECORD_COUNT, mergeJoinMemoryManager.getTotalOutputRecords()); - - logger.debug("left input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", - mergeJoinMemoryManager.getNumIncomingBatches(LEFT_INDEX), mergeJoinMemoryManager.getAvgInputBatchSize(LEFT_INDEX), - mergeJoinMemoryManager.getAvgInputRowWidth(LEFT_INDEX), mergeJoinMemoryManager.getTotalInputRecords(LEFT_INDEX)); - - logger.debug("right input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", - mergeJoinMemoryManager.getNumIncomingBatches(RIGHT_INDEX), mergeJoinMemoryManager.getAvgInputBatchSize(RIGHT_INDEX), - mergeJoinMemoryManager.getAvgInputRowWidth(RIGHT_INDEX), mergeJoinMemoryManager.getTotalInputRecords(RIGHT_INDEX)); - - logger.debug("output: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", - mergeJoinMemoryManager.getNumOutgoingBatches(), mergeJoinMemoryManager.getAvgOutputBatchSize(), - mergeJoinMemoryManager.getAvgOutputRowWidth(), mergeJoinMemoryManager.getTotalOutputRecords()); - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java index 02b07bb7ab6..70be9b59673 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java @@ -22,6 +22,9 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; public abstract class AbstractBinaryRecordBatch extends AbstractRecordBatch { + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(new Object() {}.getClass().getEnclosingClass()); + protected final RecordBatch left; protected final RecordBatch right; @@ -31,6 +34,9 @@ public abstract class AbstractBinaryRecordBatch exte // state (IterOutcome) of the right input protected IterOutcome rightUpstream = IterOutcome.NONE; + // For now only used by Lateral and Merge Join + protected RecordBatchMemoryManager batchMemoryManager; + protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { super(popConfig, context, true, context.newOperatorContext(popConfig)); @@ -45,48 +51,94 @@ protected AbstractBinaryRecordBatch(final T popConfig, final FragmentContext con this.right = right; } - /** - * Prefetch first batch from both inputs. - * @return true if caller should continue processing - * false if caller should stop and exit from processing. - */ - protected boolean prefetchFirstBatchFromBothSides() { - // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch - leftUpstream = next(0, left); - - rightUpstream = next(1, right); - - if (leftUpstream == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) { + protected boolean verifyOutcomeToSetBatchState(IterOutcome leftOutcome, IterOutcome rightOutcome) { + if (leftOutcome == IterOutcome.STOP || rightUpstream == IterOutcome.STOP) { state = BatchState.STOP; return false; } - if (leftUpstream == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) { + if (leftOutcome == IterOutcome.OUT_OF_MEMORY || rightUpstream == IterOutcome.OUT_OF_MEMORY) { state = BatchState.OUT_OF_MEMORY; return false; } - if (checkForEarlyFinish()) { + if (checkForEarlyFinish(leftOutcome, rightOutcome)) { state = BatchState.DONE; return false; } // EMIT outcome is not expected as part of first batch from either side - if (leftUpstream == IterOutcome.EMIT || rightUpstream == IterOutcome.EMIT) { + if (leftOutcome == IterOutcome.EMIT || rightOutcome == IterOutcome.EMIT) { state = BatchState.STOP; throw new IllegalStateException("Unexpected IterOutcome.EMIT received either from left or right side in " + "buildSchema phase"); } - return true; } + /** + * Prefetch first batch from both inputs. + * @return true if caller should continue processing + * false if caller should stop and exit from processing. + */ + protected boolean prefetchFirstBatchFromBothSides() { + // Left can get batch with zero or more records with OK_NEW_SCHEMA outcome as first batch + leftUpstream = next(0, left); + rightUpstream = next(1, right); + return verifyOutcomeToSetBatchState(leftUpstream, rightUpstream); + } + /* * Checks for the operator specific early terminal condition. * @return true if the further processing can stop. * false if the further processing is needed. */ - protected boolean checkForEarlyFinish() { - return (leftUpstream == IterOutcome.NONE && rightUpstream == IterOutcome.NONE); + protected boolean checkForEarlyFinish(IterOutcome leftOutcome, IterOutcome rightOutcome) { + return (leftOutcome == IterOutcome.NONE && rightOutcome == IterOutcome.NONE); + } + + protected void updateBatchMemoryManagerStats() { + stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_INPUT_BATCH_COUNT, + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX)); + stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_AVG_INPUT_BATCH_BYTES, + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX)); + stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_AVG_INPUT_ROW_BYTES, + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX)); + stats.setLongStat(JoinBatchMemoryManager.Metric.LEFT_INPUT_RECORD_COUNT, + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + + stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_INPUT_BATCH_COUNT, + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX)); + stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_AVG_INPUT_BATCH_BYTES, + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX)); + stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_AVG_INPUT_ROW_BYTES, + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX)); + stats.setLongStat(JoinBatchMemoryManager.Metric.RIGHT_INPUT_RECORD_COUNT, + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + + stats.setLongStat(JoinBatchMemoryManager.Metric.OUTPUT_BATCH_COUNT, + batchMemoryManager.getNumOutgoingBatches()); + stats.setLongStat(JoinBatchMemoryManager.Metric.AVG_OUTPUT_BATCH_BYTES, + batchMemoryManager.getAvgOutputBatchSize()); + stats.setLongStat(JoinBatchMemoryManager.Metric.AVG_OUTPUT_ROW_BYTES, + batchMemoryManager.getAvgOutputRowWidth()); + stats.setLongStat(JoinBatchMemoryManager.Metric.OUTPUT_RECORD_COUNT, + batchMemoryManager.getTotalOutputRecords()); + + logger.debug("left input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX)); + + logger.debug("right input: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX), + batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX)); + + logger.debug("output: batch count : {}, avg batch bytes : {}, avg row bytes : {}, record count : {}", + batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(), + batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 8bf18567db7..9d383c110ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -42,7 +42,6 @@ public abstract class AbstractRecordBatch implements protected final OperatorContext oContext; protected final OperatorStats stats; protected final boolean unionTypeEnabled; - protected BatchState state; protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java new file mode 100644 index 00000000000..843700085bd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record; + +import org.apache.drill.exec.ops.MetricDef; + +public class JoinBatchMemoryManager extends RecordBatchMemoryManager { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinBatchMemoryManager.class); + + private int leftRowWidth; + + private int rightRowWidth; + + private RecordBatch leftIncoming; + + private RecordBatch rightIncoming; + + private static final int numInputs = 2; + + public static final int LEFT_INDEX = 0; + + public static final int RIGHT_INDEX = 1; + + public JoinBatchMemoryManager(int outputBatchSize, RecordBatch leftBatch, RecordBatch rightBatch) { + super(numInputs, outputBatchSize); + this.leftIncoming = leftBatch; + this.rightIncoming = rightBatch; + } + + @Override + public int update(int inputIndex, int outputPosition) { + switch (inputIndex) { + case LEFT_INDEX: + setRecordBatchSizer(inputIndex, new RecordBatchSizer(leftIncoming)); + leftRowWidth = getRecordBatchSizer(inputIndex).netRowWidth(); + logger.debug("left incoming batch size : {}", getRecordBatchSizer(inputIndex)); + break; + case RIGHT_INDEX: + setRecordBatchSizer(inputIndex, new RecordBatchSizer(rightIncoming)); + rightRowWidth = getRecordBatchSizer(inputIndex).netRowWidth(); + logger.debug("right incoming batch size : {}", getRecordBatchSizer(inputIndex)); + default: + break; + } + + updateIncomingStats(inputIndex); + final int newOutgoingRowWidth = leftRowWidth + rightRowWidth; + + // If outgoing row width is 0, just return. This is possible for empty batches or + // when first set of batches come with OK_NEW_SCHEMA and no data. + if (newOutgoingRowWidth == 0) { + return getOutputRowCount(); + } + + // Adjust for the current batch. + // calculate memory used so far based on previous outgoing row width and how many rows we already processed. + final int previousOutgoingWidth = getOutgoingRowWidth(); + final long memoryUsed = outputPosition * previousOutgoingWidth; + + final int configOutputBatchSize = getOutputBatchSize(); + // This is the remaining memory. + final long remainingMemory = Math.max(configOutputBatchSize - memoryUsed, 0); + + // These are number of rows we can fit in remaining memory based on new outgoing row width. + final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth); + + // update the value to be used for next batch(es) + setOutputRowCount(configOutputBatchSize, newOutgoingRowWidth); + + // set the new row width + setOutgoingRowWidth(newOutgoingRowWidth); + + logger.debug("output batch size : {}, avg outgoing rowWidth : {}, output rowCount : {}", + getOutputBatchSize(), getOutgoingRowWidth(), getOutputRowCount()); + + return adjustOutputRowCount(outputPosition + numOutputRowsRemaining); + } + + @Override + public RecordBatchSizer.ColumnSize getColumnSize(String name) { + RecordBatchSizer leftSizer = getRecordBatchSizer(LEFT_INDEX); + RecordBatchSizer rightSizer = getRecordBatchSizer(RIGHT_INDEX); + + if (leftSizer != null && leftSizer.getColumn(name) != null) { + return leftSizer.getColumn(name); + } + return rightSizer == null ? null : rightSizer.getColumn(name); + } + + public enum Metric implements MetricDef { + LEFT_INPUT_BATCH_COUNT, + LEFT_AVG_INPUT_BATCH_BYTES, + LEFT_AVG_INPUT_ROW_BYTES, + LEFT_INPUT_RECORD_COUNT, + RIGHT_INPUT_BATCH_COUNT, + RIGHT_AVG_INPUT_BATCH_BYTES, + RIGHT_AVG_INPUT_ROW_BYTES, + RIGHT_INPUT_RECORD_COUNT, + OUTPUT_BATCH_COUNT, + AVG_OUTPUT_BATCH_BYTES, + AVG_OUTPUT_ROW_BYTES, + OUTPUT_RECORD_COUNT; + + @Override + public int metricId() { + return ordinal(); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java index c5f31a9125e..2100ae1238f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java @@ -18,15 +18,16 @@ package org.apache.drill.exec.record; import com.google.common.base.Preconditions; +import org.apache.drill.exec.vector.UInt4Vector; import org.apache.drill.exec.vector.ValueVector; public class RecordBatchMemoryManager { - protected static final int OFFSET_VECTOR_WIDTH = 4; protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT; protected static final int MIN_NUM_ROWS = 1; protected static final int DEFAULT_INPUT_INDEX = 0; private int outputRowCount = MAX_NUM_ROWS; private int outgoingRowWidth; + private int outputBatchSize; private RecordBatchSizer[] sizer; private BatchStats[] inputBatchStats; private BatchStats outputBatchStats; @@ -126,20 +127,28 @@ public long getTotalInputRecords(int index) { return inputBatchStats[index] == null ? 0 : inputBatchStats[index].getTotalRecords(); } - public RecordBatchMemoryManager(int numInputs) { + public RecordBatchMemoryManager(int numInputs, int configuredOutputSize) { this.numInputs = numInputs; + this.outputBatchSize = configuredOutputSize; sizer = new RecordBatchSizer[numInputs]; inputBatchStats = new BatchStats[numInputs]; outputBatchStats = new BatchStats(); } - public RecordBatchMemoryManager() { + public RecordBatchMemoryManager(int configuredOutputSize) { + this.outputBatchSize = configuredOutputSize; sizer = new RecordBatchSizer[numInputs]; inputBatchStats = new BatchStats[numInputs]; outputBatchStats = new BatchStats(); } - public void update(int inputIndex) {}; + public int update(int inputIndex, int outputPosition) { + // by default just return the outputRowCount + return getOutputRowCount(); + } + + public void update(int inputIndex) { + } public void update() {}; @@ -224,4 +233,12 @@ public void updateOutgoingStats(int outputRecords) { outputBatchStats.incTotalRecords(outputRecords); outputBatchStats.incSumBatchSizes(outgoingRowWidth * outputRecords); } + + public int getOutputBatchSize() { + return outputBatchSize; + } + + public int getOffsetVectorWidth() { + return UInt4Vector.VALUE_WIDTH; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java index 072a5cb5fcb..c868c7dda4b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java @@ -367,4 +367,4 @@ public void close() { clear(); clearInflightBatches(); } -} \ No newline at end of file +}