Skip to content

Commit

Permalink
added abstract Join class
Browse files Browse the repository at this point in the history
  • Loading branch information
kacper776 committed May 24, 2023
1 parent efcd10b commit 89c694a
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 108 deletions.
10 changes: 4 additions & 6 deletions komfydb/common/tuple.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,13 @@ Tuple::Tuple(Tuple& t1, Tuple&& t2) {
}

Tuple::Tuple(Tuple& t1, Tuple& t2) {
// copies both Tuples, used in hash join
fields.resize(t1.Size() + t2.Size());
Tuple tmp_t1 = t1, tmp_t2 = t2;
int idx = 0;
for (int i = 0; i < tmp_t1.Size(); i++, idx++) {
fields[idx] = std::move(tmp_t1.fields[i]);
for (int i = 0; i < t1.Size(); i++, idx++) {
fields[idx] = t1.fields[i]->CreateCopy();
}
for (int i = 0; i < tmp_t2.Size(); i++, idx++) {
fields[idx] = std::move(tmp_t2.fields[i]);
for (int i = 0; i < t2.Size(); i++, idx++) {
fields[idx] = t2.fields[i]->CreateCopy();
}
}

Expand Down
6 changes: 4 additions & 2 deletions komfydb/execution/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,35 @@ cc_library(
"delete.h",
"filter.h",
"fixed_iterator.h",
"hash_join.h",
"insert.h",
"join.h",
"join_predicate.h",
"limit.h",
"loops_join.h",
"op_iterator.h",
"order_by.h",
"predicate.h",
"project.h",
"seq_scan.h",
"hash_join.h",
],
srcs = [
"aggregate.cc",
"aggregate_tuple.cc",
"delete.cc",
"filter.cc",
"fixed_iterator.cc",
"hash_join.cc",
"insert.cc",
"join.cc",
"join_predicate.cc",
"limit.cc",
"loops_join.cc",
"op_iterator.cc",
"order_by.cc",
"predicate.cc",
"project.cc",
"seq_scan.cc",
"hash_join.cc",
],
deps = [
":op",
Expand Down
7 changes: 5 additions & 2 deletions komfydb/execution/hash_join.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace {

using komfydb::common::Field;
using komfydb::common::JoinVectors;
using komfydb::common::TupleDesc;

} // namespace
Expand All @@ -21,8 +22,10 @@ namespace komfydb::execution {
HashJoin::HashJoin(std::unique_ptr<OpIterator> l_child,
JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child, TupleDesc tuple_desc)
: Join(std::move(l_child), join_predicate, std::move(r_child), tuple_desc) {
}
: Join(tuple_desc,
JoinVectors(*l_child->GetFieldsTableAliases(),
*r_child->GetFieldsTableAliases()),
join_predicate, std::move(l_child), std::move(r_child)) {}

absl::StatusOr<std::unique_ptr<HashJoin>> HashJoin::Create(
std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
Expand Down
1 change: 1 addition & 0 deletions komfydb/execution/hash_join.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "komfydb/execution/join.h"
#include "komfydb/execution/join_predicate.h"
#include "komfydb/storage/record_id.h"
#include "komfydb/utils/utility.h"

namespace {

Expand Down
83 changes: 6 additions & 77 deletions komfydb/execution/join.cc
Original file line number Diff line number Diff line change
@@ -1,43 +1,25 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"

#include "komfydb/utils/status_macros.h"

#include "komfydb/common/tuple_desc.h"
#include "komfydb/execution/join.h"
#include "komfydb/execution/join_predicate.h"
#include "komfydb/storage/record.h"
#include "komfydb/utils/utility.h"

namespace {

using komfydb::common::JoinVectors;
using komfydb::common::TupleDesc;
using komfydb::execution::JoinPredicate;
using komfydb::storage::PageId;
using komfydb::storage::RecordId;

} // namespace

namespace komfydb::execution {

Join::Join(std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child, TupleDesc tuple_desc)
: OpIterator(tuple_desc, JoinVectors(*l_child->GetFieldsTableAliases(),
*r_child->GetFieldsTableAliases())),
Join::Join(TupleDesc& tuple_desc, std::vector<std::string> fields_table_ids,
JoinPredicate join_predicate, std::unique_ptr<OpIterator> l_child,
std::unique_ptr<OpIterator> r_child)
: OpIterator(tuple_desc, fields_table_ids),
join_predicate(join_predicate),
l_child(std::move(l_child)),
r_child(std::move(r_child)),
l_child_next(nullptr),
joined_record_id(RecordId(PageId(0, 0), -1)) {}

absl::StatusOr<std::unique_ptr<Join>> Join::Create(
std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child) {
TupleDesc tuple_desc(*l_child->GetTupleDesc(), *r_child->GetTupleDesc());
return std::unique_ptr<Join>(new Join(std::move(l_child), join_predicate,
std::move(r_child), tuple_desc));
}

JoinPredicate Join::GetJoinPredicate() {
return join_predicate;
}
Expand All @@ -54,62 +36,9 @@ absl::StatusOr<std::string> Join::GetJoinField2Name() {
return tuple_desc->GetFieldName(field_idx);
}

absl::Status Join::Open() {
RETURN_IF_ERROR(l_child->Open());
RETURN_IF_ERROR(r_child->Open());
return absl::OkStatus();
}

void Join::Close() {
l_child->Close();
r_child->Close();
}

absl::Status Join::Rewind() {
RETURN_IF_ERROR(l_child->Rewind());
RETURN_IF_ERROR(r_child->Rewind());
l_child_next = {};
next_record = {};
return absl::OkStatus();
}

absl::Status Join::FetchNext() {
if (next_record)
return absl::OkStatus();

absl::Status status;

while (!next_record) {
if (!l_child_next) {
if ((status = l_child->HasNext()).ok()) {
l_child_next = *(l_child->Next());
} else {
return status;
}
}
while ((status = r_child->HasNext()).ok()) {
std::unique_ptr<Record> potential_match = *(r_child->Next());
if (join_predicate.Filter(*l_child_next, *potential_match)) {
next_record = std::make_unique<Record>(
Record(Tuple(*l_child_next, std::move(*potential_match)),
joined_record_id));
return absl::OkStatus();
}
}
RETURN_IF_NOT_OOR(status);
l_child_next = {};
RETURN_IF_ERROR(r_child->Rewind());
continue;
}

return absl::OkStatus();
}

void Join::Explain(std::ostream& os, int indent) {
os << Indent(indent) << "-> Nested loops join: " << join_predicate << "\n";
os << Indent(indent + td_indent) << "TD: " << tuple_desc << "\n";
l_child->Explain(os, indent + child_indent);
r_child->Explain(os, indent + child_indent);
}

} // namespace komfydb::execution
}; // namespace komfydb::execution
26 changes: 9 additions & 17 deletions komfydb/execution/join.h
Original file line number Diff line number Diff line change
@@ -1,55 +1,47 @@
#ifndef __JOIN_H__
#define __JOIN_H__

#include <optional>
#include "absl/status/status.h"
#include "absl/status/statusor.h"

#include "komfydb/execution/join_predicate.h"
#include "komfydb/execution/op_iterator.h"
#include "komfydb/storage/page_id.h"
#include "komfydb/storage/record_id.h"

namespace {

using komfydb::common::TupleDesc;
using komfydb::execution::JoinPredicate;
using komfydb::storage::RecordId;

};
} // namespace

namespace komfydb::execution {

class Join : public OpIterator {
public:
static absl::StatusOr<std::unique_ptr<Join>> Create(
std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child);
Join(TupleDesc& tuple_desc, std::vector<std::string> fields_table_ids,
JoinPredicate join_predicate, std::unique_ptr<OpIterator> l_child,
std::unique_ptr<OpIterator> r_child);

JoinPredicate GetJoinPredicate();

absl::StatusOr<std::string> GetJoinField1Name();

absl::StatusOr<std::string> GetJoinField2Name();

absl::Status Open() override;

void Close() override;

absl::Status Rewind() override;

void Explain(std::ostream& os, int indent = 0) override;
// virtual ~Join() = default;

protected:
Join(std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child, TupleDesc tuple_desc);

absl::Status FetchNext() override;

JoinPredicate join_predicate;
std::unique_ptr<OpIterator> l_child;
std::unique_ptr<OpIterator> r_child;
std::unique_ptr<Record> l_child_next;
const RecordId joined_record_id;
};

}; // namespace komfydb::execution

#endif // __JOIN_H__
#endif // __JOIN_H__
93 changes: 93 additions & 0 deletions komfydb/execution/loops_join.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#include "absl/status/status.h"
#include "absl/status/statusor.h"

#include "komfydb/utils/status_macros.h"

#include "komfydb/common/tuple_desc.h"
#include "komfydb/execution/join_predicate.h"
#include "komfydb/execution/loops_join.h"
#include "komfydb/storage/record.h"
#include "komfydb/utils/utility.h"

namespace {

using komfydb::common::JoinVectors;
using komfydb::common::TupleDesc;
using komfydb::storage::PageId;
using komfydb::storage::RecordId;

} // namespace

namespace komfydb::execution {

LoopsJoin::LoopsJoin(std::unique_ptr<OpIterator> l_child,
JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child, TupleDesc tuple_desc)
: Join(tuple_desc,
JoinVectors(*l_child->GetFieldsTableAliases(),
*r_child->GetFieldsTableAliases()),
join_predicate, std::move(l_child), std::move(r_child)),
l_child_next(nullptr) {}

absl::StatusOr<std::unique_ptr<LoopsJoin>> LoopsJoin::Create(
std::unique_ptr<OpIterator> l_child, JoinPredicate join_predicate,
std::unique_ptr<OpIterator> r_child) {
TupleDesc tuple_desc(*l_child->GetTupleDesc(), *r_child->GetTupleDesc());
return std::unique_ptr<LoopsJoin>(new LoopsJoin(
std::move(l_child), join_predicate, std::move(r_child), tuple_desc));
}

absl::Status LoopsJoin::Open() {
RETURN_IF_ERROR(l_child->Open());
RETURN_IF_ERROR(r_child->Open());
return absl::OkStatus();
}

absl::Status LoopsJoin::Rewind() {
RETURN_IF_ERROR(l_child->Rewind());
RETURN_IF_ERROR(r_child->Rewind());
l_child_next = {};
next_record = {};
return absl::OkStatus();
}

absl::Status LoopsJoin::FetchNext() {
if (next_record)
return absl::OkStatus();

absl::Status status;

while (!next_record) {
if (!l_child_next) {
if ((status = l_child->HasNext()).ok()) {
l_child_next = *(l_child->Next());
} else {
return status;
}
}
while ((status = r_child->HasNext()).ok()) {
std::unique_ptr<Record> potential_match = *(r_child->Next());
if (join_predicate.Filter(*l_child_next, *potential_match)) {
next_record = std::make_unique<Record>(
Record(Tuple(*l_child_next, std::move(*potential_match)),
joined_record_id));
return absl::OkStatus();
}
}
RETURN_IF_NOT_OOR(status);
l_child_next = {};
RETURN_IF_ERROR(r_child->Rewind());
continue;
}

return absl::OkStatus();
}

void LoopsJoin::Explain(std::ostream& os, int indent) {
os << Indent(indent) << "-> Nested loops join: " << join_predicate << "\n";
os << Indent(indent + td_indent) << "TD: " << tuple_desc << "\n";
l_child->Explain(os, indent + child_indent);
r_child->Explain(os, indent + child_indent);
}

} // namespace komfydb::execution
Loading

0 comments on commit 89c694a

Please sign in to comment.