Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update refs in exposure and semantic model definitions when downstream of a split resource #194

Merged
merged 14 commits into from
Mar 13, 2024
Merged
6 changes: 5 additions & 1 deletion dbt_meshify/utilities/grouper.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,11 @@ def _generate_resource_group(

logger.info(f"Selected {len(nodes)} resources: {nodes}")
# Check if any of the selected nodes are already in a group of a different name. If so, raise an exception.
nodes = set(filter(lambda x: not x.startswith("source"), nodes))
nodes = set(
filter(
lambda x: not x.startswith("source") and not x.startswith("semantic_model"), nodes
)
)
for node in nodes:
existing_group = self.project.manifest.nodes[node].config.group

Expand Down
111 changes: 78 additions & 33 deletions dbt_meshify/utilities/references.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
import re
from pathlib import Path
from typing import List, Optional, Union
from typing import Any, Dict, List, Optional, Union

from dbt.contracts.graph.nodes import CompiledNode
from dbt.contracts.graph.nodes import CompiledNode, Exposure, Resource, SemanticModel
from loguru import logger

from dbt_meshify.change import ChangeSet, EntityType, FileChange, Operation
from dbt_meshify.change import (
ChangeSet,
EntityType,
FileChange,
Operation,
ResourceChange,
)
from dbt_meshify.dbt_projects import DbtProject, DbtSubProject, PathedProject


Expand Down Expand Up @@ -135,32 +141,77 @@ def replace_source_with_ref__python(

return new_code

def update_yml_resource_references(
self,
project_name: str,
upstream_resource_name: str,
resource: Union[Exposure, SemanticModel],
) -> Dict[str, Any]:
new_ref = f"ref('{project_name}', '{upstream_resource_name}')"
if isinstance(resource, SemanticModel):
# we can return early, since semantic models only have one ref and no depends_on
return {"model": new_ref}
refs = resource.refs
ref_to_update = f"ref('{upstream_resource_name}')"
str_refs = []
for ref in refs:
package_clause = f"'{ref.package}', " if ref.package else ""
name_clause = f"'{ref.name}'"
version_clause = f", v={ref.version}" if ref.version else ""
str_ref = f"ref({package_clause}{name_clause}{version_clause})"
if str_ref != ref_to_update:
str_refs.append(str_ref)
str_refs.append(new_ref)
Comment on lines +157 to +164
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I correct in believing that this code fully specifies each resource ref, and then adds the updated upstream source ref? If so, this seems sufficient. Perhaps it would be valuable to create a function that generates ref strings for resources? Either way, ✅

return {"depends_on": str_refs}

def generate_reference_update(
self,
project_name: str,
upstream_node: CompiledNode,
downstream_node: CompiledNode,
downstream_node: Union[Resource, CompiledNode],
downstream_project: PathedProject,
code: str,
) -> FileChange:
) -> Union[FileChange, ResourceChange]:
"""Generate FileChanges that update the references in the downstream_node's code."""

updated_code = self.ref_update_methods[downstream_node.language](
model_name=upstream_node.name,
project_name=project_name,
model_code=code,
)
change: Union[FileChange, ResourceChange]
if isinstance(downstream_node, CompiledNode):
updated_code = self.ref_update_methods[downstream_node.language](
model_name=upstream_node.name,
project_name=project_name,
model_code=code,
)

return FileChange(
operation=Operation.Update,
entity_type=EntityType.Code,
identifier=downstream_node.name,
path=downstream_project.resolve_file_path(downstream_node),
data=updated_code,
)
change = FileChange(
operation=Operation.Update,
entity_type=EntityType.Code,
identifier=downstream_node.name,
path=downstream_project.resolve_file_path(downstream_node),
data=updated_code,
)

else:
if isinstance(downstream_node, Exposure) or isinstance(downstream_node, SemanticModel):
dave-connors-3 marked this conversation as resolved.
Show resolved Hide resolved
is_exposure = isinstance(downstream_node, Exposure)
data = self.update_yml_resource_references(
project_name=project_name,
upstream_resource_name=upstream_node.name,
resource=downstream_node,
)
change = ResourceChange(
operation=Operation.Update,
entity_type=EntityType.Exposure if is_exposure else EntityType.SemanticModel,
identifier=downstream_node.name,
path=downstream_project.resolve_file_path(downstream_node),
data=data,
)

return change
dave-connors-3 marked this conversation as resolved.
Show resolved Hide resolved

def update_child_refs(
self, resource: CompiledNode, current_change_set: Optional[ChangeSet] = None
self,
resource: CompiledNode,
current_change_set: Optional[ChangeSet] = None,
Comment on lines +210 to +212
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hooray for improved formatting!

) -> ChangeSet:
"""Generate a set of FileChanges to update child references"""

Expand All @@ -175,33 +226,29 @@ def update_child_refs(
else:
compare_project = self.project.name

for model in self.project.child_map[resource.unique_id]:
if model in self.project.resources or model.split(".")[1] != compare_project:
continue
model_node = self.project.get_manifest_node(model)
if not model_node:
raise KeyError(f"Resource {model} not found in manifest")

# Don't process Resources missing a language attribute
if not hasattr(model_node, "language") or not isinstance(model_node, CompiledNode):
for child in self.project.child_map[resource.unique_id]:
if child in self.project.resources or child.split(".")[1] != compare_project:
continue
node = self.project.get_manifest_node(child)
if not node:
raise KeyError(f"Resource {child} not found in manifest")

if current_change_set:
previous_change = get_latest_file_change(
changeset=current_change_set,
identifier=model_node.name,
path=self.project.parent_project.resolve_file_path(model_node),
identifier=node.name,
path=self.project.parent_project.resolve_file_path(node),
)
code = (
previous_change.data
if (previous_change and previous_change.data)
else model_node.raw_code
else getattr(node, "raw_code", "")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is such a better way of getting the code. Good thinking!

)

change = self.generate_reference_update(
project_name=self.project.name,
upstream_node=resource,
downstream_node=model_node,
downstream_node=node,
code=code,
downstream_project=self.project.parent_project,
)
Expand Down Expand Up @@ -258,8 +305,6 @@ def update_parent_refs(self, resource: CompiledNode) -> ChangeSet:
if change.data is None:
raise Exception(f"Change has null data despite being provided code. {change}")

code = change.data

change_set.add(change)

return change_set
Expand Down
Loading
Loading