Skip to content

Commit

Permalink
No longer resize join HT, instead only construct the join HT after al…
Browse files Browse the repository at this point in the history
…l chunks have been computed
  • Loading branch information
Mytherin committed Jan 2, 2020
1 parent f75cbba commit 9a89bc4
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 96 deletions.
3 changes: 3 additions & 0 deletions src/common/vector_operations/scatter.cpp
Expand Up @@ -125,6 +125,9 @@ static void scatter_set_all_loop(Vector &source, data_ptr_t dest[], index_t offs
case TypeId::BIGINT:
scatter_set_loop<int64_t, IGNORE_NULL>(source, dest, offset);
break;
case TypeId::HASH:
scatter_set_loop<uint64_t, IGNORE_NULL>(source, dest, offset);
break;
case TypeId::FLOAT:
scatter_set_loop<float, IGNORE_NULL>(source, dest, offset);
break;
Expand Down
153 changes: 66 additions & 87 deletions src/execution/join_hashtable.cpp
Expand Up @@ -34,9 +34,9 @@ static void DeserializeChunk(DataChunk &result, data_ptr_t source[], index_t cou
}

JoinHashTable::JoinHashTable(vector<JoinCondition> &conditions, vector<TypeId> build_types, JoinType type,
index_t initial_capacity, bool parallel)
: build_types(build_types), equality_size(0), condition_size(0), build_size(0), entry_size(0), tuple_size(0),
join_type(type), has_null(false), capacity(0), count(0), parallel(parallel) {
index_t initial_capacity, bool parallel)
: build_types(build_types), equality_size(0), condition_size(0), build_size(0), entry_size(0), tuple_size(0),
join_type(type), finalized(false), has_null(false), capacity(0), count(0), parallel(parallel) {
for (auto &condition : conditions) {
assert(condition.left->return_type == condition.right->return_type);
auto type = condition.left->return_type;
Expand All @@ -52,7 +52,7 @@ JoinHashTable::JoinHashTable(vector<JoinCondition> &conditions, vector<TypeId> b
predicates.push_back(condition.comparison);
null_values_are_equal.push_back(condition.null_values_are_equal);
assert(!condition.null_values_are_equal ||
(condition.null_values_are_equal && condition.comparison == ExpressionType::COMPARE_EQUAL));
(condition.null_values_are_equal && condition.comparison == ExpressionType::COMPARE_EQUAL));

condition_types.push_back(type);
condition_size += type_size;
Expand All @@ -71,8 +71,8 @@ JoinHashTable::JoinHashTable(vector<JoinCondition> &conditions, vector<TypeId> b
}
}
tuple_size = condition_size + build_size;
entry_size = tuple_size + sizeof(void *);
Resize(initial_capacity);
// entry size is the tuple size and the size of the hash/next pointer
entry_size = tuple_size + std::max(sizeof(uint64_t), sizeof(void *));
}

void JoinHashTable::ApplyBitmask(Vector &hashes) {
Expand Down Expand Up @@ -101,75 +101,6 @@ void JoinHashTable::InsertHashes(Vector &hashes, data_ptr_t key_locations[]) {
});
}

void JoinHashTable::Resize(index_t size) {
if (size <= capacity) {
throw Exception("Cannot downsize a hash table!");
}
capacity = size;

// size needs to be a power of 2
assert((size & (size - 1)) == 0);
bitmask = size - 1;

hashed_pointers = unique_ptr<data_ptr_t[]>(new data_ptr_t[capacity]);
memset(hashed_pointers.get(), 0, capacity * sizeof(data_ptr_t));

if (count > 0) {
// we have entries, need to rehash the pointers
// first reset all chain pointers to the nullptr
// we could do this by actually following the chains in the
// hashed_pointers as well might be more or less efficient depending on
// length of chains?
auto node = head.get();
while (node) {
// scan all the entries in this node
auto entry_pointer = (data_ptr_t)node->data.get() + tuple_size;
for (index_t i = 0; i < node->count; i++) {
// reset chain pointer
auto prev_pointer = (data_ptr_t *)entry_pointer;
*prev_pointer = nullptr;
// move to next entry
entry_pointer += entry_size;
}
node = node->prev.get();
}

// now rehash the entries
DataChunk keys;
keys.Initialize(equality_types);

data_ptr_t key_locations[STANDARD_VECTOR_SIZE];

node = head.get();
while (node) {
// scan all the entries in this node
auto dataptr = node->data.get();
for (index_t i = 0; i < node->count; i++) {
// key is stored at the start
key_locations[i] = dataptr;
// move to next entry
dataptr += entry_size;
}

// reconstruct the keys chunk from the stored entries
// we only reconstruct the keys that are part of the equality
// comparison as these are the ones that are used to compute the
// hash
DeserializeChunk(keys, key_locations, node->count);

// create the hash
StaticVector<uint64_t> hashes;
keys.Hash(hashes);

// re-insert the entries
InsertHashes(hashes, key_locations);

// move to the next node
node = node->prev.get();
}
}
}

void JoinHashTable::Hash(DataChunk &keys, Vector &hashes) {
VectorOperations::Hash(keys.data[0], hashes);
for (index_t i = 1; i < equality_types.size(); i++) {
Expand All @@ -195,23 +126,31 @@ static index_t CreateNotNullSelVector(DataChunk &keys, sel_t *not_null_sel_vecto
return result_count;
}

uint64_t NextPowerOfTwo(uint64_t v) {
v--;
v |= v >> 1;
v |= v >> 2;
v |= v >> 4;
v |= v >> 8;
v |= v >> 16;
v |= v >> 32;
v++;
return v;
}

void JoinHashTable::Build(DataChunk &keys, DataChunk &payload) {
assert(!finalized);
assert(keys.size() == payload.size());
if (keys.size() == 0) {
return;
}
// resize at 50% capacity, also need to fit the entire vector
if (parallel) {
parallel_lock.lock();
}
if (count + keys.size() > capacity / 2) {
Resize(capacity * 2);
}
count += keys.size();
// move strings to the string heap
keys.MoveStringsToHeap(string_heap);
payload.MoveStringsToHeap(string_heap);

if (parallel) {
parallel_lock.unlock();
}
Expand Down Expand Up @@ -270,34 +209,39 @@ void JoinHashTable::Build(DataChunk &keys, DataChunk &payload) {
// get the locations of where to serialize the keys and payload columns
data_ptr_t key_locations[STANDARD_VECTOR_SIZE];
data_ptr_t tuple_locations[STANDARD_VECTOR_SIZE];
data_ptr_t hash_locations[STANDARD_VECTOR_SIZE];
auto node = make_unique<Node>(entry_size, keys.size());
auto dataptr = node->data.get();
VectorOperations::Exec(keys.data[0], [&](index_t i, index_t k) {
// key is stored at the start
key_locations[i] = dataptr;
// after that the build-side tuple is stored
tuple_locations[i] = dataptr + condition_size;
// the hash is stored at the end, after the key and build
hash_locations[i] = dataptr + tuple_size;
dataptr += entry_size;
});
node->count = keys.size();

// hash the keys and obtain an entry in the list
// note that we only hash the keys used in the equality comparison
vector<TypeId> hash_types = {TypeId::HASH};
DataChunk hash_chunk;
hash_chunk.Initialize(hash_types);
Hash(keys, hash_chunk.data[0]);

// serialize the values to these locations
// we serialize all the condition variables here
SerializeChunk(keys, key_locations);
if (build_size > 0) {
SerializeChunk(payload, tuple_locations);
}

// hash the keys and obtain an entry in the list
// note that we only hash the keys used in the equality comparison
StaticVector<uint64_t> hashes;
Hash(keys, hashes);
SerializeChunk(hash_chunk, hash_locations);

if (parallel) {
// obtain lock
parallel_lock.lock();
}
InsertHashes(hashes, key_locations);
// store the new node as the head
node->prev = move(head);
head = move(node);
Expand All @@ -306,7 +250,42 @@ void JoinHashTable::Build(DataChunk &keys, DataChunk &payload) {
}
}

void JoinHashTable::Finalize() {
// the build has finished, now iterate over all the nodes and construct the final hash table
// select a HT that has at least 50% empty space
index_t capacity = NextPowerOfTwo(count * 2);
// size needs to be a power of 2
assert((capacity & (capacity - 1)) == 0);
bitmask = capacity - 1;

// allocate the HT and initialize it with all-zero entries
hashed_pointers = unique_ptr<data_ptr_t[]>(new data_ptr_t[capacity]);
memset(hashed_pointers.get(), 0, capacity * sizeof(data_ptr_t));

Vector hashes(TypeId::HASH, true, false);
auto hash_data = (uint64_t*) hashes.data;
data_ptr_t key_locations[STANDARD_VECTOR_SIZE];
// now construct the actual hash table; scan the nodes
auto node = head.get();
while (node) {
// deserialize the hashes
data_ptr_t dataptr = node->data.get();
for(index_t i = 0; i < node->count; i++) {
hash_data[i] = *((uint64_t*) (dataptr + tuple_size));
key_locations[i] = dataptr;
dataptr += entry_size;
}
hashes.count = node->count;
// now insert into the hash table
InsertHashes(hashes, key_locations);

node = node->prev.get();
}
finalized = true;
}

unique_ptr<ScanStructure> JoinHashTable::Probe(DataChunk &keys) {
assert(finalized);
assert(!keys.sel_vector); // should be flattened before

for (index_t i = 0; i < keys.column_count; i++) {
Expand Down Expand Up @@ -628,7 +607,7 @@ void ScanStructure::NextAntiJoin(DataChunk &keys, DataChunk &left, DataChunk &re

namespace duckdb {
void ConstructMarkJoinResult(DataChunk &join_keys, DataChunk &child, DataChunk &result, bool found_match[],
bool right_has_null) {
bool right_has_null) {
// for the initial set of columns we just reference the left side
for (index_t i = 0; i < child.column_count; i++) {
result.data[i].Reference(child.data[i]);
Expand Down
13 changes: 7 additions & 6 deletions src/execution/operator/join/physical_hash_join.cpp
Expand Up @@ -50,12 +50,7 @@ void PhysicalHashJoin::BuildHashTable(ClientContext &context, PhysicalOperatorSt
// build the HT
hash_table->Build(state->join_keys, right_chunk);
}

if (hash_table->size() == 0 &&
(hash_table->join_type == JoinType::INNER || hash_table->join_type == JoinType::SEMI)) {
// empty hash table with INNER or SEMI join means empty result set
return;
}
hash_table->Finalize();
}

void PhysicalHashJoin::ProbeHashTable(ClientContext &context, DataChunk &chunk, PhysicalOperatorState *state_) {
Expand Down Expand Up @@ -134,6 +129,12 @@ void PhysicalHashJoin::GetChunkInternal(ClientContext &context, DataChunk &chunk
state->cached_chunk.Initialize(types);
BuildHashTable(context, state_);
state->initialized = true;

if (hash_table->size() == 0 &&
(hash_table->join_type == JoinType::INNER || hash_table->join_type == JoinType::SEMI)) {
// empty hash table with INNER or SEMI join means empty result set
return;
}
}
do {
ProbeHashTable(context, chunk, state);
Expand Down
7 changes: 4 additions & 3 deletions src/include/duckdb/execution/join_hashtable.hpp
Expand Up @@ -101,11 +101,10 @@ class JoinHashTable {
public:
JoinHashTable(vector<JoinCondition> &conditions, vector<TypeId> build_types, JoinType type,
index_t initial_capacity = 32768, bool parallel = false);
//! Resize the HT to the specified size. Must be larger than the current
//! size.
void Resize(index_t size);
//! Add the given data to the HT
void Build(DataChunk &keys, DataChunk &input);
//! Finalize the build of the HT, constructing the actual hash table and making the HT ready for probing. Finalize must be called before any call to Probe, and after Finalize is called Build should no longer be ever called.
void Finalize();
//! Probe the HT with the given input chunk, resulting in the given result
unique_ptr<ScanStructure> Probe(DataChunk &keys);

Expand Down Expand Up @@ -136,6 +135,8 @@ class JoinHashTable {
index_t tuple_size;
//! The join type of the HT
JoinType join_type;
//! Whether or not the HT has been finalized
bool finalized;
//! Whether or not any of the key elements contain NULL
bool has_null;
//! Bitmask for getting relevant bits from the hashes to determine the position
Expand Down

0 comments on commit 9a89bc4

Please sign in to comment.