Skip to content

Commit

Permalink
[parser] Add check for multiple join condition between same aliases.
Browse files Browse the repository at this point in the history
  • Loading branch information
korbiniak committed May 16, 2023
1 parent 4a745b2 commit 4acaa7f
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 3 deletions.
5 changes: 2 additions & 3 deletions .bazelrc
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
build --client_env=CC=clang-13 --cxxopt='-std=c++17' --cxxopt='-march=native' --cxxopt='-O2'

build:asan --client_env=CC=g++
build:asan --client_env=CC=cc
build:asan --strip=never
build:asan --copt -fsanitize=address
build:asan --copt -fsanitize=undefined
build:asan --copt -g
build:asan --copt -Og
build:asan --linkopt -fsanitize=address
build:asan --linkopt -fsanitize=undefined

Expand Down
4 changes: 4 additions & 0 deletions komfydb/common/column_ref.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ struct ColumnRef {
friend std::ostream& operator<<(std::ostream& os, const ColumnRef& ref) {
return os << static_cast<std::string>(ref);
}

bool operator==(const ColumnRef& ref) const {
return table == ref.table && column == ref.column;
}
};

const ColumnRef COLUMN_REF_STAR("null", "*");
Expand Down
28 changes: 28 additions & 0 deletions komfydb/execution/logical_plan/logical_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ std::vector<std::string> GetSubplanRoots(
return result;
}

bool IsDuplicateCondition(ColumnRef lref, ColumnRef rref,
std::vector<JoinNode>& joins) {
for (auto& join : joins) {
if (join.lref.table == lref.table && join.rref.table == rref.table)
return true;
if (join.lref.table == rref.table && join.rref.table == lref.table)
return true;
}
return false;
}

}; // namespace

namespace komfydb::execution::logical_plan {
Expand Down Expand Up @@ -82,6 +93,12 @@ absl::Status LogicalPlan::AddScan(int table_id, std::string_view alias) {
absl::Status LogicalPlan::AddJoin(ColumnRef lref, Op op, ColumnRef rref) {
RETURN_IF_ERROR(CheckColumnRef(lref));
RETURN_IF_ERROR(CheckColumnRef(rref));
if (IsDuplicateCondition(lref, rref, joins)) {
return absl::UnimplementedError(
absl::StrCat("Only one condition per join supported, cannot have "
"another condition for ",
std::string(lref), " ", std::string(rref)));
}
joins.push_back(JoinNode(lref, op, rref));
return absl::OkStatus();
}
Expand Down Expand Up @@ -236,12 +253,14 @@ absl::Status LogicalPlan::ProcessFilterNodes(TableStatsMap& table_stats) {
absl::Status LogicalPlan::ProcessJoinNodes(TransactionId tid,
TableStatsMap& table_stats,
bool explain) {
LOG(INFO) << "Processing join nodes.";
optimizer::JoinOptimizer join_optimizer = optimizer::JoinOptimizer(catalog);
RETURN_IF_ERROR(join_optimizer.OrderJoins(joins));

for (auto& join : joins) {
std::unique_ptr<OpIterator> lsubplan, rsubplan;
std::string ltable = GetEquiv(join.lref.table, equiv);
LOG(INFO) << "Equiv table to " << join.lref.table << " is " << ltable;
std::string rtable = "";
lsubplan = std::move(subplans[ltable]);

Expand Down Expand Up @@ -343,12 +362,17 @@ absl::StatusOr<std::unique_ptr<OpIterator>> LogicalPlan::ProcessLimit(

absl::StatusOr<std::unique_ptr<OpIterator>> LogicalPlan::GeneratePhysicalPlan(
TransactionId tid, TableStatsMap& table_stats, bool explain) {
LOG(INFO) << "Generating physical plan.";

equiv.clear();
filter_selectivities.clear();
subplans.clear();

LOG(INFO) << "Processing scan nodes.";
RETURN_IF_ERROR(ProcessScanNodes(tid));
LOG(INFO) << "Processing filter nodes.";
RETURN_IF_ERROR(ProcessFilterNodes(table_stats));
LOG(INFO) << "Processing join nodes.";
RETURN_IF_ERROR(ProcessJoinNodes(tid, table_stats, explain));

std::vector<std::string> subplan_roots = GetSubplanRoots(equiv);
Expand All @@ -358,15 +382,19 @@ absl::StatusOr<std::unique_ptr<OpIterator>> LogicalPlan::GeneratePhysicalPlan(

std::unique_ptr<OpIterator> plan = std::move(subplans[subplan_roots.back()]);

LOG(INFO) << "Processing order by.";
ASSIGN_OR_RETURN(plan, ProcessOrderBy(std::move(plan)));

if (aggregates.size()) {
LOG(INFO) << "Processing aggergate nodes.";
ASSIGN_OR_RETURN(plan, ProcessAggregateNodes(std::move(plan)));
} else {
LOG(INFO) << "Processing select nodes.";
ASSIGN_OR_RETURN(plan, ProcessSelectNodes(std::move(plan)));
}

if (limit >= 0) {
LOG(INFO) << "Processing limit.";
ASSIGN_OR_RETURN(plan, ProcessLimit(std::move(plan)));
}

Expand Down

0 comments on commit 4acaa7f

Please sign in to comment.