Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hash join #73

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions komfydb/common/tuple.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ Tuple::Tuple(Tuple& t1, Tuple&& t2) {
}
}

Tuple::Tuple(Tuple& t1, Tuple& t2) {
fields.resize(t1.Size() + t2.Size());
int idx = 0;
for (int i = 0; i < t1.Size(); i++, idx++) {
fields[idx] = t1.fields[i]->CreateCopy();
}
for (int i = 0; i < t2.Size(); i++, idx++) {
fields[idx] = t2.fields[i]->CreateCopy();
}
}

int Tuple::Size() const {
return fields.size();
}
Expand Down
2 changes: 2 additions & 0 deletions komfydb/common/tuple.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Tuple {

Tuple(Tuple& t1, Tuple&& t2);

Tuple(Tuple& t1, Tuple& t2);

virtual ~Tuple() = default;

Tuple& operator=(const Tuple& t);
Expand Down
4 changes: 4 additions & 0 deletions komfydb/execution/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ cc_library(
"delete.h",
"filter.h",
"fixed_iterator.h",
"hash_join.h",
"insert.h",
"join.h",
"join_predicate.h",
"limit.h",
"loops_join.h",
"op_iterator.h",
"order_by.h",
"predicate.h",
Expand All @@ -45,10 +47,12 @@ cc_library(
"delete.cc",
"filter.cc",
"fixed_iterator.cc",
"hash_join.cc",
"insert.cc",
"join.cc",
"join_predicate.cc",
"limit.cc",
"loops_join.cc",
"op_iterator.cc",
"order_by.cc",
"predicate.cc",
Expand Down
115 changes: 115 additions & 0 deletions komfydb/execution/hash_join.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"

#include "komfydb/utils/status_macros.h"

#include "komfydb/common/tuple_desc.h"
#include "komfydb/execution/hash_join.h"
#include "komfydb/execution/join_predicate.h"
#include "komfydb/storage/record.h"
#include "komfydb/utils/utility.h"

namespace {

using komfydb::common::Field;
using komfydb::common::JoinVectors;
using komfydb::common::TupleDesc;

} // namespace

namespace komfydb::execution {

HashJoin::HashJoin(std::unique_ptr<OpIterator> l_child,
JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child, TupleDesc tuple_desc)
: Join(tuple_desc,
JoinVectors(*l_child->GetFieldsTableAliases(),
*r_child->GetFieldsTableAliases()),
join_predicate, std::move(l_child), std::move(r_child)) {}

absl::StatusOr<std::unique_ptr<HashJoin>> HashJoin::Create(
std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child) {
switch (join_predicate.GetOperator().value) {
// TODO: should 'LIKE' also be allowed?
case Op::EQUALS: {
break;
}
default: {
return absl::InvalidArgumentError(
"Hash join only works with equality conditions");
}
}
TupleDesc tuple_desc(*l_child->GetTupleDesc(), *r_child->GetTupleDesc());
return std::unique_ptr<HashJoin>(new HashJoin(
std::move(l_child), join_predicate, std::move(r_child), tuple_desc));
}

absl::Status HashJoin::Open() {
RETURN_IF_ERROR(l_child->Open());
RETURN_IF_ERROR(r_child->Open());
ITERATE_RECORDS(l_child, record) {
Tuple key = Tuple(1);
ASSIGN_OR_RETURN(Field * key_field,
(*record)->GetField(join_predicate.GetField1idx()));
RETURN_IF_ERROR(key.SetField(0, key_field->CreateCopy()));
if (map.contains(key)) {
map[key].push_back(std::move(*record));
} else {
RecordVector value;
value.push_back(std::move(*record));
map.insert({key, std::move(value)});
}
}
RETURN_IF_NOT_OOR(record.status());
return absl::OkStatus();
}

absl::Status HashJoin::Rewind() {
RETURN_IF_ERROR(l_child->Rewind());
RETURN_IF_ERROR(r_child->Rewind());
r_child_next = nullptr;
current_vector = map.end();
next_record = nullptr;
return absl::OkStatus();
}

void HashJoin::Explain(std::ostream& os, int indent) {
os << Indent(indent) << "-> Hash join: " << join_predicate << "\n";
os << Indent(indent + td_indent) << "TD: " << tuple_desc << "\n";
l_child->Explain(os, indent + child_indent);
r_child->Explain(os, indent + child_indent);
}

void HashJoin::EmitRecord() {
next_record = std::make_unique<Record>(
Record(Tuple(*(*current_match++), *r_child_next), joined_record_id));
if (current_match == current_vector->second.end()) {
current_vector = map.end();
r_child_next = nullptr;
}
}

absl::Status HashJoin::FetchNext() {
if (r_child_next == nullptr) {
ASSIGN_OR_RETURN(r_child_next, r_child->Next());
}
if (current_vector != map.end()) {
EmitRecord();
return absl::OkStatus();
}
Tuple key = Tuple(1);
ASSIGN_OR_RETURN(Field * key_field,
r_child_next->GetField(join_predicate.GetField2idx()));
RETURN_IF_ERROR(key.SetField(0, key_field->CreateCopy()));
current_vector = map.find(key);
if (current_vector != map.end()) {
current_match = current_vector->second.begin();
EmitRecord();
return absl::OkStatus();
}
r_child_next = nullptr;
return FetchNext();
}

} // namespace komfydb::execution
49 changes: 49 additions & 0 deletions komfydb/execution/hash_join.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifndef __HASH_JOIN_H__
#define __HASH_JOIN_H__

#include "absl/status/status.h"
#include "absl/status/statusor.h"

#include "komfydb/execution/join.h"
#include "komfydb/execution/join_predicate.h"
#include "komfydb/storage/record_id.h"
#include "komfydb/utils/utility.h"

namespace {

using komfydb::storage::RecordId;

};

namespace komfydb::execution {

using RecordVector = std::vector<std::unique_ptr<Record>>;

class HashJoin : public Join {
kacper776 marked this conversation as resolved.
Show resolved Hide resolved
public:
static absl::StatusOr<std::unique_ptr<HashJoin>> Create(
std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child);

absl::Status Open() override;

absl::Status Rewind() override;

void Explain(std::ostream& os, int indent = 0) override;

private:
HashJoin(std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child, TupleDesc tuple_desc);

absl::flat_hash_map<Tuple, RecordVector> map;
absl::flat_hash_map<Tuple, RecordVector>::iterator current_vector;
RecordVector::iterator current_match;
std::unique_ptr<Record> r_child_next;

void EmitRecord();
absl::Status FetchNext() override;
};

}; // namespace komfydb::execution

#endif // __HASH_JOIN_H__
83 changes: 6 additions & 77 deletions komfydb/execution/join.cc
Original file line number Diff line number Diff line change
@@ -1,43 +1,25 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"

#include "komfydb/utils/status_macros.h"

#include "komfydb/common/tuple_desc.h"
#include "komfydb/execution/join.h"
#include "komfydb/execution/join_predicate.h"
#include "komfydb/storage/record.h"
#include "komfydb/utils/utility.h"

namespace {

using komfydb::common::JoinVectors;
using komfydb::common::TupleDesc;
using komfydb::execution::JoinPredicate;
using komfydb::storage::PageId;
using komfydb::storage::RecordId;

} // namespace

namespace komfydb::execution {

Join::Join(std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child, TupleDesc tuple_desc)
: OpIterator(tuple_desc, JoinVectors(*l_child->GetFieldsTableAliases(),
*r_child->GetFieldsTableAliases())),
Join::Join(TupleDesc& tuple_desc, std::vector<std::string> fields_table_ids,
JoinPredicate join_predicate, std::unique_ptr<OpIterator> l_child,
std::unique_ptr<OpIterator> r_child)
: OpIterator(tuple_desc, fields_table_ids),
join_predicate(join_predicate),
l_child(std::move(l_child)),
r_child(std::move(r_child)),
l_child_next(nullptr),
joined_record_id(RecordId(PageId(0, 0), -1)) {}

absl::StatusOr<std::unique_ptr<Join>> Join::Create(
std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child) {
TupleDesc tuple_desc(*l_child->GetTupleDesc(), *r_child->GetTupleDesc());
return std::unique_ptr<Join>(new Join(std::move(l_child), join_predicate,
std::move(r_child), tuple_desc));
}

JoinPredicate Join::GetJoinPredicate() {
return join_predicate;
}
Expand All @@ -54,62 +36,9 @@ absl::StatusOr<std::string> Join::GetJoinField2Name() {
return tuple_desc->GetFieldName(field_idx);
}

absl::Status Join::Open() {
RETURN_IF_ERROR(l_child->Open());
RETURN_IF_ERROR(r_child->Open());
return absl::OkStatus();
}

void Join::Close() {
l_child->Close();
r_child->Close();
}

absl::Status Join::Rewind() {
RETURN_IF_ERROR(l_child->Rewind());
RETURN_IF_ERROR(r_child->Rewind());
l_child_next = {};
next_record = {};
return absl::OkStatus();
}

absl::Status Join::FetchNext() {
if (next_record)
return absl::OkStatus();

absl::Status status;

while (!next_record) {
if (!l_child_next) {
if ((status = l_child->HasNext()).ok()) {
l_child_next = *(l_child->Next());
} else {
return status;
}
}
while ((status = r_child->HasNext()).ok()) {
std::unique_ptr<Record> potential_match = *(r_child->Next());
if (join_predicate.Filter(*l_child_next, *potential_match)) {
next_record = std::make_unique<Record>(
Record(Tuple(*l_child_next, std::move(*potential_match)),
joined_record_id));
return absl::OkStatus();
}
}
RETURN_IF_NOT_OOR(status);
l_child_next = {};
RETURN_IF_ERROR(r_child->Rewind());
continue;
}

return absl::OkStatus();
}

void Join::Explain(std::ostream& os, int indent) {
os << Indent(indent) << "-> Nested loops join: " << join_predicate << "\n";
os << Indent(indent + td_indent) << "TD: " << tuple_desc << "\n";
l_child->Explain(os, indent + child_indent);
r_child->Explain(os, indent + child_indent);
}

} // namespace komfydb::execution
}; // namespace komfydb::execution
Loading