Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support distributed ANALYZE #2374

Closed
waynexia opened this issue Sep 13, 2023 · 22 comments
Closed

Support distributed ANALYZE #2374

waynexia opened this issue Sep 13, 2023 · 22 comments
Assignees
Labels
C-feature Category Features

Comments

@waynexia
Copy link
Member

What problem does the new feature solve?

EXPLAIN ANALYZE only contains the execution result in frontend and the gRPC time to each datanodes. It works but can be more detailed.

What does the feature do?

Also show the detailed ANALYZE result in each datanode. I.e., implement the distributed ANALZYE plan

Implementation challenges

No response

@waynexia waynexia added the C-feature Category Features label Sep 13, 2023
@NiwakaDev
Copy link
Collaborator

@waynexia

What kind of output do you expect?

For example, something like:

+----------------------------------------------------+
| component | plan_type         | plan               |
+----------------------------------------------------+
| frontend  | Plan with Metrics | RepartitionExec    |
+----------------------------------------------------+
| node1     | ~                 | ~                  |
+----------------------------------------------------+
| node2     | ~                 | ~                  |
+----------------------------------------------------+
| node3     | ~                 | ~                  |
+----------------------------------------------------+

@waynexia
Copy link
Member Author

Considering the plan might be further distributed into more parts, I'd prefer to use a tuple (stage, node) to distinguish each part. At present we only have two stages, the first is executed in datanode and the second is executed in frontend. For your example, it would become something like the following.

+------------------------------------------------------------------------+
| stage   | node                | plan_type         | plan               |
+------------------------------------------------------------------------+
| stage 2 | node 1 (addr: ...)  | Plan with Metrics | RepartitionExec    |
+------------------------------------------------------------------------+
| stage 1 | node 1 (addr: ...)  | ~                 | ~                  |
+------------------------------------------------------------------------+
| stage 1 | node 2 (addr: ...)  | ~                 | ~                  |
+------------------------------------------------------------------------+
| stage 1 | node 3 (addr: ...)  | ~                 | ~                  |
+------------------------------------------------------------------------+

Some key point:

  • Node number are in the scope of stage. Start from 1 in each stage
  • Stage is counted bottom-up. The first evaluated is stage 1.
  • Maybe we don't need to assign node number to each node, the addr might be enough.

@NiwakaDev
Copy link
Collaborator

NiwakaDev commented Oct 27, 2023

@waynexia

I'd prefer to use a tuple (stage, node) to distinguish each part.

I see. By the way, if the verbose option is set, I guess that the output is as follows:

+--------------------------------------------------------------------------------------------
| stage   | node                | plan_type         | plan               | output rows | ...
+--------------------------------------------------------------------------------------------
| stage 2 | node 1 (addr: ...)  | Plan with Metrics | RepartitionExec    | 2           | ...
+--------------------------------------------------------------------------------------------
| stage 1 | node 1 (addr: ...)  | ~                 | ~                  | 0           | ...
+--------------------------------------------------------------------------------------------
| stage 1 | node 2 (addr: ...)  | ~                 | ~                  | 1           | ...
+--------------------------------------------------------------------------------------------
| stage 1 | node 3 (addr: ...)  | ~                 | ~                  | 1           | ...
+--------------------------------------------------------------------------------------------

arrow-datafusion generates some values like output rows as the plan_type column, but in our use-case I guess it is better for the output to be format like the immediately above example in order to avoid duplication of (stage, node).

@NiwakaDev
Copy link
Collaborator

@waynexia

If you agree with the above format, I would like to work on this issue.

@waynexia
Copy link
Member Author

waynexia commented Nov 3, 2023

Ahh, sorry for the delay.

Other parts look good to me. But things like output_rows are execution statistics, which might be missing. A simple workaround is to put all those statistics into a single string, if necessary.

@waynexia
Copy link
Member Author

Hi @NiwakaDev, just a friendly ping. Do you have an initial plan or a rough structure for this? I wondered if you would like to have a discussion on any undetermined thing or question.

@NiwakaDev
Copy link
Collaborator

NiwakaDev commented Dec 18, 2023

@waynexia

Do you have an initial plan or a rough structure for this? I wondered if you would like to have a discussion on any undetermined thing or question.

Here's an initial plan:

  1. Send DistributedAnalyzePlan (Custom Logical plan) to each datanode.
  2. Execute DistributedAnalyzeExec on each datanode side like https://github.com/apache/arrow-datafusion/blob/main/datafusion/physical-plan/src/analyze.rs#L143-L247.
  3. Integrate each result on the frontend side like MergeScan.

DistributedAnalyzeExec outputs:
The difference from the normal AnalyzeExec is that there're two types of output.

two types of output:

  1. input_stream.next()
  2. datanode analyze result like AnalyzeExec (https://github.com/apache/arrow-datafusion/blob/main/datafusion/physical-plan/src/analyze.rs#L201-L247)

While AnalyzeExec ignores input_stream.next(), in Distributed AnalyzeExec, I guess frontend needs both outputs to construct the above result (#2374 (comment)).

I haven't yet come up with a solution on how to send both input_stream.next() and output like AnalyzeExec from each datanode to frontend.

@waynexia
Copy link
Member Author

waynexia commented Dec 18, 2023

Thanks for your thoughtful investigation 👍

I have one concern about passing intermediate metrics (those rendered in AnalyzeExec), which are transformed into string literal, and become hard to reuse in later phases like aggregate or filter.

Thus I came up with another way: encode and transfer metrics (MetricsSet in datafusion, specifically) along with data for each query. And report it to the user on demand, just like how an AnalyzeExec works in a single instance, drop data and execute report per-plan metrics.

For transferring metrics together with data, I've submitted a PR to add corresponding fields on proto file GreptimeTeam/greptime-proto#130 (if we decide to go this way, we can define some general metric in the proto message, instead of string-string map)

Then we don't need DistributedAnalyzePlan for datanodes. But only an uppermost, customized AnalyzeExec to extract and render the "distributed" metrics.

@waynexia
Copy link
Member Author

One thing I haven't figured out is how we handle different metrics from different nodes. In datafusion, those metrics are attached to the plan itself. E.g., a join plan has two children, and each child can keep its own metrics. But here we don't have actual children nodes beside MergeScan. So how can we keep the tree structures for metrics from them?

If we don't need to distinguish metrics from each node, we can aggregate them into one sub-tree in MergeScan, since they are going to have the same physical plan. But if what we want is above per-phase and per-node analyze, this can't help.

@NiwakaDev
Copy link
Collaborator

NiwakaDev commented Dec 19, 2023

@waynexia

One thing I haven't figured out is how we handle different metrics from different nodes. In datafusion, those metrics are attached to the plan itself. E.g., a join plan has two children, and each child can keep its own metrics. But here we don't have actual children nodes beside MergeScan. So how can we keep the tree structures for metrics from them?

The issue you wrote is related to the below code? Sorry, I might be wrong because I'm not familiar with datafusion.
https://github.com/apache/arrow-datafusion/blob/main/datafusion/physical-plan/src/display.rs#L243-L253
https://github.com/apache/arrow-datafusion/blob/main/datafusion/physical-plan/src/visitor.rs#L24-L34

Thus I came up with another way: encode and transfer metrics (MetricsSet in datafusion, specifically) along with data for each query. And report it to the user on demand, just like how an AnalyzeExec works in a single instance, drop data and execute report per-plan metrics.

If we don't need analyze per-node, I agree with this.

@NiwakaDev
Copy link
Collaborator

@waynexia

By the way, if we implement your idea, what kind of output do you expect?

@waynexia
Copy link
Member Author

One thing I haven't figured out is how we handle different metrics from different nodes. In datafusion, those metrics are attached to the plan itself. E.g., a join plan has two children, and each child can keep its own metrics. But here we don't have actual children nodes beside MergeScan. So how can we keep the tree structures for metrics from them?

The issue you wrote is related to the below code? Sorry, I might be wrong because I'm not familiar with datafusion.

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

MergeScanExec is the leaf node in Frontend's execution plan tree. It submits a part of the query to datanodes and merges result from them (hence "merge scan"). Since those part of plan is not executed in frontend, it neither has children nor their metrics. This prevents the visitor method you mentioned above from walking into MergeScanExec.

We can retrieve metrics from datanodes, but I'm afraid we have to keep and access those "remote metrics" in a different way for this reason.

By the way, if we implement your idea, what kind of output do you expect?

I would like to have two forms. One is distinguished with the tuple (phase, node), we can retrieve un-aggregated, per-phase and per-node metrics for detailed analysis. Another may look closer to the ordinary ANALYZE, which hides the distributed execution's details and gives a rough and aggregated result.

@waynexia
Copy link
Member Author

Update: at this stage, it is clear that we have to find a way to pass data and execution metrics together in the same query call. @shuiyisong and I are trying to add a method to SendableRecordBatchStream to provide the corresponding execution metrics. But it's still undetermined how to define, organize and expose those metrics.

I haven't yet come up with a solution on how to send both input_stream.next() and output like AnalyzeExec from each datanode to frontend.

We can assume this issue is resolved (if everything works as expected...) and bring this ticket forward 🙌 @NiwakaDev

@NiwakaDev
Copy link
Collaborator

NiwakaDev commented Dec 24, 2023

@waynexia

If we don't need to distinguish metrics from each node, we can aggregate them into one sub-tree in MergeScan, since they are going to have the same physical plan.

Do we select "aggregate them into one child plan in MergeScan", not plan per node?

Something like:

| Plan with Metrics | MergeScanExec: peers=[4398046511104(1024, 0), ], metrics=[output_rows=2, ready_time=180.582083ms, first_consume_time=180.884833ms, finish_time=180.984ms]
|   		    |    ProjectionExec: expr=[name@0 as name], metrics=[output_rows=0, elapsed_compute=8ns]                                                           
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=0, elapsed_compute=2.499µs]                                                
|                   |       FilterExec: value@1 = 10, metrics=[output_rows=0, elapsed_compute=653.341µs]                                                               
|                   |         RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, metrics=[fetch_time=154.749µs, repart_time=1ns, send_time=6.292µs] 
~                 

or

| Plan with Metrics | MergeScanExec: ~, metrics=[output_rows=2, ~, datanode1_metrics=~, datanode2_metrics=~]
~                 

@waynexia
Copy link
Member Author

waynexia commented Dec 26, 2023

What do you think about distinguishing them by the VERBOSE word? e.g.:

EXPLAIN ANALYZE <QUERY> gives aggregated result:

| Plan with Metrics | MergeScanExec: peers=[4398046511104(1024, 0), ], metrics=[output_rows=2, ready_time=180.582083ms, first_consume_time=180.884833ms, finish_time=180.984ms]
|   		    |    ProjectionExec: expr=[name@0 as name], metrics=[output_rows=0, elapsed_compute=8ns]                                                           
|                   |     CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=0, elapsed_compute=2.499µs]                                                
|                   |       FilterExec: value@1 = 10, metrics=[output_rows=0, elapsed_compute=653.341µs]                                                               
|                   |         RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1, metrics=[fetch_time=154.749µs, repart_time=1ns, send_time=6.292µs] 
~                 

and EXPLAIN ANALYZE VERBOSE <QUERY> gives un-aggregated result:

+-------------------------------------------------------------------------
| stage   | node                | plan_type         | plan               | 
+------------------------------------------------------------------------
| stage 2 | node 1 (addr: ...)  | Plan with Metrics | RepartitionExec    | 
+------------------------------------------------------------------------
| stage 1 | node 1 (addr: ...)  | ~                 | ~                  |
+------------------------------------------------------------------------
| stage 1 | node 2 (addr: ...)  | ~                 | ~                  |
+------------------------------------------------------------------------
| stage 1 | node 3 (addr: ...)  | ~                 | ~                  | 
+------------------------------------------------------------------------

@NiwakaDev
Copy link
Collaborator

NiwakaDev commented Dec 28, 2023

@waynexia

What do you think about distinguishing them by the VERBOSE word?

Looks good to me, but as you said, I guess that we need to think another approach to implement that.

If we don't need to distinguish metrics from each node, we can aggregate them into one sub-tree in MergeScan, since they are going to have the same physical plan. But if what we want is above per-phase and per-node analyze, this can't help.

Maybe, we need to divide it into two logical plans for two purposes, one is not verbose plan, the other is verbose plan.
As you said, I think we can first implement not verbose plan.

@waynexia
Copy link
Member Author

Yes, I'm afraid the built-in ANALYZE is not aware of those logics. We have to make a new one for distributed purpose

Maybe, we need to divide it into two logical plans for two purposes, one is not verbose plan, the other is verbose plan.
As you said, I think we can first implement not verbose plan.

Looks good to me! 👍 I guess we've found answers for all previous problems we had?

@NiwakaDev
Copy link
Collaborator

Sorry for the late reply.

I guess we've found answers for all previous problems we had?

Yes! I'll review the above PR tonight and tomorrow. Sorry for the late reply again.

@waynexia
Copy link
Member Author

Don't worry! Hope you have had a nice New Year holiday 🎉

@NiwakaDev
Copy link
Collaborator

NiwakaDev commented Jan 14, 2024

@waynexia
Since we need to have plan tree of each datanode under MergeScanExec, the json format(#3113) of each datanode query needs to be something like:

{
	"name": "ProjectionExec",
	"metrics:" {
		"total_num": 0,
			~
	},
	"children": [
		{
			"name": "CoalesceBatchesExec",
			"metrics": {~},
			"children": [
				~
			]
		},
	]
}

After json format discussion lands, I'll write a rough implementation of the not verbose plan based on #3113.

@waynexia
Copy link
Member Author

#3113 is merged, and there are some little changes to plan metrics after that. Now we will pass the corresponding physical (execution) plan together with the result RecordBatchStream. This might make it easier to implement this task.

@NiwakaDev
Copy link
Collaborator

@waynexia
I apologize for the delayed response. I guess that we can close this issue. I'll find another DataFusion issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-feature Category Features
Projects
None yet
Development

No branches or pull requests

2 participants