Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Dataflow Plan Node Types #1205

Merged
merged 8 commits into from
May 16, 2024
Merged

Remove Dataflow Plan Node Types #1205

merged 8 commits into from
May 16, 2024

Conversation

plypaul
Copy link
Contributor

@plypaul plypaul commented May 11, 2024

Description

The original class hierarchy for the DataflowPlanNodes included types that described the data that was output by the node. However, those turned out to not be useful in practice (e.g. BaseOutput was the majority of use cases), so this PR removes them.

Copy link
Contributor

@tlento tlento left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this PR. It is third in the set of cleanup changes I was planning to make after predicate pushdown, but that ordering had more to do with personal preferences than anything else.

I have not really read it because it's, well, hard to read, and I'm trying to get another feature out the door.

The main problem I ran into here is this PR represents, conceptually, three or four changes in one:

  1. removing useless type marker nodes
  2. removing SinkNodes
  3. simplifying the optimizer return types (which is probably fine to consolidate into the SinkNode removal, and is a separate commit here)
  4. doing the natural consolidation of BaseOutput into DataflowPlanNode.

If this were split along these - or other equally readable - lines it'd be a "scroll through and make sure nothing looks weird" review. As it is now it's quite a bit more daunting.

Similarly, if this causes some weirdness then viewing via git diff or whatever is going to be a headache since we squash and merge, and it'd be more accessible in every piece of tooling if it was broken into more readily addressable chunks.

Anyway, you can do what you will with this. If you're going to break this up let me know and I won't review it until you're done, otherwise I'll take a pass through when I have more time.

Comment on lines +121 to +184
def visit_source_node(self, node: ReadSqlSourceNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_join_on_entities_node(self, node: JoinOnEntitiesNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_aggregate_measures_node(self, node: AggregateMeasuresNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_compute_metrics_node(self, node: ComputeMetricsNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_order_by_limit_node(self, node: OrderByLimitNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_where_constraint_node(self, node: WhereConstraintNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_filter_elements_node(self, node: FilterElementsNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_combine_aggregated_outputs_node(self, node: CombineAggregatedOutputsNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_constrain_time_range_node(self, node: ConstrainTimeRangeNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_join_over_time_range_node(self, node: JoinOverTimeRangeNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_semi_additive_join_node(self, node: SemiAdditiveJoinNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_metric_time_dimension_transform_node(
self, node: MetricTimeDimensionTransformNode
) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_join_to_time_spine_node(self, node: JoinToTimeSpineNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_min_max_node(self, node: MinMaxNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_add_generated_uuid_column_node(self, node: AddGeneratedUuidColumnNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError

@override
def visit_join_conversion_events_node(self, node: JoinConversionEventsNode) -> ConvertToExecutionPlanResult:
raise NotImplementedError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Speaking of things looking weird, big blocks of green inside mechanical change diffs always make an impression. Now if anybody attempts to convert a dataflow plan to an execution plan using the wrong node type the runtime will blow up with a NotImplementedError. This seems undesirable. While I was never a fan of the existence of the SinkNodeVisitor interface, its one redeeming feature was preventing this from happening. Indeed, I originally suggested it as a "if you must use a visitor to do this graph level property access then please at least make it a different type"

Do you have a follow-up planned where you get rid of this one way or another? My planned stack was going to involve replacing the visitor itself with a property on the DataflowPlan and pausing on removing the execution plan stuff until later just because I don't want to deal with the MFS changes right now, but removing execution plans (so much for our earlier execution plan aspirations....) altogether would be welcome.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consolidating to the one execution plan type we actually use looks like a win to me. Implementing a DagWalker that does not - and in fact absolutely SHOULD NOT - walk the DAG is confusing. Maybe just update that to be a regular class?

@plypaul
Copy link
Contributor Author

plypaul commented May 12, 2024

Anyway, you can do what you will with this. If you're going to break this up let me know and I won't review it until you're done, otherwise I'll take a pass through when I have more time.

Unfortunately, breaking out that change might be a bit tough, so let's go with the assumption that the commits will be as is. If that changes, I'll let you know.

Copy link
Contributor

@tlento tlento left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for splitting out the rename, that made a surprisingly big difference!

At some point we should really reconsider the multiple sink nodes with the runtime errors. I don't think we're likely to need multiple sinks anymore, since all we do is render queries to a single output stream (i.e., the outer SELECT statement), and any forking of that output data stream is probably best handled outside of MetricFlow. Not sure if you're doing that upstack or not, but it's a thing to consider.

raise RuntimeError("Can't create a dataflow plan without sink node(s).")
self._sink_output_nodes = tuple(sink_output_nodes)
def __init__(self, sink_nodes: Sequence[DataflowPlanNode], plan_id: Optional[DagId] = None) -> None: # noqa: D107
assert len(sink_nodes) == 1, f"Exactly 1 sink node is supported. Got: {sink_nodes}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, are we going to formalize this via the type system?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't had a chance to think about how to simplify this, but yeah, that sounds like a good idea.

def sink_output_node(self) -> DataflowPlanNode: # noqa: D102
assert len(self._sink_output_nodes) == 1, f"Only 1 sink node supported. Got: {self._sink_output_nodes}"
return self._sink_output_nodes[0]
def checked_sink_node(self) -> DataflowPlanNode:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this just be sink_node? We already have the assertion in the initializer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, updated.

@plypaul plypaul enabled auto-merge (squash) May 16, 2024 02:20
@plypaul plypaul merged commit 8ba3897 into main May 16, 2024
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants