Skip to content

Commit

Permalink
ARROW-15138: [C++] Make ExecPlan::ToString give some additional infor…
Browse files Browse the repository at this point in the history
…mation

Add indentation to ToString() function in ExecPlan.

For example if we have the following graph of execution plan:
![image](https://user-images.githubusercontent.com/40250321/147159958-79ad4ad4-db3d-435e-bf01-bb5ea1162730.png)

The execution plan string will be:
```
6 node
    5 node
         4 node
             2 node
         3 node
             2 node
             1 node
```

Closes #12031 from ArianaVillegas/ARROW-15138

Authored-by: ArianaVillegas <ariana.villegas@utec.edu.pe>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
  • Loading branch information
ArianaVillegas authored and westonpace committed Jan 6, 2022
1 parent 1e7bfa2 commit e64480d
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 64 deletions.
14 changes: 8 additions & 6 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/exec/exec_plan.h"

#include <mutex>
#include <sstream>
#include <thread>
#include <unordered_map>

#include "arrow/compute/exec.h"
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/util.h"
#include "arrow/compute/exec_internal.h"
Expand Down Expand Up @@ -64,16 +63,18 @@ void AggregatesToString(
std::stringstream* ss, const Schema& input_schema,
const std::vector<internal::Aggregate>& aggs,
const std::vector<int>& target_field_ids,
const std::vector<std::unique_ptr<FunctionOptions>>& owned_options) {
const std::vector<std::unique_ptr<FunctionOptions>>& owned_options, int indent = 0) {
*ss << "aggregates=[" << std::endl;
for (size_t i = 0; i < aggs.size(); i++) {
for (int j = 0; j < indent; ++j) *ss << " ";
*ss << '\t' << aggs[i].function << '('
<< input_schema.field(target_field_ids[i])->name();
if (owned_options[i]) {
*ss << ", " << owned_options[i]->ToString();
}
*ss << ")," << std::endl;
}
for (int j = 0; j < indent; ++j) *ss << " ";
*ss << ']';
}

Expand Down Expand Up @@ -226,7 +227,7 @@ class ScalarAggregateNode : public ExecNode {
Future<> finished() override { return finished_; }

protected:
std::string ToStringExtra() const override {
std::string ToStringExtra(int indent) const override {
std::stringstream ss;
const auto input_schema = inputs_[0]->output_schema();
AggregatesToString(&ss, *input_schema, aggs_, target_field_ids_, owned_options_);
Expand Down Expand Up @@ -539,7 +540,7 @@ class GroupByNode : public ExecNode {
Future<> finished() override { return finished_; }

protected:
std::string ToStringExtra() const override {
std::string ToStringExtra(int indent) const override {
std::stringstream ss;
const auto input_schema = inputs_[0]->output_schema();
ss << "keys=[";
Expand All @@ -548,7 +549,8 @@ class GroupByNode : public ExecNode {
ss << '"' << input_schema->field(key_field_ids_[i])->name() << '"';
}
ss << "], ";
AggregatesToString(&ss, *input_schema, aggs_, agg_src_field_ids_, owned_options_);
AggregatesToString(&ss, *input_schema, aggs_, agg_src_field_ids_, owned_options_,
indent);
return ss.str();
}

Expand Down
86 changes: 57 additions & 29 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,62 @@ struct ExecPlanImpl : public ExecPlan {
return std::move(Impl{nodes_}.sorted);
}

// This function returns a node vector and a vector of integers with the
// number of spaces to add as an indentation. The main difference between
// this function and the TopoSort function is that here we visit the nodes
// in reverse order and we can have repeated nodes if necessary.
// For example, in the following plan:
// s1 --> s3 -
// - -
// - -> s5 --> s6
// - -
// s2 --> s4 -
// Toposort node vector: s1 s2 s3 s4 s5 s6
// OrderedNodes node vector: s6 s5 s3 s1 s4 s2 s1
std::pair<NodeVector, std::vector<int>> OrderedNodes() const {
struct Impl {
const std::vector<std::unique_ptr<ExecNode>>& nodes;
std::unordered_set<ExecNode*> visited;
std::unordered_set<ExecNode*> marked;
NodeVector sorted;
std::vector<int> indents;

explicit Impl(const std::vector<std::unique_ptr<ExecNode>>& nodes) : nodes(nodes) {
visited.reserve(nodes.size());

for (auto it = nodes.rbegin(); it != nodes.rend(); ++it) {
if (visited.count(it->get()) != 0) continue;
Visit(it->get());
}

DCHECK_EQ(visited.size(), nodes.size());
}

void Visit(ExecNode* node, int indent = 0) {
marked.insert(node);
for (auto input : node->inputs()) {
if (marked.count(input) != 0) continue;
Visit(input, indent + 1);
}
marked.erase(node);

indents.push_back(indent);
sorted.push_back(node);
visited.insert(node);
}
};

auto result = Impl{nodes_};
return std::make_pair(result.sorted, result.indents);
}

std::string ToString() const {
std::stringstream ss;
ss << "ExecPlan with " << nodes_.size() << " nodes:" << std::endl;
for (const auto& node : TopoSort()) {
ss << node->ToString() << std::endl;
auto sorted = OrderedNodes();
for (size_t i = sorted.first.size(); i > 0; --i) {
for (int j = 0; j < sorted.second[i - 1]; ++j) ss << " ";
ss << sorted.first[i - 1]->ToString(sorted.second[i - 1]) << std::endl;
}
return ss.str();
}
Expand Down Expand Up @@ -249,7 +300,7 @@ Status ExecNode::Validate() const {
return Status::OK();
}

std::string ExecNode::ToString() const {
std::string ExecNode::ToString(int indent) const {
std::stringstream ss;

auto PrintLabelAndKind = [&](const ExecNode* node) {
Expand All @@ -259,39 +310,16 @@ std::string ExecNode::ToString() const {
PrintLabelAndKind(this);
ss << "{";

if (!inputs_.empty()) {
ss << "inputs=[";
for (size_t i = 0; i < inputs_.size(); i++) {
if (i > 0) ss << ", ";
ss << input_labels_[i] << "=";
PrintLabelAndKind(inputs_[i]);
}
ss << ']';
}

if (!outputs_.empty()) {
if (!inputs_.empty()) {
ss << ", ";
}

ss << "outputs=[";
for (size_t i = 0; i < outputs_.size(); i++) {
if (i > 0) ss << ", ";
PrintLabelAndKind(outputs_[i]);
}
ss << ']';
}

const std::string extra = ToStringExtra();
const std::string extra = ToStringExtra(indent);
if (!extra.empty()) {
ss << ", " << extra;
ss << extra;
}

ss << '}';
return ss.str();
}

std::string ExecNode::ToStringExtra() const { return ""; }
std::string ExecNode::ToStringExtra(int indent = 0) const { return ""; }

bool ExecNode::ErrorIfNotOk(Status status) {
if (status.ok()) return false;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ class ARROW_EXPORT ExecNode {
/// \brief A future which will be marked finished when this node has stopped producing.
virtual Future<> finished() = 0;

std::string ToString() const;
std::string ToString(int indent = 0) const;

protected:
ExecNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> input_labels,
Expand All @@ -234,7 +234,7 @@ class ARROW_EXPORT ExecNode {
bool ErrorIfNotOk(Status status);

/// Provide extra info to include in the string representation.
virtual std::string ToStringExtra() const;
virtual std::string ToStringExtra(int indent) const;

ExecPlan* plan_;
std::string label_;
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/compute/exec/filter_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/exec/exec_plan.h"

#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/compute/exec/expression.h"
#include "arrow/compute/exec/options.h"
#include "arrow/datum.h"
Expand Down Expand Up @@ -99,7 +98,9 @@ class FilterNode : public MapNode {
}

protected:
std::string ToStringExtra() const override { return "filter=" + filter_.ToString(); }
std::string ToStringExtra(int indent) const override {
return "filter=" + filter_.ToString();
}

private:
Expression filter_;
Expand Down
38 changes: 20 additions & 18 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -301,11 +301,11 @@ TEST(ExecPlan, ToString) {
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));
EXPECT_EQ(plan->sources()[0]->ToString(), R"(:SourceNode{outputs=[:SinkNode]})");
EXPECT_EQ(plan->sinks()[0]->ToString(), R"(:SinkNode{inputs=[collected=:SourceNode]})");
EXPECT_EQ(plan->sources()[0]->ToString(), R"(:SourceNode{})");
EXPECT_EQ(plan->sinks()[0]->ToString(), R"(:SinkNode{})");
EXPECT_EQ(plan->ToString(), R"(ExecPlan with 2 nodes:
:SourceNode{outputs=[:SinkNode]}
:SinkNode{inputs=[collected=:SourceNode]}
:SinkNode{}
:SourceNode{}
)");

ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());
Expand Down Expand Up @@ -338,17 +338,19 @@ TEST(ExecPlan, ToString) {
})
.AddToPlan(plan.get()));
EXPECT_EQ(plan->ToString(), R"a(ExecPlan with 6 nodes:
custom_source_label:SourceNode{outputs=[:FilterNode]}
:FilterNode{inputs=[target=custom_source_label:SourceNode], outputs=[:ProjectNode], filter=(i32 >= 0)}
:ProjectNode{inputs=[target=:FilterNode], outputs=[:GroupByNode], projection=[bool, multiply(i32, 2)]}
:GroupByNode{inputs=[groupby=:ProjectNode], outputs=[:FilterNode], keys=["bool"], aggregates=[
hash_sum(multiply(i32, 2)),
hash_count(multiply(i32, 2), {mode=NON_NULL}),
]}
:FilterNode{inputs=[target=:GroupByNode], outputs=[custom_sink_label:OrderBySinkNode], filter=(sum(multiply(i32, 2)) > 10)}
custom_sink_label:OrderBySinkNode{inputs=[collected=:FilterNode], by={sort_keys=[FieldRef.Name(sum(multiply(i32, 2))) ASC], null_placement=AtEnd}}
custom_sink_label:OrderBySinkNode{by={sort_keys=[FieldRef.Name(sum(multiply(i32, 2))) ASC], null_placement=AtEnd}}
:FilterNode{filter=(sum(multiply(i32, 2)) > 10)}
:GroupByNode{keys=["bool"], aggregates=[
hash_sum(multiply(i32, 2)),
hash_count(multiply(i32, 2), {mode=NON_NULL}),
]}
:ProjectNode{projection=[bool, multiply(i32, 2)]}
:FilterNode{filter=(i32 >= 0)}
custom_source_label:SourceNode{}
)a");

std::cout << plan->ToString() << '\n';

ASSERT_OK_AND_ASSIGN(plan, ExecPlan::Make());

Declaration union_node{"union", ExecNodeOptions{}};
Expand All @@ -374,13 +376,13 @@ custom_sink_label:OrderBySinkNode{inputs=[collected=:FilterNode], by={sort_keys=
})
.AddToPlan(plan.get()));
EXPECT_EQ(plan->ToString(), R"a(ExecPlan with 5 nodes:
lhs:SourceNode{outputs=[:UnionNode]}
rhs:SourceNode{outputs=[:UnionNode]}
:UnionNode{inputs=[input_0_label=lhs:SourceNode, input_1_label=rhs:SourceNode], outputs=[:ScalarAggregateNode]}
:ScalarAggregateNode{inputs=[target=:UnionNode], outputs=[:SinkNode], aggregates=[
:SinkNode{}
:ScalarAggregateNode{aggregates=[
count(i32, {mode=NON_NULL}),
]}
:SinkNode{inputs=[collected=:ScalarAggregateNode]}
:UnionNode{}
rhs:SourceNode{}
lhs:SourceNode{}
)a");
}

Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/compute/exec/project_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/exec/exec_plan.h"

#include <sstream>

#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/compute/exec/expression.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/util.h"
Expand Down Expand Up @@ -95,7 +94,7 @@ class ProjectNode : public MapNode {
}

protected:
std::string ToStringExtra() const override {
std::string ToStringExtra(int indent) const override {
std::stringstream ss;
ss << "projection=[";
for (int i = 0; static_cast<size_t>(i) < exprs_.size(); i++) {
Expand Down
5 changes: 2 additions & 3 deletions cpp/src/arrow/compute/exec/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/exec/exec_plan.h"

#include <mutex>

#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/compute/exec/expression.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/order_by_impl.h"
Expand Down Expand Up @@ -317,7 +316,7 @@ struct OrderBySinkNode final : public SinkNode {
}

protected:
std::string ToStringExtra() const override {
std::string ToStringExtra(int indent) const override {
return std::string("by=") + impl_->ToString();
}

Expand Down

0 comments on commit e64480d

Please sign in to comment.