Skip to content

Commit

Permalink
Unified compaction. (#1208)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Refactor: implement compaction operator.
2. Refactor: use scheduler to execute compaction.
3. Refactor: fragment plan is a DAG now by connecting multiple plan
tree.
4. Fix: Add delete to compaction todelete list in `SegmentEntry::Commit`
instead of `SegmentEntry::DeleteData`, because it should be atomic with
`SegmentEntry::max_row_ts_` update.

Issue link:#1182

### Type of change
- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Refactoring
- [x] Test cases
  • Loading branch information
small-turtle-1 committed May 16, 2024
1 parent c56ef5b commit 24404a5
Show file tree
Hide file tree
Showing 100 changed files with 4,616 additions and 3,119 deletions.
1 change: 1 addition & 0 deletions benchmark/embedding/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ target_link_libraries(
ann_ivfflat_benchmark
infinity_core
sql_parser
zsv_parser
benchmark_profiler
onnxruntime_mlas
newpfor
Expand Down
31 changes: 31 additions & 0 deletions src/executor/fragment/plan_fragment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,35 @@ SharedPtr<Vector<String>> PlanFragment::ToString() {

SharedPtr<DataTable> PlanFragment::GetResult() { return context_->GetResult(); }

void PlanFragment::AddNext(SharedPtr<PlanFragment> root, PlanFragment *next) {
Vector<PlanFragment *> next_leaves;
next->GetStartFragments(next_leaves);
for (auto &leaf : next_leaves) {
leaf->AddChild(root);
}
}

SizeT PlanFragment::GetStartFragments(Vector<PlanFragment *> &leaf_fragments) {
SizeT all_fragment_n = 0;
HashSet<PlanFragment *> visited;
std::function<void(PlanFragment *)> TraversePlanFragmentGraph = [&](PlanFragment *fragment) {
if (visited.find(fragment) != visited.end()) {
return;
}
visited.insert(fragment);
if (fragment->GetContext()) {
all_fragment_n += fragment->GetContext()->Tasks().size();
}
if (!fragment->HasChild()) {
leaf_fragments.emplace_back(fragment);
return;
}
for (auto &child : fragment->Children()) {
TraversePlanFragmentGraph(child.get());
}
};
TraversePlanFragmentGraph(this);
return all_fragment_n;
}

} // namespace infinity
16 changes: 10 additions & 6 deletions src/executor/fragment/plan_fragment.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ public:

[[nodiscard]] inline PhysicalSink *GetSinkNode() const { return sink_.get(); }

[[nodiscard]] inline PlanFragment *GetParent() const { return parent_; }
[[nodiscard]] inline Vector<PlanFragment *> GetParents() const { return parents_; }

inline void AddChild(UniquePtr<PlanFragment> child_fragment) {
child_fragment->parent_ = this;
inline void AddChild(SharedPtr<PlanFragment> child_fragment) {
child_fragment->parents_.emplace_back(this);
children_.emplace_back(std::move(child_fragment));
}

inline Vector<UniquePtr<PlanFragment>> &Children() { return children_; }
inline Vector<SharedPtr<PlanFragment>> &Children() { return children_; }

inline bool HasChild() { return !children_.empty(); }

Expand All @@ -80,6 +80,10 @@ public:

SharedPtr<DataTable> GetResult();

static void AddNext(SharedPtr<PlanFragment> root, PlanFragment *next);

SizeT GetStartFragments(Vector<PlanFragment *> &leaf_fragments);

private:
u64 fragment_id_{};

Expand All @@ -89,9 +93,9 @@ private:

UniquePtr<PhysicalSource> source_{};

PlanFragment *parent_{};
Vector<PlanFragment *> parents_{};

Vector<UniquePtr<PlanFragment>> children_{};
Vector<SharedPtr<PlanFragment>> children_{};

UniquePtr<FragmentContext> context_{};

Expand Down
105 changes: 97 additions & 8 deletions src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

module;

#include <vector>

module fragment_builder;

import stl;
Expand All @@ -35,14 +37,23 @@ import explain_statement;

namespace infinity {

UniquePtr<PlanFragment> FragmentBuilder::BuildFragment(PhysicalOperator *phys_op) {
auto plan_fragment = MakeUnique<PlanFragment>(GetFragmentId());
plan_fragment->SetSinkNode(query_context_ptr_, SinkType::kResult, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
BuildFragments(phys_op, plan_fragment.get());
if (plan_fragment->GetSourceNode() == nullptr) {
plan_fragment->SetSourceNode(query_context_ptr_, SourceType::kEmpty, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
SharedPtr<PlanFragment> FragmentBuilder::BuildFragment(const Vector<PhysicalOperator *> &phys_ops) {
SharedPtr<PlanFragment> result = nullptr;
for (auto *phys_op : phys_ops) {
auto plan_fragment = MakeUnique<PlanFragment>(GetFragmentId());
plan_fragment->SetSinkNode(query_context_ptr_, SinkType::kResult, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
BuildFragments(phys_op, plan_fragment.get());
if (plan_fragment->GetSourceNode() == nullptr) {
plan_fragment->SetSourceNode(query_context_ptr_, SourceType::kEmpty, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
}
if (result.get() == nullptr) {
result = std::move(plan_fragment);
} else {
PlanFragment::AddNext(result, plan_fragment.get());
result = std::move(plan_fragment);
}
}
return plan_fragment;
return result;
}

void FragmentBuilder::BuildExplain(PhysicalOperator *phys_op, PlanFragment *current_fragment_ptr) {
Expand All @@ -64,7 +75,8 @@ void FragmentBuilder::BuildExplain(PhysicalOperator *phys_op, PlanFragment *curr
case ExplainType::kPipeline: {
// Build explain pipeline fragment
SharedPtr<Vector<SharedPtr<String>>> texts_ptr = MakeShared<Vector<SharedPtr<String>>>();
auto explain_child_fragment = this->BuildFragment(phys_op->left());
Vector<PhysicalOperator *> phys_ops{phys_op->left()};
auto explain_child_fragment = this->BuildFragment(phys_ops);

// Generate explain context of the child fragment
ExplainFragment::Explain(explain_child_fragment.get(), texts_ptr);
Expand Down Expand Up @@ -290,6 +302,83 @@ void FragmentBuilder::BuildFragments(PhysicalOperator *phys_op, PlanFragment *cu
current_fragment_ptr->AddChild(std::move(next_plan_fragment));
return;
}
case PhysicalOperatorType::kCompact: {
if (phys_op->left() != nullptr || phys_op->right() != nullptr) {
UnrecoverableError(fmt::format("Invalid input node of {}", phys_op->GetName()));
}
current_fragment_ptr->AddOperator(phys_op);
current_fragment_ptr->SetFragmentType(FragmentType::kParallelMaterialize);
current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kEmpty, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
return;
}
case PhysicalOperatorType::kCompactIndexPrepare: {
if (phys_op->right() != nullptr) {
UnrecoverableError(fmt::format("Invalid input node of {}", phys_op->GetName()));
}
current_fragment_ptr->AddOperator(phys_op);
current_fragment_ptr->SetFragmentType(FragmentType::kSerialMaterialize);
current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kEmpty, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
if (phys_op->left() != nullptr) {
auto next_plan_fragment = MakeUnique<PlanFragment>(GetFragmentId());
next_plan_fragment->SetSinkNode(query_context_ptr_,
SinkType::kLocalQueue,
phys_op->left()->GetOutputNames(),
phys_op->left()->GetOutputTypes());
BuildFragments(phys_op->left(), next_plan_fragment.get());
current_fragment_ptr->AddChild(std::move(next_plan_fragment));
}
if (phys_op->right() != nullptr) {
UnrecoverableError(fmt::format("Invalid input node of {}", phys_op->GetName()));
}
return;
}
case PhysicalOperatorType::kCompactIndexDo: {
if (phys_op->left() == nullptr || phys_op->right() != nullptr) {
UnrecoverableError(fmt::format("Invalid input node of {}", phys_op->GetName()));
}
current_fragment_ptr->AddOperator(phys_op);
current_fragment_ptr->SetFragmentType(FragmentType::kParallelMaterialize);
current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kLocalQueue, phys_op->GetOutputNames(), phys_op->GetOutputTypes());

auto next_plan_fragment = MakeUnique<PlanFragment>(GetFragmentId());
next_plan_fragment->SetSinkNode(query_context_ptr_,
SinkType::kLocalQueue,
phys_op->left()->GetOutputNames(),
phys_op->left()->GetOutputTypes());
BuildFragments(phys_op->left(), next_plan_fragment.get());

current_fragment_ptr->AddChild(std::move(next_plan_fragment));
return;
}
case PhysicalOperatorType::kCompactFinish: {
if (phys_op->left() == nullptr) {
UnrecoverableError(fmt::format("Invalid input node of {}", phys_op->GetName()));
}
current_fragment_ptr->AddOperator(phys_op);
current_fragment_ptr->SetFragmentType(FragmentType::kSerialMaterialize);
current_fragment_ptr->SetSourceNode(query_context_ptr_, SourceType::kLocalQueue, phys_op->GetOutputNames(), phys_op->GetOutputTypes());
if (phys_op->left() != nullptr) {
auto next_plan_fragment1 = MakeUnique<PlanFragment>(GetFragmentId());
next_plan_fragment1->SetSinkNode(query_context_ptr_,
SinkType::kLocalQueue,
phys_op->left()->GetOutputNames(),
phys_op->left()->GetOutputTypes());
BuildFragments(phys_op->left(), next_plan_fragment1.get());
current_fragment_ptr->AddChild(std::move(next_plan_fragment1));
if (phys_op->right() != nullptr) {
auto next_plan_fragment2 = MakeUnique<PlanFragment>(GetFragmentId());
next_plan_fragment2->SetSinkNode(query_context_ptr_,
SinkType::kLocalQueue,
phys_op->right()->GetOutputNames(),
phys_op->right()->GetOutputTypes());
BuildFragments(phys_op->right(), next_plan_fragment2.get());
current_fragment_ptr->AddChild(std::move(next_plan_fragment2));
}
} else if (phys_op->right() != nullptr) {
UnrecoverableError(fmt::format("Invalid input node of {}", phys_op->GetName()));
}
return;
}
default: {
UnrecoverableError(fmt::format("Invalid operator type: {} in Fragment Builder", phys_op->GetName()));
}
Expand Down
2 changes: 1 addition & 1 deletion src/executor/fragment_builder.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class FragmentBuilder {
public:
explicit FragmentBuilder(QueryContext *query_context_ptr) : query_context_ptr_(query_context_ptr) {}

UniquePtr<PlanFragment> BuildFragment(PhysicalOperator *phys_op);
SharedPtr<PlanFragment> BuildFragment(const Vector<PhysicalOperator *> &physical_plans);

private:
void BuildFragments(PhysicalOperator *phys_op, PlanFragment *current_fragment_ptr);
Expand Down
7 changes: 0 additions & 7 deletions src/executor/operator/physical_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import defer_op;
import config;
import status;
import infinity_exception;
import compact_segments_task;
import variables;

namespace infinity {
Expand Down Expand Up @@ -170,12 +169,6 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
case CommandType::kCheckTable: {
break;
}
case CommandType::kCompactTable: {
auto *txn = query_context->GetTxn();
auto compact_task = CompactSegmentsTask::MakeTaskWithWholeTable(table_entry_, txn); // copy the table ref shared_ptr here
compact_task->Execute();
break;
}
default: {
UnrecoverableError("Invalid command type.");
}
Expand Down
2 changes: 0 additions & 2 deletions src/executor/operator/physical_command.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public:
inline SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const override { return output_types_; }

public:
TableEntry *table_entry_ = nullptr; // only used for compact command

const SharedPtr<CommandInfo> command_info_{};

const SharedPtr<Vector<String>> output_names_{};
Expand Down

0 comments on commit 24404a5

Please sign in to comment.