Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to exposure extraction #204

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,28 @@ dev-sandbox-models:
--metabase-database $$POSTGRES_DB )
.PHONY: dev-sandbox-models

dev-sandbox-exposures:
rm -rf tests/fixtures/sample_project/models/exposures
mkdir -p tests/fixtures/sample_project/models/exposures
( source sandbox/.env && python3 -m dbtmetabase exposures \
--dbt-manifest-path sandbox/target/manifest.json \
--dbt-database $$POSTGRES_DB \
--metabase-url http://localhost:$$MB_PORT \
--metabase-username $$MB_USER \
--metabase-password $$MB_PASSWORD \
--output-path tests/fixtures/sample_project/models/exposures \
--output-grouping collection )

( source sandbox/.env && cd tests/fixtures/sample_project && \
POSTGRES_HOST=localhost \
POSTGRES_PORT=$$POSTGRES_PORT \
POSTGRES_USER=$$POSTGRES_USER \
POSTGRES_PASSWORD=$$POSTGRES_PASSWORD \
POSTGRES_DB=$$POSTGRES_DB \
POSTGRES_SCHEMA=$$POSTGRES_SCHEMA \
dbt docs generate --profiles-dir ../../../sandbox )
.PHONY: dev-sandbox-exposures

dev-sandbox-down:
( cd sandbox && docker-compose down )
.PHONY: dev-sandbox-up
28 changes: 18 additions & 10 deletions dbtmetabase/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,17 +340,14 @@ def models(
type=click.Path(exists=True, file_okay=False),
default=".",
show_default=True,
help="Output path for generated exposure YAML.",
help="Output path for generated exposure YAML files.",
)
@click.option(
"--output-name",
metavar="NAME",
envvar="OUTPUT_NAME",
"--output-grouping",
envvar="OUTPUT_GROUPING",
show_envvar=True,
type=click.STRING,
default="metabase_exposures.yml",
show_default=True,
help="File name for generated exposure YAML.",
type=click.Choice(["collection", "type"]),
help="Grouping for output YAML files",
)
@click.option(
"--metabase-include-personal-collections",
Expand All @@ -359,6 +356,15 @@ def models(
is_flag=True,
help="Include personal collections when parsing exposures.",
)
@click.option(
"--metabase-collection-includes",
metavar="COLLECTIONS",
envvar="METABASE_COLLECTION_INCLUDES",
show_envvar=True,
type=click.UNPROCESSED,
callback=_comma_separated_list_callback,
help="Metabase collection names to includes.",
)
@click.option(
"--metabase-collection-excludes",
metavar="COLLECTIONS",
Expand All @@ -370,8 +376,9 @@ def models(
)
def exposures(
output_path: str,
output_name: str,
output_grouping: Optional[str],
metabase_include_personal_collections: bool,
metabase_collection_includes: Optional[Iterable],
metabase_collection_excludes: Optional[Iterable],
dbt_reader: DbtReader,
metabase_client: MetabaseClient,
Expand All @@ -380,8 +387,9 @@ def exposures(
metabase_client.extract_exposures(
models=dbt_models,
output_path=output_path,
output_name=output_name,
output_grouping=output_grouping,
include_personal_collections=metabase_include_personal_collections,
collection_includes=metabase_collection_includes,
collection_excludes=metabase_collection_excludes,
)

Expand Down
28 changes: 28 additions & 0 deletions dbtmetabase/_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import re
from typing import Optional


def safe_name(text: Optional[str]) -> str:
"""Sanitizes a human-readable "friendly" name to a safe string.

For example, "Joe's Collection" becomes "joe_s_collection".

Args:
text (Optional[str]): Unsafe text with non-underscore symbols and spaces.

Returns:
str: Sanitized lowercase string with underscores.
"""
return re.sub(r"[^\w]", "_", text or "").lower()


def safe_description(text: Optional[str]) -> str:
"""Sanitizes a human-readable long text, such as description.

Args:
text (Optional[str]): Unsafe long text with Jinja syntax.

Returns:
str: Sanitized string with escaped Jinja syntax.
"""
return re.sub(r"{{(.*)}}", r"\1", text or "")
172 changes: 111 additions & 61 deletions dbtmetabase/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@
import re
import time
from pathlib import Path
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple, Union
from typing import (
Any,
Dict,
Iterable,
List,
Mapping,
MutableMapping,
Optional,
Tuple,
Union,
)

import requests
import yaml
from requests.adapters import HTTPAdapter, Retry

from ._format import safe_description, safe_name
from .dbt import (
METABASE_MODEL_DEFAULT_SCHEMA,
MetabaseColumn,
Expand Down Expand Up @@ -184,7 +195,7 @@ def _export_model(self, model: MetabaseModel) -> bool:
api_display_name = api_table.get("display_name")
if api_display_name != model_display_name and (
model_display_name
or not self._friendly_equal(api_display_name, api_table.get("name"))
or safe_name(api_display_name) != safe_name(api_table.get("name"))
):
body_table["display_name"] = model_display_name

Expand Down Expand Up @@ -321,7 +332,7 @@ def _export_column(
api_display_name = api_field.get("display_name")
if api_display_name != column_display_name and (
column_display_name
or not self._friendly_equal(api_display_name, api_field.get("name"))
or safe_name(api_display_name) != safe_name(api_field.get("name"))
):
body_field["display_name"] = column_display_name

Expand Down Expand Up @@ -399,24 +410,6 @@ def _load_tables(self, database_id: str) -> Mapping[str, MutableMapping]:

return tables

def _friendly_equal(self, a: Optional[str], b: Optional[str]) -> bool:
"""Equality test for parameters susceptible to Metabase "friendly names".

For example, "Some Name" is a friendly name for "some_name".

Args:
a (Optional[str]): Possibly-friendly string.
b (Optional[str]): Possibly-friendly string.

Returns:
bool: True if strings are equal normalization.
"""

def normalize(x: Optional[str]) -> str:
return (x or "").replace(" ", "_").replace("-", "_").lower()

return normalize(a) == normalize(b)


class _ExtractExposuresJob(_MetabaseClientJob):
_RESOURCE_VERSION = 2
Expand All @@ -437,15 +430,23 @@ def __init__(
client: MetabaseClient,
models: List[MetabaseModel],
output_path: str,
output_name: str,
output_grouping: Optional[str],
include_personal_collections: bool,
collection_includes: Optional[Iterable],
collection_excludes: Optional[Iterable],
):
super().__init__(client)

self.model_refs = {model.name.upper(): model.ref for model in models}
self.output_file = Path(output_path).expanduser() / f"{output_name}.yml"
self.output_path = Path(output_path).expanduser()

if output_grouping in (None, "collection", "type"):
self.output_grouping = output_grouping
else:
raise ValueError(f"Unsupported output_grouping: {output_grouping}")

self.include_personal_collections = include_personal_collections
self.collection_includes = collection_includes or []
self.collection_excludes = collection_excludes or []

self.table_names: Mapping = {}
Expand All @@ -467,11 +468,17 @@ def execute(self) -> Mapping:
parsed_exposures = []

for collection in self.client.api("get", "/api/collection"):
# Exclude collections by name or personal collections (unless included)
if collection["name"] in self.collection_excludes or (
collection.get("personal_owner_id")
and not self.include_personal_collections
):
# Inclusion/exclusion criteria check
name_included = (
collection["name"] in self.collection_includes
or not self.collection_includes
)
name_excluded = collection["name"] in self.collection_excludes
personal_included = self.include_personal_collections or not collection.get(
"personal_owner_id"
)
if not name_included or name_excluded or not personal_included:
logging.debug("Skipping collection %s", collection["name"])
continue

# Iter through collection
Expand Down Expand Up @@ -554,45 +561,56 @@ def execute(self) -> Mapping:

exposure_label = exposure_name
# Only letters, numbers and underscores allowed in model names in dbt docs DAG / no duplicate model names
exposure_name = re.sub(r"[^\w]", "_", exposure_name).lower()
exposure_name = safe_name(exposure_name)
enumer = 1
while exposure_name in documented_exposure_names:
exposure_name = f"{exposure_name}_{enumer}"
enumer += 1

# Construct exposure
parsed_exposures.append(
self._build_exposure(
exposure_type=exposure_type,
exposure_id=exposure_id,
name=exposure_name,
label=exposure_label,
header=header or "",
created_at=exposure["created_at"],
creator_name=creator_name or "",
creator_email=creator_email or "",
description=exposure.get("description", ""),
native_query=native_query,
)
{
"id": item["id"],
"type": item["model"],
"collection": collection,
"exposure": self._build_exposure(
exposure_type=exposure_type,
exposure_id=exposure_id,
name=exposure_name,
label=exposure_label,
header=header or "",
created_at=exposure["created_at"],
creator_name=creator_name or "",
creator_email=creator_email or "",
description=exposure.get("description", ""),
native_query=native_query,
),
}
)

documented_exposure_names.append(exposure_name)

# Output dbt YAML
result = {
"version": self._RESOURCE_VERSION,
"exposures": parsed_exposures,
}
with open(self.output_file, "w", encoding="utf-8") as docs:
yaml.dump(
result,
docs,
Dumper=self.DbtDumper,
default_flow_style=False,
allow_unicode=True,
sort_keys=False,
)
return result
for group, exposures in self._group_exposures(parsed_exposures).items():
path = self.output_path.joinpath(*group[:-1]) / f"{group[-1]}.yml"
path.parent.mkdir(parents=True, exist_ok=True)

exposures_unwrapped = map(lambda x: x["exposure"], exposures)
exposures_sorted = sorted(exposures_unwrapped, key=lambda x: x["name"])

with open(path, "w", encoding="utf-8") as f:
yaml.dump(
{
"version": self._RESOURCE_VERSION,
"exposures": exposures_sorted,
},
f,
Dumper=self.DbtDumper,
default_flow_style=False,
allow_unicode=True,
sort_keys=False,
)

return {"exposures": parsed_exposures} # todo: decide on output?

def _extract_card_exposures(
self,
Expand Down Expand Up @@ -751,7 +769,7 @@ def _build_exposure(
return {
"name": name,
"label": label,
"description": description,
"description": safe_description(description),
"type": "analysis" if exposure_type == "card" else "dashboard",
"url": f"{self.client.url}/{exposure_type}/{exposure_id}",
"maturity": "medium",
Expand All @@ -768,6 +786,35 @@ def _build_exposure(
),
}

def _group_exposures(
self, exposures: Iterable[Mapping]
) -> Mapping[Tuple[str, ...], Iterable[Mapping]]:
"""Group exposures by configured output grouping.

Args:
exposures (Iterable[Mapping]): Collection of exposures.

Returns:
Mapping[Tuple[str, ...], Iterable[Mapping]]: Exposures indexed by configured grouping.
"""

results: Dict[Tuple[str, ...], List[Mapping]] = {}

for exposure in exposures:
group: Tuple[str, ...] = ("exposures",)
if self.output_grouping == "collection":
collection = exposure["collection"]
group = (collection.get("slug") or safe_name(collection["name"]),)
elif self.output_grouping == "type":
group = (exposure["type"], exposure["id"])

result = results.get(group, [])
result.append(exposure)
if group not in results:
results[group] = result

return results


class MetabaseClient:
"""Metabase API client."""
Expand Down Expand Up @@ -894,17 +941,19 @@ def extract_exposures(
self,
models: List[MetabaseModel],
output_path: str = ".",
output_name: str = "metabase_exposures",
output_grouping: Optional[str] = None,
include_personal_collections: bool = True,
collection_includes: Optional[Iterable] = None,
collection_excludes: Optional[Iterable] = None,
) -> Mapping:
"""Extracts exposures in Metabase downstream of dbt models and sources as parsed by dbt reader.

Args:
models (List[MetabaseModel]): List of dbt models.
output_path (str, optional): Path for output YAML. Defaults to ".".
output_name (str, optional): Name for output YAML. Defaults to "metabase_exposures".
output_path (str, optional): Path for output files. Defaults to ".".
output_grouping (Optional[str], optional): Grouping for output YAML files, supported values: "collection" (by collection slug) or "type" (by entity type). Defaults to None.
include_personal_collections (bool, optional): Include personal Metabase collections. Defaults to True.
collection_includes (Optional[Iterable], optional): Include certain Metabase collections. Defaults to None.
collection_excludes (Optional[Iterable], optional): Exclude certain Metabase collections. Defaults to None.

Returns:
Expand All @@ -914,7 +963,8 @@ def extract_exposures(
client=self,
models=models,
output_path=output_path,
output_name=output_name,
output_grouping=output_grouping,
include_personal_collections=include_personal_collections,
collection_includes=collection_includes,
collection_excludes=collection_excludes,
).execute()
Loading
Loading