Skip to content

Commit

Permalink
DRILL-6453: Fix deadlock caused by reading from left and right inputs…
Browse files Browse the repository at this point in the history
… in HashJoin simultaneously.

closes #1408
  • Loading branch information
ilooner authored and vdiravka committed Aug 13, 2018
1 parent 93a1c5a commit 6ad0f9f
Show file tree
Hide file tree
Showing 17 changed files with 1,429 additions and 492 deletions.
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.common;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.common.exceptions.UserException;
Expand Down Expand Up @@ -122,6 +123,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
private List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList();
private long partitionInMemorySize;
private long numInMemoryRecords;
private boolean updatedRecordsPerBatch = false;

public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
RecordBatch buildBatch, RecordBatch probeBatch,
Expand Down Expand Up @@ -155,6 +157,18 @@ public HashPartition(FragmentContext context, BufferAllocator allocator, Chained
}
}

/**
* Configure a different temporary batch size when spilling probe batches.
* @param newRecordsPerBatch The new temporary batch size to use.
*/
public void updateProbeRecordsPerBatch(int newRecordsPerBatch) {
Preconditions.checkArgument(newRecordsPerBatch > 0);
Preconditions.checkState(!updatedRecordsPerBatch); // Only allow updating once
Preconditions.checkState(processingOuter); // We can only update the records per batch when probing.

recordsPerBatch = newRecordsPerBatch;
}

/**
* Allocate a new vector container for either right or left record batch
* Add an additional special vector for the hash values
Expand Down
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.join;

import org.apache.drill.exec.record.RecordBatch;

/**
* This class predicts the sizes of batches given an input batch.
*
* <h4>Invariants</h4>
* <ul>
* <li>The {@link BatchSizePredictor} assumes that a {@link RecordBatch} is in a state where it can return a valid record count.</li>
* </ul>
*/
public interface BatchSizePredictor {
/**
* Gets the batchSize computed in the call to {@link #updateStats()}. Returns 0 if {@link #hadDataLastTime()} is false.
* @return Gets the batchSize computed in the call to {@link #updateStats()}. Returns 0 if {@link #hadDataLastTime()} is false.
* @throws IllegalStateException if {@link #updateStats()} was never called.
*/
long getBatchSize();

/**
* Gets the number of records computed in the call to {@link #updateStats()}. Returns 0 if {@link #hadDataLastTime()} is false.
* @return Gets the number of records computed in the call to {@link #updateStats()}. Returns 0 if {@link #hadDataLastTime()} is false.
* @throws IllegalStateException if {@link #updateStats()} was never called.
*/
int getNumRecords();

/**
* True if the input batch had records in the last call to {@link #updateStats()}. False otherwise.
* @return True if the input batch had records in the last call to {@link #updateStats()}. False otherwise.
*/
boolean hadDataLastTime();

/**
* This method can be called multiple times to collect stats about the latest data in the provided record batch. These
* stats are used to predict batch sizes. If the batch currently has no data, this method is a noop. This method must be
* called at least once before {@link #predictBatchSize(int, boolean)}.
*/
void updateStats();

/**
* Predicts the size of a batch using the current collected stats.
* @param desiredNumRecords The number of records contained in the batch whose size we want to predict.
* @param reserveHash Whether or not to include a column containing hash values.
* @return The size of the predicted batch.
* @throws IllegalStateException if {@link #hadDataLastTime()} is false or {@link #updateStats()} was not called.
*/
long predictBatchSize(int desiredNumRecords, boolean reserveHash);

/**
* A factory for creating {@link BatchSizePredictor}s.
*/
interface Factory {
/**
* Creates a predictor with a batch whose data needs to be used to predict other batch sizes.
* @param batch The batch whose size needs to be predicted.
* @param fragmentationFactor A constant used to predict value vector doubling.
* @param safetyFactor A constant used to leave padding for unpredictable incoming batches.
*/
BatchSizePredictor create(RecordBatch batch, double fragmentationFactor, double safetyFactor);
}
}
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.join;

import com.google.common.base.Preconditions;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchSizer;
import org.apache.drill.exec.vector.IntVector;

import java.util.Map;

public class BatchSizePredictorImpl implements BatchSizePredictor {
private RecordBatch batch;
private double fragmentationFactor;
private double safetyFactor;

private long batchSize;
private int numRecords;
private boolean updatedStats;
private boolean hasData;

public BatchSizePredictorImpl(final RecordBatch batch,
final double fragmentationFactor,
final double safetyFactor) {
this.batch = Preconditions.checkNotNull(batch);
this.fragmentationFactor = fragmentationFactor;
this.safetyFactor = safetyFactor;
}

@Override
public long getBatchSize() {
Preconditions.checkState(updatedStats);
return hasData? batchSize: 0;
}

@Override
public int getNumRecords() {
Preconditions.checkState(updatedStats);
return hasData? numRecords: 0;
}

@Override
public boolean hadDataLastTime() {
return hasData;
}

@Override
public void updateStats() {
final RecordBatchSizer batchSizer = new RecordBatchSizer(batch);
numRecords = batchSizer.rowCount();
updatedStats = true;
hasData = numRecords > 0;

if (hasData) {
batchSize = getBatchSizeEstimate(batch);
}
}

@Override
public long predictBatchSize(int desiredNumRecords, boolean reserveHash) {
Preconditions.checkState(hasData);
// Safety factor can be multiplied at the end since these batches are coming from exchange operators, so no excess value vector doubling
return computeMaxBatchSize(batchSize,
numRecords,
desiredNumRecords,
fragmentationFactor,
safetyFactor,
reserveHash);
}

public static long computeValueVectorSize(long numRecords, long byteSize) {
long naiveSize = numRecords * byteSize;
return roundUpToPowerOf2(naiveSize);
}

public static long computeValueVectorSize(long numRecords, long byteSize, double safetyFactor) {
long naiveSize = RecordBatchSizer.multiplyByFactor(numRecords * byteSize, safetyFactor);
return roundUpToPowerOf2(naiveSize);
}

public static long roundUpToPowerOf2(long num) {
Preconditions.checkArgument(num >= 1);
return num == 1 ? 1 : Long.highestOneBit(num - 1) << 1;
}

public static long computeMaxBatchSizeNoHash(final long incomingBatchSize,
final int incomingNumRecords,
final int desiredNumRecords,
final double fragmentationFactor,
final double safetyFactor) {
long maxBatchSize = computePartitionBatchSize(incomingBatchSize, incomingNumRecords, desiredNumRecords);
// Multiple by fragmentation factor
return RecordBatchSizer.multiplyByFactors(maxBatchSize, fragmentationFactor, safetyFactor);
}

public static long computeMaxBatchSize(final long incomingBatchSize,
final int incomingNumRecords,
final int desiredNumRecords,
final double fragmentationFactor,
final double safetyFactor,
final boolean reserveHash) {
long size = computeMaxBatchSizeNoHash(incomingBatchSize,
incomingNumRecords,
desiredNumRecords,
fragmentationFactor,
safetyFactor);

if (!reserveHash) {
return size;
}

long hashSize = desiredNumRecords * ((long) IntVector.VALUE_WIDTH);
hashSize = RecordBatchSizer.multiplyByFactors(hashSize, fragmentationFactor);

return size + hashSize;
}

public static long computePartitionBatchSize(final long incomingBatchSize,
final int incomingNumRecords,
final int desiredNumRecords) {
return (long) Math.ceil((((double) incomingBatchSize) /
((double) incomingNumRecords)) *
((double) desiredNumRecords));
}

public static long getBatchSizeEstimate(final RecordBatch recordBatch) {
final RecordBatchSizer sizer = new RecordBatchSizer(recordBatch);
long size = 0L;

for (Map.Entry<String, RecordBatchSizer.ColumnSize> column : sizer.columns().entrySet()) {
size += computeValueVectorSize(recordBatch.getRecordCount(), column.getValue().getStdNetOrNetSizePerEntry());
}

return size;
}

public static class Factory implements BatchSizePredictor.Factory {
public static final Factory INSTANCE = new Factory();

private Factory() {
}

@Override
public BatchSizePredictor create(final RecordBatch batch,
final double fragmentationFactor,
final double safetyFactor) {
return new BatchSizePredictorImpl(batch, fragmentationFactor, safetyFactor);
}
}
}

0 comments on commit 6ad0f9f

Please sign in to comment.