Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-6845: Semi-Hash-Join to skip incoming build duplicates, automatically stop skipping if too few #1606

Open
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

Ben-Zvi
Copy link
Contributor

@Ben-Zvi Ben-Zvi commented Jan 9, 2019

The first two commits here were extracted from the original PR #1522 (DRILL-6735), where the Semi-Hash-Join was implemented in a straightforward way: Read data like a regular hash join (e.g. into partitions, then later build hash-tables), and only during probe time perform at most a single probe match.
The issue with the above implementation is the case of excessive incoming build-side duplicates (more common with synthetic data in benchmarks). In such a case, reading all the data first can blow up the hash join memory (e.g., cause spills) and regress performance.

This PR addresses the problem by creating the hash-tables first, and using them to detect build duplicates early (before copying from incoming into partitions), so those duplicates can be simply ignored/skipped (see the new method insertKeyIntoHashTable()). After all the build side is read (if no spill), there is no need to build the hash tables as they already exist - see the new method buildContainers() .
All this logic is in the first commit. The issue with this logic is that it adds overhead (e.g., hash table doubling), which is a waste when there are very little duplicates. So this issue is addressed by the second commit. (Also note the new option semi_skip_duplicates that can be used to disable this whole feature).

The second commit performs some "runtime statistics" to decide if there are too few duplicates. In such a case, it drops those hash tables and falls back to the simple semi-join work (a la PR #1522). This decision uses a "threshold", which is half the size of all the hash tables (so they won't double), and incoming duplicates are counted. After so many incoming rows are processed, the percentage of duplicates is checked - if under %20 (hard coded), then stop skipping, else continue using the hash tables to eliminate the duplicates.

The third commit extends the memory manager to handle this special "duplicate skipping" mode. With a new class HashJoinSpillControlImpl and interface HashJoinMemoryCalculator.HashJoinSpillControl. The technique used for shouldSpill() is simply ensuring that the available memory is large enough for at least 3 (see below) more batches. That required a change to all the shouldSpill() methods - add the currentVectorContainer parameter.
Most of the code changes in HashPartition were a rename (batch -> vectorContainer) and in HashJoinBatch (added "semi" to some variable names).
As for "running out of memory" while inserting into the hash table (either allocating a new keys batch, or resizing the hash table) -- this is handled by the hash table throwing RetryAfterSpillException, which is caught in the new insertKeyIntoHashTable() which leads to a spill, and a reset of the hash table anyway, and return false (it's a new key - it would be inserted into the new empty hash-table). So this case is much simpler than Hash-Aggr.

The fourth commit adds an option min_batches_in_available_memory instead of the above hard coded "3". Also added a method IntegerValidator that can specify the min/max values.

@Ben-Zvi Ben-Zvi requested a review from ilooner January 9, 2019 03:58
Copy link
Contributor

@ilooner ilooner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Ben-Zvi Most of the changes look good to me. My main feedback is on improving the accuracy of HashJoinSpillControlImpl.shouldSpill. Please take a look at my comments. Thanks.

@@ -154,6 +151,9 @@ void initialize(boolean firstCycle,
long getInMemorySize();
}

interface HashJoinSpillControl {
boolean shouldSpill(VectorContainer currentVectorContainer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The argument currentVectorContainer shouldn't be required. So the signature should be

boolean shouldSpill();

For details about why we don't need the arg see my comment on HashJoinSpillControl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The currentVectorContainer is used to asses the size of the next batch, which may vary as the incoming are read. If we make a (reasonable) assumption that there would be very little variation (as these are join keys), then that size can be given to the constructor and not passed as an argument to shouldSpill().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this argument (i.e. restored original signature). See below.


@Override
public boolean shouldSpill(VectorContainer currentVectorContainer) {
assert currentVectorContainer.hasRecordCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a heuristic that doesn't capture a lot of the memory requirements we need to keep track of like:

  • Reserving enough space for the output batch
  • Reserving space for the incoming probe batch. Note if we don't spill the incoming probe batch is owned by the upstream operator, but if we are processing a spilled partition, the incoming probe batch is owned by the HashJoin operator's memory allocator so we need to account for it.
  • Reserving room for the partial batches of each partition.

So I'm concerned we can run into unexpected OOMs.

However, I think we can account for all this and get a more accurate picture of memory usage. The shouldSpill method needs to to compute the memory to reserve for the three items above and needs to compute the total memory usage of each partition (HashTables + partition batches). Here is a snippet of my proposed implementation of should spill.

    public boolean shouldSpill() {
      // Handle early completion conditions
      if (buildPartitionStatSet.allSpilled()) {
        // All build side partitions are spilled so our memory calculation is complete
        return false;
      }

      long reservedMemory = calculateReservedMemory(
        buildPartitionStatSet.getNumSpilledPartitions(),
        getIncomingProbeBatchReservedSpace(),
        maxOutputBatchSize,
        partitionProbeBatchSize);

      // We are consuming our reserved memory plus the amount of memory for each build side
      // batch and the size of the hashtables
      consumedMemory = reservedMemory + RecordBatchSizer.multiplyByFactor(buildPartitionStatSet.getConsumedMemoryWithHashTable(), fragmentationFactor);
      return consumedMemory > memoryAvailable;
}

    public long getIncomingProbeBatchReservedSpace() {
      Preconditions.checkState(initialized);

      if (firstCycle) {
        return 0;
      } else {
        return probeSizePredictor.getBatchSize();
      }
    }

Dependencies

There are two dependencies for the above code to work.

Probe Size Predictor

This batch size predictor will give us an estimate of the size of an incoming probe batch so that we can reserve space for it. It can be instantiated by using the following snippet.

BatchSizePredictorImpl.Factory.INSTANCE.create(probeBatch, fragmentationFactor, safetyFactor);

PartitionStatSet

The PartitionStatSet is used to get the memory usage of each partition and is used by the other memory calculators. Currently the partitionStatSet can only give you the size of all of the temporary batches in all the partitions, but you will have to enhance it to give the size of all the temporary batches + the hash tables in all the partitions.

I suggest adding a getConsumedMemoryWithHashTable() method to PartitionStatSet. This method can then call a getInMemorySizeWithHashTable() method on each partition. Note you will have to add this method as well.

The getInMemorySizeWithHashTable() can be implemented for a partition like the following:

public getInMemorySizeWithHashTable() {
  getInMemorySize() + hashTable.getActualSize();
}

And that's it. I think if we do this we'll have a really accurate estimate of memory usage and can avoid OOMs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the three missing issues:

  • Reserving enough space for the output batch: Indeed was missing. I modified HashJoinSpillControlImpl to also take the batchMemoryManager, and then reserve the output batch size during the calculations.
    Note that just like a regular Hash-Join, this size is used while reading all the build side, before prefetchFirstProbeBatch() is called; so this size may not be relying on real probe side data.
  • Reserving space for the incoming probe batch: Not needed - The incoming size is not charged to the Hash-Join's available memory, as the incoming is just read and deallocated. (Except that optimization with num_partitions equal 1, but then there is no spilling).
  • Reserving room for the partial batches of each partition: This in a way is covered by this code, which is called prior to allocating these batches. One point that is not covered is variable length data types, but in this special case we are dealing with join keys only, which tend to be of a reasonable uniform sizes.

So all this new memory spill control code is only needed for special cases. In most reasonable uses of Semi-Join, the build side keys are mostly unique, and the Semi would stop skipping duplicates and fall back to the regular memory calculator. In extreme cases of duplicates (e.g., some benchmarks), then those would be skipped and consume zero extra memory. So the cases left are in the middle, with enough duplicates to skip, but still very many diverged keys.

While performing an aggregation like Hash-Aggr, the case of hash-tables here is much simpler than in Hash-Aggr, as there are no "values" to be matched. So memory need not be computed for the hash-Tables; any operation (put, or doubling) that would OOM, would be caught, then that partition would spill and reset, and then continue. The phase of "creating hash tables" in the regular Hash-Join build does not exist here (as the tables already exist).

Last: I changed that hard-coded %20 into an option: semi_percent_duplicates_to_skip (default 20). So now there are three options (also the spill triggering "num of batches in available mem", and disabling the whole feature). So if any user ever gets an OOM from the skip-duplicates semi, the user can easily tune some option to avoid that OOM.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boaz I still think we need to address the other two cases.

  • "Reserving space for the incoming probe batch: Not needed - The incoming size is not charged to the Hash-Join's available memory" This statement is not completely accurate. If the HashJoin operator has not spilled then this is correct. The upstream operator owns the probe batch and we don't need to account for it. But if we are processing a spilled partition the probe batch is read from the SpilledRecord batch and the HashJoin operator owns the memory for the probe batch. In the code snippet I proposed we would account for the probe batch in this case.

  • "Reserving room for the partial batches of each partition: This in a way is covered by this code, which is called prior to allocating these batches." I don't see how it's accounted for with the rest of the code. You are using a heuristic to save room for 3 partition batches and it is tunable by the user. This approach is problematic since it will not be sufficient if there are a large number of partitions (16 or 32). So the user will have to change the minBatchesInAvailableMemory setting to tune the operator for their data and the number of partitions. If we can easily account for this explicitly and automatically for the user, why not do it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Yes, you are correct. The single probe batch per spilled partition is not accounted for. The cleanest solution is to put this logic into the post Build calculator, and restore the "if" order (on line 1121) that was there initially - first check the postBuild and spill the partition if needed, and second check for semiSkipDuplicates.
    Here again it would be simple to just check if the available memory is large enough (for 3 probe batches, etc).
  • The term "partial" is confusing. Should it be: Need the memory calculator to adjust (down) the number of partitions so that each partition has enough memory to start working ? (unlike the regular case, a minimal hash-table needs to be accounted for here, for each partition).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another commit (3613689) addressing the above issues. The new "spill control" code was rearranged in a way similar to the regular memory calculator:

  • The parameter to shouldSpill() was removed. Instead that information (inner build batch size) is calculated from the batchMemoryManager.
  • A post-build class was added, which performs a similar calculation using the inner probe batch size, plus the number of spilled partitions (added as a parameter to initialize()). This would be called after the build, and spill whole partition(s) is there is not enough memory to hold one inner probe batch per a spilled partition. (The shouldSpill() call now covers "skipping semi" cases as well - restored the order mention in the prior comments).
  • The number of partitions is reduced as needed, in a manner similar to the regular memory calculator - initialize() calls an internal calculateMemoryUsage().

Ran some tests - with the current defaults is is practically impossible to spill (32 partitions, 1024 rows in inner batch, key size - all together much less than 1 meg, and each HJ gets at least 40 meg).


@Override
public boolean shouldSpill(VectorContainer currentVectorContainer) {
assert currentVectorContainer.hasRecordCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boaz I still think we need to address the other two cases.

  • "Reserving space for the incoming probe batch: Not needed - The incoming size is not charged to the Hash-Join's available memory" This statement is not completely accurate. If the HashJoin operator has not spilled then this is correct. The upstream operator owns the probe batch and we don't need to account for it. But if we are processing a spilled partition the probe batch is read from the SpilledRecord batch and the HashJoin operator owns the memory for the probe batch. In the code snippet I proposed we would account for the probe batch in this case.

  • "Reserving room for the partial batches of each partition: This in a way is covered by this code, which is called prior to allocating these batches." I don't see how it's accounted for with the rest of the code. You are using a heuristic to save room for 3 partition batches and it is tunable by the user. This approach is problematic since it will not be sufficient if there are a large number of partitions (16 or 32). So the user will have to change the minBatchesInAvailableMemory setting to tune the operator for their data and the number of partitions. If we can easily account for this explicitly and automatically for the user, why not do it?

// in case of a Semi Join skippinging duplicates, use a "spill control" calc
// (may revert back to the buildCalc if the code decides to stop skipping)
currentCalc = new HashJoinSpillControlImpl(allocator, RECORDS_PER_BATCH,
(int) context.getOptions().getOption(ExecConstants.HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need ExecConstants.HASHJOIN_MIN_BATCHES_IN_AVAILABLE_MEMORY_VALIDATOR . Please see my comments on the memory calculator implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to leave a small "slack" in memory to cover cases like inaccurate estimates (e.g., probe side), or varchar buffer doubling, and that "incoming" batch read from the spill file, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand we want to have a knob to allow for some slack. But there is already the safety factor parameter that does this. Why give the user yet another tuning parameter when one already exists? It makes things more difficult for the user.


@Override
public void setPartitionStatSet(HashJoinMemoryCalculator.PartitionStatSet partitionStatSet) {
// Do nothing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Save the PartitionStatSet so that you can use it to get hashtable sizes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new idea was to avoid calculating the hash-tables' sizes, by checking the total memory used instead.

* Calculate the number of partitions possible for the given available memory
* start at initialPartitions and adjust down (in powers of 2) as needed
*/
private void calculateMemoryUsage() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already calculated in the HashJoinBatch.partitionNumTuning() method. You should just use the partitionCount computed from there. You can get the number of partitions from the PartitionStatSet

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The calculation is a little different in this special case: Each partition needs a hash table (without a Hash-Join Helper) during the build time; which is not accounted for in the original method. Also the build side should be more "predictable", as it includes keys only.

Copy link
Contributor

@ilooner ilooner Jan 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the number of partitions is determined exclusively by the HashJoinMemoryCalculator.BuildSidePartitioning class. If you add new logic then numPartition should equal the min of the value compute by HashJoinMemoryCalculator.BuildSidePartitioning and any partition value your logic computes.

long batchSize = new RecordBatchSizer(currentVectorContainer).getActualSize() + 4 * recordsPerBatch;
public boolean shouldSpill() {
// Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
long batchSize = ( batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
long memoryAvailableNow = allocator.getLimit() - allocator.getAllocatedMemory() - reserveForOutgoing;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to account for the probe batch size here when we are processing a spilled partition. If we try to account for it in he PostBuildCalculator it will already be too late, because we will have already prefetched the probe side batch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to account for the partition batches that are being built for each partition here. I don't see any code that reserves space for them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • The incoming probe batch is not charged to this HJ allocator.
  • The internal probe batches needed (one for each spilled partition) are ignored here, but checked later in the "post build" stage (when the number of spilled partitions is known). If not enough memory at that "post build" time, then we spill one (or more) "pristine" (i.e., that never spilled before) partition.
  • The initial allocation of internal build batches (plus a hash table for each) is controlled by reducing the number of partitions as needed to accommodate all these.
  • Indeed the above did not cover the case where the internal probe batch is huge, and even if all the partitions are spilled, there will not be enough memory to hold all these batches. One possible solution here can be to reduce the number of rows per such batch (it's internal anyway). Another solution was made: In calculateMemoryUsage(), take into account the size of the internal probe batch when calculating the number of partitions. (Yes, this is based on schema only, not real data).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, unlike a regular "incoming", when reading back a spilled partition the "incoming" batch should be charged to the Hash-Join's allocator. That gap is covered by the MIN_BATCHES_IN_MEMORY slack (also used in the "num partitions" computation).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • By partition batch I was referring to the small batches that are used to store the records that are appended to each partition. They are stored in the HashPartition.tmpBatchesList list. We need to reserve space for one partition batch for each partition. Otherwise we can run into a scenario where we OOM when appending a row to a partition and allocate a new partition batch to store it. I already have code to account for these in my memory calculator. See HashJoinMemoryCalculatorImpl lines 361 and 373.

  • Also we should not be using MIN_BATCHES_IN_MEMORY. We can account for the probe batches exactly when handling a spilled partition. I have already implemented the logic for this in HashJoinMemoryCalculatorImpl lines 363 - 365, 378 - 380, and 383 - 389. It should just be a matter of copying the code and plugging it in.

long batchSize = new RecordBatchSizer(currentVectorContainer).getActualSize() + 4 * recordsPerBatch;
public boolean shouldSpill() {
// Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
long batchSize = ( batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
long memoryAvailableNow = allocator.getLimit() - allocator.getAllocatedMemory() - reserveForOutgoing;
boolean needsSpill = minBatchesInAvailableMemory * batchSize > memoryAvailableNow;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the comments above, this calculation is not clear to me.


@Override
public boolean shouldSpill() {
if ( numPartitionsSpilled == 0 ) { return false; } // no extra memory is needed if all the build side is in memory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code doesn't handle all the cases. The PostBuildCalculator will need to reserve space for the following:

  • Incoming probe batch in the case where we are processing a spilled partiton.
  • The existing in memory partition (Hashtables + batches).
  • Space for a spill batch for the spilled partitions. Ex. if we are processing the probe side for a spilled partition we append the records to a smaller batch for each partition before writing them out to disk.
  • space for the output batch.

The issue with using allocator.getAllocatedMemory() is that it mixes all of these things together, making it difficult to reserve space accurately. Specifically the spill batches for each partition are difficult to account for if they are lumped in with allocator.getAllocatedMemory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new method (used only for "duplicate skipping semi join") is simpler code than before, relying on the allocator's "getAllocatedMemory" to get the current memory state, no need to calculate in detail every hash table size, build side batches, other probe side batches, etc.

  • Indeed the "incoming" batch read from the spill is not specifically accounted for. However there's a "slack" (+ min batches in memory) that can cover that small gap.
  • All should be covered by allocator.getAllocatedMemory(), no need to reserve for that.
  • This "post" method cares about each spilled partition having one in-memory internal probe batch (else spill one more). That the numPartitionsSpilled * batchSize that is being checked.
  • Output batch: This is batchMemoryManager.getOutputBatchSize() .

With the new change to use the partitionStatSet to get the number of spilled partitions - how is this number going to be updated - in case the code returns true (i.e. spill one more) and then try again ? Where are those stats being updated ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • "Indeed the incoming batch read from the spill is not specifically accounted for. However there's a "slack" (+ min batches in memory) that can cover that small gap." We should not be introducing another tuning parameter with minBatchesInMemory, we already have the safetyFactor and fragmentationFactor tuning parameters. How about we do something like the following psuedo code:
if (firstCycle) { 
   partitionsMemoryUsage = allocator.getAllocatedMemory() ;
   probeReserveSpace = 0;
} else {
   partitionsMemoryUsage = allocator.getAllocatedMemory() - probeBatch.getSize();
   probeReserveSpace = probeSizePredictor.predictBatchSize(maxNumProbeRecords, false)
}

This would allow us to account for the probe data safely while using the existing safetyFactor and fragementationFactor parameters and would allow us to remove minBatchesInMemory.

  • PartitionStatSet collects all the stats from the HashPartitions directly. So if a HashPartition is spilled the PartitionStatSet will know. I've unit tested this and the other memory calculators use this logic without issue. The idea was to decouple the stats needed for memory calculations from the rest of the HashJoin operator, so that the memory calculators can be unit tested.

@Ben-Zvi
Copy link
Contributor Author

Ben-Zvi commented Jan 15, 2019

Commit added ( 6fb890c ) - The number of spilled partitions is taken from partitionStatSet (not passed via initialize()), and the size of the probe side is also considered when calculating the number of partitions possible (in calculateMemoryUsage() )

@Ben-Zvi Ben-Zvi closed this Jan 15, 2019
@Ben-Zvi
Copy link
Contributor Author

Ben-Zvi commented Jan 16, 2019

Seems that this PR was closed by mistake; re-opening now.
@ilooner - do you have more comments or suggestions ? we are trying to finish and commit this work soon. The spill control may not be perfect, but in most use cases this "duplicate skipping" work will not depend on such fine control.

@Ben-Zvi Ben-Zvi reopened this Jan 16, 2019
@Ben-Zvi Ben-Zvi changed the title Drill 6845: Semi-Hash-Join to skip incoming build duplicates, automatically stop skipping if too few DRILL-6845: Semi-Hash-Join to skip incoming build duplicates, automatically stop skipping if too few Jan 16, 2019
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);

numPartitions = currentCalc.getNumPartitions();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this the numPartitions = min(numPartitions, currentCalc.getNumPartitions())


// Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
int buildBatchSize = ( batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
int hashTableSize = buildBatchSize /* the keys in the HT */ +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please reuse the HashTableSizeCalculator here. It has already been unit tested and has the benefit of using the actual key sizes. So the user will not be required to provide the ExecConstants.MIN_HASH_TABLE_SIZE_KEY. For an example of how to use it see
HashJoinMemoryCalculatorImpl line 766. And for an example of how to compute the key sizes see HashJoinMemoryCalculatorImpl line 264 - 268.

long memoryAvailableNow = allocator.getLimit() - allocator.getAllocatedMemory() - reserveForOutgoing;

// Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
int buildBatchSize = ( batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I already have a unit tested utility class to do this. Please use BatchSizePredictor. See HashJoinMemoryImpl lines 253 and 360 .

A benefit of using the utility class is that it allows us to write unit tests for the memory calculator independent of the rest of the HashJoinBatch operator.

int hashTableSize = buildBatchSize /* the keys in the HT */ +
4 * (int)context.getOptions().getLong(ExecConstants.MIN_HASH_TABLE_SIZE_KEY) /* the initial hash table buckets */ +
(2 + 2) * recordsPerBatch; /* the hash-values and the links */
int probeBatchSize = ( batchMemoryManager.getRecordBatchSizer(LEFT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use BatchSizePredictor here. See HashJoinMemoryImpl lines 253 and 360.

long batchSize = new RecordBatchSizer(currentVectorContainer).getActualSize() + 4 * recordsPerBatch;
public boolean shouldSpill() {
// Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
long batchSize = ( batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
long reserveForOutgoing = batchMemoryManager.getOutputBatchSize();
long memoryAvailableNow = allocator.getLimit() - allocator.getAllocatedMemory() - reserveForOutgoing;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • By partition batch I was referring to the small batches that are used to store the records that are appended to each partition. They are stored in the HashPartition.tmpBatchesList list. We need to reserve space for one partition batch for each partition. Otherwise we can run into a scenario where we OOM when appending a row to a partition and allocate a new partition batch to store it. I already have code to account for these in my memory calculator. See HashJoinMemoryCalculatorImpl lines 361 and 373.

  • Also we should not be using MIN_BATCHES_IN_MEMORY. We can account for the probe batches exactly when handling a spilled partition. I have already implemented the logic for this in HashJoinMemoryCalculatorImpl lines 363 - 365, 378 - 380, and 383 - 389. It should just be a matter of copying the code and plugging it in.


long memoryNeededPerPartition = Integer.max(buildBatchSize + hashTableSize, probeBatchSize);

for ( numPartitions = initialPartitions; numPartitions > 2; numPartitions /= 2 ) { // need at least 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need to account for probe batches. I did this when calculating the number of partitions for in my memory calculator see HashJoinMemoryCalculatorImpl lines 363 - 365, 378 - 380, and 383 - 389

if ( numPartitionsSpilled == 0 ) { return false; } // no extra memory is needed if all the build side is in memory
if ( probeEmpty ) { return false; } // no probe side data
// Expected new batch size like the current, plus the Hash Values vector (4 bytes per HV)
long batchSize = ( batchMemoryManager.getRecordBatchSizer(LEFT_INDEX).getRowAllocWidth() + 4 ) * recordsPerBatch;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use BatchSizePredictorHere


@Override
public boolean shouldSpill() {
if ( numPartitionsSpilled == 0 ) { return false; } // no extra memory is needed if all the build side is in memory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • "Indeed the incoming batch read from the spill is not specifically accounted for. However there's a "slack" (+ min batches in memory) that can cover that small gap." We should not be introducing another tuning parameter with minBatchesInMemory, we already have the safetyFactor and fragmentationFactor tuning parameters. How about we do something like the following psuedo code:
if (firstCycle) { 
   partitionsMemoryUsage = allocator.getAllocatedMemory() ;
   probeReserveSpace = 0;
} else {
   partitionsMemoryUsage = allocator.getAllocatedMemory() - probeBatch.getSize();
   probeReserveSpace = probeSizePredictor.predictBatchSize(maxNumProbeRecords, false)
}

This would allow us to account for the probe data safely while using the existing safetyFactor and fragementationFactor parameters and would allow us to remove minBatchesInMemory.

  • PartitionStatSet collects all the stats from the HashPartitions directly. So if a HashPartition is spilled the PartitionStatSet will know. I've unit tested this and the other memory calculators use this logic without issue. The idea was to decouple the stats needed for memory calculations from the rest of the HashJoin operator, so that the memory calculators can be unit tested.

@ilooner
Copy link
Contributor

ilooner commented Jan 16, 2019

@Ben-Zvi let's not rush this PR. I agree with you we should not be trying to do things perfectly, that's why I mainly only focus on functional correctness in my reviews and avoid superficial comments about variable names and shortening lines of code. As you've experienced first hand with HashAgg, getting memory calculations right is extremely tricky, and they're even trickier to debug when users hit bugs. Let's make sure the logic is rock solid and unit tested while everything is still fresh in our minds. Doing this now will save us a lot more time in the coming months. Plus I think we are getting pretty close, I don't think there is that much code left to write.

If there is a time crunch and this needs to go into our private branch soon, I have no issues with putting this into the private branch, and continuing the review process in open source. Since the changes are mainly in the memory calculators, the chances of any significant merge conflict are almost zero.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants