Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Column ordering sync #229

Merged
merged 2 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ sandbox-models:
--metabase-database $$POSTGRES_DB \
--include-schemas "public",other \
--http-header x-dummy-key dummy-value \
--order-fields \
--verbose )
.PHONY: sandbox-models

Expand Down
9 changes: 9 additions & 0 deletions dbtmetabase/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,13 @@ def wrapper(
type=click.STRING,
help="URL for dbt docs hosting, to append model links to table descriptions.",
)
@click.option(
"--order-fields",
envvar="ORDER_FIELDS",
show_envvar=True,
is_flag=True,
help="Preserve column order in dbt project.",
)
def models(
metabase_database: str,
include_databases: Optional[Sequence[str]],
Expand All @@ -295,6 +302,7 @@ def models(
sync_timeout: int,
append_tags: bool,
docs_url: Optional[str],
order_fields: bool,
core: DbtMetabase,
):
core.export_models(
Expand All @@ -306,6 +314,7 @@ def models(
sync_timeout=sync_timeout,
append_tags=append_tags,
docs_url=docs_url,
order_fields=order_fields,
)


Expand Down
101 changes: 93 additions & 8 deletions dbtmetabase/_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def export_models(
sync_timeout: int = DEFAULT_MODELS_SYNC_TIMEOUT,
append_tags: bool = False,
docs_url: Optional[str] = None,
order_fields: bool = False,
):
"""Exports dbt models to Metabase database schema.

Expand All @@ -53,6 +54,7 @@ def export_models(
sync_timeout (int, optional): Number of seconds to wait until Metabase schema matches the dbt project. To skip synchronization, set timeout to 0. Defaults to 30.
append_tags (bool, optional): Append dbt tags to table descriptions. Defaults to False.
docs_url (Optional[str], optional): URL for dbt docs hosting, to append model links to table descriptions. Defaults to None.
order_fields (bool, optional): Preserve column order in dbt project.
"""

ctx = self.__Context()
Expand Down Expand Up @@ -113,14 +115,25 @@ def export_models(
raise MetabaseStateError("Unable to sync models with Metabase")

for model in models:
success &= self.__export_model(ctx, model, append_tags, docs_url)
success &= self.__export_model(
ctx=ctx,
model=model,
append_tags=append_tags,
docs_url=docs_url,
order_fields=order_fields,
)

for update in ctx.updates.values():
if update["kind"] == "table":
self.metabase.update_table(
uid=update["id"],
body=update["body"],
)
elif update["kind"] == "table_field_order":
self.metabase.update_table_field_order(
uid=update["id"],
body=list(update["body"]["values"]),
)
elif update["kind"] == "field":
self.metabase.update_field(
uid=update["id"],
Expand All @@ -129,7 +142,7 @@ def export_models(

_logger.info(
"%s '%s' updated successfully: %s",
update["kind"].capitalize(),
" ".join(update["kind"].split("_")).capitalize(),
update["label"],
", ".join(update.get("body", {})),
)
Expand All @@ -143,6 +156,7 @@ def __export_model(
model: Model,
append_tags: bool,
docs_url: Optional[str],
order_fields: bool,
) -> bool:
"""Exports one dbt model to Metabase database schema."""

Expand Down Expand Up @@ -192,10 +206,79 @@ def __export_model(
_logger.info("Table '%s' is up to date", table_key)

for column in model.columns:
success &= self.__export_column(ctx, schema_name, model_name, column)
success &= self.__export_column(
ctx,
schema_name=schema_name,
model_name=model_name,
column=column,
)

if order_fields:
success &= self.__export_model_column_order(
ctx=ctx,
model=model,
api_table=api_table,
table_key=table_key,
)

return success

def __export_model_column_order(
self,
ctx: __Context,
model: Model,
api_table: Mapping,
table_key: str,
) -> bool:
"""Exports model column order to Metabase field order."""

api_ord = {}
for field in api_table.get("fields", {}).values():
api_ord[field["id"]] = field["name"]

dbt_ord = {}
for column in model.columns:
field_id = ctx.get_field(table_key, column.name.upper()).get("id")
if field_id:
dbt_ord[field_id] = column.name

if (
list(api_ord.keys()) != list(dbt_ord.keys())
or api_table.get("field_order") != "custom"
):
api_keys = set(api_ord.keys())
dbt_keys = set(dbt_ord.keys())

if api_keys == dbt_keys:
ctx.update(
entity={
"kind": "table_field_order",
"id": api_table["id"],
},
change={"values": dbt_ord.keys()},
label=table_key,
)
_logger.info("Table field order '%s' will be updated", table_key)
else:
details = []

api_only = [api_ord[x] for x in api_keys - dbt_keys]
if api_only:
details.append(f"missing in dbt {api_only}")

dbt_only = [dbt_ord[x] for x in dbt_keys - api_keys]
if dbt_only:
details.append(f"missing in Metabase {dbt_only}")

_logger.error(
"Table field order '%s' mismatch: %s",
table_key,
", ".join(details),
)
return False

return True

def __export_column(
self,
ctx: __Context,
Expand All @@ -211,7 +294,7 @@ def __export_column(
column_name = column.name.upper()
column_label = f"{schema_name}.{model_name}.{column_name}"

api_field = ctx.tables.get(table_key, {}).get("fields", {}).get(column_name)
api_field = ctx.get_field(table_key, column_name)
if not api_field:
_logger.error("Field '%s.%s' does not exist", table_key, column_name)
return False
Expand All @@ -234,10 +317,9 @@ def __export_column(
fk_target_field_label = f"{fk_target_table_name}.{fk_target_field_name}"

if fk_target_table_name and fk_target_field_name:
fk_target_field = (
ctx.tables.get(fk_target_table_name, {})
.get("fields", {})
.get(fk_target_field_name)
fk_target_field = ctx.get_field(
table_key=fk_target_table_name,
field_key=fk_target_field_name,
)
if fk_target_field:
fk_target_field_id = fk_target_field.get("id")
Expand Down Expand Up @@ -380,6 +462,9 @@ class __Context:
tables: Mapping[str, MutableMapping] = dc.field(default_factory=dict)
updates: MutableMapping[str, MutableMapping] = dc.field(default_factory=dict)

def get_field(self, table_key: str, field_key: str) -> MutableMapping:
return self.tables.get(table_key, {}).get("fields", {}).get(field_key, {})

def update(self, entity: MutableMapping, change: Mapping, label: str):
entity.update(change)

Expand Down
4 changes: 4 additions & 0 deletions dbtmetabase/metabase.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ def update_table(self, uid: str, body: Mapping) -> Mapping:
"""Posts update to an existing table."""
return dict(self._api("put", f"/api/table/{uid}", json=body))

def update_table_field_order(self, uid: str, body: Sequence) -> Sequence:
"""Posts update to field order of an existing table."""
return list(self._api("put", f"/api/table/{uid}/fields/order", json=body))

def update_field(self, uid: str, body: Mapping) -> Mapping:
"""Posts an update to an existing table field."""
return dict(self._api("put", f"/api/field/{uid}", json=body))
6 changes: 6 additions & 0 deletions sandbox/models/staging/schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ models:
tests:
- unique
- not_null
- name: first_name
- name: last_name

- name: stg_orders
columns:
Expand All @@ -18,6 +20,8 @@ models:
tests:
- accepted_values:
values: ['placed', 'shipped', 'completed', 'return_pending', 'returned']
- name: order_date
- name: customer_id

- name: stg_payments
columns:
Expand All @@ -29,3 +33,5 @@ models:
tests:
- accepted_values:
values: ['credit_card', 'coupon', 'bank_transfer', 'gift_card']
- name: order_id
- name: amount