From 2c5276fd36268d2d200c61e86ce9eee434fa3349 Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Fri, 8 May 2026 13:44:21 -0700 Subject: [PATCH 1/7] Support full mat --- .../internal/cube_materializations.py | 181 +++++++++++++- .../tests/api/materializations_test.py | 233 +++++++----------- 2 files changed, 255 insertions(+), 159 deletions(-) diff --git a/datajunction-server/datajunction_server/internal/cube_materializations.py b/datajunction-server/datajunction_server/internal/cube_materializations.py index 5bb4894bf..7cc1208ff 100644 --- a/datajunction-server/datajunction_server/internal/cube_materializations.py +++ b/datajunction-server/datajunction_server/internal/cube_materializations.py @@ -1,11 +1,17 @@ """Helper functions related to cube materializations.""" import itertools +from types import SimpleNamespace from sqlalchemy.ext.asyncio import AsyncSession from datajunction_server.sql.parsing.backends.antlr4 import parse -from datajunction_server.internal.sql import get_measures_query +from datajunction_server.construction.build_v3.builder import build_measures_sql +from datajunction_server.construction.build_v3.types import ( + BuildContext, + DecomposedMetricInfo, + GrainGroupSQL, +) from datajunction_server.database.node import Column, NodeRevision from datajunction_server.errors import DJInvalidInputException from datajunction_server.models.column import SemanticType @@ -17,9 +23,13 @@ MeasuresMaterialization, UpsertCubeMaterialization, ) +from datajunction_server.models.dialect import Dialect +from datajunction_server.models.materialization import MaterializationStrategy from datajunction_server.models.node_type import NodeNameVersion from datajunction_server.models.partition import Granularity +from datajunction_server.models.query import ColumnMetadata from datajunction_server.sql.parsing import ast +from datajunction_server.utils import SEPARATOR def generate_partition_filter_sql( @@ -152,6 +162,144 @@ def _extract_expression(metric_query: str) -> str: ) +def _v3_col_to_model_column(col) -> ColumnMetadata: + """ + Convert a v3 ``ColumnMetadata`` (dataclass keyed on ``semantic_name``) into + the persisted ``models.query.ColumnMetadata`` (BaseModel keyed on + ``semantic_entity``) shape that ``MeasuresMaterialization.from_measures_query`` + consumes. v3's ``"metric"`` / ``"metric_component"`` / ``"metric_input"`` + semantic types collapse to v2's ``"measure"`` so the downstream + ``SemanticType.DIMENSION`` / ``"measure"`` branching keeps working. + """ + semantic_entity = col.semantic_name + column_name = None + node_name = None + if semantic_entity and SEPARATOR in semantic_entity: + column_name = semantic_entity.rsplit(SEPARATOR, 1)[-1] + node_name = semantic_entity.rsplit(SEPARATOR, 1)[0] + v3_type = col.semantic_type + semantic_type = ( + "measure" + if v3_type in ("metric", "metric_component", "metric_input") + else v3_type + ) + return ColumnMetadata( + name=col.name, + type=col.type, + column=column_name, + node=node_name, + semantic_entity=semantic_entity, + semantic_type=semantic_type, + ) + + +async def _v3_grain_group_to_measures_query( + session: AsyncSession, + gg: GrainGroupSQL, + ctx: BuildContext, + decomposed_metrics: dict[str, DecomposedMetricInfo], +): + """ + Adapt a v3 ``GrainGroupSQL`` into the v2 measures-query shape so the + rest of ``build_cube_materialization`` (and + ``MeasuresMaterialization.from_measures_query``) can stay unchanged. + Async because we may need to refresh expired ORM attributes on the parent + fact / source nodes that v3 left lazily-loaded. + """ + from datajunction_server.models.node_type import NodeType # noqa: PLC0415 + from datajunction_server.utils import refresh_if_needed # noqa: PLC0415 + + parent_node = ctx.nodes.get(gg.parent_name) + if not parent_node or not parent_node.current: # pragma: no cover + raise DJInvalidInputException( + f"Parent node `{gg.parent_name}` not found in build context", + ) + rev = parent_node.current + # v3's ``ctx.nodes`` may carry NodeRevisions whose scalar attrs got + # expired by an earlier ``session.commit()`` in the calling endpoint. + # Pull what we need before reading. + await refresh_if_needed(session, rev, ["name", "version", "display_name"]) + + # v3 doesn't expose the per-grain-group source set on the public + # ``GrainGroupSQL`` (it's tracked internally for scan-estimation only). + # Walk ``ctx.parent_map`` from this grain group's parent fact and + # collect any ``SOURCE`` ancestors so the cube workflow gets the same + # ``catalog.schema.table`` upstream list v2 produced. + upstream_tables: list[str] = [] + seen_sources: set[str] = set() + visited: set[str] = set() + + async def _walk(node_name: str) -> None: + if node_name in visited: + return + visited.add(node_name) + node = ctx.nodes.get(node_name) + if not node or not node.current: + return + if node.type == NodeType.SOURCE: + srev = node.current + await refresh_if_needed( + session, + srev, + ["catalog", "schema_", "table"], + ) + if ( + node.name not in seen_sources + and srev.catalog + and srev.schema_ + and srev.table + ): + seen_sources.add(node.name) + upstream_tables.append( + f"{srev.catalog.name}.{srev.schema_}.{srev.table}", + ) + return + for parent_name in ctx.parent_map.get(node_name, []): + await _walk(parent_name) + + await _walk(gg.parent_name) + # Dimension columns are joined in via dimension links rather than parent + # edges, so their source tables don't appear under ``gg.parent_name``'s + # ``parent_map``. Walk each dimension column's owning node too so the + # upstream-table list reflects everything the materialization SQL reads. + for col in gg.columns: + if col.semantic_type != "dimension": + continue + sem = col.semantic_name + if not sem or SEPARATOR not in sem: + continue + # Strip optional ``[role]`` suffix to get the bare ``node.column``. + bare_sem = sem.split("[", 1)[0] + dim_node_name = bare_sem.rsplit(SEPARATOR, 1)[0] + await _walk(dim_node_name) + + metrics: dict[str, tuple[list, str]] = {} + for metric_name in gg.metrics: + info = decomposed_metrics.get(metric_name) + if info is None: # pragma: no cover + continue + # v2's "combiner" is the full derived SELECT statement (e.g. ``SELECT + # SUM(...) FROM ...``); ``_extract_expression`` and the V1 cube config's + # ``derived_expression`` field both rely on that shape. v3 exposes the + # full query AST separately on ``derived_ast``; stringify it so the + # downstream parser-based extraction stays happy. + metrics[metric_name] = (info.components, str(info.derived_ast)) + + return SimpleNamespace( + node=NodeNameVersion( + name=rev.name, + version=rev.version, + display_name=rev.display_name, + ), + grain=list(gg.grain), + columns=[_v3_col_to_model_column(c) for c in gg.columns], + metrics=metrics, + sql=gg.sql, + spark_conf=None, + upstream_tables=upstream_tables, + ) + + async def build_cube_materialization( session: AsyncSession, current_revision: NodeRevision, @@ -162,21 +310,34 @@ async def build_cube_materialization( """ temporal_partitions = current_revision.temporal_partition_columns() temporal_partition = temporal_partitions[0] - measures_queries = await get_measures_query( - session=session, - metrics=current_revision.cube_node_metrics, - dimensions=current_revision.cube_node_dimensions, - filters=(current_revision.cube_filters or []) - + [ + incremental = upsert_input.strategy == MaterializationStrategy.INCREMENTAL_TIME + extra_filters = ( + [ generate_partition_filter_sql( temporal_partition, upsert_input.lookback_window, # type: ignore ), - ], - include_all_columns=False, + ] + if incremental + else [] + ) + result = await build_measures_sql( + session=session, + metrics=current_revision.cube_node_metrics, + dimensions=current_revision.cube_node_dimensions, + filters=(current_revision.cube_filters or []) + extra_filters, + dialect=Dialect.SPARK, use_materialized=True, - preagg_requested=True, ) + measures_queries = [ + await _v3_grain_group_to_measures_query( + session, + gg, + result.ctx, + result.decomposed_metrics, + ) + for gg in result.grain_groups + ] query_grains = { k: [q.node.name for q in queries] for k, queries in itertools.groupby( diff --git a/datajunction-server/tests/api/materializations_test.py b/datajunction-server/tests/api/materializations_test.py index a18c90182..f8ffc3046 100644 --- a/datajunction-server/tests/api/materializations_test.py +++ b/datajunction-server/tests/api/materializations_test.py @@ -17,7 +17,6 @@ NodeNameVersion, ) from datajunction_server.models.partition import Granularity, PartitionBackfill -from datajunction_server.models.query import ColumnMetadata from datajunction_server.service_clients import QueryServiceClient from datajunction_server.sql.parsing.backends.antlr4 import parse @@ -818,19 +817,19 @@ async def test_druid_cube_incremental( actual_node = mat.measures_materializations[0].node assert actual_node.name == "default.repair_orders_fact" assert actual_node.display_name == "Repair Orders Fact" - assert mat.measures_materializations[0].grain == [ - "default_DOT_repair_orders_fact_DOT_order_date", - "default_DOT_hard_hat_DOT_state", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_municipality_dim_DOT_local_region", + # v3 emits clean SQL aliases (``order_date``) instead of v2's munged + # ``default_DOT_repair_orders_fact_DOT_order_date`` form, and sorts grain + # columns deterministically. + expected_dim_grain = ["company_name", "local_region", "order_date", "state"] + expected_dimensions_in_order = [ + "order_date", + "state", + "company_name", + "local_region", ] - assert mat.measures_materializations[0].dimensions == [ - "default_DOT_repair_orders_fact_DOT_order_date", - "default_DOT_hard_hat_DOT_state", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_municipality_dim_DOT_local_region", - ] - assert mat.measures_materializations[0].measures == [ + assert mat.measures_materializations[0].grain == expected_dim_grain + assert mat.measures_materializations[0].dimensions == expected_dimensions_in_order + expected_components = [ MetricComponent( name="repair_order_id_count_bd241964", expression="repair_order_id", @@ -846,64 +845,34 @@ async def test_druid_cube_incremental( rule=AggregationRule(type=Aggregability.FULL, level=None), ), ] - assert mat.measures_materializations[0].columns == [ - ColumnMetadata( - name="default_DOT_repair_orders_fact_DOT_order_date", - type="timestamp", - column="order_date", - node="default.repair_orders_fact", - semantic_entity="default.repair_orders_fact.order_date", - semantic_type="dimension", - ), - ColumnMetadata( - name="default_DOT_hard_hat_DOT_state", - type="string", - column="state", - node="default.hard_hat", - semantic_entity="default.hard_hat.state", - semantic_type="dimension", - ), - ColumnMetadata( - name="default_DOT_dispatcher_DOT_company_name", - type="string", - column="company_name", - node="default.dispatcher", - semantic_entity="default.dispatcher.company_name", - semantic_type="dimension", - ), - ColumnMetadata( - name="default_DOT_municipality_dim_DOT_local_region", - type="string", - column="local_region", - node="default.municipality_dim", - semantic_entity="default.municipality_dim.local_region", - semantic_type="dimension", - ), - ColumnMetadata( - name="repair_order_id_count_bd241964", - type="bigint", - column="repair_order_id_count_bd241964", - node="default.repair_orders_fact", - semantic_entity="default.repair_orders_fact.repair_order_id_count_bd241964", - semantic_type="measure", - ), - ColumnMetadata( - name="total_repair_cost_sum_67874507", - type="double", - column="total_repair_cost_sum_67874507", - node="default.repair_orders_fact", - semantic_entity="default.repair_orders_fact.total_repair_cost_sum_67874507", - semantic_type="measure", - ), - ] + assert sorted( + mat.measures_materializations[0].measures, + key=lambda m: m.name, + ) == sorted(expected_components, key=lambda m: m.name) + # v3 emits ``metric_node:component_name`` as the semantic identifier for + # measure columns; the v2 path stitched ``parent_fact.component_name`` + # instead. Both are usable downstream — assert structurally. + columns_by_name = {c.name: c for c in mat.measures_materializations[0].columns} + assert set(columns_by_name) == { + "order_date", + "state", + "company_name", + "local_region", + "repair_order_id_count_bd241964", + "total_repair_cost_sum_67874507", + } assert ( - mat.measures_materializations[0].timestamp_column - == "default_DOT_repair_orders_fact_DOT_order_date" + columns_by_name["order_date"].semantic_entity + == "default.repair_orders_fact.order_date" ) + assert columns_by_name["order_date"].semantic_type == "dimension" + assert columns_by_name["repair_order_id_count_bd241964"].semantic_type == "measure" + assert mat.measures_materializations[0].timestamp_column == "order_date" assert mat.measures_materializations[0].timestamp_format == "yyyyMMdd" assert mat.measures_materializations[0].granularity == Granularity.DAY assert mat.measures_materializations[0].spark_conf is None - assert mat.measures_materializations[0].upstream_tables == [ + # Order varies (set traversal); compare as a set. + assert set(mat.measures_materializations[0].upstream_tables) == { "default.roads.repair_orders", "default.roads.repair_order_details", "default.roads.hard_hats", @@ -911,7 +880,7 @@ async def test_druid_cube_incremental( "default.roads.municipality", "default.roads.municipality_municipality_type", "default.roads.municipality_type", - ] + } assert mat.measures_materializations[0].output_table_name.startswith( "default_repair_orders_fact", ) @@ -921,93 +890,59 @@ async def test_druid_cube_incremental( assert combiner_node.version == "v1.0" assert combiner_node.display_name == "Repairs Cube Default Incremental" assert mat.combiners[0].query is None - assert mat.combiners[0].columns == [ - ColumnMetadata( - name="default_DOT_repair_orders_fact_DOT_order_date", - type="timestamp", - column="order_date", - node="default.repair_orders_fact", - semantic_entity="default.repair_orders_fact.order_date", - semantic_type="dimension", - ), - ColumnMetadata( - name="default_DOT_hard_hat_DOT_state", - type="string", - column="state", - node="default.hard_hat", - semantic_entity="default.hard_hat.state", - semantic_type="dimension", - ), - ColumnMetadata( - name="default_DOT_dispatcher_DOT_company_name", - type="string", - column="company_name", - node="default.dispatcher", - semantic_entity="default.dispatcher.company_name", - semantic_type="dimension", - ), - ColumnMetadata( - name="default_DOT_municipality_dim_DOT_local_region", - type="string", - column="local_region", - node="default.municipality_dim", - semantic_entity="default.municipality_dim.local_region", - semantic_type="dimension", - ), - ColumnMetadata( - name="repair_order_id_count_bd241964", - type="bigint", - column="repair_order_id_count_bd241964", - node="default.repair_orders_fact", - semantic_entity="default.repair_orders_fact.repair_order_id_count_bd241964", - semantic_type="measure", - ), - ColumnMetadata( - name="total_repair_cost_sum_67874507", - type="double", - column="total_repair_cost_sum_67874507", - node="default.repair_orders_fact", - semantic_entity="default.repair_orders_fact.total_repair_cost_sum_67874507", - semantic_type="measure", - ), - ] - assert mat.combiners[0].grain == [ - "default_DOT_repair_orders_fact_DOT_order_date", - "default_DOT_hard_hat_DOT_state", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_municipality_dim_DOT_local_region", - ] - assert mat.combiners[0].dimensions == [ - "default_DOT_repair_orders_fact_DOT_order_date", - "default_DOT_hard_hat_DOT_state", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_municipality_dim_DOT_local_region", - ] - assert mat.combiners[0].measures == [ - MetricComponent( - name="repair_order_id_count_bd241964", - expression="repair_order_id", - aggregation="COUNT", - merge="SUM", - rule=AggregationRule(type=Aggregability.FULL, level=None), - ), - MetricComponent( - name="total_repair_cost_sum_67874507", - expression="total_repair_cost", - aggregation="SUM", - merge="SUM", - rule=AggregationRule(type=Aggregability.FULL, level=None), - ), - ] - assert ( - mat.combiners[0].timestamp_column - == "default_DOT_repair_orders_fact_DOT_order_date" - ) + combiner_cols_by_name = {c.name: c for c in mat.combiners[0].columns} + assert set(combiner_cols_by_name) == set(columns_by_name) + assert mat.combiners[0].grain == expected_dim_grain + assert mat.combiners[0].dimensions == expected_dimensions_in_order + assert sorted(mat.combiners[0].measures, key=lambda m: m.name) == sorted( + expected_components, + key=lambda m: m.name, + ) + assert mat.combiners[0].timestamp_column == "order_date" assert mat.combiners[0].timestamp_format == "yyyyMMdd" assert mat.combiners[0].granularity == Granularity.DAY assert mat.combiners[0].upstream_tables[0].startswith("default_repair_orders_fact") +@pytest.mark.asyncio +async def test_druid_cube_full( + client_with_repairs_cube: AsyncClient, + module__query_service_client: QueryServiceClient, + set_temporal_column, +): + """ + Verifying this materialization setup: + - Job Type: druid_cube + - Strategy: full + Cases to check: + - [success] FULL strategy should NOT include temporal partition filters in the SQL + """ + cube_name = "default.repairs_cube__default_full" + client_with_repairs_cube = await client_with_repairs_cube(cube_name=cube_name) # type: ignore + await set_temporal_column( + client_with_repairs_cube, + cube_name, + "default.repair_orders_fact.order_date", + ) + response = await client_with_repairs_cube.post( + f"/nodes/{cube_name}/materialization/", + json={ + "job": "druid_cube", + "strategy": "full", + "schedule": "@daily", + }, + ) + assert response.status_code in (200, 201), response.json() + assert "Successfully updated materialization config" in response.json()["message"] + + _, kwargs = module__query_service_client.materialize_cube.call_args_list[0] # type: ignore + mat = kwargs["materialization_input"] + assert mat.job == "DruidCubeMaterializationJob" + measures_query = mat.measures_materializations[0].query + assert "DJ_LOGICAL_TIMESTAMP" not in measures_query + assert "dj_logical_timestamp" not in measures_query.lower() + + @pytest.mark.asyncio async def test_spark_sql_full( module__client_with_roads: AsyncClient, From 712060bad0a632c102f12a796688d4b45e2a7c1f Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Fri, 8 May 2026 17:54:57 -0700 Subject: [PATCH 2/7] Fix --- .../internal/cube_materializations.py | 68 +++- .../models/cube_materialization.py | 3 +- datajunction-server/tests/api/cubes_test.py | 364 ++++++++---------- 3 files changed, 220 insertions(+), 215 deletions(-) diff --git a/datajunction-server/datajunction_server/internal/cube_materializations.py b/datajunction-server/datajunction_server/internal/cube_materializations.py index 7cc1208ff..96f3e2a15 100644 --- a/datajunction-server/datajunction_server/internal/cube_materializations.py +++ b/datajunction-server/datajunction_server/internal/cube_materializations.py @@ -183,8 +183,16 @@ def _v3_col_to_model_column(col) -> ColumnMetadata: if v3_type in ("metric", "metric_component", "metric_input") else v3_type ) + # Dimension columns use the full semantic name (dots → _DOT_) as their output + # name, matching the v2 shape that MeasuresMaterialization.from_measures_query + # expects. Measure columns use their hashed short name unchanged. + name = ( + semantic_entity.replace(".", "_DOT_") + if semantic_type == "dimension" and semantic_entity + else col.name + ) return ColumnMetadata( - name=col.name, + name=name, type=col.type, column=column_name, node=node_name, @@ -283,20 +291,49 @@ async def _walk(node_name: str) -> None: # ``derived_expression`` field both rely on that shape. v3 exposes the # full query AST separately on ``derived_ast``; stringify it so the # downstream parser-based extraction stays happy. - metrics[metric_name] = (info.components, str(info.derived_ast)) + metrics[metric_name] = ( + info.components, + " ".join(str(info.derived_ast).split()), + ) + columns_raw = [_v3_col_to_model_column(c) for c in gg.columns] + # For measure columns v3's semantic_name is "namespace.metric:component" which + # gives the wrong node/column when split on ".". Normalize them to the v2 shape + # so downstream consumers see "parent_fact_node.component_name". + parent_node_name = gg.parent_name + columns = [] + for col in columns_raw: + if col.semantic_type == "measure": + columns.append( + ColumnMetadata( + name=col.name, + type=col.type, + column=col.name, + node=parent_node_name, + semantic_entity=f"{parent_node_name}.{col.name}", + semantic_type=col.semantic_type, + ), + ) + else: + columns.append(col) + # Build lookup from v3's short alias → full _DOT_ name so the grain list + # matches what MeasuresMaterialization.from_measures_query expects. + alias_to_name = { + orig.name: converted.name for orig, converted in zip(gg.columns, columns) + } + grain = [alias_to_name.get(g, g) for g in gg.grain] return SimpleNamespace( node=NodeNameVersion( name=rev.name, version=rev.version, display_name=rev.display_name, ), - grain=list(gg.grain), - columns=[_v3_col_to_model_column(c) for c in gg.columns], + grain=grain, + columns=columns, metrics=metrics, sql=gg.sql, spark_conf=None, - upstream_tables=upstream_tables, + upstream_tables=sorted(upstream_tables), ) @@ -329,15 +366,18 @@ async def build_cube_materialization( dialect=Dialect.SPARK, use_materialized=True, ) - measures_queries = [ - await _v3_grain_group_to_measures_query( - session, - gg, - result.ctx, - result.decomposed_metrics, - ) - for gg in result.grain_groups - ] + measures_queries = sorted( + [ + await _v3_grain_group_to_measures_query( + session, + gg, + result.ctx, + result.decomposed_metrics, + ) + for gg in result.grain_groups + ], + key=lambda q: (-len(q.metrics), q.node.name), + ) query_grains = { k: [q.node.name for q in queries] for k, queries in itertools.groupby( diff --git a/datajunction-server/datajunction_server/models/cube_materialization.py b/datajunction-server/datajunction_server/models/cube_materialization.py index af834928b..790fe7e9c 100644 --- a/datajunction-server/datajunction_server/models/cube_materialization.py +++ b/datajunction-server/datajunction_server/models/cube_materialization.py @@ -127,12 +127,13 @@ def from_measures_query(cls, measures_query, temporal_partition): """ Builds a MeasuresMaterialization object from a measures query. """ - metric_components = list( + metric_components = sorted( { component.name: component for metric, (components, combiner) in measures_query.metrics.items() for component in components }.values(), + key=lambda c: c.name, ) dimensional_metric_components = [ component.name diff --git a/datajunction-server/tests/api/cubes_test.py b/datajunction-server/tests/api/cubes_test.py index 93b6e1579..19d011c77 100644 --- a/datajunction-server/tests/api/cubes_test.py +++ b/datajunction-server/tests/api/cubes_test.py @@ -2999,8 +2999,8 @@ async def test_cube_materialization_metadata( ] assert results["metrics"] == [ { - "derived_expression": "SELECT CAST(SUM(discount_sum_30b84e6c) AS DOUBLE) / " - "SUM(count_c8e42e74) AS default_DOT_discounted_orders_rate FROM " + "derived_expression": "SELECT CAST(SUM(discount_sum_30b84e6c) AS DOUBLE) / " + "SUM(count_c8e42e74) AS default_DOT_discounted_orders_rate FROM " "default.repair_orders_fact", "metric_expression": "CAST(SUM(discount_sum_30b84e6c) AS DOUBLE) / " "SUM(count_c8e42e74)", @@ -3029,7 +3029,7 @@ async def test_cube_materialization_metadata( ], }, { - "derived_expression": "SELECT SUM(repair_order_id_count_bd241964) FROM " + "derived_expression": "SELECT SUM(repair_order_id_count_bd241964) FROM " "default.repair_orders_fact", "metric_expression": "SUM(repair_order_id_count_bd241964)", "metric": { @@ -3049,7 +3049,7 @@ async def test_cube_materialization_metadata( ], }, { - "derived_expression": "SELECT SUM(price_sum_935e7117) / SUM(price_count_935e7117) FROM " + "derived_expression": "SELECT SUM(price_sum_935e7117) / SUM(price_count_935e7117) FROM " "default.repair_orders_fact", "metric_expression": "SUM(price_sum_935e7117) / SUM(price_count_935e7117)", "metric": { @@ -3077,7 +3077,7 @@ async def test_cube_materialization_metadata( ], }, { - "derived_expression": "SELECT SUM(total_repair_cost_sum_67874507) FROM " + "derived_expression": "SELECT SUM(total_repair_cost_sum_67874507) FROM " "default.repair_orders_fact", "metric_expression": "SUM(total_repair_cost_sum_67874507)", "metric": { @@ -3097,7 +3097,7 @@ async def test_cube_materialization_metadata( ], }, { - "derived_expression": "SELECT SUM(price_discount_sum_e4ba5456) FROM " + "derived_expression": "SELECT SUM(price_discount_sum_e4ba5456) FROM " "default.repair_orders_fact", "metric_expression": "SUM(price_discount_sum_e4ba5456)", "metric": { @@ -3117,8 +3117,8 @@ async def test_cube_materialization_metadata( ], }, { - "derived_expression": "SELECT SUM(price_sum_252381cf) + SUM(price_sum_252381cf) AS " - "default_DOT_double_total_repair_cost FROM " + "derived_expression": "SELECT SUM(price_sum_252381cf) + SUM(price_sum_252381cf) AS " + "default_DOT_double_total_repair_cost FROM " "default.repair_order_details", "metric_expression": "SUM(price_sum_252381cf) + SUM(price_sum_252381cf)", "metric": { @@ -3203,7 +3203,7 @@ async def test_cube_materialization_metadata( "node": "default.repair_orders_fact", "semantic_entity": "default.repair_orders_fact.discount_sum_30b84e6c", "semantic_type": "measure", - "type": "bigint", + "type": "double", }, { "column": "count_c8e42e74", @@ -3264,33 +3264,33 @@ async def test_cube_materialization_metadata( "default_DOT_municipality_dim_DOT_local_region", ], "grain": [ - "default_DOT_hard_hat_DOT_country", - "default_DOT_hard_hat_DOT_postal_code", "default_DOT_hard_hat_DOT_city", - "default_DOT_hard_hat_DOT_hire_date", - "default_DOT_hard_hat_DOT_state", "default_DOT_dispatcher_DOT_company_name", + "default_DOT_hard_hat_DOT_country", + "default_DOT_hard_hat_DOT_hire_date", "default_DOT_municipality_dim_DOT_local_region", + "default_DOT_hard_hat_DOT_postal_code", + "default_DOT_hard_hat_DOT_state", ], "granularity": "day", "measures": [ { - "aggregation": "SUM", - "expression": "if(discount > 0.0, 1, 0)", + "aggregation": "COUNT", + "expression": "*", "grain_alias": None, "merge": "SUM", - "name": "discount_sum_30b84e6c", + "name": "count_c8e42e74", "rule": { "level": None, "type": "full", }, }, { - "aggregation": "COUNT", - "expression": "*", + "aggregation": "SUM", + "expression": "if(discount > 0.0, 1, 0)", "grain_alias": None, "merge": "SUM", - "name": "count_c8e42e74", + "name": "discount_sum_30b84e6c", "rule": { "level": None, "type": "full", @@ -3298,21 +3298,21 @@ async def test_cube_materialization_metadata( }, { "aggregation": "COUNT", - "expression": "repair_order_id", + "expression": "price", "grain_alias": None, "merge": "SUM", - "name": "repair_order_id_count_bd241964", + "name": "price_count_935e7117", "rule": { "level": None, "type": "full", }, }, { - "aggregation": "COUNT", - "expression": "price", + "aggregation": "SUM", + "expression": "price * discount", "grain_alias": None, "merge": "SUM", - "name": "price_count_935e7117", + "name": "price_discount_sum_e4ba5456", "rule": { "level": None, "type": "full", @@ -3330,11 +3330,11 @@ async def test_cube_materialization_metadata( }, }, { - "aggregation": "SUM", - "expression": "total_repair_cost", + "aggregation": "COUNT", + "expression": "repair_order_id", "grain_alias": None, "merge": "SUM", - "name": "total_repair_cost_sum_67874507", + "name": "repair_order_id_count_bd241964", "rule": { "level": None, "type": "full", @@ -3342,10 +3342,10 @@ async def test_cube_materialization_metadata( }, { "aggregation": "SUM", - "expression": "price * discount", + "expression": "total_repair_cost", "grain_alias": None, "merge": "SUM", - "name": "price_discount_sum_e4ba5456", + "name": "total_repair_cost_sum_67874507", "rule": { "level": None, "type": "full", @@ -3363,13 +3363,13 @@ async def test_cube_materialization_metadata( "timestamp_column": "default_DOT_hard_hat_DOT_hire_date", "timestamp_format": "yyyyMMdd", "upstream_tables": [ - "default.roads.repair_orders", - "default.roads.repair_order_details", - "default.roads.hard_hats", "default.roads.dispatchers", + "default.roads.hard_hats", "default.roads.municipality", "default.roads.municipality_municipality_type", "default.roads.municipality_type", + "default.roads.repair_order_details", + "default.roads.repair_orders", ], }, { @@ -3449,13 +3449,13 @@ async def test_cube_materialization_metadata( "default_DOT_municipality_dim_DOT_local_region", ], "grain": [ - "default_DOT_hard_hat_DOT_country", - "default_DOT_hard_hat_DOT_postal_code", "default_DOT_hard_hat_DOT_city", - "default_DOT_hard_hat_DOT_hire_date", - "default_DOT_hard_hat_DOT_state", "default_DOT_dispatcher_DOT_company_name", + "default_DOT_hard_hat_DOT_country", + "default_DOT_hard_hat_DOT_hire_date", "default_DOT_municipality_dim_DOT_local_region", + "default_DOT_hard_hat_DOT_postal_code", + "default_DOT_hard_hat_DOT_state", ], "granularity": "day", "measures": [ @@ -3482,107 +3482,71 @@ async def test_cube_materialization_metadata( "timestamp_column": "default_DOT_hard_hat_DOT_hire_date", "timestamp_format": "yyyyMMdd", "upstream_tables": [ - "default.roads.repair_order_details", - "default.roads.repair_orders", - "default.roads.hard_hats", "default.roads.dispatchers", + "default.roads.hard_hats", "default.roads.municipality", "default.roads.municipality_municipality_type", "default.roads.municipality_type", + "default.roads.repair_order_details", ], }, ] measures_sql = """ - WITH default_DOT_repair_orders_fact AS ( + WITH default_dispatcher AS ( + SELECT dispatcher_id, company_name + FROM default.roads.dispatchers + ), + default_hard_hat AS ( + SELECT hard_hat_id, hire_date, city, state, postal_code, country + FROM default.roads.hard_hats + WHERE state = 'AZ' + AND hire_date = CAST(DATE_FORMAT(CAST(${dj_logical_timestamp} AS TIMESTAMP), 'yyyyMMdd') AS TIMESTAMP) + ), + default_municipality_dim AS ( + SELECT m.municipality_id AS municipality_id, local_region + FROM default.roads.municipality AS m + LEFT JOIN default.roads.municipality_municipality_type AS mmt + ON m.municipality_id = mmt.municipality_id + LEFT JOIN default.roads.municipality_type AS mt + ON mmt.municipality_type_id = mt.municipality_type_desc + ), + default_repair_orders_fact AS ( SELECT repair_orders.repair_order_id, repair_orders.municipality_id, repair_orders.hard_hat_id, repair_orders.dispatcher_id, - repair_orders.order_date, - repair_orders.dispatched_date, - repair_orders.required_date, repair_order_details.discount, repair_order_details.price, - repair_order_details.quantity, - repair_order_details.repair_type_id, - repair_order_details.price * repair_order_details.quantity AS total_repair_cost, - repair_orders.dispatched_date - repair_orders.order_date AS time_to_dispatch, - repair_orders.dispatched_date - repair_orders.required_date AS dispatch_delay - FROM roads.repair_orders AS repair_orders JOIN roads.repair_order_details AS repair_order_details ON repair_orders.repair_order_id = repair_order_details.repair_order_id - ), - default_DOT_hard_hat AS ( - SELECT - default_DOT_hard_hats.hard_hat_id, - default_DOT_hard_hats.last_name, - default_DOT_hard_hats.first_name, - default_DOT_hard_hats.title, - default_DOT_hard_hats.birth_date, - default_DOT_hard_hats.hire_date, - default_DOT_hard_hats.address, - default_DOT_hard_hats.city, - default_DOT_hard_hats.state, - default_DOT_hard_hats.postal_code, - default_DOT_hard_hats.country, - default_DOT_hard_hats.manager, - default_DOT_hard_hats.contractor_id - FROM roads.hard_hats AS default_DOT_hard_hats - WHERE default_DOT_hard_hats.state = 'AZ' AND default_DOT_hard_hats.hire_date = CAST(DATE_FORMAT(CAST(DJ_LOGICAL_TIMESTAMP() AS TIMESTAMP), 'yyyyMMdd') AS TIMESTAMP) - ), - default_DOT_dispatcher AS ( - SELECT - default_DOT_dispatchers.dispatcher_id, - default_DOT_dispatchers.company_name, - default_DOT_dispatchers.phone - FROM roads.dispatchers AS default_DOT_dispatchers - ), - default_DOT_municipality_dim AS ( - SELECT - m.municipality_id AS municipality_id, - m.contact_name, - m.contact_title, - m.local_region, - m.state_id, - mmt.municipality_type_id AS municipality_type_id, - mt.municipality_type_desc AS municipality_type_desc - FROM roads.municipality AS m LEFT JOIN roads.municipality_municipality_type AS mmt ON m.municipality_id = mmt.municipality_id - LEFT JOIN roads.municipality_type AS mt ON mmt.municipality_type_id = mt.municipality_type_desc - ), - default_DOT_repair_orders_fact_built AS ( - SELECT - default_DOT_repair_orders_fact.repair_order_id, - default_DOT_repair_orders_fact.discount, - default_DOT_repair_orders_fact.price, - default_DOT_repair_orders_fact.total_repair_cost, - default_DOT_hard_hat.country default_DOT_hard_hat_DOT_country, - default_DOT_hard_hat.postal_code default_DOT_hard_hat_DOT_postal_code, - default_DOT_hard_hat.city default_DOT_hard_hat_DOT_city, - default_DOT_hard_hat.hire_date default_DOT_hard_hat_DOT_hire_date, - default_DOT_hard_hat.state default_DOT_hard_hat_DOT_state, - default_DOT_dispatcher.company_name default_DOT_dispatcher_DOT_company_name, - default_DOT_municipality_dim.local_region default_DOT_municipality_dim_DOT_local_region - FROM default_DOT_repair_orders_fact INNER JOIN default_DOT_hard_hat ON default_DOT_repair_orders_fact.hard_hat_id = default_DOT_hard_hat.hard_hat_id - INNER JOIN default_DOT_dispatcher ON default_DOT_repair_orders_fact.dispatcher_id = default_DOT_dispatcher.dispatcher_id - INNER JOIN default_DOT_municipality_dim ON default_DOT_repair_orders_fact.municipality_id = default_DOT_municipality_dim.municipality_id - WHERE default_DOT_hard_hat.state = 'AZ' AND default_DOT_hard_hat.hire_date = CAST(DATE_FORMAT(CAST(${dj_logical_timestamp} AS TIMESTAMP), 'yyyyMMdd') AS TIMESTAMP) + repair_order_details.price * repair_order_details.quantity AS total_repair_cost + FROM default.roads.repair_orders repair_orders + JOIN default.roads.repair_order_details repair_order_details + ON repair_orders.repair_order_id = repair_order_details.repair_order_id ) - SELECT default_DOT_repair_orders_fact_built.default_DOT_hard_hat_DOT_country, - default_DOT_repair_orders_fact_built.default_DOT_hard_hat_DOT_postal_code, - default_DOT_repair_orders_fact_built.default_DOT_hard_hat_DOT_city, - default_DOT_repair_orders_fact_built.default_DOT_hard_hat_DOT_hire_date, - default_DOT_repair_orders_fact_built.default_DOT_hard_hat_DOT_state, - default_DOT_repair_orders_fact_built.default_DOT_dispatcher_DOT_company_name, - default_DOT_repair_orders_fact_built.default_DOT_municipality_dim_DOT_local_region, - SUM(if(discount > 0.0, 1, 0)) AS discount_sum_30b84e6c, - COUNT(*) AS count_c8e42e74, - COUNT(repair_order_id) AS repair_order_id_count_bd241964, - COUNT(price) AS price_count_935e7117, - SUM(price) AS price_sum_935e7117, - SUM(total_repair_cost) AS total_repair_cost_sum_67874507, - SUM(price * discount) AS price_discount_sum_e4ba5456 - FROM default_DOT_repair_orders_fact_built - GROUP BY default_DOT_repair_orders_fact_built.default_DOT_hard_hat_DOT_country, default_DOT_repair_orders_fact_built.default_DOT_hard_hat_DOT_postal_code, default_DOT_repair_orders_fact_built.default_DOT_hard_hat_DOT_city, default_DOT_repair_orders_fact_built.default_DOT_hard_hat_DOT_hire_date, default_DOT_repair_orders_fact_built.default_DOT_hard_hat_DOT_state, default_DOT_repair_orders_fact_built.default_DOT_dispatcher_DOT_company_name, default_DOT_repair_orders_fact_built.default_DOT_municipality_dim_DOT_local_region + SELECT + t2.country, + t2.postal_code, + t2.city, + t2.hire_date, + t2.state, + t3.company_name, + t4.local_region, + SUM(if(t1.discount > 0.0, 1, 0)) discount_sum_30b84e6c, + COUNT(*) count_c8e42e74, + COUNT(t1.repair_order_id) repair_order_id_count_bd241964, + COUNT(t1.price) price_count_935e7117, + SUM(t1.price) price_sum_935e7117, + SUM(t1.total_repair_cost) total_repair_cost_sum_67874507, + SUM(t1.price * t1.discount) price_discount_sum_e4ba5456 + FROM default_repair_orders_fact t1 + INNER JOIN default_hard_hat t2 ON t1.hard_hat_id = t2.hard_hat_id + INNER JOIN default_dispatcher t3 ON t1.dispatcher_id = t3.dispatcher_id + INNER JOIN default_municipality_dim t4 ON t1.municipality_id = t4.municipality_id + WHERE t2.state = 'AZ' + AND t2.hire_date = CAST(DATE_FORMAT(CAST(${dj_logical_timestamp} AS TIMESTAMP), 'yyyyMMdd') AS TIMESTAMP) + GROUP BY t2.country, t2.postal_code, t2.city, t2.hire_date, t2.state, + t3.company_name, t4.local_region """ assert str( parse( @@ -3610,27 +3574,27 @@ async def test_cube_materialization_metadata( "query": mock.ANY, "columns": [ { - "name": "default_DOT_hard_hat_DOT_country", + "name": "default_DOT_hard_hat_DOT_city", "type": "string", - "column": "country", + "column": "city", "node": "default.hard_hat", - "semantic_entity": "default.hard_hat.country", + "semantic_entity": "default.hard_hat.city", "semantic_type": "dimension", }, { - "name": "default_DOT_hard_hat_DOT_postal_code", + "name": "default_DOT_dispatcher_DOT_company_name", "type": "string", - "column": "postal_code", - "node": "default.hard_hat", - "semantic_entity": "default.hard_hat.postal_code", + "column": "company_name", + "node": "default.dispatcher", + "semantic_entity": "default.dispatcher.company_name", "semantic_type": "dimension", }, { - "name": "default_DOT_hard_hat_DOT_city", + "name": "default_DOT_hard_hat_DOT_country", "type": "string", - "column": "city", + "column": "country", "node": "default.hard_hat", - "semantic_entity": "default.hard_hat.city", + "semantic_entity": "default.hard_hat.country", "semantic_type": "dimension", }, { @@ -3642,37 +3606,29 @@ async def test_cube_materialization_metadata( "semantic_type": "dimension", }, { - "name": "default_DOT_hard_hat_DOT_state", + "name": "default_DOT_municipality_dim_DOT_local_region", "type": "string", - "column": "state", - "node": "default.hard_hat", - "semantic_entity": "default.hard_hat.state", + "column": "local_region", + "node": "default.municipality_dim", + "semantic_entity": "default.municipality_dim.local_region", "semantic_type": "dimension", }, { - "name": "default_DOT_dispatcher_DOT_company_name", + "name": "default_DOT_hard_hat_DOT_postal_code", "type": "string", - "column": "company_name", - "node": "default.dispatcher", - "semantic_entity": "default.dispatcher.company_name", + "column": "postal_code", + "node": "default.hard_hat", + "semantic_entity": "default.hard_hat.postal_code", "semantic_type": "dimension", }, { - "name": "default_DOT_municipality_dim_DOT_local_region", + "name": "default_DOT_hard_hat_DOT_state", "type": "string", - "column": "local_region", - "node": "default.municipality_dim", - "semantic_entity": "default.municipality_dim.local_region", + "column": "state", + "node": "default.hard_hat", + "semantic_entity": "default.hard_hat.state", "semantic_type": "dimension", }, - { - "name": "discount_sum_30b84e6c", - "type": "bigint", - "column": "discount_sum_30b84e6c", - "node": "default.repair_orders_fact", - "semantic_entity": "default.repair_orders_fact.discount_sum_30b84e6c", - "semantic_type": "measure", - }, { "name": "count_c8e42e74", "type": "bigint", @@ -3682,11 +3638,11 @@ async def test_cube_materialization_metadata( "semantic_type": "measure", }, { - "name": "repair_order_id_count_bd241964", - "type": "bigint", - "column": "repair_order_id_count_bd241964", + "name": "discount_sum_30b84e6c", + "type": "double", + "column": "discount_sum_30b84e6c", "node": "default.repair_orders_fact", - "semantic_entity": "default.repair_orders_fact.repair_order_id_count_bd241964", + "semantic_entity": "default.repair_orders_fact.discount_sum_30b84e6c", "semantic_type": "measure", }, { @@ -3697,6 +3653,14 @@ async def test_cube_materialization_metadata( "semantic_entity": "default.repair_orders_fact.price_count_935e7117", "semantic_type": "measure", }, + { + "name": "price_discount_sum_e4ba5456", + "type": "double", + "column": "price_discount_sum_e4ba5456", + "node": "default.repair_orders_fact", + "semantic_entity": "default.repair_orders_fact.price_discount_sum_e4ba5456", + "semantic_type": "measure", + }, { "name": "price_sum_935e7117", "type": "double", @@ -3706,19 +3670,19 @@ async def test_cube_materialization_metadata( "semantic_type": "measure", }, { - "name": "total_repair_cost_sum_67874507", - "type": "double", - "column": "total_repair_cost_sum_67874507", + "name": "repair_order_id_count_bd241964", + "type": "bigint", + "column": "repair_order_id_count_bd241964", "node": "default.repair_orders_fact", - "semantic_entity": "default.repair_orders_fact.total_repair_cost_sum_67874507", + "semantic_entity": "default.repair_orders_fact.repair_order_id_count_bd241964", "semantic_type": "measure", }, { - "name": "price_discount_sum_e4ba5456", + "name": "total_repair_cost_sum_67874507", "type": "double", - "column": "price_discount_sum_e4ba5456", + "column": "total_repair_cost_sum_67874507", "node": "default.repair_orders_fact", - "semantic_entity": "default.repair_orders_fact.price_discount_sum_e4ba5456", + "semantic_entity": "default.repair_orders_fact.total_repair_cost_sum_67874507", "semantic_type": "measure", }, { @@ -3731,32 +3695,24 @@ async def test_cube_materialization_metadata( }, ], "grain": [ - "default_DOT_hard_hat_DOT_country", - "default_DOT_hard_hat_DOT_postal_code", "default_DOT_hard_hat_DOT_city", - "default_DOT_hard_hat_DOT_hire_date", - "default_DOT_hard_hat_DOT_state", "default_DOT_dispatcher_DOT_company_name", + "default_DOT_hard_hat_DOT_country", + "default_DOT_hard_hat_DOT_hire_date", "default_DOT_municipality_dim_DOT_local_region", + "default_DOT_hard_hat_DOT_postal_code", + "default_DOT_hard_hat_DOT_state", ], "dimensions": [ - "default_DOT_hard_hat_DOT_country", - "default_DOT_hard_hat_DOT_postal_code", "default_DOT_hard_hat_DOT_city", - "default_DOT_hard_hat_DOT_hire_date", - "default_DOT_hard_hat_DOT_state", "default_DOT_dispatcher_DOT_company_name", + "default_DOT_hard_hat_DOT_country", + "default_DOT_hard_hat_DOT_hire_date", "default_DOT_municipality_dim_DOT_local_region", + "default_DOT_hard_hat_DOT_postal_code", + "default_DOT_hard_hat_DOT_state", ], "measures": [ - { - "name": "discount_sum_30b84e6c", - "expression": "if(discount > 0.0, 1, 0)", - "grain_alias": None, - "aggregation": "SUM", - "merge": "SUM", - "rule": {"type": "full", "level": None}, - }, { "name": "count_c8e42e74", "expression": "*", @@ -3766,10 +3722,10 @@ async def test_cube_materialization_metadata( "rule": {"type": "full", "level": None}, }, { - "name": "repair_order_id_count_bd241964", - "expression": "repair_order_id", + "name": "discount_sum_30b84e6c", + "expression": "if(discount > 0.0, 1, 0)", "grain_alias": None, - "aggregation": "COUNT", + "aggregation": "SUM", "merge": "SUM", "rule": {"type": "full", "level": None}, }, @@ -3781,6 +3737,14 @@ async def test_cube_materialization_metadata( "merge": "SUM", "rule": {"type": "full", "level": None}, }, + { + "name": "price_discount_sum_e4ba5456", + "expression": "price * discount", + "grain_alias": None, + "aggregation": "SUM", + "merge": "SUM", + "rule": {"type": "full", "level": None}, + }, { "name": "price_sum_935e7117", "expression": "price", @@ -3790,16 +3754,16 @@ async def test_cube_materialization_metadata( "rule": {"type": "full", "level": None}, }, { - "name": "total_repair_cost_sum_67874507", - "expression": "total_repair_cost", + "name": "repair_order_id_count_bd241964", + "expression": "repair_order_id", "grain_alias": None, - "aggregation": "SUM", + "aggregation": "COUNT", "merge": "SUM", "rule": {"type": "full", "level": None}, }, { - "name": "price_discount_sum_e4ba5456", - "expression": "price * discount", + "name": "total_repair_cost_sum_67874507", + "expression": "total_repair_cost", "grain_alias": None, "aggregation": "SUM", "merge": "SUM", @@ -3820,7 +3784,7 @@ async def test_cube_materialization_metadata( "upstream_tables": [], "druid_spec": { "dataSchema": { - "dataSource": "dj__default_example_repairs_cube_v1_0_fdc5182835060cb1", + "dataSource": mock.ANY, "parser": { "parseSpec": { "format": "parquet", @@ -3842,39 +3806,39 @@ async def test_cube_materialization_metadata( }, }, "metricsSpec": [ - { - "fieldName": "discount_sum_30b84e6c", - "name": "discount_sum_30b84e6c", - "type": "longSum", - }, { "fieldName": "count_c8e42e74", "name": "count_c8e42e74", "type": "longSum", }, { - "fieldName": "repair_order_id_count_bd241964", - "name": "repair_order_id_count_bd241964", - "type": "longSum", + "fieldName": "discount_sum_30b84e6c", + "name": "discount_sum_30b84e6c", + "type": "doubleSum", }, { "fieldName": "price_count_935e7117", "name": "price_count_935e7117", "type": "longSum", }, + { + "fieldName": "price_discount_sum_e4ba5456", + "name": "price_discount_sum_e4ba5456", + "type": "doubleSum", + }, { "fieldName": "price_sum_935e7117", "name": "price_sum_935e7117", "type": "doubleSum", }, { - "fieldName": "total_repair_cost_sum_67874507", - "name": "total_repair_cost_sum_67874507", - "type": "doubleSum", + "fieldName": "repair_order_id_count_bd241964", + "name": "repair_order_id_count_bd241964", + "type": "longSum", }, { - "fieldName": "price_discount_sum_e4ba5456", - "name": "price_discount_sum_e4ba5456", + "fieldName": "total_repair_cost_sum_67874507", + "name": "total_repair_cost_sum_67874507", "type": "doubleSum", }, { @@ -3898,7 +3862,7 @@ async def test_cube_materialization_metadata( "type": "hadoop", }, }, - "output_table_name": "default_example_repairs_cube_v1_0_fdc5182835060cb1", + "output_table_name": mock.ANY, }, ] From 59d4042283163be66c062342dc454e0224cdceed Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Fri, 8 May 2026 20:28:53 -0700 Subject: [PATCH 3/7] Fix --- .../tests/api/materializations_test.py | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/datajunction-server/tests/api/materializations_test.py b/datajunction-server/tests/api/materializations_test.py index f8ffc3046..b55b82cef 100644 --- a/datajunction-server/tests/api/materializations_test.py +++ b/datajunction-server/tests/api/materializations_test.py @@ -817,15 +817,20 @@ async def test_druid_cube_incremental( actual_node = mat.measures_materializations[0].node assert actual_node.name == "default.repair_orders_fact" assert actual_node.display_name == "Repair Orders Fact" - # v3 emits clean SQL aliases (``order_date``) instead of v2's munged - # ``default_DOT_repair_orders_fact_DOT_order_date`` form, and sorts grain - # columns deterministically. - expected_dim_grain = ["company_name", "local_region", "order_date", "state"] + # v3 uses full semantic names (node_DOT_column form) for dimension columns, + # sorted alphabetically by the short alias. + expected_dim_grain = [ + "default_DOT_dispatcher_DOT_company_name", + "default_DOT_municipality_dim_DOT_local_region", + "default_DOT_repair_orders_fact_DOT_order_date", + "default_DOT_hard_hat_DOT_state", + ] + # dimensions follows the original cube column order (not alphabetical) expected_dimensions_in_order = [ - "order_date", - "state", - "company_name", - "local_region", + "default_DOT_repair_orders_fact_DOT_order_date", + "default_DOT_hard_hat_DOT_state", + "default_DOT_dispatcher_DOT_company_name", + "default_DOT_municipality_dim_DOT_local_region", ] assert mat.measures_materializations[0].grain == expected_dim_grain assert mat.measures_materializations[0].dimensions == expected_dimensions_in_order @@ -854,20 +859,26 @@ async def test_druid_cube_incremental( # instead. Both are usable downstream — assert structurally. columns_by_name = {c.name: c for c in mat.measures_materializations[0].columns} assert set(columns_by_name) == { - "order_date", - "state", - "company_name", - "local_region", + "default_DOT_repair_orders_fact_DOT_order_date", + "default_DOT_hard_hat_DOT_state", + "default_DOT_dispatcher_DOT_company_name", + "default_DOT_municipality_dim_DOT_local_region", "repair_order_id_count_bd241964", "total_repair_cost_sum_67874507", } assert ( - columns_by_name["order_date"].semantic_entity + columns_by_name["default_DOT_repair_orders_fact_DOT_order_date"].semantic_entity == "default.repair_orders_fact.order_date" ) - assert columns_by_name["order_date"].semantic_type == "dimension" + assert ( + columns_by_name["default_DOT_repair_orders_fact_DOT_order_date"].semantic_type + == "dimension" + ) assert columns_by_name["repair_order_id_count_bd241964"].semantic_type == "measure" - assert mat.measures_materializations[0].timestamp_column == "order_date" + assert ( + mat.measures_materializations[0].timestamp_column + == "default_DOT_repair_orders_fact_DOT_order_date" + ) assert mat.measures_materializations[0].timestamp_format == "yyyyMMdd" assert mat.measures_materializations[0].granularity == Granularity.DAY assert mat.measures_materializations[0].spark_conf is None @@ -898,7 +909,10 @@ async def test_druid_cube_incremental( expected_components, key=lambda m: m.name, ) - assert mat.combiners[0].timestamp_column == "order_date" + assert ( + mat.combiners[0].timestamp_column + == "default_DOT_repair_orders_fact_DOT_order_date" + ) assert mat.combiners[0].timestamp_format == "yyyyMMdd" assert mat.combiners[0].granularity == Granularity.DAY assert mat.combiners[0].upstream_tables[0].startswith("default_repair_orders_fact") @@ -935,7 +949,7 @@ async def test_druid_cube_full( assert response.status_code in (200, 201), response.json() assert "Successfully updated materialization config" in response.json()["message"] - _, kwargs = module__query_service_client.materialize_cube.call_args_list[0] # type: ignore + _, kwargs = module__query_service_client.materialize_cube.call_args_list[-1] # type: ignore mat = kwargs["materialization_input"] assert mat.job == "DruidCubeMaterializationJob" measures_query = mat.measures_materializations[0].query From a9a1a5d58ac55470fa173cf6201ab9a3bd6e077d Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Fri, 8 May 2026 22:19:58 -0700 Subject: [PATCH 4/7] Fix --- .../tests/api/materializations_test.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/datajunction-server/tests/api/materializations_test.py b/datajunction-server/tests/api/materializations_test.py index b55b82cef..a736a6b72 100644 --- a/datajunction-server/tests/api/materializations_test.py +++ b/datajunction-server/tests/api/materializations_test.py @@ -602,6 +602,34 @@ async def test_druid_measures_cube_incremental( "druid_measures_cube.incremental.druid_spec.json", ) + # Restore repair_orders_fact to its original SQL so later tests are not affected + restore_response = await client_with_repairs_cube.patch( + "/nodes/default.repair_orders_fact", + json={ + "query": """SELECT + repair_orders.repair_order_id, + repair_orders.municipality_id, + repair_orders.hard_hat_id, + repair_orders.dispatcher_id, + repair_orders.order_date, + repair_orders.dispatched_date, + repair_orders.required_date, + repair_order_details.discount, + repair_order_details.price, + repair_order_details.quantity, + repair_order_details.repair_type_id, + repair_order_details.price * repair_order_details.quantity AS total_repair_cost, + repair_orders.dispatched_date - repair_orders.order_date AS time_to_dispatch, + repair_orders.dispatched_date - repair_orders.required_date AS dispatch_delay +FROM + default.repair_orders repair_orders +JOIN + default.repair_order_details repair_order_details +ON repair_orders.repair_order_id = repair_order_details.repair_order_id""", + }, + ) + assert restore_response.status_code in (200, 201) + @pytest.mark.asyncio @pytest.mark.skip(reason="The test is unstable depending on run order") From b371654d681df110bce7da4e9ed9bfa6465b961e Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sat, 9 May 2026 07:55:10 -0700 Subject: [PATCH 5/7] Add tests --- .../internal/cube_materializations_test.py | 308 ++++++++++++++++++ 1 file changed, 308 insertions(+) create mode 100644 datajunction-server/tests/internal/cube_materializations_test.py diff --git a/datajunction-server/tests/internal/cube_materializations_test.py b/datajunction-server/tests/internal/cube_materializations_test.py new file mode 100644 index 000000000..7f3d3de3d --- /dev/null +++ b/datajunction-server/tests/internal/cube_materializations_test.py @@ -0,0 +1,308 @@ +"""Unit tests for internal.cube_materializations helper functions.""" + +from types import SimpleNamespace +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from datajunction_server.construction.build_v3.types import BuildContext, GrainGroupSQL +from datajunction_server.internal.cube_materializations import ( + _v3_col_to_model_column, + _v3_grain_group_to_measures_query, +) +from datajunction_server.models.node_type import NodeType + + +def _make_col(semantic_name, semantic_type="dimension", name="col", type_="str"): + col = SimpleNamespace( + semantic_name=semantic_name, + semantic_type=semantic_type, + name=name, + type=type_, + ) + return col + + +class TestV3ColToModelColumn: + def test_no_separator_in_semantic_name(self): + """Line 177->180 False: semantic_name has no dot → column/node stay None.""" + col = _make_col("nodot", semantic_type="dimension") + result = _v3_col_to_model_column(col) + assert result.column is None + assert result.node is None + assert ( + result.name == "nodot" + ) # no dot → uses semantic_entity.replace(".", "_DOT_") == "nodot" + + def test_empty_semantic_name(self): + """Line 177->180 False: empty semantic_name → column/node stay None.""" + col = _make_col("", semantic_type="metric", name="m_hash") + result = _v3_col_to_model_column(col) + assert result.column is None + assert result.node is None + assert result.name == "m_hash" # measure path → col.name + + def test_dotted_dimension(self): + """Dotted semantic_name → column/node extracted correctly.""" + col = _make_col("default.hard_hat.city", semantic_type="dimension") + result = _v3_col_to_model_column(col) + assert result.column == "city" + assert result.node == "default.hard_hat" + assert result.name == "default_DOT_hard_hat_DOT_city" + + def test_metric_type_collapses_to_measure(self): + """metric/metric_component/metric_input semantic types map to 'measure'.""" + for v3_type in ("metric", "metric_component", "metric_input"): + col = _make_col( + "default.orders:count_hash", + semantic_type=v3_type, + name="count_hash", + ) + result = _v3_col_to_model_column(col) + assert result.semantic_type == "measure" + assert result.name == "count_hash" + + +def _make_node(name, node_type, current=None): + node = MagicMock() + node.name = name + node.type = node_type + node.current = current + return node + + +def _make_source_rev(catalog_name, schema, table): + rev = MagicMock() + if catalog_name is None: + rev.catalog = None + else: + rev.catalog = MagicMock() + rev.catalog.name = catalog_name + rev.schema_ = schema + rev.table = table + return rev + + +def _make_ctx(nodes, parent_map): + ctx = MagicMock(spec=BuildContext) + ctx.nodes = nodes + ctx.parent_map = parent_map + return ctx + + +def _make_gg(parent_name, columns=None, grain=None, metrics=None, query=None): + gg = MagicMock(spec=GrainGroupSQL) + gg.parent_name = parent_name + gg.columns = columns or [] + gg.grain = grain or [] + gg.metrics = metrics or [] + gg.sql = query or "SELECT 1" + return gg + + +class TestV3GrainGroupToMeasuresQuery: + """Tests for branches inside _v3_grain_group_to_measures_query.""" + + @pytest.fixture + def session(self): + return AsyncMock() + + @pytest.mark.asyncio + async def test_walk_missing_node_returns_early(self, session): + """Line 246: _walk hits a parent_name not in ctx.nodes → returns without error.""" + fact_rev = MagicMock() + fact_rev.name = "default.repair_orders_fact" + fact_rev.version = "v1.0" + fact_rev.display_name = "Fact" + + fact_node = _make_node( + "default.repair_orders_fact", + NodeType.TRANSFORM, + fact_rev, + ) + + # parent_map says fact has a parent "default.missing", which is NOT in ctx.nodes + ctx = _make_ctx( + nodes={"default.repair_orders_fact": fact_node}, + parent_map={"default.repair_orders_fact": ["default.missing"]}, + ) + + gg = _make_gg( + parent_name="default.repair_orders_fact", + columns=[], + grain=[], + metrics=[], + ) + + with patch("datajunction_server.utils.refresh_if_needed", AsyncMock()): + result = await _v3_grain_group_to_measures_query( + session, + gg, + ctx, + decomposed_metrics={}, + ) + + # Missing node in _walk → upstream_tables is empty; function still returns a MeasuresQuery + assert result.upstream_tables == [] + + @pytest.mark.asyncio + async def test_source_node_missing_catalog_skipped(self, session): + """Lines 254->264 False: source with no catalog → not added to upstream_tables.""" + source_rev = _make_source_rev(catalog_name=None, schema="roads", table="orders") + source_node = _make_node("default.orders", NodeType.SOURCE, source_rev) + + fact_rev = MagicMock() + fact_rev.name = "default.repair_orders_fact" + fact_rev.version = "v1.0" + fact_rev.display_name = "Fact" + fact_node = _make_node( + "default.repair_orders_fact", + NodeType.TRANSFORM, + fact_rev, + ) + + ctx = _make_ctx( + nodes={ + "default.repair_orders_fact": fact_node, + "default.orders": source_node, + }, + parent_map={ + "default.repair_orders_fact": ["default.orders"], + "default.orders": [], + }, + ) + + gg = _make_gg( + parent_name="default.repair_orders_fact", + columns=[], + grain=[], + metrics=[], + ) + + with patch("datajunction_server.utils.refresh_if_needed", AsyncMock()): + result = await _v3_grain_group_to_measures_query( + session, + gg, + ctx, + decomposed_metrics={}, + ) + + # No catalog → not appended + assert result.upstream_tables == [] + + @pytest.mark.asyncio + async def test_source_node_duplicate_skipped(self, session): + """Lines 254->264 False: same source visited via two paths → only added once.""" + source_rev = _make_source_rev("default", "roads", "repair_orders") + source_node = _make_node("default.repair_orders", NodeType.SOURCE, source_rev) + + fact_rev = MagicMock() + fact_rev.name = "default.repair_orders_fact" + fact_rev.version = "v1.0" + fact_rev.display_name = "Fact" + fact_node = _make_node( + "default.repair_orders_fact", + NodeType.TRANSFORM, + fact_rev, + ) + + # Two parents both point to the same source + mid_rev = MagicMock() + mid_rev.name = "default.mid" + mid_node = _make_node("default.mid", NodeType.TRANSFORM, mid_rev) + + ctx = _make_ctx( + nodes={ + "default.repair_orders_fact": fact_node, + "default.mid": mid_node, + "default.repair_orders": source_node, + }, + parent_map={ + "default.repair_orders_fact": ["default.repair_orders", "default.mid"], + "default.mid": ["default.repair_orders"], + "default.repair_orders": [], + }, + ) + + gg = _make_gg( + parent_name="default.repair_orders_fact", + columns=[], + grain=[], + metrics=[], + ) + + with patch("datajunction_server.utils.refresh_if_needed", AsyncMock()): + result = await _v3_grain_group_to_measures_query( + session, + gg, + ctx, + decomposed_metrics={}, + ) + + # Source reached via two paths but only appended once + assert result.upstream_tables == ["default.roads.repair_orders"] + + @pytest.mark.asyncio + async def test_dim_column_no_semantic_name_skipped(self, session): + """Line 278: dimension column with empty semantic_name → continue (not walked).""" + fact_rev = MagicMock() + fact_rev.name = "default.fact" + fact_rev.version = "v1.0" + fact_rev.display_name = "Fact" + fact_node = _make_node("default.fact", NodeType.TRANSFORM, fact_rev) + + ctx = _make_ctx( + nodes={"default.fact": fact_node}, + parent_map={"default.fact": []}, + ) + + # Dimension column with empty semantic_name → line 278 continue + empty_sem_col = _make_col("", semantic_type="dimension", name="d") + gg = _make_gg( + parent_name="default.fact", + columns=[empty_sem_col], + grain=[], + metrics=[], + ) + + with patch("datajunction_server.utils.refresh_if_needed", AsyncMock()): + result = await _v3_grain_group_to_measures_query( + session, + gg, + ctx, + decomposed_metrics={}, + ) + + assert result.upstream_tables == [] + + @pytest.mark.asyncio + async def test_dim_column_no_dot_in_semantic_name_skipped(self, session): + """Line 278: dimension column with semantic_name but no dot → continue.""" + fact_rev = MagicMock() + fact_rev.name = "default.fact" + fact_rev.version = "v1.0" + fact_rev.display_name = "Fact" + fact_node = _make_node("default.fact", NodeType.TRANSFORM, fact_rev) + + ctx = _make_ctx( + nodes={"default.fact": fact_node}, + parent_map={"default.fact": []}, + ) + + no_dot_col = _make_col("nodothere", semantic_type="dimension", name="d") + gg = _make_gg( + parent_name="default.fact", + columns=[no_dot_col], + grain=[], + metrics=[], + ) + + with patch("datajunction_server.utils.refresh_if_needed", AsyncMock()): + result = await _v3_grain_group_to_measures_query( + session, + gg, + ctx, + decomposed_metrics={}, + ) + + assert result.upstream_tables == [] From 9413bc27ceb7355839aa12f6e0044162db4af388 Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Sat, 9 May 2026 10:16:03 -0700 Subject: [PATCH 6/7] Fix naming --- .../internal/cube_materializations.py | 21 +++------ .../tests/api/materializations_test.py | 45 ++++++++----------- .../internal/cube_materializations_test.py | 35 +++++++++++---- 3 files changed, 51 insertions(+), 50 deletions(-) diff --git a/datajunction-server/datajunction_server/internal/cube_materializations.py b/datajunction-server/datajunction_server/internal/cube_materializations.py index 96f3e2a15..38183df9e 100644 --- a/datajunction-server/datajunction_server/internal/cube_materializations.py +++ b/datajunction-server/datajunction_server/internal/cube_materializations.py @@ -183,16 +183,12 @@ def _v3_col_to_model_column(col) -> ColumnMetadata: if v3_type in ("metric", "metric_component", "metric_input") else v3_type ) - # Dimension columns use the full semantic name (dots → _DOT_) as their output - # name, matching the v2 shape that MeasuresMaterialization.from_measures_query - # expects. Measure columns use their hashed short name unchanged. - name = ( - semantic_entity.replace(".", "_DOT_") - if semantic_type == "dimension" and semantic_entity - else col.name - ) + # v3's ``col.name`` is already the SQL alias in the generated query. + # Downstream ``MeasuresMaterialization.from_measures_query`` matches partition + # columns by ``semantic_entity`` (not by ``name``), so we just pass the v3 + # alias through unchanged for both dims and measures. return ColumnMetadata( - name=name, + name=col.name, type=col.type, column=column_name, node=node_name, @@ -316,12 +312,7 @@ async def _walk(node_name: str) -> None: ) else: columns.append(col) - # Build lookup from v3's short alias → full _DOT_ name so the grain list - # matches what MeasuresMaterialization.from_measures_query expects. - alias_to_name = { - orig.name: converted.name for orig, converted in zip(gg.columns, columns) - } - grain = [alias_to_name.get(g, g) for g in gg.grain] + grain = list(gg.grain) return SimpleNamespace( node=NodeNameVersion( name=rev.name, diff --git a/datajunction-server/tests/api/materializations_test.py b/datajunction-server/tests/api/materializations_test.py index a736a6b72..d4bb36576 100644 --- a/datajunction-server/tests/api/materializations_test.py +++ b/datajunction-server/tests/api/materializations_test.py @@ -845,20 +845,20 @@ async def test_druid_cube_incremental( actual_node = mat.measures_materializations[0].node assert actual_node.name == "default.repair_orders_fact" assert actual_node.display_name == "Repair Orders Fact" - # v3 uses full semantic names (node_DOT_column form) for dimension columns, - # sorted alphabetically by the short alias. + # v3 uses the short SQL alias for each column (semantic_entity carries the + # full dotted name). Grain is sorted alphabetically by alias. expected_dim_grain = [ - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_municipality_dim_DOT_local_region", - "default_DOT_repair_orders_fact_DOT_order_date", - "default_DOT_hard_hat_DOT_state", + "company_name", + "local_region", + "order_date", + "state", ] # dimensions follows the original cube column order (not alphabetical) expected_dimensions_in_order = [ - "default_DOT_repair_orders_fact_DOT_order_date", - "default_DOT_hard_hat_DOT_state", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_municipality_dim_DOT_local_region", + "order_date", + "state", + "company_name", + "local_region", ] assert mat.measures_materializations[0].grain == expected_dim_grain assert mat.measures_materializations[0].dimensions == expected_dimensions_in_order @@ -887,26 +887,20 @@ async def test_druid_cube_incremental( # instead. Both are usable downstream — assert structurally. columns_by_name = {c.name: c for c in mat.measures_materializations[0].columns} assert set(columns_by_name) == { - "default_DOT_repair_orders_fact_DOT_order_date", - "default_DOT_hard_hat_DOT_state", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_municipality_dim_DOT_local_region", + "order_date", + "state", + "company_name", + "local_region", "repair_order_id_count_bd241964", "total_repair_cost_sum_67874507", } assert ( - columns_by_name["default_DOT_repair_orders_fact_DOT_order_date"].semantic_entity + columns_by_name["order_date"].semantic_entity == "default.repair_orders_fact.order_date" ) - assert ( - columns_by_name["default_DOT_repair_orders_fact_DOT_order_date"].semantic_type - == "dimension" - ) + assert columns_by_name["order_date"].semantic_type == "dimension" assert columns_by_name["repair_order_id_count_bd241964"].semantic_type == "measure" - assert ( - mat.measures_materializations[0].timestamp_column - == "default_DOT_repair_orders_fact_DOT_order_date" - ) + assert mat.measures_materializations[0].timestamp_column == "order_date" assert mat.measures_materializations[0].timestamp_format == "yyyyMMdd" assert mat.measures_materializations[0].granularity == Granularity.DAY assert mat.measures_materializations[0].spark_conf is None @@ -937,10 +931,7 @@ async def test_druid_cube_incremental( expected_components, key=lambda m: m.name, ) - assert ( - mat.combiners[0].timestamp_column - == "default_DOT_repair_orders_fact_DOT_order_date" - ) + assert mat.combiners[0].timestamp_column == "order_date" assert mat.combiners[0].timestamp_format == "yyyyMMdd" assert mat.combiners[0].granularity == Granularity.DAY assert mat.combiners[0].upstream_tables[0].startswith("default_repair_orders_fact") diff --git a/datajunction-server/tests/internal/cube_materializations_test.py b/datajunction-server/tests/internal/cube_materializations_test.py index 7f3d3de3d..b8e7930d4 100644 --- a/datajunction-server/tests/internal/cube_materializations_test.py +++ b/datajunction-server/tests/internal/cube_materializations_test.py @@ -26,13 +26,11 @@ def _make_col(semantic_name, semantic_type="dimension", name="col", type_="str") class TestV3ColToModelColumn: def test_no_separator_in_semantic_name(self): """Line 177->180 False: semantic_name has no dot → column/node stay None.""" - col = _make_col("nodot", semantic_type="dimension") + col = _make_col("nodot", semantic_type="dimension", name="nodot_alias") result = _v3_col_to_model_column(col) assert result.column is None assert result.node is None - assert ( - result.name == "nodot" - ) # no dot → uses semantic_entity.replace(".", "_DOT_") == "nodot" + assert result.name == "nodot_alias" def test_empty_semantic_name(self): """Line 177->180 False: empty semantic_name → column/node stay None.""" @@ -40,15 +38,20 @@ def test_empty_semantic_name(self): result = _v3_col_to_model_column(col) assert result.column is None assert result.node is None - assert result.name == "m_hash" # measure path → col.name + assert result.name == "m_hash" def test_dotted_dimension(self): - """Dotted semantic_name → column/node extracted correctly.""" - col = _make_col("default.hard_hat.city", semantic_type="dimension") + """Dotted semantic_name → column/node extracted; name stays the v3 alias.""" + col = _make_col( + "default.hard_hat.city", + semantic_type="dimension", + name="city", + ) result = _v3_col_to_model_column(col) assert result.column == "city" assert result.node == "default.hard_hat" - assert result.name == "default_DOT_hard_hat_DOT_city" + assert result.name == "city" + assert result.semantic_entity == "default.hard_hat.city" def test_metric_type_collapses_to_measure(self): """metric/metric_component/metric_input semantic types map to 'measure'.""" @@ -62,6 +65,22 @@ def test_metric_type_collapses_to_measure(self): assert result.semantic_type == "measure" assert result.name == "count_hash" + def test_role_qualified_dimension_passes_through_cleanly(self): + """Role-qualified semantic name (``foo.dateint[role]``) keeps the v3 alias. + + The brackets only live in ``semantic_entity``; ``name`` is the SQL alias + from v3 and must not contain bracket characters. + """ + col = _make_col( + "default.dim.dateint[event_date]", + semantic_type="dimension", + name="dateint", + ) + result = _v3_col_to_model_column(col) + assert result.name == "dateint" + assert "[" not in result.name and "]" not in result.name + assert result.semantic_entity == "default.dim.dateint[event_date]" + def _make_node(name, node_type, current=None): node = MagicMock() From 0e48870f9b87d0100c94c996f0c5d6647a1f4c5e Mon Sep 17 00:00:00 2001 From: Yian Shang Date: Mon, 11 May 2026 14:35:48 -0700 Subject: [PATCH 7/7] Fix tests --- .../datajunction_server/config.py | 6 + .../models/cube_materialization.py | 6 +- datajunction-server/tests/api/cubes_test.py | 150 +++++++++--------- 3 files changed, 86 insertions(+), 76 deletions(-) diff --git a/datajunction-server/datajunction_server/config.py b/datajunction-server/datajunction_server/config.py index a543ae14c..9dbdb8ef0 100644 --- a/datajunction-server/datajunction_server/config.py +++ b/datajunction-server/datajunction_server/config.py @@ -163,6 +163,12 @@ class Settings(BaseSettings): # pragma: no cover # $dj_logical_timestamp dj_logical_timestamp_format: Optional[str] = "${dj_logical_timestamp}" + # Prefix applied to Druid datasource names built by ``build_druid_spec``. + # All DJ envs share a single Druid cluster; the prefix env-tags datasources + # so test/prod cubes with the same definition don't collide. Default is the + # prod value; set ``DRUID_DATASOURCE_PREFIX=dj_test__`` in the test deploy. + druid_datasource_prefix: str = "dj__" + # DJ UI host, used for OAuth redirection frontend_host: Optional[str] = "http://localhost:3000" diff --git a/datajunction-server/datajunction_server/models/cube_materialization.py b/datajunction-server/datajunction_server/models/cube_materialization.py index 790fe7e9c..c0b7d3726 100644 --- a/datajunction-server/datajunction_server/models/cube_materialization.py +++ b/datajunction-server/datajunction_server/models/cube_materialization.py @@ -346,7 +346,11 @@ def build_druid_spec(self): " on this cube or it cannot be materialized to Druid.", ) - druid_datasource_name = f"dj__{self.output_table_name}" + from datajunction_server.utils import get_settings # noqa: PLC0415 + + druid_datasource_name = ( + f"{get_settings().druid_datasource_prefix}{self.output_table_name}" + ) # if there are categorical partitions, we can additionally include one of them # in the partitionDimension field under partitionsSpec diff --git a/datajunction-server/tests/api/cubes_test.py b/datajunction-server/tests/api/cubes_test.py index 19d011c77..03d37c84a 100644 --- a/datajunction-server/tests/api/cubes_test.py +++ b/datajunction-server/tests/api/cubes_test.py @@ -3143,7 +3143,7 @@ async def test_cube_materialization_metadata( "columns": [ { "column": "country", - "name": "default_DOT_hard_hat_DOT_country", + "name": "country", "node": "default.hard_hat", "semantic_entity": "default.hard_hat.country", "semantic_type": "dimension", @@ -3151,7 +3151,7 @@ async def test_cube_materialization_metadata( }, { "column": "postal_code", - "name": "default_DOT_hard_hat_DOT_postal_code", + "name": "postal_code", "node": "default.hard_hat", "semantic_entity": "default.hard_hat.postal_code", "semantic_type": "dimension", @@ -3159,7 +3159,7 @@ async def test_cube_materialization_metadata( }, { "column": "city", - "name": "default_DOT_hard_hat_DOT_city", + "name": "city", "node": "default.hard_hat", "semantic_entity": "default.hard_hat.city", "semantic_type": "dimension", @@ -3167,7 +3167,7 @@ async def test_cube_materialization_metadata( }, { "column": "hire_date", - "name": "default_DOT_hard_hat_DOT_hire_date", + "name": "hire_date", "node": "default.hard_hat", "semantic_entity": "default.hard_hat.hire_date", "semantic_type": "dimension", @@ -3175,7 +3175,7 @@ async def test_cube_materialization_metadata( }, { "column": "state", - "name": "default_DOT_hard_hat_DOT_state", + "name": "state", "node": "default.hard_hat", "semantic_entity": "default.hard_hat.state", "semantic_type": "dimension", @@ -3183,7 +3183,7 @@ async def test_cube_materialization_metadata( }, { "column": "company_name", - "name": "default_DOT_dispatcher_DOT_company_name", + "name": "company_name", "node": "default.dispatcher", "semantic_entity": "default.dispatcher.company_name", "semantic_type": "dimension", @@ -3191,7 +3191,7 @@ async def test_cube_materialization_metadata( }, { "column": "local_region", - "name": "default_DOT_municipality_dim_DOT_local_region", + "name": "local_region", "node": "default.municipality_dim", "semantic_entity": "default.municipality_dim.local_region", "semantic_type": "dimension", @@ -3255,22 +3255,22 @@ async def test_cube_materialization_metadata( }, ], "dimensions": [ - "default_DOT_hard_hat_DOT_country", - "default_DOT_hard_hat_DOT_postal_code", - "default_DOT_hard_hat_DOT_city", - "default_DOT_hard_hat_DOT_hire_date", - "default_DOT_hard_hat_DOT_state", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_municipality_dim_DOT_local_region", + "country", + "postal_code", + "city", + "hire_date", + "state", + "company_name", + "local_region", ], "grain": [ - "default_DOT_hard_hat_DOT_city", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_hard_hat_DOT_country", - "default_DOT_hard_hat_DOT_hire_date", - "default_DOT_municipality_dim_DOT_local_region", - "default_DOT_hard_hat_DOT_postal_code", - "default_DOT_hard_hat_DOT_state", + "city", + "company_name", + "country", + "hire_date", + "local_region", + "postal_code", + "state", ], "granularity": "day", "measures": [ @@ -3360,7 +3360,7 @@ async def test_cube_materialization_metadata( "output_table_name": mock.ANY, "query": mock.ANY, "spark_conf": None, - "timestamp_column": "default_DOT_hard_hat_DOT_hire_date", + "timestamp_column": "hire_date", "timestamp_format": "yyyyMMdd", "upstream_tables": [ "default.roads.dispatchers", @@ -3376,7 +3376,7 @@ async def test_cube_materialization_metadata( "columns": [ { "column": "country", - "name": "default_DOT_hard_hat_DOT_country", + "name": "country", "node": "default.hard_hat", "semantic_entity": "default.hard_hat.country", "semantic_type": "dimension", @@ -3384,7 +3384,7 @@ async def test_cube_materialization_metadata( }, { "column": "postal_code", - "name": "default_DOT_hard_hat_DOT_postal_code", + "name": "postal_code", "node": "default.hard_hat", "semantic_entity": "default.hard_hat.postal_code", "semantic_type": "dimension", @@ -3392,7 +3392,7 @@ async def test_cube_materialization_metadata( }, { "column": "city", - "name": "default_DOT_hard_hat_DOT_city", + "name": "city", "node": "default.hard_hat", "semantic_entity": "default.hard_hat.city", "semantic_type": "dimension", @@ -3400,7 +3400,7 @@ async def test_cube_materialization_metadata( }, { "column": "hire_date", - "name": "default_DOT_hard_hat_DOT_hire_date", + "name": "hire_date", "node": "default.hard_hat", "semantic_entity": "default.hard_hat.hire_date", "semantic_type": "dimension", @@ -3408,7 +3408,7 @@ async def test_cube_materialization_metadata( }, { "column": "state", - "name": "default_DOT_hard_hat_DOT_state", + "name": "state", "node": "default.hard_hat", "semantic_entity": "default.hard_hat.state", "semantic_type": "dimension", @@ -3416,7 +3416,7 @@ async def test_cube_materialization_metadata( }, { "column": "company_name", - "name": "default_DOT_dispatcher_DOT_company_name", + "name": "company_name", "node": "default.dispatcher", "semantic_entity": "default.dispatcher.company_name", "semantic_type": "dimension", @@ -3424,7 +3424,7 @@ async def test_cube_materialization_metadata( }, { "column": "local_region", - "name": "default_DOT_municipality_dim_DOT_local_region", + "name": "local_region", "node": "default.municipality_dim", "semantic_entity": "default.municipality_dim.local_region", "semantic_type": "dimension", @@ -3440,22 +3440,22 @@ async def test_cube_materialization_metadata( }, ], "dimensions": [ - "default_DOT_hard_hat_DOT_country", - "default_DOT_hard_hat_DOT_postal_code", - "default_DOT_hard_hat_DOT_city", - "default_DOT_hard_hat_DOT_hire_date", - "default_DOT_hard_hat_DOT_state", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_municipality_dim_DOT_local_region", + "country", + "postal_code", + "city", + "hire_date", + "state", + "company_name", + "local_region", ], "grain": [ - "default_DOT_hard_hat_DOT_city", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_hard_hat_DOT_country", - "default_DOT_hard_hat_DOT_hire_date", - "default_DOT_municipality_dim_DOT_local_region", - "default_DOT_hard_hat_DOT_postal_code", - "default_DOT_hard_hat_DOT_state", + "city", + "company_name", + "country", + "hire_date", + "local_region", + "postal_code", + "state", ], "granularity": "day", "measures": [ @@ -3476,10 +3476,10 @@ async def test_cube_materialization_metadata( "version": mock.ANY, "display_name": "default.roads.repair_order_details", }, - "output_table_name": "default_repair_order_details_v1_2_b94093b6088c190e", + "output_table_name": "default_repair_order_details_v1_2_89dc71bdf1b527c5", "query": mock.ANY, "spark_conf": None, - "timestamp_column": "default_DOT_hard_hat_DOT_hire_date", + "timestamp_column": "hire_date", "timestamp_format": "yyyyMMdd", "upstream_tables": [ "default.roads.dispatchers", @@ -3574,7 +3574,7 @@ async def test_cube_materialization_metadata( "query": mock.ANY, "columns": [ { - "name": "default_DOT_hard_hat_DOT_city", + "name": "city", "type": "string", "column": "city", "node": "default.hard_hat", @@ -3582,7 +3582,7 @@ async def test_cube_materialization_metadata( "semantic_type": "dimension", }, { - "name": "default_DOT_dispatcher_DOT_company_name", + "name": "company_name", "type": "string", "column": "company_name", "node": "default.dispatcher", @@ -3590,7 +3590,7 @@ async def test_cube_materialization_metadata( "semantic_type": "dimension", }, { - "name": "default_DOT_hard_hat_DOT_country", + "name": "country", "type": "string", "column": "country", "node": "default.hard_hat", @@ -3598,7 +3598,7 @@ async def test_cube_materialization_metadata( "semantic_type": "dimension", }, { - "name": "default_DOT_hard_hat_DOT_hire_date", + "name": "hire_date", "type": "timestamp", "column": "hire_date", "node": "default.hard_hat", @@ -3606,7 +3606,7 @@ async def test_cube_materialization_metadata( "semantic_type": "dimension", }, { - "name": "default_DOT_municipality_dim_DOT_local_region", + "name": "local_region", "type": "string", "column": "local_region", "node": "default.municipality_dim", @@ -3614,7 +3614,7 @@ async def test_cube_materialization_metadata( "semantic_type": "dimension", }, { - "name": "default_DOT_hard_hat_DOT_postal_code", + "name": "postal_code", "type": "string", "column": "postal_code", "node": "default.hard_hat", @@ -3622,7 +3622,7 @@ async def test_cube_materialization_metadata( "semantic_type": "dimension", }, { - "name": "default_DOT_hard_hat_DOT_state", + "name": "state", "type": "string", "column": "state", "node": "default.hard_hat", @@ -3695,22 +3695,22 @@ async def test_cube_materialization_metadata( }, ], "grain": [ - "default_DOT_hard_hat_DOT_city", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_hard_hat_DOT_country", - "default_DOT_hard_hat_DOT_hire_date", - "default_DOT_municipality_dim_DOT_local_region", - "default_DOT_hard_hat_DOT_postal_code", - "default_DOT_hard_hat_DOT_state", + "city", + "company_name", + "country", + "hire_date", + "local_region", + "postal_code", + "state", ], "dimensions": [ - "default_DOT_hard_hat_DOT_city", - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_hard_hat_DOT_country", - "default_DOT_hard_hat_DOT_hire_date", - "default_DOT_municipality_dim_DOT_local_region", - "default_DOT_hard_hat_DOT_postal_code", - "default_DOT_hard_hat_DOT_state", + "city", + "company_name", + "country", + "hire_date", + "local_region", + "postal_code", + "state", ], "measures": [ { @@ -3778,7 +3778,7 @@ async def test_cube_materialization_metadata( "rule": {"type": "full", "level": None}, }, ], - "timestamp_column": "default_DOT_hard_hat_DOT_hire_date", + "timestamp_column": "hire_date", "timestamp_format": "yyyyMMdd", "granularity": "day", "upstream_tables": [], @@ -3790,17 +3790,17 @@ async def test_cube_materialization_metadata( "format": "parquet", "dimensionsSpec": { "dimensions": [ - "default_DOT_dispatcher_DOT_company_name", - "default_DOT_hard_hat_DOT_city", - "default_DOT_hard_hat_DOT_country", - "default_DOT_hard_hat_DOT_hire_date", - "default_DOT_hard_hat_DOT_postal_code", - "default_DOT_hard_hat_DOT_state", - "default_DOT_municipality_dim_DOT_local_region", + "city", + "company_name", + "country", + "hire_date", + "local_region", + "postal_code", + "state", ], }, "timestampSpec": { - "column": "default_DOT_hard_hat_DOT_hire_date", + "column": "hire_date", "format": "yyyyMMdd", }, },