Skip to content

Implement Out-of-Core Hash Join and Re-Work Query Verification #4189

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 160 commits into from
Aug 24, 2022

Conversation

lnkuiper
Copy link
Contributor

Out-of-Core Hash Join

This PR adds out-of-core functionality to our hash join, allowing it to join more data than fits in memory.
Optimizations to this functionality will be done in future PRs, for now this adds just the basic functionality.

This functionality has been added to the PhysicalHashJoin, and is triggered dynamically during runtime, so the optimizer does not have to predict whether the data fits in memory.

With an out-of-core join there's basically two options: sorting or partitioning. We've opted for partitioning.
In summary, I've changed the following in PhysicalHashJoin:

Sink

Each thread now has its own thread-local HT.

There's now a threshold on how much data the build side accumulates. When the threshold is reached, we swizzled the pointers to variable-size data, and unpin the blocks, allowing them to be spilled to disk.

Finalize

If we did not accumulate enough data to warrant an external join, the thread-local HT's are merged into the thread-global HT, and the operator does the same old hash join we all know.

If we did accumulate enough data for an external join, we schedule tasks for each thread to radix-partition their thread-local data.
After all threads finish partitioning their data, we build a hash table on some of the partitions, which is decided by the memory limit.

Execute

If we're doing an external hash join, the probe looks slightly different. We check the radix of the probe-side hashes, and determine if we can probe it against the current partitioned hash table. The rows that can be probed, are simply probed against the hash table.

The rows that cannot be probed against the hash table are accumulated. If a threshold is exceeded, we swizzle the pointers in the accumulated probe-side data, and unpin the blocks, allowing them to be spilled to disk.

GetData

After the initial probe phase, each thread partitions their thread-local probe-side data.
Then, one thread builds the hash table for the next partitions, and all threads probe it using the same partitions. Rinse and repeat until all partitions are done.

Query Verification Re-Work

This PR also re-works query verification, which happens during testing. I've added a StatementVerifier base class, which has a few derived classes: CopiedStatementVerifier, DeserializedStatementVerifier, ParsedStatementVerifier, PreparedStatementVerifier, UnoptimizedStatementVerifier.

We performed all of these verifications before, but now they are contained within their own class. I've also added the new ExternalStatementVerifier, which runs a query with force_external turned on, and compares the result to running the query with force_external turned off. This saves a ton of time writing tests for operators with out-of-core capabilities, as we can just add PRAGMA verify_external to existing tests.

@lnkuiper
Copy link
Contributor Author

Just noticed that I refactored the parallel state machine once more after you reviewed it, but the idea is the same.

Also passing CI now, so I think this is good to go

Copy link
Contributor

@hawkfish hawkfish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked at the meat yet, but the RDC work is fresh in my mind right now.

auto &segment = *segments[segment_index];
lstate.current_chunk_state.properties = state.scan_state.properties;
segment.ReadChunk(chunk_index, lstate.current_chunk_state, result, state.scan_state.column_ids);
lstate.current_row_index = row_index;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work when row_index is non-zero? It only seems to write it, not read it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've copied this code from Mark CDC PR. I've just split scanning up in acquiring indices and actually reading the data, rather than them being in one function. I needed the more fine-grained control for my parallel state machine in PhysicalHashJoin::GetData

I think it should work for non-zero row indices, since ColumnDataCollection::NextScanIndex can assign non-zero indices.

@@ -166,9 +166,9 @@ void RowDataCollectionScanner::Scan(DataChunk &chunk) {
// Eagerly delete references to blocks that we've passed
if (flush) {
for (idx_t i = 0; i < read_state.block_idx; ++i) {
rows.blocks[i].block = nullptr;
rows.blocks[i]->block = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have moved this to the end to handle the case where the last block is completely read. Right now, the last block will not get flushed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea!

@@ -108,7 +108,7 @@ struct LocalSortState {
//! Sort the data accumulated so far
void Sort(GlobalSortState &global_sort_state, bool reorder_heap);
//! Concatenate the blocks held by a RowDataCollection into a single block
static RowDataBlock ConcatenateBlocks(RowDataCollection &row_data);
unique_ptr<RowDataBlock> ConcatenateBlocks(RowDataCollection &row_data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this no longer static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, that was by accident

vector<BufferHandle> Build(idx_t added_count, data_ptr_t key_locations[], idx_t entry_sizes[],
const SelectionVector *sel = FlatVector::IncrementalSelectionVector());

void Merge(RowDataCollection &other);
unique_ptr<RowDataCollection> CopyEmpty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this used to be called CloneEmpty? That had a keep_pinned argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you've added that in your branch with the keep_pinned arg. I don't seem to have it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was me (Richard Wesley 2022-07-12 14:26:35 -0700 44) but I have no memory of doing it - or even why! But in hindsight it is the right API (the clone may want to behave differently).

void GetData(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate,
LocalSourceState &lstate) const override;

//! Becomes a source when it is an external join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment does not match the implementation - in fact it seems to contradict it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How so? Right now the control flow of the operator is:

  1. Sink operator interface for build side
  2. Operator interface for probing
  3. Source operator interface for partitioned probing (anything that we couldn't probe in (2.))

It has to always be marked as a source because we need to schedule the source pipeline before execution (it can decide at any point during execution to go external and become a source op).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry not sure what I was thinking there - it makes sense now.

@lnkuiper
Copy link
Contributor Author

Fixed #4445, all tests passing now. Can merge when @hannes gets back and @hawkfish gives the OK!

}

unique_ptr<RowDataCollection> RowDataCollection::CloneEmpty(bool keep_pinned_p = false) {
return make_unique<RowDataCollection>(buffer_manager, block_capacity, entry_size, keep_pinned_p);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this was me originally, but we should keep an eye on this because I'm not clear why it shouldn't just use this->keep_pinned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an in-memory block collection and a spillable block collection. The in-memory block collection needs to keep the blocks pinned because the pointers need to stay valid. The spillable block collection does not need to keep them pinned, because it will contain only aligned data/heap blocks with offsets, no pointers.

@@ -57,11 +60,55 @@ JoinHashTable::JoinHashTable(BufferManager &buffer_manager, const vector<JoinCon
idx_t block_capacity = MaxValue<idx_t>(STANDARD_VECTOR_SIZE, (Storage::BLOCK_SIZE / entry_size) + 1);
block_collection = make_unique<RowDataCollection>(buffer_manager, block_capacity, entry_size);
string_heap = make_unique<RowDataCollection>(buffer_manager, (idx_t)Storage::BLOCK_SIZE, 1, true);
swizzled_block_collection = block_collection->CloneEmpty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept getting burned by keep_pinned not transferring. Don't know if that is a problem here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the swizzled_block_collection does not need to have the keep_pinned flag no?

The idea of the swizzled_block_collection is that it is spillable, and does not need to keep the blocks pinned.

Copy link
Contributor

@hawkfish hawkfish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry two more to go, but hopefully this will help.

auto &info = correlated_mark_join_info;
lock_guard<mutex> mj_lock(info.mj_lock);
has_null = has_null || other.has_null;
if (!correlated_mark_join_info.correlated_types.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is empty, shouldn't it get copied?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correlated_types field should be the same for this and other, because it gets set on initialization of the HT, so no need to copy the vector holding the types.

Only the aggregated data in correlated_counts needs to be combined (and the has_null flag needs to be correctly set).

}
}

lock_guard<mutex> lock(partition_lock);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the other partition_lock need to be held as you are moving blocks out of it? Or does the caller guarantee that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving blocks out of the partitions should always be done by a single thread, so no need to hold the lock.

const auto col_offset = layout.GetOffsets()[col_no];
RowOperations::Gather(addresses, sel_vector, vector, sel_vector, found_entries, col_offset, col_no);
}
idx_t left_column_count = result.ColumnCount() - build_types.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method says full outer, but this looks like a right outer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is essentially a full/outer scan, but since the right side is our build side, we need to do this for right outer too. The method already had this name, I would rather not change it.

@@ -794,4 +888,221 @@ idx_t JoinHashTable::FillWithHTOffsets(data_ptr_t *key_locations, JoinHTScanStat
}
return key_count;
}

void JoinHashTable::SwizzleBlocks() {
RowDataCollectionScanner::AlignHeapBlocks(*swizzled_block_collection, *swizzled_string_heap, *block_collection,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sure you have my 2022-08-17 changes in b8de547 here - the swizzling was not right across block boundaries. Which I have found is the really dangerous part of all this stuff...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. I think we fixed the code for AlignHeapBlocks now, especially with your additional efforts to validate the swizzling code.


auto &heap_block = heap_blocks[block_idx];
D_ASSERT(data_block->count == heap_block->count);
auto heap_handle = buffer_manager.Pin(heap_block->block);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might improve safety here to assert that the heap blocks are pinned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We pin them both here, I don't see why this should be asserted

@hannes hannes added this to the 0.5.0 milestone Aug 22, 2022
@hannes
Copy link
Member

hannes commented Aug 23, 2022

Sorry another merge conflict...

@lnkuiper
Copy link
Contributor Author

That's ok, merge conflicts keep me off the streets. I hope I merged it right, waiting for CI

return;
}

// If input data does not have keep_pinned, it should already be aligned
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that right? I wanted this to work even when the blocks were not aligned e.g., the OVER() case in PhysicalWindow where both sets of blocks need to spill. My current long term vision for window is that it will use CDC for parallel hash grouping, then sort the groups, eventually resulting in the current sorted RDCs. Won't that result in unaligned blocks that are swizzled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If keep_pinned is false, then the BufferManager can decide to evict heap blocks, invalidating pointers. Although I may not be 100% sure about how you're using this. We should have a proper discussion about this at some point.

However, I am leaving for vacation this Friday, and the feature freeze is next week. We want this in before 5.0, so I've decided to postpone fixing this until after I get back. I've now left AlignHeapBlocks untouched, and moved my code to JoinHashTable::SwizzleBlocks.

void GetData(ExecutionContext &context, DataChunk &chunk, GlobalSourceState &gstate,
LocalSourceState &lstate) const override;

//! Becomes a source when it is an external join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry not sure what I was thinking there - it makes sense now.

}

// Init local counts of the partition blocks
uint32_t block_counts[CONSTANTS::NUM_PARTITIONS];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vector<uint32_t>(CONSTANTS::NUM_PARTITIONS, 0)? Or are you trying to avoid memory allocation/fragmentation by allocating on the stack? (I'm asking more for my own understanding of some of our code idioms.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only used within this scope so I prefer allocating on the stack.

#endif
}

static inline void NonTemporalWrite(data_ptr_t &data_ptr, const idx_t &row_width, uint32_t &block_count,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the name of this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the partitioning code, we allocate a small temporary buffer that (hopefully) fits in the CPU cache. The temporary buffer can hold up to 8 entries per bin. We writes entries into this buffer until we have 8 entries for a bin. These writes are "temporal writes".

When we have 8 entries for a bin in the temporary buffer, we flush it and write it to the actual partitions. This is a "non-temporal write".

This temporary buffer is called a SoftWare Write-Combine Buffer (SWWCB), and should reduce TLB cache pressure during parallel partitioning.

That being said, it's a bunch of unnecessary fancy words, so I've renamed it FlushTempBuf, and changed the comments to more plainly explain what's going on.

@hannes hannes merged commit b51257e into duckdb:master Aug 24, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants