Skip to content

Commit

Permalink
Merge pull request #47 from dbt-labs/component-and-group-commands
Browse files Browse the repository at this point in the history
Component and group commands
  • Loading branch information
dave-connors-3 committed Jun 8, 2023
2 parents a23a727 + ad62b14 commit 3bec517
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 147,171 deletions.
6 changes: 0 additions & 6 deletions dbt_meshify/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,3 @@ def docs_generate(self, directory: os.PathLike) -> CatalogArtifact:
"""
args = ["--quiet", "docs", "generate"]
return self.invoke(directory, args)

def run(self, directory: os.PathLike) -> CatalogArtifact:
"""
Excute dbt run. No args permitted -- this should only be used for integration tests to seed the duckdb instance with the necessary models so we can execute a docs generate
"""
return self.invoke(directory, ["run"])
190 changes: 119 additions & 71 deletions dbt_meshify/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,27 @@
help="The dbt selection syntax specifying the resources to exclude in the operation",
)

group_yml_path = click.option(
"--group-yml-path",
type=click.Path(exists=False),
help="An optional path to store the new group YAML definition.",
)

select = click.option(
"--select",
"-s",
default=None,
help="The dbt selection syntax specifying the resources to include in the operation",
)

owner = click.option(
"--owner",
nargs=2,
multiple=True,
type=click.Tuple([str, str]),
help="A tuple of Owner information for the group. For example " "`--owner name example`",
)

selector = click.option(
"--selector",
default=None,
Expand All @@ -43,12 +57,80 @@ def cli():
pass


@cli.command(name="add-contract")
@cli.group()
def operation():
"""
Set of subcommands for performing mesh operations on dbt projects
"""
pass


@cli.command(name="connect")
@click.argument("projects-dir", type=click.Path(exists=True), default=".")
def connect(projects_dir):
"""
!!! info
This command is not yet implemented
Connects multiple dbt projects together by adding all necessary dbt Mesh constructs
"""
holder = DbtProjectHolder()

while True:
path_string = input("Enter the relative path to a dbt project (enter 'done' to finish): ")
if path_string == "done":
break

path = Path(path_string).expanduser().resolve()
project = DbtProject.from_directory(path)
holder.register_project(project)

print(holder.project_map())


@cli.command(name="split")
@exclude
@project_path
@select
@selector
def split():
"""
!!! info
This command is not yet implemented
Splits dbt projects apart by adding all necessary dbt Mesh constructs based on the selection syntax.
"""
path_string = input("Enter the relative path to a dbt project you'd like to split: ")

holder = DbtProjectHolder()

path = Path(path_string).expanduser().resolve()
project = DbtProject.from_directory(path)
holder.register_project(project)

while True:
subproject_name = input("Enter the name for your subproject ('done' to finish): ")
if subproject_name == "done":
break
subproject_selector = input(
f"Enter the selector that represents the subproject {subproject_name}: "
)

subproject: DbtSubProject = project.split(
project_name=subproject_name, select=subproject_selector
)
holder.register_project(subproject)

print(holder.project_map())


@operation.command(name="add-contract")
@exclude
@project_path
@select
@selector
def add_contract(select, exclude, project_path, selector):
def add_contract(select, exclude, project_path, selector, public_only=False):
"""
Adds a contract to all selected models.
"""
Expand All @@ -60,6 +142,8 @@ def add_contract(select, exclude, project_path, selector):
)
)
models = filter(lambda x: x.startswith("model"), resources)
if public_only:
models = filter(lambda x: project.get_manifest_node(x).access == "public", models)
for model_unique_id in models:
model_node = project.get_manifest_node(model_unique_id)
model_catalog = project.get_catalog_entry(model_unique_id)
Expand All @@ -69,14 +153,17 @@ def add_contract(select, exclude, project_path, selector):
meshify_constructor.add_model_contract()


@cli.command(name="add-version")
@operation.command(name="add-version")
@exclude
@project_path
@select
@selector
@click.option("--prerelease", "--pre", default=False, is_flag=True)
@click.option("--defined-in", default=None)
def add_version(select, exclude, project_path, selector, prerelease, defined_in):
"""
Adds/increments model versions for all selected models.
"""
path = Path(project_path).expanduser().resolve()
project = DbtProject.from_directory(path)
resources = list(
Expand All @@ -94,24 +181,14 @@ def add_version(select, exclude, project_path, selector, prerelease, defined_in)
meshify_constructor.add_model_version(prerelease=prerelease, defined_in=defined_in)


@cli.command(name="create-group")
@operation.command(name="create-group")
@exclude
@project_path
@select
@selector
@click.argument("name")
@click.option(
"--owner",
nargs=2,
multiple=True,
type=click.Tuple([str, str]),
help="A tuple of Owner information for the group. For example " "`--owner name example`",
)
@click.option(
"--group-yml-path",
type=click.Path(exists=False),
help="An optional path to store the new group YAML definition.",
)
@owner
@group_yml_path
def create_group(
name,
project_path: os.PathLike,
Expand Down Expand Up @@ -139,70 +216,41 @@ def create_group(
"The provided group-yml-path is not contained within the provided dbt project."
)

owner_dict: Dict[str, Any] = {key: value for key, value in owner}
owner_object: Owner = Owner(**owner_dict)
owner: Owner = Owner(**{key: value for key, value in owner})

grouper = ResourceGrouper(project)
grouper.add_group(
name=name, owner=owner_object, select=select, exclude=exclude, path=group_yml_path
name=name,
owner=owner,
select=select,
exclude=exclude,
selector=selector,
path=group_yml_path,
)


@cli.command(name="connect")
@click.argument("projects-dir", type=click.Path(exists=True), default=".")
def connect(projects_dir):
"""
!!! info
This command is not yet implemented
Connects multiple dbt projects together by adding all necessary dbt Mesh constructs
"""
holder = DbtProjectHolder()

while True:
path_string = input("Enter the relative path to a dbt project (enter 'done' to finish): ")
if path_string == "done":
break

path = Path(path_string).expanduser().resolve()
project = DbtProject.from_directory(path)
holder.register_project(project)

print(holder.project_map())


@cli.command(name="split")
@cli.command(name="group")
@exclude
@project_path
@select
@selector
def split():
@click.argument("name")
@owner
@group_yml_path
@click.pass_context
def group(
ctx,
name,
project_path: os.PathLike,
owner: List[Tuple[str, str]],
group_yml_path: os.PathLike,
select: str,
exclude: Optional[str] = None,
selector: Optional[str] = None,
):
"""
!!! info
This command is not yet implemented
Splits dbt projects apart by adding all necessary dbt Mesh constructs based on the selection syntax.
Creates a new dbt group based on the selection syntax
Detects the edges of the group, makes their access public, and adds contracts to them
"""
path_string = input("Enter the relative path to a dbt project you'd like to split: ")

holder = DbtProjectHolder()

path = Path(path_string).expanduser().resolve()
project = DbtProject.from_directory(path)
holder.register_project(project)

while True:
subproject_name = input("Enter the name for your subproject ('done' to finish): ")
if subproject_name == "done":
break
subproject_selector = input(
f"Enter the selector that represents the subproject {subproject_name}: "
)

subproject: DbtSubProject = project.split(
project_name=subproject_name, select=subproject_selector
)
holder.register_project(subproject)

print(holder.project_map())
ctx.forward(create_group)
ctx.invoke(add_contract, select=f'group:{name}', project_path=project_path, public_only=True)
17 changes: 9 additions & 8 deletions dbt_meshify/storage/yaml_editors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@ def filter_empty_dict_items(dict_to_filter: Dict[str, Any]):
return {k: v for k, v in dict_to_filter.items() if v is not None}


def process_model_yml(model_yml: Dict):
def process_model_yml(model_yml: Dict[str, Any]):
"""Processes the yml contents to be written back to a file"""
model_ordered_dict = OrderedDict.fromkeys(
[
"name",
"description",
"latest_version",
"access",
"group",
"config",
"meta",
"tests",
"columns",
"versions",
]
Expand Down Expand Up @@ -77,11 +79,7 @@ def add_group_and_access_to_model_yml(
models = resources_yml_to_dict(models_yml)
model_yml = models.get(model_name) or {"name": model_name, "columns": [], "config": {}}

model_yml.update({"access": access_type.value})
config = model_yml.get("config", {})
config.update({"group": group.name})
model_yml["config"] = config

model_yml.update({"access": access_type.value, "group": group.name})
models[model_name] = process_model_yml(model_yml)

models_yml["models"] = list(models.values())
Expand All @@ -102,10 +100,11 @@ def add_model_contract_to_yml(
catalog_cols = model_catalog.columns or {} if model_catalog else {}

# add the data type to the yml entry for columns that are in yml
# import pdb; pdb.set_trace()
yml_cols = [
{**yml_col, "data_type": catalog_cols[yml_col["name"]].type.lower()}
for yml_col in yml_cols
if yml_col["name"] in catalog_cols
if yml_col.get("name") in catalog_cols.keys()
]

# append missing columns in the table to the yml entry
Expand All @@ -118,7 +117,9 @@ def add_model_contract_to_yml(
model_yml.update({"columns": yml_cols})
# add contract to the model yml entry
# this part should come from the same service as what we use for the standalone command when we get there
model_yml.update({"config": {"contract": {"enforced": True}}})
model_config = model_yml.get("config", {})
model_config.update({"contract": {"enforced": True}})
model_yml["config"] = model_config
# update the model entry in the full yml file
# if no entries exist, add the model entry
# otherwise, update the existing model entry in place
Expand Down
14 changes: 12 additions & 2 deletions dbt_meshify/utilities/grouper.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def _generate_resource_group(
path: os.PathLike,
select: str,
exclude: Optional[str] = None,
selector: Optional[str] = None,
) -> Tuple[Group, Dict[str, AccessType]]:
"""Generate the ResourceGroup that we want to apply to the project."""

Expand All @@ -75,9 +76,12 @@ def _generate_resource_group(
resource_type=NodeType.Group,
)

nodes = self.project.select_resources(select, exclude, output_key="unique_id")
nodes = self.project.select_resources(
select=select, exclude=exclude, selector=selector, output_key="unique_id"
)

# Check if any of the selected nodes are already in a group of a different name. If so, raise an exception.
nodes = set(filter(lambda x: not x.startswith("source"), nodes))
for node in nodes:
existing_group = self.project.manifest.nodes[node].config.group

Expand All @@ -100,10 +104,13 @@ def add_group(
path: os.PathLike,
select: str,
exclude: Optional[str] = None,
selector: Optional[str] = None,
) -> None:
"""Create a ResourceGroup for a dbt project."""

group, resources = self._generate_resource_group(name, owner, path, select, exclude)
group, resources = self._generate_resource_group(
name, owner, path, select, exclude, selector
)

group_path = Path(group.original_file_path)
try:
Expand All @@ -115,6 +122,9 @@ def add_group(
self.file_manager.write_file(group_path, output_yml)

for resource, access_type in resources.items():
# TODO: revisit this logic other resource types
if not resource.startswith("model"):
continue
model: ModelNode = self.project.models[resource]
if model.patch_path:
path = Path(model.patch_path.split("://")[1])
Expand Down
Loading

0 comments on commit 3bec517

Please sign in to comment.