Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-6549: batch sizing for nested loop join #1363

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -37,6 +37,9 @@ public void setupNestedLoopJoin(FragmentContext context, RecordBatch left,
ExpandableHyperContainer rightContainer,
LinkedList<Integer> rightCounts,
NestedLoopJoinBatch outgoing);

void setTargetOutputCount(int targetOutputCount);

// Produce output records taking into account join type
public int outputRecords(JoinRelType joinType);

Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.join;

import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;

Expand All @@ -29,6 +30,7 @@
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
Expand All @@ -50,8 +52,8 @@
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.AllocationHelper;

import org.apache.drill.exec.record.JoinBatchMemoryManager;
import org.apache.drill.exec.record.RecordBatchSizer;
import com.google.common.base.Preconditions;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
Expand All @@ -65,9 +67,6 @@
public class NestedLoopJoinBatch extends AbstractBinaryRecordBatch<NestedLoopJoinPOP> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);

// Maximum number records in the outgoing batch
protected static final int MAX_BATCH_SIZE = 4096;

// Input indexes to correctly update the stats
protected static final int LEFT_INPUT = 0;
protected static final int RIGHT_INPUT = 1;
Expand Down Expand Up @@ -130,6 +129,11 @@ protected NestedLoopJoinBatch(NestedLoopJoinPOP popConfig, FragmentContext conte
super(popConfig, context, left, right);
Preconditions.checkNotNull(left);
Preconditions.checkNotNull(right);

// get the output batch size from config.
int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right, new HashSet<>());
logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
}

/**
Expand Down Expand Up @@ -162,6 +166,9 @@ public IterOutcome innerNext() {
}
// fall through
case OK:
// For right side, use aggregate i.e. average row width across batches
batchMemoryManager.update(RIGHT_INDEX, 0, true);
logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
addBatchToHyperContainer(right);
break;
case OUT_OF_MEMORY:
Expand All @@ -179,7 +186,9 @@ public IterOutcome innerNext() {
}

// allocate space for the outgoing batch
allocateVectors();
batchMemoryManager.allocateVectors(container);

nljWorker.setTargetOutputCount(batchMemoryManager.getOutputRowCount());

// invoke the runtime generated method to emit records in the output batch
outputRecords = nljWorker.outputRecords(popConfig.getJoinType());
Expand All @@ -193,6 +202,10 @@ public IterOutcome innerNext() {
container.setRecordCount(outputRecords);
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}

logger.debug("Number of records emitted: " + outputRecords);

return (outputRecords > 0) ? IterOutcome.OK : IterOutcome.NONE;
Expand Down Expand Up @@ -331,15 +344,6 @@ Logical expression may contain fields from left and right batches. During code g
return context.getImplementationClass(nLJCodeGenerator);
}

/**
* Simple method to allocate space for all the vectors in the container.
*/
private void allocateVectors() {
for (final VectorWrapper<?> vw : container) {
AllocationHelper.allocateNew(vw.getValueVector(), MAX_BATCH_SIZE);
}
}

/**
* Builds the output container's schema. Goes over the left and the right
* batch and adds the corresponding vectors to the output container.
Expand All @@ -352,6 +356,9 @@ protected void buildSchema() throws SchemaChangeException {
return;
}

batchMemoryManager.update(RIGHT_INDEX, 0, true);
logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));

if (leftUpstream != IterOutcome.NONE) {
leftSchema = left.getSchema();
for (final VectorWrapper<?> vw : left) {
Expand Down Expand Up @@ -380,15 +387,16 @@ protected void buildSchema() throws SchemaChangeException {
addBatchToHyperContainer(right);
}

allocateVectors();
nljWorker = setupWorker();

// if left batch is empty, fetch next
if (leftUpstream != IterOutcome.NONE && left.getRecordCount() == 0) {
leftUpstream = next(LEFT_INPUT, left);
}

container.setRecordCount(0);
batchMemoryManager.update(LEFT_INDEX, 0);
logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));

container.buildSchema(BatchSchema.SelectionVectorMode.NONE);

} catch (ClassTransformationException | IOException e) {
Expand All @@ -412,6 +420,26 @@ private void addBatchToHyperContainer(RecordBatch inputBatch) {

@Override
public void close() {
updateBatchMemoryManagerStats();

if (logger.isDebugEnabled()) {
logger.debug("BATCH_STATS, incoming aggregate left: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.LEFT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.LEFT_INDEX));

logger.debug("BATCH_STATS, incoming aggregate right: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumIncomingBatches(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputBatchSize(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getAvgInputRowWidth(JoinBatchMemoryManager.RIGHT_INDEX),
batchMemoryManager.getTotalInputRecords(JoinBatchMemoryManager.RIGHT_INDEX));

logger.debug("BATCH_STATS, outgoing aggregate: batch count : {}, avg bytes : {}, avg row bytes : {}, record count : {}",
batchMemoryManager.getNumOutgoingBatches(), batchMemoryManager.getAvgOutputBatchSize(),
batchMemoryManager.getAvgOutputRowWidth(), batchMemoryManager.getTotalOutputRecords());
}

rightContainer.clear();
rightCounts.clear();
super.close();
Expand Down
Expand Up @@ -29,12 +29,16 @@
import java.util.LinkedList;
import java.util.List;

import static org.apache.drill.exec.record.JoinBatchMemoryManager.LEFT_INDEX;

/*
* Template class that combined with the runtime generated source implements the NestedLoopJoin interface. This
* class contains the main nested loop join logic.
*/
public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {

private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NestedLoopJoinBatch.class);

// Current left input batch being processed
private RecordBatch left = null;

Expand All @@ -50,6 +54,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
// Iteration status tracker
private IterationStatusTracker tracker = new IterationStatusTracker();

private int targetOutputRecords;

/**
* Method initializes necessary state and invokes the doSetup() to set the
* input and output value vector references.
Expand All @@ -69,10 +75,14 @@ public void setupNestedLoopJoin(FragmentContext context,
this.leftRecordCount = left.getRecordCount();
this.rightCounts = rightCounts;
this.outgoing = outgoing;

doSetup(context, rightContainer, left, outgoing);
}

@Override
public void setTargetOutputCount(int targetOutputRecords) {
this.targetOutputRecords = targetOutputRecords;
}

/**
* Main entry point for producing the output records. Thin wrapper around populateOutgoingBatch(), this method
* controls which left batch we are processing and fetches the next left input batch once we exhaust the current one.
Expand All @@ -84,11 +94,11 @@ public int outputRecords(JoinRelType joinType) {
int outputIndex = 0;
while (leftRecordCount != 0) {
outputIndex = populateOutgoingBatch(joinType, outputIndex);
if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
if (outputIndex >= targetOutputRecords) {
break;
}
// reset state and get next left batch
resetAndGetNextLeft();
resetAndGetNextLeft(outputIndex);
}
return outputIndex;
}
Expand Down Expand Up @@ -128,7 +138,7 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
outputIndex++;
rightRecordMatched = true;

if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
if (outputIndex >= targetOutputRecords) {
nextRightRecordToProcess++;

// no more space left in the batch, stop processing
Expand All @@ -143,7 +153,7 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
// project records from the left side only, records from right will be null
emitLeft(nextLeftRecordToProcess, outputIndex);
outputIndex++;
if (outputIndex >= NestedLoopJoinBatch.MAX_BATCH_SIZE) {
if (outputIndex >= targetOutputRecords) {
nextLeftRecordToProcess++;

// no more space left in the batch, stop processing
Expand All @@ -165,7 +175,7 @@ private int populateOutgoingBatch(JoinRelType joinType, int outputIndex) {
* Resets some internal state which indicates the next records to process in the left and right batches,
* also fetches the next left input batch.
*/
private void resetAndGetNextLeft() {
private void resetAndGetNextLeft(int outputIndex) {
for (VectorWrapper<?> vw : left) {
vw.getValueVector().clear();
}
Expand All @@ -181,6 +191,8 @@ private void resetAndGetNextLeft() {
leftRecordCount = 0;
break;
case OK:
setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex));
logger.debug("BATCH_STATS, incoming left: {}", outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX));
leftRecordCount = left.getRecordCount();
break;
}
Expand Down