Skip to content

Commit

Permalink
hash join finally working
Browse files Browse the repository at this point in the history
  • Loading branch information
kacper776 committed Apr 16, 2023
1 parent 7d99839 commit 6d933a2
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 25 deletions.
13 changes: 13 additions & 0 deletions komfydb/common/tuple.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ Tuple::Tuple(Tuple& t1, Tuple&& t2) {
}
}

Tuple::Tuple(Tuple& t1, Tuple& t2) {
// copies both Tuples, used in hash join
fields.resize(t1.Size() + t2.Size());
Tuple tmp_t1 = t1, tmp_t2 = t2;
int idx = 0;
for (int i = 0; i < tmp_t1.Size(); i++, idx++) {
fields[idx] = std::move(tmp_t1.fields[i]);
}
for (int i = 0; i < tmp_t2.Size(); i++, idx++) {
fields[idx] = std::move(tmp_t2.fields[i]);
}
}

int Tuple::Size() const {
return fields.size();
}
Expand Down
2 changes: 2 additions & 0 deletions komfydb/common/tuple.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Tuple {

Tuple(Tuple& t1, Tuple&& t2);

Tuple(Tuple& t1, Tuple& t2);

virtual ~Tuple() = default;

Tuple& operator=(const Tuple& t);
Expand Down
2 changes: 2 additions & 0 deletions komfydb/execution/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ cc_library(
"insert.h",
"fixed_iterator.h",
"delete.h",
"hash_join.h",
],
srcs = [
"op_iterator.cc",
Expand All @@ -52,6 +53,7 @@ cc_library(
"insert.cc",
"fixed_iterator.cc",
"delete.cc",
"hash_join.cc",
],
deps = [
":op",
Expand Down
43 changes: 22 additions & 21 deletions komfydb/execution/hash_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ absl::Status HashJoin::Open() {
Tuple key = Tuple(1);
ASSIGN_OR_RETURN(Field * key_field,
(*record)->GetField(join_predicate.GetField1idx()));
key.SetField(0, std::make_unique<Field>(*key_field));
RETURN_IF_ERROR(key.SetField(0, key_field->CreateCopy()));
if (map.contains(key)) {
map[key].push_back(std::move(*record));
} else {
map.insert(
{key, std::vector<std::unique_ptr<Record>>({std::move(*record)})});
RecordVector value;
value.push_back(std::move(*record));
map.insert({key, std::move(value)});
}
}
RETURN_IF_NOT_OOR(record.status());
Expand All @@ -65,7 +66,7 @@ absl::Status HashJoin::Rewind() {
RETURN_IF_ERROR(l_child->Rewind());
RETURN_IF_ERROR(r_child->Rewind());
r_child_next = nullptr;
current_vector = nullptr;
current_vector = map.end();
next_record = nullptr;
return absl::OkStatus();
}
Expand All @@ -77,34 +78,34 @@ void HashJoin::Explain(std::ostream& os, int indent) {
r_child->Explain(os, indent + child_indent);
}

void HashJoin::EmitRecord() {
next_record = std::make_unique<Record>(
Record(Tuple(*(*current_match++), *r_child_next), joined_record_id));
if (current_match == current_vector->second.end()) {
current_vector = map.end();
r_child_next = nullptr;
}
}

absl::Status HashJoin::FetchNext() {
if (r_child_next == nullptr) {
ASSIGN_OR_RETURN(r_child_next, r_child->Next());
}
if (current_vector) {
next_record = std::make_unique<Record>(
Record(Tuple(*(*current_match++), std::move(*r_child_next)),
joined_record_id));
if (current_match == current_vector->end()) {
current_vector = nullptr;
}
if (current_vector != map.end()) {
EmitRecord();
return absl::OkStatus();
}
Tuple key = Tuple(1);
ASSIGN_OR_RETURN(Field * key_field,
r_child_next->GetField(join_predicate.GetField2idx()));
key.SetField(0, std::make_unique<Field>(*key_field));
if (map.contains(key)) {
current_vector = &map.at(key);
current_match = current_vector->begin();
next_record = std::make_unique<Record>(
Record(Tuple(*(*current_match++), std::move(*r_child_next)),
joined_record_id));
if (current_match == current_vector->end()) {
current_vector = nullptr;
}
RETURN_IF_ERROR(key.SetField(0, key_field->CreateCopy()));
current_vector = map.find(key);
if (current_vector != map.end()) {
current_match = current_vector->second.begin();
EmitRecord();
return absl::OkStatus();
}
r_child_next = nullptr;
return FetchNext();
}

Expand Down
13 changes: 9 additions & 4 deletions komfydb/execution/hash_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,15 @@
#include "komfydb/storage/record_id.h"

namespace {

using komfydb::storage::RecordId;
}

};

namespace komfydb::execution {

using RecordVector = std::vector<std::unique_ptr<Record>>;

class HashJoin : public Join {
public:
static absl::StatusOr<std::unique_ptr<HashJoin>> Create(
Expand All @@ -30,11 +34,12 @@ class HashJoin : public Join {
HashJoin(std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child, TupleDesc tuple_desc);

absl::flat_hash_map<Tuple, std::vector<std::unique_ptr<Record>>> map;
std::vector<std::unique_ptr<Record>>* current_vector;
std::vector<std::unique_ptr<Record>>::iterator current_match;
absl::flat_hash_map<Tuple, RecordVector> map;
absl::flat_hash_map<Tuple, RecordVector>::iterator current_vector;
RecordVector::iterator current_match;
std::unique_ptr<Record> r_child_next;

void EmitRecord();
absl::Status FetchNext() override;
};

Expand Down

0 comments on commit 6d933a2

Please sign in to comment.