Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-6236: batch sizing for hash join #1227

Closed
wants to merge 2 commits into from
Closed

Conversation

ppadma
Copy link
Contributor

@ppadma ppadma commented Apr 19, 2018

No description provided.

@ppadma ppadma force-pushed the DRILL-6236 branch 2 times, most recently from ccce105 to aa7a3da Compare April 19, 2018 21:46
Copy link
Contributor

@Ben-Zvi Ben-Zvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to the other comments: We will have some work to do when this code is merged with the Hash-Join Spill code (PR coming soon). Some merge work would be just mechanical (e.g., HashJoinProbeTemplate code was moved into HashJoinBatch), but other would require some thinking. E.g., The new spill code re-assigns the left/right incoming when reading from the spill files -- should the Memory Manager be updated over that data ? (which was already read, via the original left/right).

OUTPUT_BATCH_COUNT,
AVG_OUTPUT_BATCH_BYTES,
AVG_OUTPUT_ROW_BYTES,
OUTPUT_RECORD_COUNT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metrics are to be used also by customers; is this information relevant for them ? Is this too detailed (e.g., can be logged instead).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is relevant in the sense that they provide high level picture of amount of data being processed, memory usage etc. by each operator. This is also helpful when debugging trying to figure out what is going on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting these metrics inside operator Metric class will not work. For joins these metrics were moved inside JoinBatchMemoryManager.Metric class since they are memory manager metrics. So when you call updateBatchMemoryManagerStats() it updates the operator stats but using ordinals from JoinBatchMemoryManager.Metric class. So the ordinal for LEFT_INPUT_BATCH_COUNT will be 0 not 4 (which is required).
I think we should improve our OperatorsMetricRegistry to register multiple Metric classes for an operator.

// skip first batch if count is zero, as it may be an empty schema batch
if (isFurtherProcessingRequired(rightUpstream) && right.getRecordCount() == 0) {
for (final VectorWrapper<?> w : right) {
w.clear();
}
rightUpstream = next(right);
// For build side, use aggregate i.e. average row width across batches
batchMemoryManager.update(RIGHT_INDEX, 0,true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is update() being called when the right has zero rows ? Shouldn't it be called for every new right incoming batch ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a call to "next" right above the update.

@@ -346,6 +369,7 @@ public void executeBuildPhase() throws SchemaChangeException, ClassTransformatio
}
// Fall through
case OK:
batchMemoryManager.update(LEFT_INDEX);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be the RIGHT_INDEX here ?

@@ -241,4 +261,41 @@ public int getOutputBatchSize() {
public int getOffsetVectorWidth() {
return UInt4Vector.VALUE_WIDTH;
}

public void allocateVectors(VectorContainer container) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleaner implementation: Just call the following method, with outputRowCount as the second parameter.

}

public void allocateVectors(List<ValueVector> valueVectors) {
// Allocate memory for the vectors.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same idea/comment as above; can avoid some duplicate code by calling allocateVectors(valueVectors, outputRecordCount)

if (inputBatchStats[index] == null) {
inputBatchStats[index] = new BatchStats();
}
updateIncomingStats(index);
}

public void setRecordBatchSizer(RecordBatchSizer sizer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can instead just call the above method with DEFAULT_INPUT_INDEX as the first parameter.

@@ -147,6 +149,12 @@ public int update(int inputIndex, int outputPosition) {
return getOutputRowCount();
}

public int update(int inputIndex, int outputPosition, boolean useAggregate) {
// by default just return the outputRowCount
return update(inputIndex, outputPosition, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an infinite recursive call ??

@ppadma
Copy link
Contributor Author

ppadma commented Apr 20, 2018

@Ben-Zvi Thanks a lot for the review. updated PR with review comments taken care of. Please take a look.

Regarding spill files, here are my thoughts.
For build side, I am using aggregate statistics i.e. average of all batches. On probe side, I am using stats for each batch coming in and adjusting the output row count. So, we can skip applying sizing for batches spilled from build side and continue to do what I am doing on the probe side. Once your code is merged in, I will refactor the code as needed.

@@ -157,14 +172,20 @@ public int metricId() {
}
}

public class HashJoinMemoryManager extends JoinBatchMemoryManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not required you can directly use JoinBatchMemoryManager.

OUTPUT_BATCH_COUNT,
AVG_OUTPUT_BATCH_BYTES,
AVG_OUTPUT_ROW_BYTES,
OUTPUT_RECORD_COUNT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Putting these metrics inside operator Metric class will not work. For joins these metrics were moved inside JoinBatchMemoryManager.Metric class since they are memory manager metrics. So when you call updateBatchMemoryManagerStats() it updates the operator stats but using ordinals from JoinBatchMemoryManager.Metric class. So the ordinal for LEFT_INPUT_BATCH_COUNT will be 0 not 4 (which is required).
I think we should improve our OperatorsMetricRegistry to register multiple Metric classes for an operator.

@ppadma
Copy link
Contributor Author

ppadma commented Apr 20, 2018

@sohami thanks for the review. updated with review comments addressed. please take a look.

@@ -560,6 +554,40 @@ public void close() {
super.close();
}

@Override
protected void updateBatchMemoryManagerStats() {
stats.setLongStat(Metric.LEFT_INPUT_BATCH_COUNT, batchMemoryManager.getNumIncomingBatches(LEFT_INDEX));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ppadma - The main motive of moving the metrics inside JoinBatchMemoryManager was to avoid duplicate definition in all the BinaryRecordBatches. I think we should improve on OperatorMetricsRegistry rather than just overriding this method.
Or if you want you can create a new JIRA to track metrics related improvement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sohami I will create a JIRA and address that in a separate PR. For now, I would like to override this method. is that ok ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fine.

@Ben-Zvi
Copy link
Contributor

Ben-Zvi commented Apr 20, 2018

Need to update the subject line of this PR: the '-' is missing between "DRILL" and "6236" (should be DRILL-6236) ; because of this missing '-' the PR is not listed in the Jira ....

@ppadma ppadma changed the title Drill 6236: batch sizing for hash join Drill-6236: batch sizing for hash join Apr 20, 2018
@ppadma
Copy link
Contributor Author

ppadma commented Apr 20, 2018

@Ben-Zvi my bad. I updated the title. but, it has not updated the JIRA. trying to figure this out.

@Ben-Zvi
Copy link
Contributor

Ben-Zvi commented Apr 20, 2018

Need to be "DRILL" in capital letters ...

@ppadma ppadma changed the title Drill-6236: batch sizing for hash join DRILL-6236: batch sizing for hash join Apr 20, 2018
@ppadma ppadma force-pushed the DRILL-6236 branch 2 times, most recently from b781d04 to 5b6ba46 Compare April 25, 2018 22:00
@ppadma
Copy link
Contributor Author

ppadma commented Apr 25, 2018

@Ben-Zvi I manually added the PR link to the JIRA. all code review comments are addressed. can you look at the latest changes ?

@ilooner
Copy link
Contributor

ilooner commented Apr 26, 2018

@ppadma Please fix travis failure

@ppadma
Copy link
Contributor Author

ppadma commented May 27, 2018

@Ben-Zvi I merged with spill to disk changes and fixed all issues. please take a look.

@@ -234,6 +235,7 @@ public HashJoinState getState() {
private int recordsPerPartitionBatchBuild;
private int recordsPerPartitionBatchProbe;
private int outputBatchNumRecords;
private int outputBatchSize;
private Map<String, Long> buildValueSizes;
private Map<String, Long> probeValueSizes;
private Map<String, Long> keySizes;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ppadma I think outputBatchNumRecords, buildValueSizes, probeValueSizes, and keySizes are unused now since we are directly passing in the outputBatchSize. This is great since the calculator has been simplified. Could you also delete these unused parameters?

@ppadma ppadma force-pushed the DRILL-6236 branch 2 times, most recently from 0d388b2 to 76bbc59 Compare May 30, 2018 21:34
@ppadma
Copy link
Contributor Author

ppadma commented May 30, 2018

@Ben-Zvi I rebased and updated the PR. Please review the latest diffs.

@ppadma ppadma force-pushed the DRILL-6236 branch 2 times, most recently from 45d2897 to 45f3b01 Compare May 30, 2018 22:21
@@ -448,8 +442,7 @@ private void calculateMemoryUsage()
safetyFactor,
reserveHash);

maxOutputBatchSize = computeMaxOutputBatchSize(buildValueSizes, probeValueSizes, keySizes,
outputBatchNumRecords, safetyFactor, fragmentationFactor);
maxOutputBatchSize = (long) (outputBatchSize * fragmentationFactor * safetyFactor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the "outputBatchSize" needs to be casted to (double) to ensure that the whole multiplication is performed as a double-multiplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

@ppadma ppadma force-pushed the DRILL-6236 branch 2 times, most recently from 7c33cd0 to a5664f2 Compare May 30, 2018 23:32
Copy link
Contributor

@Ben-Zvi Ben-Zvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few comments ...

@@ -262,6 +272,7 @@ private void executeProbePhase() throws SchemaChangeException {
probeBatch.getSchema());
}
case OK:
setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is called when a new LEFT incoming batch is read. At this point the outgoing batch may be "half full". Looks like this call is modifying the "targetOutputRecords" variable. If so, then it would not match the allocated size for the outgoing batch. For example, if made bigger, then the code above would try to add rows (to the outgoing) beyond the original allocation size !

Copy link
Contributor Author

@ppadma ppadma May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will not make it bigger. It will look at remaining memory and adjust the row count based on that.

Here is the relevant code from updateInternal function:
final long remainingMemory = Math.max(configOutputBatchSize - memoryUsed, 0);
// These are number of rows we can fit in remaining memory based on new outgoing row width.
final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);

@@ -85,13 +71,50 @@ public int update(int inputIndex, int outputPosition) {
}

@Override
public RecordBatchSizer.ColumnSize getColumnSize(String name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the overriding method deleted ? It is used by Lateral-Join and Merge-Join. By deleting it they are going to use the one from the super class (RecordBatchMemoryManager) .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it is redundant. super class is doing the same thing.

RecordBatchSizer rightSizer = getRecordBatchSizer(RIGHT_INDEX);
public int update(int inputIndex, int outputPosition, boolean useAggregate) {
switch (inputIndex) {
case LEFT_INDEX:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A cleanup suggestion: There are too many "update()" methods. And the LEFT never use aggregate, and the RIGHT always use aggregate. So how about instead:

private int foo(RecordBatch batch, int inputIndex, boolean useAggregate) {
     setRecordBatchSizer(inputIndex, new RecordBatchSizer(batch));
     updateIncomingStats(inputIndex);
     return useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocSize();
}

public int updateRight(RecordBatch batch,int outputPosition) {
   rightRowWidth = foo(batch,RIGHT_INDEX,true);
   return updateInternal(outputPosition);
}

public int updateLeft(RecordBatch batch,int outputPosition) {
   leftRowWidth = foo(batch,LEFT_INDEX,false);
   return updateInternal(outputPosition);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rearranged the code. Got rid of left and right. Instead, using array called rowWidth which can be indexed by input index. It is better now. Unfortunately, each operator calls update with different parameters. So, we have different versions of the same function.

Right is not always "use aggregate".
It is based on the operator. For example, for merge join, we do not use aggregate. It is batch by batch.

Copy link
Contributor

@Ben-Zvi Ben-Zvi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants