From 1cde56369874c6e6d97825a0b18106811028fbca Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Thu, 21 May 2026 13:29:55 +0200 Subject: [PATCH 01/10] Implemented unit conversion support for KeyValueStoreSolver - Added `unit_conversion_table` parameter to `MeasurementDBConfig` and updated related methods to handle unit conversions. - Modified `load_blob` methods across various cache classes to accept `uses_alias` parameter for compatibility with unit conversion logic. - Updated `SolverConfig` to include unit conversion mappings and properties. - Introduced unit conversion tests to validate functionality and ensure correct behavior when using aliased selectors. - Enhanced documentation and comments for clarity on unit conversion processes. --- .../metadata/time_series_expression.py | 2 +- .../analyze/query/solvers/blob_solver.py | 5 +- .../analyze/query/solvers/delta_solver.py | 5 +- .../analyze/query/solvers/empty_cache.py | 5 +- .../query/solvers/key_value_store_solver.py | 198 +++++++++++- .../analyze/query/solvers/series_cache.py | 9 +- .../analyze/query/solvers/solver_config.py | 34 +++ src/impulse_query_engine/measurement_db.py | 12 + src/impulse_reporting/config/config_parser.py | 6 + src/impulse_reporting/core/report.py | 7 + tests/conftest.py | 66 ++++ .../key_value_store_solver_wide_only_test.py | 1 + .../key_value_store_unit_conversion_test.py | 284 ++++++++++++++++++ .../solvers/kvs_solver_column_mapping_test.py | 1 + .../query/solvers/solver_config_test.py | 5 +- .../unit/meta/container_dimensions_test.py | 2 +- .../channel_mapping.csv | 6 + .../unit_conversion.csv | 6 + 18 files changed, 633 insertions(+), 21 deletions(-) create mode 100644 tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py create mode 100644 tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv create mode 100644 tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv diff --git a/src/impulse_query_engine/analyze/metadata/time_series_expression.py b/src/impulse_query_engine/analyze/metadata/time_series_expression.py index fbdacd2..7df22c1 100644 --- a/src/impulse_query_engine/analyze/metadata/time_series_expression.py +++ b/src/impulse_query_engine/analyze/metadata/time_series_expression.py @@ -598,7 +598,7 @@ def build(self, cache: SeriesCache) -> SampleSeries: # TODO: select candidate mid = candidates.container_id.iloc[0] cid = candidates.channel_id.iloc[0] - return cache.load_blob(mid, cid) + return cache.load_blob(mid, cid, uses_alias=self.uses_alias) def get_required_tag_exprs(self) -> set[TagExpression]: """ diff --git a/src/impulse_query_engine/analyze/query/solvers/blob_solver.py b/src/impulse_query_engine/analyze/query/solvers/blob_solver.py index 089190d..6e5bced 100644 --- a/src/impulse_query_engine/analyze/query/solvers/blob_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/blob_solver.py @@ -49,7 +49,7 @@ def resolve(self, selection): idx = selection._expr.build_pandas(self.df) return self.df[idx] - def load_blob(self, container_id, channel_id): + def load_blob(self, container_id, channel_id, uses_alias: bool = False): """ Load a time series blob from disk. @@ -59,6 +59,9 @@ def load_blob(self, container_id, channel_id): Container ID. channel_id : Any Channel ID. + uses_alias : bool, optional + Unused by this cache (no unit conversion); accepted for + interface compatibility with :class:`SeriesCache`. Returns ------- diff --git a/src/impulse_query_engine/analyze/query/solvers/delta_solver.py b/src/impulse_query_engine/analyze/query/solvers/delta_solver.py index cddf9df..30b545a 100644 --- a/src/impulse_query_engine/analyze/query/solvers/delta_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/delta_solver.py @@ -61,7 +61,7 @@ def resolve(self, selection): idx = selection._expr.build_pandas(self.mdf) return self.mdf[idx] - def load_blob(self, mid, cid): + def load_blob(self, mid, cid, uses_alias: bool = False): """ Load a time series blob from the DataFrame. @@ -71,6 +71,9 @@ def load_blob(self, mid, cid): Container or measurement ID. cid : Any Channel ID. + uses_alias : bool, optional + Unused by this cache (no unit conversion); accepted for + interface compatibility with :class:`SeriesCache`. Returns ------- diff --git a/src/impulse_query_engine/analyze/query/solvers/empty_cache.py b/src/impulse_query_engine/analyze/query/solvers/empty_cache.py index 84254bf..32ca0d1 100644 --- a/src/impulse_query_engine/analyze/query/solvers/empty_cache.py +++ b/src/impulse_query_engine/analyze/query/solvers/empty_cache.py @@ -25,7 +25,7 @@ def resolve(self, selection): """ return [] - def load_blob(self, mid, cid): + def load_blob(self, mid, cid, uses_alias: bool = False): """ Return an empty SampleSeries for any container and channel ID. @@ -35,6 +35,9 @@ def load_blob(self, mid, cid): Container or measurement ID. cid : Any Channel ID. + uses_alias : bool, optional + Unused by this cache; accepted for interface compatibility + with :class:`SeriesCache`. Returns ------- diff --git a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py index 2a1e518..dca8f1f 100644 --- a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py @@ -30,16 +30,24 @@ def __init__(self, pdf, col_map: dict[str, str]): Parameters ---------- pdf : pd.DataFrame - DataFrame containing time series data. + DataFrame containing time series data. When the column named by + ``col_map["conv"]`` is present, :meth:`load_blob` multiplies the + loaded values by that per-channel factor. All rows of a given + ``(cid, ch)`` slice are expected to share the same factor. col_map : dict[str, str] Mapping with keys ``"cid"``, ``"ch"``, ``"ts"``, ``"te"``, - ``"val"`` to the actual column names in *pdf*. + ``"val"``, ``"conv"`` to the actual column names in *pdf*. The + ``"conv"`` column is optional in *pdf*. """ self._cid_col = col_map["cid"] self._ch_col = col_map["ch"] self._ts_col = col_map["ts"] self._te_col = col_map["te"] self._val_col = col_map["val"] + self._conv_col = col_map.get("conv") + self._has_conversion = ( + self._conv_col is not None and self._conv_col in pdf.columns + ) meta = pdf.drop(columns=[self._ts_col, self._te_col, self._val_col]) self.mdf = meta.drop_duplicates(subset=[self._cid_col, self._ch_col]).reset_index() @@ -67,16 +75,26 @@ def resolve(self, selection): idx = selection._expr.build_pandas(self.mdf) return self.mdf[idx] - def load_blob(self, mid, cid): + def load_blob(self, mid, cid, uses_alias: bool = False): """ Load a time series blob from the DataFrame. + When the underlying *pdf* carries a conversion-factor column (the + column named by ``col_map["conv"]``) **and** the caller is an + aliased selector (``uses_alias=True``), the returned values are + multiplied by that factor. Direct selectors on the same physical + channel always receive raw values — unit conversion is a property + of the alias, not of the channel. + Parameters ---------- mid : Any Container or measurement ID. cid : Any Channel ID. + uses_alias : bool, optional + ``True`` when the calling selector resolved via channel_mapping. + Gates the per-channel conversion factor; defaults to ``False``. Returns ------- @@ -84,7 +102,12 @@ def load_blob(self, mid, cid): The loaded sample series object. """ s = self.pdf[(self.pdf[self._cid_col] == mid) & (self.pdf[self._ch_col] == cid)] - return SampleSeries(s[self._ts_col], s[self._te_col], s[self._val_col]) + values = s[self._val_col] + if self._has_conversion and len(s) > 0 and uses_alias: + factor = s[self._conv_col].iloc[0] + if pd.notna(factor): + values = values * factor + return SampleSeries(s[self._ts_col], s[self._te_col], values) class KeyValueStoreSolver(QuerySolver): @@ -389,13 +412,25 @@ def filter_aliased_channel_metrics( ) alias_priority_col = self.config.alias_priority_col + source_unit_col = self.config.source_unit_col + target_unit_col = self.config.target_unit_col + has_unit_cols = ( + db.config.unit_conversion_table is not None + and source_unit_col in resolved_mapping.columns + and target_unit_col in resolved_mapping.columns + ) + + mapping_select_cols = [ + F.col("source_channel").alias("_map_source_channel"), + F.col("data_key").alias("_map_data_key"), + F.col("channel_alias"), + F.col(alias_priority_col), + ] + if has_unit_cols: + mapping_select_cols.extend([F.col(source_unit_col), F.col(target_unit_col)]) + resolved = channel_metrics.join( - resolved_mapping.select( - F.col("source_channel").alias("_map_source_channel"), - F.col("data_key").alias("_map_data_key"), - F.col("channel_alias"), - F.col(alias_priority_col), - ), + resolved_mapping.select(*mapping_select_cols), on=[ channel_metrics["channel_name"] == F.col("_map_source_channel"), channel_metrics["data_key"] == F.col("_map_data_key"), @@ -412,7 +447,10 @@ def filter_aliased_channel_metrics( resolved = resolved.withColumn( "selector_ids", F.array(self._build_selector_id_expr(selectors)) ) - return resolved.select(container_id_col, channel_id_col, "selector_ids") + out_cols = [container_id_col, channel_id_col, "selector_ids"] + if has_unit_cols: + out_cols.extend([source_unit_col, target_unit_col]) + return resolved.select(*out_cols) def resolve_channel_selections( self, spark, channel_metrics_df, aliased_channel_metrics_df @@ -420,6 +458,13 @@ def resolve_channel_selections( """ Union direct and aliased channel metrics, combining selector_ids. + When the aliased side carries ``source_unit`` / ``target_unit`` + columns (added by :meth:`filter_aliased_channel_metrics` when a + unit conversion table is configured), those columns are preserved + through the union and aggregation. Direct selectors produce null + unit columns, which causes the downstream conversion-factor join + in :meth:`solve` to leave their values unchanged. + Parameters ---------- spark : SparkSession @@ -432,13 +477,114 @@ def resolve_channel_selections( Returns ------- pyspark.sql.DataFrame - Merged DataFrame with ``(container_id, channel_id, selector_ids)``. + Merged DataFrame with ``(container_id, channel_id, selector_ids)`` + (plus ``source_unit`` / ``target_unit`` when present on the + aliased side). """ - merged = channel_metrics_df.unionByName(aliased_channel_metrics_df) + source_unit_col = self.config.source_unit_col + target_unit_col = self.config.target_unit_col + has_unit_cols = ( + source_unit_col in aliased_channel_metrics_df.columns + and target_unit_col in aliased_channel_metrics_df.columns + ) + + merged = channel_metrics_df.unionByName( + aliased_channel_metrics_df, allowMissingColumns=has_unit_cols + ) + + agg_exprs = [F.flatten(F.collect_list("selector_ids")).alias("selector_ids")] + if has_unit_cols: + agg_exprs.append(F.first(source_unit_col, ignorenulls=True).alias(source_unit_col)) + agg_exprs.append(F.first(target_unit_col, ignorenulls=True).alias(target_unit_col)) + return merged.groupBy( self.config.container_id_col, self.config.channel_id_col, - ).agg(F.flatten(F.collect_list("selector_ids")).alias("selector_ids")) + ).agg(*agg_exprs) + + # ------------------------------------------------------------------ + # Unit conversion + # ------------------------------------------------------------------ + + def _compute_conversion_factors(self, spark, query, channels_df: DataFrame) -> DataFrame: + """ + Join *channels_df* with the unit conversion table to compute a + per-channel combined conversion factor. + + The unit conversion table associates each unit with a base-unit + scaling factor inside a unit family (``group_id``). For a row with + ``source_unit = S``, ``target_unit = T`` belonging to family ``G``: + + - ``_src_factor`` converts a value in ``S`` to the base unit of ``G``. + - ``_tgt_factor`` converts a value in ``T`` to the base unit of ``G``. + - The combined factor that converts ``S`` to ``T`` is + ``_src_factor / _tgt_factor``. + + Rows whose source or target unit is missing on the table — or whose + source/target units belong to different families — receive a null + factor. Null factors are treated as "no conversion" by the cache. + + Parameters + ---------- + spark : SparkSession + Active Spark session. + query : QueryBuilder + Query object carrying the configured ``db``. + channels_df : pyspark.sql.DataFrame + DataFrame that already carries ``source_unit`` / ``target_unit`` + columns (added by :meth:`filter_aliased_channel_metrics`). + + Returns + ------- + pyspark.sql.DataFrame + *channels_df* augmented with a ``conversion_factor`` column. + """ + uc_table = query.db.unit_conversion(spark) + uc_table = self._apply_column_mapping( + uc_table, self.config.unit_conversion.column_name_mapping + ) + + unit_col = self.config.unit_col + group_id_col = self.config.group_id_col + factor_col = self.config.conversion_factor_col + source_unit_col = self.config.source_unit_col + target_unit_col = self.config.target_unit_col + + # Source-side join: fetch _src_factor and _src_group_id. + channels_df = channels_df.join( + F.broadcast( + uc_table.select( + F.col(unit_col).alias("_src_unit"), + F.col(factor_col).alias("_src_factor"), + F.col(group_id_col).alias("_src_group_id"), + ) + ), + on=[channels_df[source_unit_col] == F.col("_src_unit")], + how="left", + ).drop("_src_unit") + + # Target-side join: must belong to the same unit family. + channels_df = channels_df.join( + F.broadcast( + uc_table.select( + F.col(unit_col).alias("_tgt_unit"), + F.col(factor_col).alias("_tgt_factor"), + F.col(group_id_col).alias("_tgt_group_id"), + ) + ), + on=[ + channels_df[target_unit_col] == F.col("_tgt_unit"), + F.col("_src_group_id") == F.col("_tgt_group_id"), + ], + how="left", + ).drop("_tgt_unit", "_tgt_group_id") + + channels_df = channels_df.withColumn( + factor_col, + F.col("_src_factor") / F.col("_tgt_factor"), + ).drop("_src_factor", "_src_group_id", "_tgt_factor") + + return channels_df # ------------------------------------------------------------------ # Solve @@ -478,6 +624,13 @@ def solve(self, query, channels_df, selections, dtypes) -> DataFrame: """ Solve the query by grouping channels and applying selections. + When a ``unit_conversion_table`` is configured on the database and + *channels_df* carries ``source_unit`` / ``target_unit`` columns + (added upstream by :meth:`filter_aliased_channel_metrics`), + per-channel conversion factors are computed and propagated into + the grouped-map UDF so that time-series values are converted from + the source to the target unit on the fly. + Parameters ---------- query : QueryBuilder @@ -495,6 +648,23 @@ def solve(self, query, channels_df, selections, dtypes) -> DataFrame: DataFrame containing results for each container. """ col_map = self.config.col_map + source_unit_col = self.config.source_unit_col + target_unit_col = self.config.target_unit_col + + has_conversion_table = ( + getattr(query.db.config, "unit_conversion_table", None) is not None + ) + has_unit_cols = ( + source_unit_col in channels_df.columns + and target_unit_col in channels_df.columns + ) + + if has_conversion_table and has_unit_cols: + channels_df = self._compute_conversion_factors(self.spark, query, channels_df) + + for col_name in (source_unit_col, target_unit_col): + if col_name in channels_df.columns: + channels_df = channels_df.drop(col_name) q = query.db.channels(self.spark) q = self._apply_column_mapping(q, self.config.channels.column_name_mapping) diff --git a/src/impulse_query_engine/analyze/query/solvers/series_cache.py b/src/impulse_query_engine/analyze/query/solvers/series_cache.py index b2f7e2f..7ad955f 100644 --- a/src/impulse_query_engine/analyze/query/solvers/series_cache.py +++ b/src/impulse_query_engine/analyze/query/solvers/series_cache.py @@ -24,7 +24,7 @@ def resolve(self, selection) -> pd.DataFrame: pass @abstractmethod - def load_blob(self, mid, cid) -> SampleSeries: + def load_blob(self, mid, cid, uses_alias: bool = False) -> SampleSeries: """ Resolve given mid and cid to a series. @@ -34,6 +34,13 @@ def load_blob(self, mid, cid) -> SampleSeries: Container or measurement ID. cid : Any Channel ID. + uses_alias : bool, optional + ``True`` when the calling selector resolves the channel via a + ``channel_mapping`` alias. Caches that perform unit conversion + (e.g. :class:`KVSTimeSeriesCache`) only apply the per-channel + conversion factor when this is ``True``, so a direct selector + on the same physical channel always returns raw values. + Defaults to ``False`` (direct / no-conversion semantics). Returns ------- diff --git a/src/impulse_query_engine/analyze/query/solvers/solver_config.py b/src/impulse_query_engine/analyze/query/solvers/solver_config.py index 5536b3f..8c27add 100644 --- a/src/impulse_query_engine/analyze/query/solvers/solver_config.py +++ b/src/impulse_query_engine/analyze/query/solvers/solver_config.py @@ -64,6 +64,8 @@ class SolverConfig(BaseModel): Column mappings and filters for the channel mapping (alias) table. channels : TableConfig Column mappings and filters for the channel data table. + unit_conversion : TableConfig + Column mappings and filters for the unit conversion table. """ project_id: str | None = None @@ -74,6 +76,7 @@ class SolverConfig(BaseModel): channel_metrics: TableConfig = TableConfig() channel_mapping: TableConfig = TableConfig() channels: TableConfig = TableConfig() + unit_conversion: TableConfig = TableConfig() # ------------------------------------------------------------------ # Class methods @@ -176,6 +179,36 @@ def parent_id_col(self) -> str: """Internal column name for the parent/scope identifier.""" return "parent_id" + @property + def conversion_factor_col(self) -> str: + """Internal column name for the conversion factor on the unit_conversion table. + + Also used as the column that carries the per-channel combined factor + downstream from :meth:`KeyValueStoreSolver._compute_conversion_factors` + into the grouped-map UDF. + """ + return "conversion_factor" + + @property + def source_unit_col(self) -> str: + """Internal column name for the source unit on the channel_mapping table.""" + return "source_unit" + + @property + def target_unit_col(self) -> str: + """Internal column name for the target unit on the channel_mapping table.""" + return "target_unit" + + @property + def unit_col(self) -> str: + """Internal column name for the unit name on the unit_conversion table.""" + return "unit" + + @property + def group_id_col(self) -> str: + """Internal column name for the unit group id on the unit_conversion table.""" + return "group_id" + @property def col_map(self) -> dict[str, str]: """Short-key → internal-column-name mapping for UDFs and caches.""" @@ -185,4 +218,5 @@ def col_map(self) -> dict[str, str]: "ts": self.tstart_col, "te": self.tend_col, "val": self.value_col, + "conv": self.conversion_factor_col, } diff --git a/src/impulse_query_engine/measurement_db.py b/src/impulse_query_engine/measurement_db.py index 36a1c26..c0ba27c 100644 --- a/src/impulse_query_engine/measurement_db.py +++ b/src/impulse_query_engine/measurement_db.py @@ -15,6 +15,7 @@ def __init__( channel_metrics_table=None, channels_uri=None, channel_mapping_table=None, + unit_conversion_table=None, table_locations: str = "external_locations", ): self.container_tags_table = container_tags_table @@ -23,6 +24,7 @@ def __init__( self.channel_metrics_table = channel_metrics_table self.channels_uri = channels_uri self.channel_mapping_table = channel_mapping_table + self.unit_conversion_table = unit_conversion_table self.table_locations = table_locations self.debug_tables = None @@ -31,6 +33,7 @@ def for_unity_catalog( catalog_name: str, core_schema_name: str = "core", channel_mapping_table: str | None = None, + unit_conversion_table: str | None = None, ): return MeasurementDBConfig( container_tags_table=f"{catalog_name}.{core_schema_name}.container_tags", @@ -39,6 +42,7 @@ def for_unity_catalog( channel_metrics_table=f"{catalog_name}.{core_schema_name}.channel_metrics", channels_uri=f"{catalog_name}.{core_schema_name}.channels", channel_mapping_table=channel_mapping_table, + unit_conversion_table=unit_conversion_table, table_locations="unity_catalog", ) @@ -57,6 +61,9 @@ def for_debug(debug_tables): channel_mapping_table=( "channel_mapping" if "channel_mapping" in debug_tables else None ), + unit_conversion_table=( + "unit_conversion" if "unit_conversion" in debug_tables else None + ), table_locations="debug", ) cfg.debug_tables = debug_tables @@ -101,6 +108,11 @@ def channel_mapping(self, spark) -> DataFrame: raise ValueError("channel_mapping_table is not configured") return self._read_table(spark, self.config.channel_mapping_table) + def unit_conversion(self, spark) -> DataFrame: + if self.config.unit_conversion_table is None: + raise ValueError("unit_conversion_table is not configured") + return self._read_table(spark, self.config.unit_conversion_table) + def channel_uri(self): return self.config.channels_uri diff --git a/src/impulse_reporting/config/config_parser.py b/src/impulse_reporting/config/config_parser.py index 198b445..4953448 100644 --- a/src/impulse_reporting/config/config_parser.py +++ b/src/impulse_reporting/config/config_parser.py @@ -204,6 +204,11 @@ class Source(BaseModel): channel_mapping_table : str, optional Full Unity Catalog path to the channel mapping table. Required when using ``channel_with_alias()`` for logical alias resolution. + unit_conversion_table : str, optional + Full Unity Catalog path to the unit conversion table. When set together + with a ``channel_mapping_table`` whose rows carry ``source_unit`` and + ``target_unit`` columns, the query engine converts time-series values + from the source to the target unit during ``solve()``. Notes ----- @@ -217,6 +222,7 @@ class Source(BaseModel): channel_metrics_table: Annotated[str, AfterValidator(is_valid_table_name)] channels_uri: Annotated[str, AfterValidator(is_valid_table_name)] channel_mapping_table: Annotated[str, AfterValidator(is_valid_table_name)] | None = None + unit_conversion_table: Annotated[str, AfterValidator(is_valid_table_name)] | None = None class UnitySink(BaseModel): diff --git a/src/impulse_reporting/core/report.py b/src/impulse_reporting/core/report.py index ab09ee4..7b73278 100644 --- a/src/impulse_reporting/core/report.py +++ b/src/impulse_reporting/core/report.py @@ -859,6 +859,13 @@ def determine_report(self, is_incremental: bool = None): # Validate that every aggregation references a registered event self._validate_aggregation_events() + # TODO: port unit-consistency sanity check from MDA Framework + # (`mda_reporting/util/unit_sanity_check.py`). When a + # `unit_conversion_table` is configured, walk all aggregation / + # event expressions and emit a UserWarning for each aliased + # selector whose source_unit differs from target_unit so the + # caller knows to express formula constants in target units. + # Clean up temp tables from previous runs self._cleanup_temp_tables() diff --git a/tests/conftest.py b/tests/conftest.py index d29fa74..e91a383 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -334,3 +334,69 @@ def key_value_store_alias_db( cfg = MeasurementDBConfig.for_debug(tables) cfg.channel_mapping_table = "channel_mapping" return MeasurementDB(cfg, ws=mock_workspace_client) + + +@pytest.fixture(scope="session") +def unit_conversion_dataframes(spark): + """Load unit-conversion test CSVs into cached in-memory DataFrames. + + Hands DataFrames directly to MeasurementDB (via ``for_debug``) instead of + persisting them through Delta — the alias-style write-then-read fixture + occasionally hit Delta ``ProtocolChangedException`` during macOS test + runs. Caching the DataFrames once per session keeps the data stable. + """ + base_path = os.path.dirname(os.path.abspath(__file__)) + base_path = base_path[: base_path.find("tests")] + + container_tags_path = f"{base_path}/tests/unit/data/key_value_store_csv/container_metrics.csv" + container_metric_path = f"{base_path}/tests/unit/data/basic_narrow_csv/container_metrics.csv" + channel_metric_path = ( + f"{base_path}/tests/unit/data/key_value_store_alias_csv/channel_metrics.csv" + ) + channels_path = f"{base_path}/tests/unit/data/basic_narrow_csv/channel_data.csv" + channel_mapping_path = ( + f"{base_path}/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv" + ) + unit_conversion_path = ( + f"{base_path}/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv" + ) + + options = {"header": "True", "delimiter": ",", "inferSchema": "True"} + + def _load(path): + df = spark.read.options(**options).csv(path).cache() + df.count() + return df + + return { + "container_tags": _load(container_tags_path), + "container_metrics": _load(container_metric_path), + "channel_metrics": _load(channel_metric_path), + "channels": _load(channels_path), + "channel_mapping": _load(channel_mapping_path), + "unit_conversion": _load(unit_conversion_path), + } + + +@pytest.fixture +def key_value_store_unit_conversion_db( + unit_conversion_dataframes, mock_workspace_client +) -> MeasurementDB: + """Return a key-value-store MeasurementDB with unit conversion configured.""" + cfg = MeasurementDBConfig.for_debug(unit_conversion_dataframes) + cfg.channel_mapping_table = "channel_mapping" + cfg.unit_conversion_table = "unit_conversion" + return MeasurementDB(cfg, ws=mock_workspace_client) + + +@pytest.fixture +def key_value_store_unit_conversion_db_no_table( + unit_conversion_dataframes, mock_workspace_client +) -> MeasurementDB: + """Same data as ``key_value_store_unit_conversion_db`` but with + ``unit_conversion_table=None`` to test the opt-out path.""" + tables = {k: v for k, v in unit_conversion_dataframes.items() if k != "unit_conversion"} + cfg = MeasurementDBConfig.for_debug(tables) + cfg.channel_mapping_table = "channel_mapping" + # Explicitly leave unit_conversion_table = None + return MeasurementDB(cfg, ws=mock_workspace_client) diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py index ad0590a..f4d98b4 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_solver_wide_only_test.py @@ -270,6 +270,7 @@ def test_col_map_always_returns_internal_names(self, spark: SparkSession): "ts": "tstart", "te": "tend", "val": "value", + "conv": "conversion_factor", } def test_config_properties_return_internal_names(self, spark: SparkSession): diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py new file mode 100644 index 0000000..287de01 --- /dev/null +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py @@ -0,0 +1,284 @@ +# pylint: disable=missing-function-docstring + +import os + +import numpy as np +import pandas as pd +import pytest +from pyspark.sql import SparkSession + +from impulse_query_engine.analyze.query.solvers.key_value_store_solver import ( + KeyValueStoreSolver, +) +from impulse_query_engine.analyze.query.solvers.solver_config import ( + SolverConfig, + TableConfig, +) +from impulse_query_engine.measurement_db import MeasurementDB + + +def _solver(spark: SparkSession) -> KeyValueStoreSolver: + return KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + ), + ) + + +def _expected_raw_values(channels_csv_path: str, container_id: int, channel_id: int) -> np.ndarray: + raw = pd.read_csv(channels_csv_path) + rows = raw[(raw["container_id"] == container_id) & (raw["channel_id"] == channel_id)] + return rows.sort_values("tstart")["value"].values.astype(np.float64) + + +@pytest.fixture +def channels_csv_path() -> str: + base_path = os.path.dirname(os.path.abspath(__file__)) + base_path = base_path[: base_path.find("tests")] + return f"{base_path}/tests/unit/data/basic_narrow_csv/channel_data.csv" + + +class TestUnitConversionSolve: + def test_solve_with_unit_conversion( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + assert pdf["container_id"].tolist() == [1, 2, 3] + + factor = 0.277778 + # Containers 1 and 2 resolve vehicle_speed -> "Vehicle Speed Sensor" (channel 7). + for cid in (1, 2): + expected = _expected_raw_values(channels_csv_path, cid, 7) * factor + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed.values, expected, rtol=1e-6) + + # Container 3 resolves to channel 7 via Spd_Vhcl / ProjSpecREC_10Hz. + expected3 = _expected_raw_values(channels_csv_path, 3, 7) * factor + row3 = pdf.loc[pdf["container_id"] == 3].iloc[0] + np.testing.assert_allclose(row3.vehicle_speed.values, expected3, rtol=1e-6) + + def test_solve_no_conversion_when_same_unit( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias( + "engine_speed" + ) + + pdf = query.select(engine_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + for cid in (1, 2, 3): + expected = _expected_raw_values(channels_csv_path, cid, 5) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.engine_speed.values, expected, rtol=1e-12) + + def test_solve_no_conversion_when_table_not_configured( + self, + spark: SparkSession, + key_value_store_unit_conversion_db_no_table: MeasurementDB, + channels_csv_path: str, + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db_no_table.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + # No conversion: values are returned exactly as-is from the raw channel data. + for cid in (1, 2, 3): + expected = _expected_raw_values(channels_csv_path, cid, 7) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed.values, expected, rtol=1e-12) + + def test_solve_no_conversion_for_direct_selectors( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + # Direct selector — no alias, so no unit metadata, so no conversion. + vehicle_speed_direct = query.channel( + channel_name="Vehicle Speed Sensor", data_key="TM" + ).alias("vehicle_speed_direct") + + pdf = query.select(vehicle_speed_direct).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + for cid in (1, 2): + expected = _expected_raw_values(channels_csv_path, cid, 7) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed_direct.values, expected, rtol=1e-12) + + def test_solve_same_channel_direct_stays_raw_aliased_converts( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # When a direct selector and an aliased selector resolve to the same + # (container_id, channel_id) (both land on channel 7), conversion is a + # property of the alias — the direct selector returns raw values, + # the aliased selector returns raw * factor (km/h -> m/s). + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + direct = query.channel(channel_name="Vehicle Speed Sensor", data_key="TM").alias( + "vehicle_speed_raw" + ) + aliased = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed_converted" + ) + + pdf = query.select(direct, aliased).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + factor = 0.277778 + for cid in (1, 2): + raw = _expected_raw_values(channels_csv_path, cid, 7) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed_raw.values, raw, rtol=1e-12) + np.testing.assert_allclose( + row.vehicle_speed_converted.values, raw * factor, rtol=1e-6 + ) + + def test_solve_mixed_direct_and_aliased_disjoint_channels( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Direct selector targets a *different* channel than the aliased one. + # Direct: Ambient Air Temperature (channel 6, no conversion). + # Aliased: vehicle_speed (channel 7, km/h -> m/s). + # + # Note: when a direct selector and an aliased selector resolve to the + # same (container_id, channel_id), the conversion factor stored on the + # channel row applies to both — the per-channel factor model in + # KVSTimeSeriesCache cannot distinguish callers. We therefore only + # cover the disjoint case here. + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + direct = query.channel(channel_name="Ambient Air Temperature", data_key="TM").alias( + "ambient_temp" + ) + aliased = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed_converted" + ) + + pdf = query.select(direct, aliased).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + factor = 0.277778 + for cid in (1, 2): + ambient_raw = _expected_raw_values(channels_csv_path, cid, 6) + speed_raw = _expected_raw_values(channels_csv_path, cid, 7) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.ambient_temp.values, ambient_raw, rtol=1e-12) + np.testing.assert_allclose( + row.vehicle_speed_converted.values, speed_raw * factor, rtol=1e-6 + ) + + def test_solve_cross_family_units_leave_values_unchanged( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # cross_family_alias maps Engine RPM (rotation family) -> m/s + # (speed family). The group_id mismatch makes the target-side join + # miss, leaving conversion_factor null and values unchanged. + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + cross = query.channel_with_alias(channel_alias="cross_family_alias").alias("cross") + + pdf = query.select(cross).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + # The mapping only references Engine RPM/TM, which exists for containers 1 and 2. + for cid in (1, 2): + expected = _expected_raw_values(channels_csv_path, cid, 5) + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.cross.values, expected, rtol=1e-12) + + +class TestComputeConversionFactors: + def test_factor_one_for_identical_units( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + + channels_df = spark.createDataFrame( + [(1, 5, "RPM", "RPM"), (2, 5, "RPM", "RPM")], + schema=["container_id", "channel_id", "source_unit", "target_unit"], + ) + + result = solver._compute_conversion_factors(spark, query, channels_df).collect() + factors = {row.container_id: row.conversion_factor for row in result} + assert pytest.approx(factors[1], rel=1e-12) == 1.0 + assert pytest.approx(factors[2], rel=1e-12) == 1.0 + + def test_factor_for_known_speed_conversion( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + + channels_df = spark.createDataFrame( + [(1, 7, "km/h", "m/s")], + schema=["container_id", "channel_id", "source_unit", "target_unit"], + ) + + row = solver._compute_conversion_factors(spark, query, channels_df).collect()[0] + assert row.conversion_factor == pytest.approx(0.277778, rel=1e-6) + + def test_null_factor_for_cross_family( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + + channels_df = spark.createDataFrame( + [(1, 5, "RPM", "m/s")], + schema=["container_id", "channel_id", "source_unit", "target_unit"], + ) + + row = solver._compute_conversion_factors(spark, query, channels_df).collect()[0] + assert row.conversion_factor is None + + def test_null_factor_for_unknown_unit( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + + channels_df = spark.createDataFrame( + [(1, 5, "furlongs/fortnight", "m/s")], + schema=["container_id", "channel_id", "source_unit", "target_unit"], + ) + + row = solver._compute_conversion_factors(spark, query, channels_df).collect()[0] + assert row.conversion_factor is None diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py index 8606f7b..a08c635 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/kvs_solver_column_mapping_test.py @@ -610,6 +610,7 @@ def test_col_map_always_returns_internal_names(self, spark): "ts": "tstart", "te": "tend", "val": "value", + "conv": "conversion_factor", } def test_mapping_entries_stored_correctly(self, spark): diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py index 509f06a..5cf3705 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py @@ -35,6 +35,7 @@ "ts": "tstart", "te": "tend", "val": "value", + "conv": "conversion_factor", } @@ -137,7 +138,7 @@ class TestColMap: def test_col_map_keys(self, cfg: SolverConfig): """col_map should contain exactly the expected short keys.""" - assert set(cfg.col_map.keys()) == {"cid", "ch", "ts", "te", "val"} + assert set(cfg.col_map.keys()) == {"cid", "ch", "ts", "te", "val", "conv"} def test_col_map_default_config(self): """Default SolverConfig col_map should match hardcoded defaults.""" @@ -148,6 +149,7 @@ def test_col_map_default_config(self): "ts": "tstart", "te": "tend", "val": "value", + "conv": "conversion_factor", } def test_col_map_consistent_with_properties(self, cfg: SolverConfig): @@ -157,6 +159,7 @@ def test_col_map_consistent_with_properties(self, cfg: SolverConfig): assert cfg.col_map["ts"] == cfg.tstart_col assert cfg.col_map["te"] == cfg.tend_col assert cfg.col_map["val"] == cfg.value_col + assert cfg.col_map["conv"] == cfg.conversion_factor_col def test_col_map_values(self, cfg: SolverConfig): assert cfg.col_map == _EXPECTED_COL_MAP diff --git a/tests/impulse_reporting/unit/meta/container_dimensions_test.py b/tests/impulse_reporting/unit/meta/container_dimensions_test.py index e40bf0b..acad8f8 100644 --- a/tests/impulse_reporting/unit/meta/container_dimensions_test.py +++ b/tests/impulse_reporting/unit/meta/container_dimensions_test.py @@ -68,7 +68,7 @@ def test_config_hashing(spark): schema = T.StructType([T.StructField(col, T.StringType(), True) for col in silver_columns]) df = spark.createDataFrame([("test_vehicle",)], schema) result = df.transform(ContainerDimension._add_config_hash(impulse_config)) - expected_result = [Row(uut_id="test_vehicle", config_hash=1983688711)] + expected_result = [Row(uut_id="test_vehicle", config_hash=1267386821)] assert expected_result == result.collect() diff --git a/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv b/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv new file mode 100644 index 0000000..eab3979 --- /dev/null +++ b/tests/unit/data/key_value_store_unit_conversion_csv/channel_mapping.csv @@ -0,0 +1,6 @@ +project_id,toolbox_id,channel_alias,source_channel,data_key,priority,source_unit,target_unit +SAMPLE_PROJECT,container_concept,engine_speed,Engine RPM,TM,,RPM,RPM +SAMPLE_PROJECT,container_concept,engine_speed,EngSpd,ProjSpecREC_10Hz,,RPM,RPM +SAMPLE_PROJECT,container_concept,vehicle_speed,Vehicle Speed Sensor,TM,,km/h,m/s +SAMPLE_PROJECT,container_concept,vehicle_speed,Spd_Vhcl,ProjSpecREC_10Hz,,km/h,m/s +SAMPLE_PROJECT,container_concept,cross_family_alias,Engine RPM,TM,,RPM,m/s diff --git a/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv b/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv new file mode 100644 index 0000000..25964d6 --- /dev/null +++ b/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv @@ -0,0 +1,6 @@ +group_id,unit,conversion_factor +speed,m/s,1.0 +speed,km/h,0.277778 +speed,mph,0.44704 +rotation,RPM,1.0 +rotation,rad/s,0.10472 From 029703bb9907b40da8477c90c657cd887e8e6d3b Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Thu, 21 May 2026 14:20:29 +0200 Subject: [PATCH 02/10] Refactor key_value_store_solver.py and unit conversion tests for improved readability --- .../analyze/query/solvers/key_value_store_solver.py | 11 +++-------- .../solvers/key_value_store_unit_conversion_test.py | 8 ++------ 2 files changed, 5 insertions(+), 14 deletions(-) diff --git a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py index dca8f1f..88b848c 100644 --- a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py @@ -45,9 +45,7 @@ def __init__(self, pdf, col_map: dict[str, str]): self._te_col = col_map["te"] self._val_col = col_map["val"] self._conv_col = col_map.get("conv") - self._has_conversion = ( - self._conv_col is not None and self._conv_col in pdf.columns - ) + self._has_conversion = self._conv_col is not None and self._conv_col in pdf.columns meta = pdf.drop(columns=[self._ts_col, self._te_col, self._val_col]) self.mdf = meta.drop_duplicates(subset=[self._cid_col, self._ch_col]).reset_index() @@ -651,12 +649,9 @@ def solve(self, query, channels_df, selections, dtypes) -> DataFrame: source_unit_col = self.config.source_unit_col target_unit_col = self.config.target_unit_col - has_conversion_table = ( - getattr(query.db.config, "unit_conversion_table", None) is not None - ) + has_conversion_table = getattr(query.db.config, "unit_conversion_table", None) is not None has_unit_cols = ( - source_unit_col in channels_df.columns - and target_unit_col in channels_df.columns + source_unit_col in channels_df.columns and target_unit_col in channels_df.columns ) if has_conversion_table and has_unit_cols: diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py index 287de01..7795ba5 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py @@ -79,9 +79,7 @@ def test_solve_no_conversion_when_same_unit( ): solver = _solver(spark) query = key_value_store_unit_conversion_db.query - engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias( - "engine_speed" - ) + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed") pdf = query.select(engine_speed).toPandas(spark, solver=solver) pdf = pdf.sort_values("container_id").reset_index(drop=True) @@ -160,9 +158,7 @@ def test_solve_same_channel_direct_stays_raw_aliased_converts( raw = _expected_raw_values(channels_csv_path, cid, 7) row = pdf.loc[pdf["container_id"] == cid].iloc[0] np.testing.assert_allclose(row.vehicle_speed_raw.values, raw, rtol=1e-12) - np.testing.assert_allclose( - row.vehicle_speed_converted.values, raw * factor, rtol=1e-6 - ) + np.testing.assert_allclose(row.vehicle_speed_converted.values, raw * factor, rtol=1e-6) def test_solve_mixed_direct_and_aliased_disjoint_channels( self, From 0d92c1d6424b19ecbaf94524e0e3d6db1a421b95 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Thu, 21 May 2026 14:50:23 +0200 Subject: [PATCH 03/10] Update unit conversion support in documentation and configuration - Added `unit_conversion_table` parameter to relevant configuration sections, detailing its role in converting time-series values during `solve()`. - Enhanced documentation across multiple files to clarify the usage of `source_unit` and `target_unit` columns in the `channel_mapping` table. - Updated `KeyValueStoreSolver` documentation to reflect changes in unit conversion handling and its integration with aliased selectors. - Improved clarity on the configuration and functionality of unit conversion within the impulse framework. --- docs/impulse/docs/config/configuration.md | 32 ++++++++++++ .../docs/data_model/silver_layer_schema.md | 25 ++++++++++ .../query/solvers/key_value_store_solver.md | 18 ++++++- .../analyze/query/solvers/solver_config.md | 50 +++++++++++++++++++ .../impulse_reporting/config/config_parser.md | 4 ++ docs/impulse/docs/references/query_engine.md | 1 + docs/impulse/docs/references/tsal.md | 6 +++ 7 files changed, 135 insertions(+), 1 deletion(-) diff --git a/docs/impulse/docs/config/configuration.md b/docs/impulse/docs/config/configuration.md index 2fdca38..db96e47 100644 --- a/docs/impulse/docs/config/configuration.md +++ b/docs/impulse/docs/config/configuration.md @@ -62,6 +62,7 @@ Maps the silver-layer input tables. | `container_tags_table` | `str` | No | Full Unity Catalog path. Container EAV tags. | | `channel_tags_table` | `str` | No | Full Unity Catalog path. Channel EAV tags. | | `channel_mapping_table` | `str` | No | Full Unity Catalog path. Logical-to-physical channel alias table. Required when using `QueryBuilder.channel_with_alias()` (currently supported by `KeyValueStoreSolver`). | +| `unit_conversion_table` | `str` | No | Full Unity Catalog path. Per-unit-family conversion factors. When configured together with a `channel_mapping_table` whose rows carry `source_unit` / `target_unit` columns, aliased selectors auto-convert values from source to target unit during `solve()` (currently supported by `KeyValueStoreSolver`). | Tag tables are required for solvers that consume tag-based filters (`DeltaSolver` with tag filters, `KeyValueStoreSolver`). @@ -174,6 +175,7 @@ Per-table sections (each a `TableConfig`): | `channel_metrics` | All solvers | Custom channel_id column, custom value/timestamp columns | | `channel_mapping` | KeyValueStoreSolver | Alias-table column renames; `priority` column | | `channels` | All solvers | RLE column renames (`tstart`/`tend`/`value`) | +| `unit_conversion` | KeyValueStoreSolver | Unit-conversion table column renames (`unit`, `group_id`, `conversion_factor`) | Internal column names that mappings can target: @@ -187,6 +189,10 @@ Internal column names that mappings can target: | `priority` | Tie-breaker column on the `channel_mapping` table | | `project_id` | Project scoping column | | `parent_id` | Parent/scope identifier | +| `source_unit`, `target_unit` | Source/target unit columns on the `channel_mapping` table | +| `unit` | Unit name column on the `unit_conversion` table | +| `group_id` | Unit-family identifier on the `unit_conversion` table | +| `conversion_factor` | Per-unit factor on `unit_conversion`; also the per-channel factor name carried into the solve UDF | :::note Per-solver feature support @@ -235,6 +241,32 @@ However, only the parts each solver supports are actually consumed: Sections you don't customize can be omitted; defaults are an empty mapping and no filters. +### Unit conversion (optional) + +Set `source.unit_conversion_table` and extend `channel_mapping` with `source_unit` / `target_unit` columns +to have aliased selectors auto-convert values from source to target unit during `solve()`. Direct selectors +via `query.channel(...)` always return raw values, even on a channel that an aliased sibling converts — +conversion is a property of the alias, not of the channel. See +[`unit_conversion`](../data_model/silver_layer_schema.md#unit_conversion-optional) for the table schema. + +```python +"source": { + "container_metrics_table": "my_catalog.silver.container_metrics", + "channel_metrics_table": "my_catalog.silver.channel_metrics", + "channels_uri": "my_catalog.silver.channels", + "channel_mapping_table": "my_catalog.silver.channel_mapping", + "unit_conversion_table": "my_catalog.silver.unit_conversion" +}, +"query_engine": { + "solver": "KeyValueStoreSolver", + "solver_config": { + "unit_conversion": { + "column_name_mapping": {} + } + } +} +``` + ### When to use what - **`solver_config..column_name_mapping`** — your silver-layer column is named differently from diff --git a/docs/impulse/docs/data_model/silver_layer_schema.md b/docs/impulse/docs/data_model/silver_layer_schema.md index 32d859d..22fe298 100644 --- a/docs/impulse/docs/data_model/silver_layer_schema.md +++ b/docs/impulse/docs/data_model/silver_layer_schema.md @@ -260,7 +260,32 @@ channel name to one or more physical channels keyed by `project_id` / | `channel_name` | `string` | No | Logical channel name to match against `channel_with_alias` selectors. | | `data_key` | `string` | No | Physical lookup key joined to `channel_metrics`. | | `priority` | `int` | Yes | Tie-breaker when multiple physical channels match a logical name. | +| `source_unit` | `string` | Yes | Unit of the raw channel data. When non-null together with `target_unit` and a configured `unit_conversion_table`, the solver converts values from source to target unit on aliased reads. | +| `target_unit` | `string` | Yes | Target unit for aliased reads of this mapping. | Configured via `source.channel_mapping_table` (see [Configuration](../config/configuration.md)). Joins to `channel_metrics` on `(project_id, data_key, channel_name)`. + +--- + +## unit_conversion (optional) + +Per-unit-family conversion factors. Read by `KeyValueStoreSolver` at +solve time when `source.unit_conversion_table` is configured and the +`channel_mapping` table carries `source_unit` / `target_unit` columns. + +| Column | Type | Nullable | Description | +|---------------------|----------|----------|------------------------------------------------------------------------------------------------------------| +| `group_id` | `string` | No | Unit family identifier (e.g. `speed`, `rotation`). Only units within the same family can convert into each other. | +| `unit` | `string` | No | Unit name. Matches the `source_unit` / `target_unit` values on `channel_mapping`. | +| `conversion_factor` | `double` | No | Multiplier that converts a value in this unit to the family's base unit. The base unit has factor `1.0`. | + +For each aliased channel the solver looks up `source_factor` (the row +whose `unit` matches `source_unit`) and `target_factor` (the row whose +`unit` matches `target_unit`, constrained to the same `group_id`) and +multiplies values by `source_factor / target_factor`. Missing rows or a +`group_id` mismatch yield a null factor and no conversion. + +Configured via `source.unit_conversion_table` (see +[Configuration](../config/configuration.md)). diff --git a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md index be9e711..34afe34 100644 --- a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md +++ b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md @@ -171,6 +171,13 @@ def resolve_channel_selections(spark, channel_metrics_df, Union direct and aliased channel metrics, combining selector_ids. +When the aliased side carries ``source_unit`` / ``target_unit`` +columns (added by :meth:`filter_aliased_channel_metrics` when a +unit conversion table is configured), those columns are preserved +through the union and aggregation. Direct selectors produce null +unit columns, which causes the downstream conversion-factor join +in :meth:`solve` to leave their values unchanged. + **Arguments**: - `spark` (`SparkSession`): Spark session used for query execution. @@ -179,7 +186,9 @@ Union direct and aliased channel metrics, combining selector_ids. **Returns**: -`pyspark.sql.DataFrame`: Merged DataFrame with ``(container_id, channel_id, selector_ids)``. +`pyspark.sql.DataFrame`: Merged DataFrame with ``(container_id, channel_id, selector_ids)`` +(plus ``source_unit`` / ``target_unit`` when present on the +aliased side). #### solve @@ -189,6 +198,13 @@ def solve(query, channels_df, selections, dtypes) -> DataFrame Solve the query by grouping channels and applying selections. +When a ``unit_conversion_table`` is configured on the database and +*channels_df* carries ``source_unit`` / ``target_unit`` columns +(added upstream by :meth:`filter_aliased_channel_metrics`), +per-channel conversion factors are computed and propagated into +the grouped-map UDF so that time-series values are converted from +the source to the target unit on the fly. + **Arguments**: - `query` (`QueryBuilder`): Query object containing database and filter information. diff --git a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md index 7b0a8fa..b15ce7d 100644 --- a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md +++ b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md @@ -60,6 +60,7 @@ so that solver code can always reference the same constants. - `channel_metrics` (`TableConfig`): Column mappings and filters for the channel metrics table. - `channel_mapping` (`TableConfig`): Column mappings and filters for the channel mapping (alias) table. - `channels` (`TableConfig`): Column mappings and filters for the channel data table. +- `unit_conversion` (`TableConfig`): Column mappings and filters for the unit conversion table. #### from\_json @@ -194,6 +195,55 @@ def parent_id_col() -> str Internal column name for the parent/scope identifier. +#### conversion\_factor\_col + +```python +def conversion_factor_col() -> str +``` + +Internal column name for the conversion factor on the unit_conversion table. + +Also used as the column that carries the per-channel combined factor +downstream from :meth:`KeyValueStoreSolver._compute_conversion_factors` +into the grouped-map UDF. + + +#### source\_unit\_col + +```python +def source_unit_col() -> str +``` + +Internal column name for the source unit on the channel_mapping table. + + +#### target\_unit\_col + +```python +def target_unit_col() -> str +``` + +Internal column name for the target unit on the channel_mapping table. + + +#### unit\_col + +```python +def unit_col() -> str +``` + +Internal column name for the unit name on the unit_conversion table. + + +#### group\_id\_col + +```python +def group_id_col() -> str +``` + +Internal column name for the unit group id on the unit_conversion table. + + #### col\_map ```python diff --git a/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md b/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md index 2eccf27..c12be71 100644 --- a/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md +++ b/docs/impulse/docs/references/api/impulse_reporting/config/config_parser.md @@ -146,6 +146,10 @@ configured) regardless of whether ``container_tags_table`` is set. - `channels_uri` (`str`): Full Unity Catalog path to the channels data table. - `channel_mapping_table` (`str`): Full Unity Catalog path to the channel mapping table. Required when using ``channel_with_alias()`` for logical alias resolution. +- `unit_conversion_table` (`str`): Full Unity Catalog path to the unit conversion table. When set together +with a ``channel_mapping_table`` whose rows carry ``source_unit`` and +``target_unit`` columns, the query engine converts time-series values +from the source to the target unit during ``solve()``. ## UnitySink diff --git a/docs/impulse/docs/references/query_engine.md b/docs/impulse/docs/references/query_engine.md index 150e928..fa07dfb 100644 --- a/docs/impulse/docs/references/query_engine.md +++ b/docs/impulse/docs/references/query_engine.md @@ -47,6 +47,7 @@ attributes are already wide on `container_metrics` itself. | `container_tags` | required (narrow EAV) | optional (narrow EAV) | | `channel_tags` | required (narrow EAV) | not used | | `channel_mapping` | not used | optional (channel aliases) | +| `unit_conversion` | not used | optional (per-alias unit conversion) | See the [Silver Layer Schema](../data_model/silver_layer_schema.md) for the columns each table is expected to carry. diff --git a/docs/impulse/docs/references/tsal.md b/docs/impulse/docs/references/tsal.md index d94d654..9542df7 100644 --- a/docs/impulse/docs/references/tsal.md +++ b/docs/impulse/docs/references/tsal.md @@ -51,6 +51,12 @@ Each keyword argument becomes a tag filter on the `channel_mapping` table; the s to the physical channels at read time. Use this when the consuming code should not need to know which physical signal backs a given logical name. +When the `channel_mapping` table carries `source_unit` and `target_unit` columns and the report config sets +`source.unit_conversion_table`, values returned from `channel_with_alias()` are automatically converted from source +to target unit before any expression is evaluated. Constants and parameters in expressions over an aliased selector +must therefore be expressed in the target unit. Direct selectors via `channel(...)` on the same physical channel are +unaffected — conversion is a property of the alias, not of the channel. + --- ## Operators From 85bf6ef0f4536275ccb5913e6c6bc367f3db9b8a Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Thu, 21 May 2026 16:33:18 +0200 Subject: [PATCH 04/10] Enhance channel mapping configuration and alias resolution in documentation and code - Introduced `JoinKey` class to define custom join keys for alias resolution between `channel_mapping` and `channel_metrics`. - Updated `ChannelMappingConfig` to include an optional `join_keys` attribute, allowing for flexible alias-resolution configurations. - Enhanced `SolverConfig` to support the new `ChannelMappingConfig` and its properties. - Improved documentation to clarify the usage of `join_keys` and internal column names related to channel mapping. - Added unit tests to validate the behavior of configurable join keys and their integration with the `KeyValueStoreSolver`. --- docs/impulse/docs/config/configuration.md | 66 ++++++++- .../analyze/query/solvers/solver_config.md | 105 ++++++++++++- .../query/solvers/key_value_store_solver.py | 22 ++- .../analyze/query/solvers/solver_config.py | 129 +++++++++++++++- .../solvers/key_value_store_alias_test.py | 139 ++++++++++++++++++ .../query/solvers/solver_config_test.py | 102 +++++++++++++ 6 files changed, 549 insertions(+), 14 deletions(-) diff --git a/docs/impulse/docs/config/configuration.md b/docs/impulse/docs/config/configuration.md index db96e47..c7c53e5 100644 --- a/docs/impulse/docs/config/configuration.md +++ b/docs/impulse/docs/config/configuration.md @@ -173,7 +173,7 @@ Per-table sections (each a `TableConfig`): | `container_metrics`| All solvers | Custom container_id column, custom timestamp columns | | `channel_tags` | DeltaSolver | Tag key/value column renames | | `channel_metrics` | All solvers | Custom channel_id column, custom value/timestamp columns | -| `channel_mapping` | KeyValueStoreSolver | Alias-table column renames; `priority` column | +| `channel_mapping` | KeyValueStoreSolver | Alias-table column renames; `priority` column; optional `join_keys` for non-default alias-resolution composite keys | | `channels` | All solvers | RLE column renames (`tstart`/`tend`/`value`) | | `unit_conversion` | KeyValueStoreSolver | Unit-conversion table column renames (`unit`, `group_id`, `conversion_factor`) | @@ -189,6 +189,10 @@ Internal column names that mappings can target: | `priority` | Tie-breaker column on the `channel_mapping` table | | `project_id` | Project scoping column | | `parent_id` | Parent/scope identifier | +| `source_channel`| Source-channel identifier on the `channel_mapping` table | +| `data_key` | Data-key identifier (default present on both `channel_mapping` and `channel_metrics`) | +| `channel_alias` | Alias identifier on the `channel_mapping` table | +| `channel_name` | Channel-name identifier on the `channel_metrics` table | | `source_unit`, `target_unit` | Source/target unit columns on the `channel_mapping` table | | `unit` | Unit name column on the `unit_conversion` table | | `group_id` | Unit-family identifier on the `unit_conversion` table | @@ -267,6 +271,66 @@ conversion is a property of the alias, not of the channel. See } ``` +### Alias-resolution join keys (optional) + +`KeyValueStoreSolver.filter_aliased_channel_metrics` joins `channel_mapping` +to `channel_metrics` to resolve aliased selectors. The default composite key +is `(source_channel, channel_name) + (data_key, data_key)`. Override +`channel_mapping.join_keys` to change the arity or column choice — for +example, a single-column join when `data_key` is not part of the channel +identity in your silver layout: + +```python +"solver_config": { + "channel_mapping": { + "join_keys": [ + {"mapping_col": "source_channel", "metrics_col": "channel_name"} + ] + } +} +``` + +Each `mapping_col` / `metrics_col` is an **internal** name (the name as the +solver sees the column **after** `column_name_mapping` has been applied on +the respective table). The two sides of a pair are independent, so the same +column can carry different names on the two tables. For instance, a layout +where the data-key column has different physical names on the two tables +has two equivalent paths: + +```python +# Path 1 — rename both physical columns to the same internal name; the +# default join_keys then works unchanged. +"solver_config": { + "channel_mapping": { + "column_name_mapping": {"mapping_data_key": "data_key"} + }, + "channel_metrics": { + "column_name_mapping": {"metrics_data_key": "data_key"} + } +} + +# Path 2 — leave the physical names as-is and reference them directly. +"solver_config": { + "channel_mapping": { + "join_keys": [ + {"mapping_col": "source_channel", "metrics_col": "channel_name"}, + {"mapping_col": "mapping_data_key", "metrics_col": "metrics_data_key"} + ] + } +} +``` + +`query.channel(...)` and `query.channel_with_alias(...)` kwargs are column +references against the **post-`column_name_mapping`** schema. If you +override `join_keys` (or skip renames) so that the solver sees a column +under a non-default name, the same name must be used as the kwarg. Example: +if `join_keys` references `metrics_col: "my_chan_name"` and the column is +not renamed via `column_name_mapping`, call +`query.channel(my_chan_name=...)`. The internal-name properties on +`SolverConfig` exist primarily to remove magic strings from the solver +code; the user-facing contract is "kwarg name == column name as the solver +sees it". + ### When to use what - **`solver_config.
.column_name_mapping`** — your silver-layer column is named differently from diff --git a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md index b15ce7d..e8369c0 100644 --- a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md +++ b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md @@ -36,6 +36,47 @@ names used by the solver. An empty dict means no renaming Keys are internal column names; values are the literal values to match. +## JoinKey + +```python +class JoinKey(BaseModel) +``` + +A single column pair in the ``channel_mapping`` → ``channel_metrics`` join. + +Used by :class:`ChannelMappingConfig.join_keys` to override the default +alias-resolution composite key. + +Both fields reference column names **after** ``column_name_mapping`` has +been applied on the respective table; the two sides are independent, so +a column may appear under different names on the two tables. + +**Arguments**: + +- `mapping_col` (`str`): Column name on ``channel_mapping`` after its ``column_name_mapping`` +has been applied. +- `metrics_col` (`str`): Column name on ``channel_metrics`` after its ``column_name_mapping`` +has been applied. + +## ChannelMappingConfig + +```python +class ChannelMappingConfig(TableConfig) +``` + +``TableConfig`` plus an optional alias-resolution join-key spec. + +**Arguments**: + +- `join_keys` (`list[JoinKey] or None`): Custom composite key for the ``channel_mapping`` → ``channel_metrics`` +join performed by ``KeyValueStoreSolver.filter_aliased_channel_metrics``. +When ``None`` (the default), the solver uses the backward-compatible +pair ``[(source_channel, channel_name), (data_key, data_key)]`` +sourced from :class:`SolverConfig` internal-name properties. +Provide a custom list to change the join arity or column choice +(e.g. a single-column join when ``data_key`` is not part of the +channel identity in your silver layout). + ## SolverConfig ```python @@ -58,7 +99,8 @@ so that solver code can always reference the same constants. - `container_metrics` (`TableConfig`): Column mappings and filters for the container metrics table. - `channel_tags` (`TableConfig`): Column mappings and filters for the channel tags table. - `channel_metrics` (`TableConfig`): Column mappings and filters for the channel metrics table. -- `channel_mapping` (`TableConfig`): Column mappings and filters for the channel mapping (alias) table. +- `channel_mapping` (`ChannelMappingConfig`): Column mappings, filters, and the alias-resolution ``join_keys`` +override for the channel mapping (alias) table. - `channels` (`TableConfig`): Column mappings and filters for the channel data table. - `unit_conversion` (`TableConfig`): Column mappings and filters for the unit conversion table. @@ -177,6 +219,50 @@ def alias_priority_col() -> str Internal column name for the alias priority on the channel_mapping table. +#### source\_channel\_col + +```python +def source_channel_col() -> str +``` + +Internal column name for the source-channel identifier on the channel_mapping table. + + +#### data\_key\_col + +```python +def data_key_col() -> str +``` + +Internal column name for the data-key identifier. + +Default present on both ``channel_mapping`` and ``channel_metrics``; +used by the default :meth:`effective_alias_join_keys` for both sides. +Layouts where the two tables carry the data-key column under different +physical names can either rename both to ``"data_key"`` via per-table +``column_name_mapping`` or override + + +#### channel\_alias\_col + +```python +def channel_alias_col() -> str +``` + +Internal column name for the alias identifier on the channel_mapping table. + +Referenced by the dedup window in + + +#### channel\_name\_col + +```python +def channel_name_col() -> str +``` + +Internal column name for the channel-name identifier on the channel_metrics table. + + #### project\_id\_col ```python @@ -244,6 +330,23 @@ def group_id_col() -> str Internal column name for the unit group id on the unit_conversion table. +#### effective\_alias\_join\_keys + +```python +def effective_alias_join_keys() -> list[tuple[str, str]] +``` + +Return the resolved alias-resolution join keys as ``(mapping_col, metrics_col)`` tuples. + +Falls back to the default composite key +``[(source_channel_col, channel_name_col), (data_key_col, data_key_col)]`` +when :attr:`ChannelMappingConfig.join_keys` is ``None``. Otherwise +returns the configured list. + +Both members of each tuple are column names **after** +``column_name_mapping`` has been applied on the respective table. + + #### col\_map ```python diff --git a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py index 88b848c..b8d9898 100644 --- a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py @@ -403,12 +403,18 @@ def filter_aliased_channel_metrics( resolved_mapping = channel_mapping.where(self._build_expr(selectors)) - channel_metrics = db.channel_metrics(spark).join( + channel_metrics = db.channel_metrics(spark) + channel_metrics = self._apply_column_mapping( + channel_metrics, self.config.channel_metrics.column_name_mapping + ) + channel_metrics = channel_metrics.join( F.broadcast(container_df.select(container_id_col)), on=[container_id_col], how="inner", ) alias_priority_col = self.config.alias_priority_col + channel_alias_col = self.config.channel_alias_col + join_keys = self.config.effective_alias_join_keys source_unit_col = self.config.source_unit_col target_unit_col = self.config.target_unit_col @@ -418,25 +424,25 @@ def filter_aliased_channel_metrics( and target_unit_col in resolved_mapping.columns ) + # Mapping-side projection: one aliased copy per mapping_col plus the + # alias / priority columns (and the optional unit columns). mapping_select_cols = [ - F.col("source_channel").alias("_map_source_channel"), - F.col("data_key").alias("_map_data_key"), - F.col("channel_alias"), - F.col(alias_priority_col), + F.col(mapping_col).alias(f"_map_{mapping_col}") for mapping_col, _ in join_keys ] + mapping_select_cols.extend([F.col(channel_alias_col), F.col(alias_priority_col)]) if has_unit_cols: mapping_select_cols.extend([F.col(source_unit_col), F.col(target_unit_col)]) resolved = channel_metrics.join( resolved_mapping.select(*mapping_select_cols), on=[ - channel_metrics["channel_name"] == F.col("_map_source_channel"), - channel_metrics["data_key"] == F.col("_map_data_key"), + channel_metrics[metrics_col] == F.col(f"_map_{mapping_col}") + for mapping_col, metrics_col in join_keys ], how="inner", ) - dedup_window = Window.partitionBy(container_id_col, "channel_alias").orderBy( + dedup_window = Window.partitionBy(container_id_col, channel_alias_col).orderBy( F.col(alias_priority_col).asc_nulls_last() ) resolved = resolved.withColumn("_rank", F.row_number().over(dedup_window)) diff --git a/src/impulse_query_engine/analyze/query/solvers/solver_config.py b/src/impulse_query_engine/analyze/query/solvers/solver_config.py index 8c27add..4fa9c9b 100644 --- a/src/impulse_query_engine/analyze/query/solvers/solver_config.py +++ b/src/impulse_query_engine/analyze/query/solvers/solver_config.py @@ -16,7 +16,7 @@ import json -from pydantic import BaseModel +from pydantic import BaseModel, field_validator class TableConfig(BaseModel): @@ -38,6 +38,49 @@ class TableConfig(BaseModel): filters: dict[str, str] = {} +class JoinKey(BaseModel): + """A single column pair in the ``channel_mapping`` → ``channel_metrics`` join. + + Used by :class:`ChannelMappingConfig.join_keys` to override the default + alias-resolution composite key. + + Both fields reference column names **after** ``column_name_mapping`` has + been applied on the respective table; the two sides are independent, so + a column may appear under different names on the two tables. + + Attributes + ---------- + mapping_col : str + Column name on ``channel_mapping`` after its ``column_name_mapping`` + has been applied. + metrics_col : str + Column name on ``channel_metrics`` after its ``column_name_mapping`` + has been applied. + """ + + mapping_col: str + metrics_col: str + + +class ChannelMappingConfig(TableConfig): + """``TableConfig`` plus an optional alias-resolution join-key spec. + + Attributes + ---------- + join_keys : list[JoinKey] or None + Custom composite key for the ``channel_mapping`` → ``channel_metrics`` + join performed by ``KeyValueStoreSolver.filter_aliased_channel_metrics``. + When ``None`` (the default), the solver uses the backward-compatible + pair ``[(source_channel, channel_name), (data_key, data_key)]`` + sourced from :class:`SolverConfig` internal-name properties. + Provide a custom list to change the join arity or column choice + (e.g. a single-column join when ``data_key`` is not part of the + channel identity in your silver layout). + """ + + join_keys: list[JoinKey] | None = None + + class SolverConfig(BaseModel): """Per-table configuration for solver column name mappings and filters. @@ -60,8 +103,9 @@ class SolverConfig(BaseModel): Column mappings and filters for the channel tags table. channel_metrics : TableConfig Column mappings and filters for the channel metrics table. - channel_mapping : TableConfig - Column mappings and filters for the channel mapping (alias) table. + channel_mapping : ChannelMappingConfig + Column mappings, filters, and the alias-resolution ``join_keys`` + override for the channel mapping (alias) table. channels : TableConfig Column mappings and filters for the channel data table. unit_conversion : TableConfig @@ -74,10 +118,28 @@ class SolverConfig(BaseModel): container_metrics: TableConfig = TableConfig() channel_tags: TableConfig = TableConfig() channel_metrics: TableConfig = TableConfig() - channel_mapping: TableConfig = TableConfig() + channel_mapping: ChannelMappingConfig = ChannelMappingConfig() channels: TableConfig = TableConfig() unit_conversion: TableConfig = TableConfig() + @field_validator("channel_mapping", mode="before") + @classmethod + def _coerce_channel_mapping(cls, v): + """Accept a plain ``TableConfig`` instance and coerce to ``ChannelMappingConfig``. + + Lets callers pass ``TableConfig(filters=...)`` (or any subclass) for + backward compatibility with code written before ``join_keys`` existed. + ``ChannelMappingConfig`` instances pass through unchanged; dicts are + validated by pydantic in the usual way. + """ + if isinstance(v, ChannelMappingConfig): + return v + if isinstance(v, TableConfig): + return ChannelMappingConfig( + column_name_mapping=v.column_name_mapping, filters=v.filters + ) + return v + # ------------------------------------------------------------------ # Class methods # ------------------------------------------------------------------ @@ -169,6 +231,44 @@ def alias_priority_col(self) -> str: """Internal column name for the alias priority on the channel_mapping table.""" return "priority" + @property + def source_channel_col(self) -> str: + """Internal column name for the source-channel identifier on the channel_mapping table.""" + return "source_channel" + + @property + def data_key_col(self) -> str: + """Internal column name for the data-key identifier. + + Default present on both ``channel_mapping`` and ``channel_metrics``; + used by the default :meth:`effective_alias_join_keys` for both sides. + Layouts where the two tables carry the data-key column under different + physical names can either rename both to ``"data_key"`` via per-table + ``column_name_mapping`` or override + :attr:`ChannelMappingConfig.join_keys` with explicit + ``mapping_col`` / ``metrics_col`` values. + """ + return "data_key" + + @property + def channel_alias_col(self) -> str: + """Internal column name for the alias identifier on the channel_mapping table. + + Referenced by the dedup window in + :meth:`KeyValueStoreSolver.filter_aliased_channel_metrics` and is the + conventional kwarg name passed to + :meth:`QueryBuilder.channel_with_alias` (e.g. + ``channel_with_alias(channel_alias="vehicle_speed")``). The kwarg name + must match the column name as seen by the solver after + ``column_name_mapping`` is applied. + """ + return "channel_alias" + + @property + def channel_name_col(self) -> str: + """Internal column name for the channel-name identifier on the channel_metrics table.""" + return "channel_name" + @property def project_id_col(self) -> str: """Internal column name for the project identifier.""" @@ -209,6 +309,27 @@ def group_id_col(self) -> str: """Internal column name for the unit group id on the unit_conversion table.""" return "group_id" + @property + def effective_alias_join_keys(self) -> list[tuple[str, str]]: + """Return the resolved alias-resolution join keys as ``(mapping_col, metrics_col)`` tuples. + + Falls back to the default composite key + ``[(source_channel_col, channel_name_col), (data_key_col, data_key_col)]`` + when :attr:`ChannelMappingConfig.join_keys` is ``None``. Otherwise + returns the configured list. + + Both members of each tuple are column names **after** + ``column_name_mapping`` has been applied on the respective table. + """ + if self.channel_mapping.join_keys is None: + return [ + (self.source_channel_col, self.channel_name_col), + (self.data_key_col, self.data_key_col), + ] + return [ + (jk.mapping_col, jk.metrics_col) for jk in self.channel_mapping.join_keys + ] + @property def col_map(self) -> dict[str, str]: """Short-key → internal-column-name mapping for UDFs and caches.""" diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py index 02c3798..7c61c19 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py @@ -11,6 +11,8 @@ KeyValueStoreSolver, ) from impulse_query_engine.analyze.query.solvers.solver_config import ( + ChannelMappingConfig, + JoinKey, SolverConfig, TableConfig, ) @@ -304,3 +306,140 @@ def test_alias_returns_same_channel_data_as_direct_engine_rpm( def test_channel_with_alias_without_mapping_raises(self, key_value_store_db: MeasurementDB): with pytest.raises(ValueError, match="channel_mapping_table is not configured"): key_value_store_db.query.channel_with_alias(channel_alias="engine_speed") + + +class TestConfigurableJoinKeys: + """Behavior of the configurable ``channel_mapping.join_keys`` override.""" + + def test_single_column_join_key( + self, spark: SparkSession, key_value_store_alias_db: MeasurementDB + ): + # Single-column join on source_channel == channel_name only; data_key + # is intentionally dropped from the join. The alias resolution still + # works and the (container_id, channel_alias) dedup keeps results + # unique. + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_mapping=ChannelMappingConfig( + filters={"toolbox_id": "container_concept"}, + join_keys=[ + JoinKey(mapping_col="source_channel", metrics_col="channel_name"), + ], + ), + ), + ) + query = key_value_store_alias_db.query + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias( + "engine_speed" + ) + + pdf = query.select(engine_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + + assert pdf["container_id"].tolist() == [1, 2, 3] + assert all(length > 0 for length in pdf["engine_speed"].map(len)) + + def test_different_data_key_names_per_side_via_rename( + self, spark: SparkSession, key_value_store_alias_db: MeasurementDB + ): + # Path 1: rename both physical `data_key` columns to a common + # internal name (here we use a non-default name `dk`). Two + # JoinKey entries cover the composite key. + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_metrics=TableConfig(column_name_mapping={"data_key": "dk"}), + channel_mapping=ChannelMappingConfig( + column_name_mapping={"data_key": "dk"}, + filters={"toolbox_id": "container_concept"}, + join_keys=[ + JoinKey(mapping_col="source_channel", metrics_col="channel_name"), + JoinKey(mapping_col="dk", metrics_col="dk"), + ], + ), + ), + ) + query = key_value_store_alias_db.query + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias( + "engine_speed" + ) + + pdf = query.select(engine_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + assert pdf["container_id"].tolist() == [1, 2, 3] + assert all(length > 0 for length in pdf["engine_speed"].map(len)) + + def test_different_data_key_names_per_side_via_join_keys( + self, spark: SparkSession, key_value_store_alias_db: MeasurementDB + ): + # Path 2: rename the two physical `data_key` columns to *different* + # internal names per table and reference them directly in + # join_keys. No common-name rename — the JoinKey's two sides are + # independent. + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_metrics=TableConfig(column_name_mapping={"data_key": "metrics_dk"}), + channel_mapping=ChannelMappingConfig( + column_name_mapping={"data_key": "map_dk"}, + filters={"toolbox_id": "container_concept"}, + join_keys=[ + JoinKey(mapping_col="source_channel", metrics_col="channel_name"), + JoinKey(mapping_col="map_dk", metrics_col="metrics_dk"), + ], + ), + ), + ) + query = key_value_store_alias_db.query + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias( + "engine_speed" + ) + + pdf = query.select(engine_speed).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + assert pdf["container_id"].tolist() == [1, 2, 3] + assert all(length > 0 for length in pdf["engine_speed"].map(len)) + + def test_tag_kwarg_must_match_post_rename_name( + self, spark: SparkSession, key_value_store_alias_db: MeasurementDB + ): + # When channel_metrics.channel_name is renamed via column_name_mapping + # to a non-default internal name, the direct selector's kwarg must use + # the renamed name AND the override `join_keys` must reference it. + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_metrics=TableConfig( + column_name_mapping={"channel_name": "chan"} + ), + channel_mapping=ChannelMappingConfig( + filters={"toolbox_id": "container_concept"}, + join_keys=[ + JoinKey(mapping_col="source_channel", metrics_col="chan"), + JoinKey(mapping_col="data_key", metrics_col="data_key"), + ], + ), + ), + ) + query = key_value_store_alias_db.query + # Direct selector — kwarg `chan` must match the renamed column name. + engine_rpm = query.channel(chan="Engine RPM", data_key="TM").alias( + "engine_rpm" + ) + + pdf = query.select(engine_rpm).toPandas(spark, solver=solver) + pdf = pdf.sort_values("container_id").reset_index(drop=True) + # Direct selector — containers with no matching channel drop out. + # Containers 1 and 2 have "Engine RPM"/data_key="TM"; container 3 + # only carries it under "EngSpd"/"ProjSpecREC_10Hz" (no match). + assert sorted(pdf["container_id"].tolist()) == [1, 2] + assert all(length > 0 for length in pdf["engine_rpm"].map(len)) diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py index 5cf3705..c74f7d4 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py @@ -15,6 +15,8 @@ import pytest from impulse_query_engine.analyze.query.solvers.solver_config import ( + ChannelMappingConfig, + JoinKey, SolverConfig, TableConfig, ) @@ -163,3 +165,103 @@ def test_col_map_consistent_with_properties(self, cfg: SolverConfig): def test_col_map_values(self, cfg: SolverConfig): assert cfg.col_map == _EXPECTED_COL_MAP + + +# --------------------------------------------------------------------------- +# TestAliasInternalNameProperties – channel mapping / metrics internal names +# --------------------------------------------------------------------------- + + +class TestAliasInternalNameProperties: + """Internal-name properties for the alias-resolution columns.""" + + def test_source_channel_col(self): + assert SolverConfig().source_channel_col == "source_channel" + + def test_data_key_col(self): + assert SolverConfig().data_key_col == "data_key" + + def test_channel_alias_col(self): + assert SolverConfig().channel_alias_col == "channel_alias" + + def test_channel_name_col(self): + assert SolverConfig().channel_name_col == "channel_name" + + +# --------------------------------------------------------------------------- +# TestEffectiveAliasJoinKeys – default + override behavior +# --------------------------------------------------------------------------- + + +class TestEffectiveAliasJoinKeys: + def test_default_when_join_keys_none(self): + cfg = SolverConfig() + assert cfg.channel_mapping.join_keys is None + assert cfg.effective_alias_join_keys == [ + ("source_channel", "channel_name"), + ("data_key", "data_key"), + ] + + def test_single_column_override(self): + cfg = SolverConfig( + channel_mapping=ChannelMappingConfig( + join_keys=[JoinKey(mapping_col="source_channel", metrics_col="channel_name")] + ) + ) + assert cfg.effective_alias_join_keys == [("source_channel", "channel_name")] + + def test_different_names_per_side(self): + cfg = SolverConfig( + channel_mapping=ChannelMappingConfig( + join_keys=[ + JoinKey(mapping_col="source_channel", metrics_col="channel_name"), + JoinKey(mapping_col="map_dk", metrics_col="metrics_dk"), + ] + ) + ) + assert cfg.effective_alias_join_keys == [ + ("source_channel", "channel_name"), + ("map_dk", "metrics_dk"), + ] + + +# --------------------------------------------------------------------------- +# TestChannelMappingConfigCoercion – field validator + JSON round-trip +# --------------------------------------------------------------------------- + + +class TestChannelMappingConfigCoercion: + def test_accepts_plain_table_config(self): + cfg = SolverConfig( + channel_mapping=TableConfig(filters={"toolbox_id": "tb"}) + ) + # Coerced to ChannelMappingConfig with join_keys=None. + assert isinstance(cfg.channel_mapping, ChannelMappingConfig) + assert cfg.channel_mapping.filters == {"toolbox_id": "tb"} + assert cfg.channel_mapping.join_keys is None + + def test_accepts_channel_mapping_config_instance(self): + cm = ChannelMappingConfig( + filters={"toolbox_id": "tb"}, + join_keys=[JoinKey(mapping_col="source_channel", metrics_col="channel_name")], + ) + cfg = SolverConfig(channel_mapping=cm) + assert cfg.channel_mapping is cm + + def test_json_round_trip_with_join_keys(self): + raw = { + "channel_mapping": { + "column_name_mapping": {"alias": "channel_alias"}, + "filters": {"toolbox_id": "tb"}, + "join_keys": [ + {"mapping_col": "source_channel", "metrics_col": "channel_name"} + ], + } + } + cfg = SolverConfig.from_dict(raw) + assert isinstance(cfg.channel_mapping, ChannelMappingConfig) + assert cfg.channel_mapping.column_name_mapping == {"alias": "channel_alias"} + assert cfg.channel_mapping.filters == {"toolbox_id": "tb"} + assert cfg.channel_mapping.join_keys == [ + JoinKey(mapping_col="source_channel", metrics_col="channel_name") + ] From 211667b4a60b87846e6d1082f90676a0fec5ff3c Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Thu, 21 May 2026 16:50:31 +0200 Subject: [PATCH 05/10] Refactor channel mapping configuration in SolverConfig - Removed the field validator for channel mapping coercion, simplifying the handling of `ChannelMappingConfig`. - Updated tests to ensure compatibility with the new configuration, replacing instances of `TableConfig` with `ChannelMappingConfig` where applicable. - Enhanced unit tests to validate the behavior of the updated channel mapping configuration and its integration with the `KeyValueStoreSolver`. --- .../analyze/query/solvers/solver_config.py | 24 +---------- .../integration/kvs_solver_test.py | 7 ++-- .../solvers/key_value_store_alias_test.py | 42 ++++++++----------- .../key_value_store_unit_conversion_test.py | 3 +- .../query/solvers/solver_config_test.py | 17 ++------ 5 files changed, 28 insertions(+), 65 deletions(-) diff --git a/src/impulse_query_engine/analyze/query/solvers/solver_config.py b/src/impulse_query_engine/analyze/query/solvers/solver_config.py index 4fa9c9b..a743e4c 100644 --- a/src/impulse_query_engine/analyze/query/solvers/solver_config.py +++ b/src/impulse_query_engine/analyze/query/solvers/solver_config.py @@ -16,7 +16,7 @@ import json -from pydantic import BaseModel, field_validator +from pydantic import BaseModel class TableConfig(BaseModel): @@ -122,24 +122,6 @@ class SolverConfig(BaseModel): channels: TableConfig = TableConfig() unit_conversion: TableConfig = TableConfig() - @field_validator("channel_mapping", mode="before") - @classmethod - def _coerce_channel_mapping(cls, v): - """Accept a plain ``TableConfig`` instance and coerce to ``ChannelMappingConfig``. - - Lets callers pass ``TableConfig(filters=...)`` (or any subclass) for - backward compatibility with code written before ``join_keys`` existed. - ``ChannelMappingConfig`` instances pass through unchanged; dicts are - validated by pydantic in the usual way. - """ - if isinstance(v, ChannelMappingConfig): - return v - if isinstance(v, TableConfig): - return ChannelMappingConfig( - column_name_mapping=v.column_name_mapping, filters=v.filters - ) - return v - # ------------------------------------------------------------------ # Class methods # ------------------------------------------------------------------ @@ -326,9 +308,7 @@ def effective_alias_join_keys(self) -> list[tuple[str, str]]: (self.source_channel_col, self.channel_name_col), (self.data_key_col, self.data_key_col), ] - return [ - (jk.mapping_col, jk.metrics_col) for jk in self.channel_mapping.join_keys - ] + return [(jk.mapping_col, jk.metrics_col) for jk in self.channel_mapping.join_keys] @property def col_map(self) -> dict[str, str]: diff --git a/tests/impulse_query_engine/integration/kvs_solver_test.py b/tests/impulse_query_engine/integration/kvs_solver_test.py index 6fca0d2..c79825e 100644 --- a/tests/impulse_query_engine/integration/kvs_solver_test.py +++ b/tests/impulse_query_engine/integration/kvs_solver_test.py @@ -19,6 +19,7 @@ KeyValueStoreSolver, ) from impulse_query_engine.analyze.query.solvers.solver_config import ( + ChannelMappingConfig, SolverConfig, TableConfig, ) @@ -29,7 +30,7 @@ def _kvs_cfg( project_id: str = "SAMPLE_PROJECT", container_tags: TableConfig | None = None, container_metrics: TableConfig | None = None, - channel_mapping: TableConfig | None = None, + channel_mapping: ChannelMappingConfig | None = None, ) -> SolverConfig: """Build a SolverConfig wired up for the KVS test data. @@ -42,7 +43,7 @@ def _kvs_cfg( container_tags=container_tags or TableConfig(column_name_mapping={"element_id": "key"}), container_metrics=container_metrics or TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=channel_mapping or TableConfig(), + channel_mapping=channel_mapping or ChannelMappingConfig(), ) @@ -222,7 +223,7 @@ def test_solve_with_aliased_channel( solver = KeyValueStoreSolver( spark, config=_kvs_cfg( - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py index 7c61c19..e229b4f 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_alias_test.py @@ -33,7 +33,7 @@ def test_no_aliased_selections_returns_empty( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -54,7 +54,7 @@ def test_alias_resolves_to_correct_channels( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -82,7 +82,7 @@ def test_alias_scoped_by_project_id( config=SolverConfig( project_id="NON_EXISTENT_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -102,7 +102,9 @@ def test_alias_scoped_by_toolbox_id( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "non_existent_toolbox"}), + channel_mapping=ChannelMappingConfig( + filters={"toolbox_id": "non_existent_toolbox"} + ), ), ) query = key_value_store_alias_db.query @@ -122,7 +124,7 @@ def test_selector_id_consistent_for_same_expression( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -142,7 +144,7 @@ def test_multiple_aliases(self, spark: SparkSession, key_value_store_alias_db: M config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -167,7 +169,7 @@ def test_solve_with_alias_only( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -187,7 +189,7 @@ def test_solve_with_mixed_direct_and_alias( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -211,7 +213,7 @@ def test_solve_deduplication( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -243,7 +245,7 @@ def test_alias_returns_same_channel_data_as_direct_engine_rpm( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) query = key_value_store_alias_db.query @@ -332,9 +334,7 @@ def test_single_column_join_key( ), ) query = key_value_store_alias_db.query - engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias( - "engine_speed" - ) + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed") pdf = query.select(engine_speed).toPandas(spark, solver=solver) pdf = pdf.sort_values("container_id").reset_index(drop=True) @@ -365,9 +365,7 @@ def test_different_data_key_names_per_side_via_rename( ), ) query = key_value_store_alias_db.query - engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias( - "engine_speed" - ) + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed") pdf = query.select(engine_speed).toPandas(spark, solver=solver) pdf = pdf.sort_values("container_id").reset_index(drop=True) @@ -398,9 +396,7 @@ def test_different_data_key_names_per_side_via_join_keys( ), ) query = key_value_store_alias_db.query - engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias( - "engine_speed" - ) + engine_speed = query.channel_with_alias(channel_alias="engine_speed").alias("engine_speed") pdf = query.select(engine_speed).toPandas(spark, solver=solver) pdf = pdf.sort_values("container_id").reset_index(drop=True) @@ -418,9 +414,7 @@ def test_tag_kwarg_must_match_post_rename_name( config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_metrics=TableConfig( - column_name_mapping={"channel_name": "chan"} - ), + channel_metrics=TableConfig(column_name_mapping={"channel_name": "chan"}), channel_mapping=ChannelMappingConfig( filters={"toolbox_id": "container_concept"}, join_keys=[ @@ -432,9 +426,7 @@ def test_tag_kwarg_must_match_post_rename_name( ) query = key_value_store_alias_db.query # Direct selector — kwarg `chan` must match the renamed column name. - engine_rpm = query.channel(chan="Engine RPM", data_key="TM").alias( - "engine_rpm" - ) + engine_rpm = query.channel(chan="Engine RPM", data_key="TM").alias("engine_rpm") pdf = query.select(engine_rpm).toPandas(spark, solver=solver) pdf = pdf.sort_values("container_id").reset_index(drop=True) diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py index 7795ba5..993c3df 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py @@ -11,6 +11,7 @@ KeyValueStoreSolver, ) from impulse_query_engine.analyze.query.solvers.solver_config import ( + ChannelMappingConfig, SolverConfig, TableConfig, ) @@ -23,7 +24,7 @@ def _solver(spark: SparkSession) -> KeyValueStoreSolver: config=SolverConfig( project_id="SAMPLE_PROJECT", container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), - channel_mapping=TableConfig(filters={"toolbox_id": "container_concept"}), + channel_mapping=ChannelMappingConfig(filters={"toolbox_id": "container_concept"}), ), ) diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py index c74f7d4..6bc29ba 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/solver_config_test.py @@ -226,20 +226,11 @@ def test_different_names_per_side(self): # --------------------------------------------------------------------------- -# TestChannelMappingConfigCoercion – field validator + JSON round-trip +# TestChannelMappingConfig – type acceptance + JSON round-trip # --------------------------------------------------------------------------- -class TestChannelMappingConfigCoercion: - def test_accepts_plain_table_config(self): - cfg = SolverConfig( - channel_mapping=TableConfig(filters={"toolbox_id": "tb"}) - ) - # Coerced to ChannelMappingConfig with join_keys=None. - assert isinstance(cfg.channel_mapping, ChannelMappingConfig) - assert cfg.channel_mapping.filters == {"toolbox_id": "tb"} - assert cfg.channel_mapping.join_keys is None - +class TestChannelMappingConfig: def test_accepts_channel_mapping_config_instance(self): cm = ChannelMappingConfig( filters={"toolbox_id": "tb"}, @@ -253,9 +244,7 @@ def test_json_round_trip_with_join_keys(self): "channel_mapping": { "column_name_mapping": {"alias": "channel_alias"}, "filters": {"toolbox_id": "tb"}, - "join_keys": [ - {"mapping_col": "source_channel", "metrics_col": "channel_name"} - ], + "join_keys": [{"mapping_col": "source_channel", "metrics_col": "channel_name"}], } } cfg = SolverConfig.from_dict(raw) From 4311b713f75f4c9e0efd7390670972bb3ea88a67 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Sat, 23 May 2026 16:01:03 +0200 Subject: [PATCH 06/10] Update unit conversion CSV to include base unit indicator - Modified the `unit_conversion.csv` file to add an `is_base` column, indicating whether each unit is a base unit for its group. - Updated entries for speed and rotation units to reflect their base status, enhancing clarity for unit conversion processes. --- .../unit_conversion.csv | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv b/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv index 25964d6..2aa4d36 100644 --- a/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv +++ b/tests/unit/data/key_value_store_unit_conversion_csv/unit_conversion.csv @@ -1,6 +1,6 @@ -group_id,unit,conversion_factor -speed,m/s,1.0 -speed,km/h,0.277778 -speed,mph,0.44704 -rotation,RPM,1.0 -rotation,rad/s,0.10472 +group_id,unit,conversion_factor,is_base +speed,m/s,1.0,true +speed,km/h,0.277778,false +speed,mph,0.44704,false +rotation,RPM,1.0,true +rotation,rad/s,0.10472,false From 96815817242b40953b385b728de5495b5bd5ed37 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Sun, 24 May 2026 15:32:15 +0200 Subject: [PATCH 07/10] Enhance unit conversion handling in KeyValueStoreSolver and documentation - Updated the `KeyValueStoreSolver` to propagate effective `source_unit` and `target_unit` during query resolution, ensuring correct unit conversions based on `channel_metrics` and `channel_mapping`. - Enhanced documentation to clarify the behavior of `source_unit` and `target_unit`, including their precedence and fallback mechanisms. - Added new unit tests to validate the effective unit resolution logic, ensuring accurate conversions and fallback scenarios. - Refactored related code for improved clarity and maintainability. --- .../docs/data_model/silver_layer_schema.md | 12 +- .../query/solvers/key_value_store_solver.md | 14 +- .../analyze/query/solvers/solver_config.md | 17 +- .../query/solvers/key_value_store_solver.py | 43 ++++- .../analyze/query/solvers/solver_config.py | 18 +- tests/conftest.py | 2 +- .../key_value_store_unit_conversion_test.py | 155 +++++++++++++++++- .../channel_metrics.csv | 13 ++ 8 files changed, 261 insertions(+), 13 deletions(-) create mode 100644 tests/unit/data/key_value_store_unit_conversion_csv/channel_metrics.csv diff --git a/docs/impulse/docs/data_model/silver_layer_schema.md b/docs/impulse/docs/data_model/silver_layer_schema.md index 22fe298..c698c29 100644 --- a/docs/impulse/docs/data_model/silver_layer_schema.md +++ b/docs/impulse/docs/data_model/silver_layer_schema.md @@ -183,6 +183,14 @@ pre-filtering before scanning the much larger `channels` table. | `pz90` | `float` | Yes | 90th percentile. | | `pz99` | `float` | Yes | 99th percentile. | +An optional `unit: string` column may also be present. When the report +config sets a `unit_conversion_table` and the solver resolves an aliased +selector, this column is treated as the authoritative source unit of the +physical channel and takes precedence over `channel_mapping.source_unit` +via `COALESCE(channel_metrics.unit, channel_mapping.source_unit)`. The +column is not part of the canonical schema — omit it for layouts that +don't need per-channel physical units. + --- ## channel_tags @@ -260,8 +268,8 @@ channel name to one or more physical channels keyed by `project_id` / | `channel_name` | `string` | No | Logical channel name to match against `channel_with_alias` selectors. | | `data_key` | `string` | No | Physical lookup key joined to `channel_metrics`. | | `priority` | `int` | Yes | Tie-breaker when multiple physical channels match a logical name. | -| `source_unit` | `string` | Yes | Unit of the raw channel data. When non-null together with `target_unit` and a configured `unit_conversion_table`, the solver converts values from source to target unit on aliased reads. | -| `target_unit` | `string` | Yes | Target unit for aliased reads of this mapping. | +| `source_unit` | `string` | Yes | **Fallback** source unit for aliased reads of this mapping. The solver resolves the effective source unit as `COALESCE(channel_metrics.unit, channel_mapping.source_unit)`, so `channel_mapping.source_unit` only takes effect when `channel_metrics.unit` is null or absent. When configured together with `target_unit` and a `unit_conversion_table`, the solver converts values from source to target unit on aliased reads. | +| `target_unit` | `string` | Yes | Target unit for aliased reads of this mapping. Always taken from the mapping (there is no analogous column on `channel_metrics`). | Configured via `source.channel_mapping_table` (see [Configuration](../config/configuration.md)). Joins to `channel_metrics` diff --git a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md index 34afe34..1c77a5a 100644 --- a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md +++ b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md @@ -150,6 +150,16 @@ columns, then applies the top-level ``project_id`` filter and any per-table ``channel_mapping.filters``, and finally joins with channel_metrics to resolve aliases. +When the database is configured with a ``unit_conversion_table`` and +the ``channel_mapping`` table carries ``source_unit`` / ``target_unit`` +columns, this method also propagates the effective unit pair on each +resolved row. The effective ``source_unit`` is computed as +``COALESCE(channel_metrics.unit, channel_mapping.source_unit)`` so +that the authoritative per-channel physical unit on +``channel_metrics`` takes precedence over the mapping-level default +when present. ``target_unit`` is always taken from the mapping — +there is no analogous column on ``channel_metrics``. + **Arguments**: - `spark` (`SparkSession`): Spark session used for query execution. @@ -160,7 +170,9 @@ channel_metrics to resolve aliases. **Returns**: `pyspark.sql.DataFrame`: DataFrame with ``(container_id, channel_id, selector_ids)`` -where ``selector_ids`` is an array column. +where ``selector_ids`` is an array column. When unit conversion +is active (see above), also carries ``source_unit`` and +``target_unit`` columns. #### resolve\_channel\_selections diff --git a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md index e8369c0..f1389a0 100644 --- a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md +++ b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/solver_config.md @@ -318,7 +318,22 @@ Internal column name for the target unit on the channel_mapping table. def unit_col() -> str ``` -Internal column name for the unit name on the unit_conversion table. +Internal column name for the unit identifier. + +Used in two places that happen to share the same default name: + +- On the ``unit_conversion`` table, as the key joined against + ``channel_mapping.source_unit`` / ``target_unit`` to look up a + conversion factor. +- On the ``channel_metrics`` table (optional), as the authoritative + physical unit of a channel. When present, takes precedence over + ``channel_mapping.source_unit`` for aliased reads via the + :meth:`KeyValueStoreSolver.filter_aliased_channel_metrics` + coalesce. + +Users with different internal names per table can rename physical +columns to ``unit`` on each table independently via the per-table +``column_name_mapping``. #### group\_id\_col diff --git a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py index b8d9898..8246e1c 100644 --- a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py @@ -365,6 +365,16 @@ def filter_aliased_channel_metrics( per-table ``channel_mapping.filters``, and finally joins with channel_metrics to resolve aliases. + When the database is configured with a ``unit_conversion_table`` and + the ``channel_mapping`` table carries ``source_unit`` / ``target_unit`` + columns, this method also propagates the effective unit pair on each + resolved row. The effective ``source_unit`` is computed as + ``COALESCE(channel_metrics.unit, channel_mapping.source_unit)`` so + that the authoritative per-channel physical unit on + ``channel_metrics`` takes precedence over the mapping-level default + when present. ``target_unit`` is always taken from the mapping — + there is no analogous column on ``channel_metrics``. + Parameters ---------- spark : SparkSession @@ -380,7 +390,9 @@ def filter_aliased_channel_metrics( ------- pyspark.sql.DataFrame DataFrame with ``(container_id, channel_id, selector_ids)`` - where ``selector_ids`` is an array column. + where ``selector_ids`` is an array column. When unit conversion + is active (see above), also carries ``source_unit`` and + ``target_unit`` columns. """ container_id_col = self.config.container_id_col channel_id_col = self.config.channel_id_col @@ -418,20 +430,29 @@ def filter_aliased_channel_metrics( source_unit_col = self.config.source_unit_col target_unit_col = self.config.target_unit_col + unit_col = self.config.unit_col has_unit_cols = ( db.config.unit_conversion_table is not None and source_unit_col in resolved_mapping.columns and target_unit_col in resolved_mapping.columns ) + metrics_has_unit = unit_col in channel_metrics.columns # Mapping-side projection: one aliased copy per mapping_col plus the - # alias / priority columns (and the optional unit columns). + # alias / priority columns (and the optional unit columns, aliased + # with the ``_map_`` prefix so we can coalesce the source unit with + # ``channel_metrics.unit`` after the join). mapping_select_cols = [ F.col(mapping_col).alias(f"_map_{mapping_col}") for mapping_col, _ in join_keys ] mapping_select_cols.extend([F.col(channel_alias_col), F.col(alias_priority_col)]) if has_unit_cols: - mapping_select_cols.extend([F.col(source_unit_col), F.col(target_unit_col)]) + mapping_select_cols.extend( + [ + F.col(source_unit_col).alias("_map_source_unit"), + F.col(target_unit_col).alias("_map_target_unit"), + ] + ) resolved = channel_metrics.join( resolved_mapping.select(*mapping_select_cols), @@ -442,6 +463,22 @@ def filter_aliased_channel_metrics( how="inner", ) + # Materialize the effective source_unit / target_unit. The source unit + # comes from ``channel_metrics.unit`` when present (authoritative + # physical unit of the channel) and falls back to the mapping + # ``source_unit`` otherwise. The target unit is always taken from + # the mapping — there is no per-channel "target" on + # ``channel_metrics``; the target is a user choice on the alias. + if has_unit_cols: + if metrics_has_unit: + resolved = resolved.withColumn( + source_unit_col, + F.coalesce(channel_metrics[unit_col], F.col("_map_source_unit")), + ) + else: + resolved = resolved.withColumn(source_unit_col, F.col("_map_source_unit")) + resolved = resolved.withColumn(target_unit_col, F.col("_map_target_unit")) + dedup_window = Window.partitionBy(container_id_col, channel_alias_col).orderBy( F.col(alias_priority_col).asc_nulls_last() ) diff --git a/src/impulse_query_engine/analyze/query/solvers/solver_config.py b/src/impulse_query_engine/analyze/query/solvers/solver_config.py index a743e4c..48fc0bf 100644 --- a/src/impulse_query_engine/analyze/query/solvers/solver_config.py +++ b/src/impulse_query_engine/analyze/query/solvers/solver_config.py @@ -283,7 +283,23 @@ def target_unit_col(self) -> str: @property def unit_col(self) -> str: - """Internal column name for the unit name on the unit_conversion table.""" + """Internal column name for the unit identifier. + + Used in two places that happen to share the same default name: + + - On the ``unit_conversion`` table, as the key joined against + ``channel_mapping.source_unit`` / ``target_unit`` to look up a + conversion factor. + - On the ``channel_metrics`` table (optional), as the authoritative + physical unit of a channel. When present, takes precedence over + ``channel_mapping.source_unit`` for aliased reads via the + :meth:`KeyValueStoreSolver.filter_aliased_channel_metrics` + coalesce. + + Users with different internal names per table can rename physical + columns to ``unit`` on each table independently via the per-table + ``column_name_mapping``. + """ return "unit" @property diff --git a/tests/conftest.py b/tests/conftest.py index e91a383..431fb42 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -351,7 +351,7 @@ def unit_conversion_dataframes(spark): container_tags_path = f"{base_path}/tests/unit/data/key_value_store_csv/container_metrics.csv" container_metric_path = f"{base_path}/tests/unit/data/basic_narrow_csv/container_metrics.csv" channel_metric_path = ( - f"{base_path}/tests/unit/data/key_value_store_alias_csv/channel_metrics.csv" + f"{base_path}/tests/unit/data/key_value_store_unit_conversion_csv/channel_metrics.csv" ) channels_path = f"{base_path}/tests/unit/data/basic_narrow_csv/channel_data.csv" channel_mapping_path = ( diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py index 993c3df..272e191 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py @@ -61,16 +61,21 @@ def test_solve_with_unit_conversion( assert pdf["container_id"].tolist() == [1, 2, 3] factor = 0.277778 - # Containers 1 and 2 resolve vehicle_speed -> "Vehicle Speed Sensor" (channel 7). + # Containers 1 and 2 resolve vehicle_speed -> "Vehicle Speed Sensor" (channel 7); + # channel_metrics.unit == "km/h" matches channel_mapping.source_unit, so the + # coalesce yields "km/h" and values scale by ~0.277778 to reach m/s. for cid in (1, 2): expected = _expected_raw_values(channels_csv_path, cid, 7) * factor row = pdf.loc[pdf["container_id"] == cid].iloc[0] np.testing.assert_allclose(row.vehicle_speed.values, expected, rtol=1e-6) - # Container 3 resolves to channel 7 via Spd_Vhcl / ProjSpecREC_10Hz. - expected3 = _expected_raw_values(channels_csv_path, 3, 7) * factor + # Container 3 resolves to channel 7 via Spd_Vhcl / ProjSpecREC_10Hz. Its + # channel_metrics.unit is "m/s" (overrides channel_mapping.source_unit="km/h" + # via COALESCE), and target_unit is also "m/s", so the conversion factor is + # 1.0 and values are unchanged from raw. + expected3 = _expected_raw_values(channels_csv_path, 3, 7) row3 = pdf.loc[pdf["container_id"] == 3].iloc[0] - np.testing.assert_allclose(row3.vehicle_speed.values, expected3, rtol=1e-6) + np.testing.assert_allclose(row3.vehicle_speed.values, expected3, rtol=1e-12) def test_solve_no_conversion_when_same_unit( self, @@ -221,6 +226,148 @@ def test_solve_cross_family_units_leave_values_unchanged( np.testing.assert_allclose(row.cross.values, expected, rtol=1e-12) +class TestSourceUnitResolution: + """Effective source_unit = COALESCE(channel_metrics.unit, channel_mapping.source_unit).""" + + def test_source_unit_from_channel_metrics_overrides_mapping( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Container 3's Spd_Vhcl row has channel_metrics.unit = "m/s" while the + # mapping's source_unit is "km/h". Coalesce yields "m/s"; mapping's + # target_unit is also "m/s"; effective factor = 1.0 (no scaling). + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + row3 = pdf.loc[pdf["container_id"] == 3].iloc[0] + expected = _expected_raw_values(channels_csv_path, 3, 7) + np.testing.assert_allclose(row3.vehicle_speed.values, expected, rtol=1e-12) + + def test_source_unit_falls_back_to_mapping_when_metrics_unit_null( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Containers 1 and 2 have channel_metrics.unit = "km/h" (which equals + # the mapping's source_unit, so they coalesce identically). To + # exercise the null-fallback specifically, construct a custom + # channel_metrics where the unit cell is null for the row of interest + # — the coalesce must then return the mapping's source_unit. + from pyspark.sql import functions as F # noqa: PLR0402 local import + + # Replace the unit cell on (cid=1, ch=7) with null. + cm = key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] + cm_null = cm.withColumn( + "unit", + F.when( + (F.col("container_id") == 1) & (F.col("channel_id") == 7), + F.lit(None).cast("string"), + ).otherwise(F.col("unit")), + ) + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm_null + + try: + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + + # Container 1: unit null → fall back to mapping source_unit="km/h" + # → factor 0.277778. + expected = _expected_raw_values(channels_csv_path, 1, 7) * 0.277778 + row1 = pdf.loc[pdf["container_id"] == 1].iloc[0] + np.testing.assert_allclose(row1.vehicle_speed.values, expected, rtol=1e-6) + finally: + # Restore the fixture so subsequent tests in this session see + # the original DataFrame. + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm + + def test_source_unit_falls_back_when_channel_metrics_lacks_unit_column( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Drop the `unit` column from channel_metrics entirely. The solver + # detects its absence (metrics_has_unit = False) and falls back to + # the mapping's source_unit directly. + cm = key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] + cm_no_unit = cm.drop("unit") + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm_no_unit + + try: + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + + # All three containers: no unit column → mapping source_unit + # "km/h" wins → factor 0.277778. + for cid in (1, 2, 3): + expected = _expected_raw_values(channels_csv_path, cid, 7) * 0.277778 + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed.values, expected, rtol=1e-6) + finally: + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm + + def test_channel_metrics_unit_col_is_configurable( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Rename the physical `unit` column to `phys_unit` on channel_metrics, + # then point the solver at it via channel_metrics.column_name_mapping. + # The configurable unit_col property (default "unit") is what the + # solver references; rename brings the physical name to the internal + # name. + cm = key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] + cm_renamed = cm.withColumnRenamed("unit", "phys_unit") + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm_renamed + + try: + solver = KeyValueStoreSolver( + spark, + config=SolverConfig( + project_id="SAMPLE_PROJECT", + container_metrics=TableConfig(column_name_mapping={"project": "project_id"}), + channel_metrics=TableConfig(column_name_mapping={"phys_unit": "unit"}), + channel_mapping=ChannelMappingConfig( + filters={"toolbox_id": "container_concept"} + ), + ), + ) + query = key_value_store_unit_conversion_db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + pdf = query.select(vehicle_speed).toPandas(spark, solver=solver) + + # Renamed column carries through: container 3 still resolves to + # m/s (no scaling); containers 1/2 still scale by 0.277778. + expected3 = _expected_raw_values(channels_csv_path, 3, 7) + row3 = pdf.loc[pdf["container_id"] == 3].iloc[0] + np.testing.assert_allclose(row3.vehicle_speed.values, expected3, rtol=1e-12) + + for cid in (1, 2): + expected = _expected_raw_values(channels_csv_path, cid, 7) * 0.277778 + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.vehicle_speed.values, expected, rtol=1e-6) + finally: + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm + + class TestComputeConversionFactors: def test_factor_one_for_identical_units( self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB diff --git a/tests/unit/data/key_value_store_unit_conversion_csv/channel_metrics.csv b/tests/unit/data/key_value_store_unit_conversion_csv/channel_metrics.csv new file mode 100644 index 0000000..0e1d966 --- /dev/null +++ b/tests/unit/data/key_value_store_unit_conversion_csv/channel_metrics.csv @@ -0,0 +1,13 @@ +container_id,channel_id,channel_name,data_key,group_idx,channel_idx,unit,sample_count,min,max,mean,begin_ms,end_ms,duration_ms,sample_rate,value_type +1,6,Ambient Air Temperature,TM,2,2,C,59625,13,23,15.904821802935011,1499929242072000,1499934640063000,5397991000,11.045776104480352,DOUBLE +1,9,Intake Air Temperature,TM,4,1,C,59625,-8,150,29.0571572327044,1499929242072000,1499934640063000,5397991000,11.045776104480352,DOUBLE +3,8,Intake Air Temperature,TM,4,1,C,46340,-7,146,29.205761760897712,1499238991257000,1499242935793999,3944536999,11.747893355227216,DOUBLE +2,7,Vehicle Speed Sensor,TM,3,1,km/h,57240,0,182,75.85866526904263,1499367269349000,1499372240481000,4971132000,11.514480001738036,DOUBLE +2,5,Engine RPM,TM,2,1,RPM,57240,0,3072,1595.6337875611462,1499367269349000,1499372240481000,4971132000,11.514480001738036,DOUBLE +3,7,Spd_Vhcl,ProjSpecREC_10Hz,3,1,m/s,46340,0,126,49.57703927492447,1499238991257000,1499242935793999,3944536999,11.747893355227216,DOUBLE +3,2,Ambient Air Temperature,TM,0,2,C,46340,16,23,19.51277514026759,1499238991257000,1499242935793999,3944536999,11.747893355227216,DOUBLE +2,9,Intake Air Temperature,TM,4,1,C,57240,-8,141,36.545946890286515,1499367269349000,1499372240481000,4971132000,11.514480001738036,DOUBLE +3,5,EngSpd,ProjSpecREC_10Hz,2,1,RPM,46340,0,2689,1378.3660552438498,1499238991257000,1499242935793999,3944536999,11.747893355227216,DOUBLE +2,6,Ambient Air Temperature,TM,2,2,C,57240,21,33,28.793081761006288,1499367269349000,1499372240481000,4971132000,11.514480001738036,DOUBLE +1,7,Vehicle Speed Sensor,TM,3,1,km/h,59625,0,217,68.22906498951782,1499929242072000,1499934640063000,5397991000,11.045776104480352,DOUBLE +1,5,Engine RPM,TM,2,1,RPM,59625,0,3658,1490.707790356394,1499929242072000,1499934640063000,5397991000,11.045776104480352,DOUBLE From 1df609b9261cc7613655e3ce1dbb151a16eb0a01 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Sun, 24 May 2026 16:06:34 +0200 Subject: [PATCH 08/10] Enhance unit conversion validation in KeyValueStoreSolver - Updated the `KeyValueStoreSolver` to validate that each `(container_id, channel_id)` pair has at most one distinct `source_unit` and one distinct `target_unit`, preventing silent mis-conversions. - Added error handling to raise a `ValueError` when conflicting unit conversions are detected for aliased selectors, providing detailed information about the conflicts. - Enhanced documentation to clarify the constraints on unit conversions and the behavior of the solver when encountering conflicting aliases. - Introduced new unit tests to validate the conflict detection logic and ensure correct error handling for unit conversion scenarios. --- .../docs/data_model/silver_layer_schema.md | 12 ++ .../query/solvers/key_value_store_solver.md | 13 ++ .../query/solvers/key_value_store_solver.py | 70 ++++++++- .../key_value_store_unit_conversion_test.py | 139 ++++++++++++++++++ 4 files changed, 230 insertions(+), 4 deletions(-) diff --git a/docs/impulse/docs/data_model/silver_layer_schema.md b/docs/impulse/docs/data_model/silver_layer_schema.md index c698c29..a561181 100644 --- a/docs/impulse/docs/data_model/silver_layer_schema.md +++ b/docs/impulse/docs/data_model/silver_layer_schema.md @@ -275,6 +275,18 @@ Configured via `source.channel_mapping_table` (see [Configuration](../config/configuration.md)). Joins to `channel_metrics` on `(project_id, data_key, channel_name)`. +**Per-channel unit conversion is single-target per query.** Storing two +distinct aliases that resolve to the same physical channel (same +`(source_channel, data_key)` → same `channel_metrics.channel_id`) with +different `target_unit` (or different `source_unit`) values is allowed at +the table level. The constraint only applies at query time: if a single +query selects **both** such aliases via `channel_with_alias()`, the solver +raises `ValueError`. The current per-channel factor model attaches one +conversion factor per physical channel and cannot apply two distinct +conversions to the same channel in the same query. Workarounds: select +the conflicting aliases in **separate queries**, or align the mapping rows +so they agree on the unit pair per physical channel. + --- ## unit_conversion (optional) diff --git a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md index 1c77a5a..3a7b9cd 100644 --- a/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md +++ b/docs/impulse/docs/references/api/impulse_query_engine/analyze/query/solvers/key_value_store_solver.md @@ -190,12 +190,25 @@ through the union and aggregation. Direct selectors produce null unit columns, which causes the downstream conversion-factor join in :meth:`solve` to leave their values unchanged. +Validates that each ``(container_id, channel_id)`` carries at most +one distinct ``source_unit`` and one distinct ``target_unit``. Per +physical channel the unit-conversion model can attach only one +factor; conflicting aliases would otherwise pick an arbitrary +target and silently mis-convert one of them. + **Arguments**: - `spark` (`SparkSession`): Spark session used for query execution. - `channel_metrics_df` (`pyspark.sql.DataFrame`): Direct channel metrics with ``selector_ids`` array column. - `aliased_channel_metrics_df` (`pyspark.sql.DataFrame`): Aliased channel metrics with ``selector_ids`` array column. +**Raises**: + +- `ValueError`: If two or more aliased selectors resolve to the same physical +channel with conflicting ``source_unit`` or ``target_unit`` +values. Up to three offending channels are listed in the +message. + **Returns**: `pyspark.sql.DataFrame`: Merged DataFrame with ``(container_id, channel_id, selector_ids)`` diff --git a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py index 8246e1c..bcfc40d 100644 --- a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py @@ -506,6 +506,12 @@ def resolve_channel_selections( unit columns, which causes the downstream conversion-factor join in :meth:`solve` to leave their values unchanged. + Validates that each ``(container_id, channel_id)`` carries at most + one distinct ``source_unit`` and one distinct ``target_unit``. Per + physical channel the unit-conversion model can attach only one + factor; conflicting aliases would otherwise pick an arbitrary + target and silently mis-convert one of them. + Parameters ---------- spark : SparkSession @@ -521,6 +527,14 @@ def resolve_channel_selections( Merged DataFrame with ``(container_id, channel_id, selector_ids)`` (plus ``source_unit`` / ``target_unit`` when present on the aliased side). + + Raises + ------ + ValueError + If two or more aliased selectors resolve to the same physical + channel with conflicting ``source_unit`` or ``target_unit`` + values. Up to three offending channels are listed in the + message. """ source_unit_col = self.config.source_unit_col target_unit_col = self.config.target_unit_col @@ -535,14 +549,62 @@ def resolve_channel_selections( agg_exprs = [F.flatten(F.collect_list("selector_ids")).alias("selector_ids")] if has_unit_cols: - agg_exprs.append(F.first(source_unit_col, ignorenulls=True).alias(source_unit_col)) - agg_exprs.append(F.first(target_unit_col, ignorenulls=True).alias(target_unit_col)) - - return merged.groupBy( + # collect_set serves a dual purpose: (a) it deduplicates so we + # can detect a conflict by size > 1, and (b) the single + # remaining element materializes the scalar unit value the + # downstream code expects. + agg_exprs.append(F.collect_set(source_unit_col).alias("_source_units")) + agg_exprs.append(F.collect_set(target_unit_col).alias("_target_units")) + + grouped = merged.groupBy( self.config.container_id_col, self.config.channel_id_col, ).agg(*agg_exprs) + if has_unit_cols: + # TODO(unit-conversion): lift this limitation by attaching the + # conversion factor to the selector instead of the channel row + # (see PR #30 review). + conflicts = ( + grouped.where( + (F.size("_source_units") > 1) | (F.size("_target_units") > 1) + ) + .select( + self.config.container_id_col, + self.config.channel_id_col, + "_source_units", + "_target_units", + ) + .limit(3) + .collect() + ) + if conflicts: + details = [ + f"(container_id={row[self.config.container_id_col]}, " + f"channel_id={row[self.config.channel_id_col]}): " + f"source_units={sorted(row['_source_units'])}, " + f"target_units={sorted(row['_target_units'])}" + for row in conflicts + ] + raise ValueError( + "Conflicting unit conversions on the same physical channel " + "(first 3 shown):\n" + "\n".join(details) + ) + # Empty sets (direct-only channels) yield null via + # try_element_at, matching the prior F.first(ignorenulls=True) + # behavior. Plain element_at raises on empty arrays in Spark 4. + grouped = ( + grouped.withColumn( + source_unit_col, F.try_element_at("_source_units", F.lit(1)) + ) + .withColumn( + target_unit_col, F.try_element_at("_target_units", F.lit(1)) + ) + .drop("_source_units", "_target_units") + ) + + return grouped + # ------------------------------------------------------------------ # Unit conversion # ------------------------------------------------------------------ diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py index 272e191..93ae1e0 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py @@ -368,6 +368,145 @@ def test_channel_metrics_unit_col_is_configurable( key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm +class TestAliasUnitConflictDetection: + """Per-channel unit conversion supports only one (source_unit, target_unit) pair. + + When two aliases on the same physical channel disagree, the solver + must raise rather than silently mis-converting one of them. + """ + + @staticmethod + def _mapping_with(spark: SparkSession, rows): + """Build a channel_mapping DataFrame from rows matching the + unit-conversion fixture schema.""" + from pyspark.sql.types import IntegerType, StringType, StructField, StructType + + schema = StructType( + [ + StructField("project_id", StringType(), nullable=False), + StructField("toolbox_id", StringType(), nullable=False), + StructField("channel_alias", StringType(), nullable=False), + StructField("source_channel", StringType(), nullable=False), + StructField("data_key", StringType(), nullable=False), + StructField("priority", IntegerType(), nullable=True), + StructField("source_unit", StringType(), nullable=True), + StructField("target_unit", StringType(), nullable=True), + ] + ) + return spark.createDataFrame(rows, schema=schema) + + def test_conflict_on_target_unit_raises( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + # Two aliases both resolve to (container_id, channel_id) = (1, 7) and + # (2, 7) via Vehicle Speed Sensor / TM, but request different + # target_units. The solver must raise. + original = key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] + conflicting = self._mapping_with( + spark, + [ + ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_mph", + "Vehicle Speed Sensor", "TM", None, "km/h", "mph"), + ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_ms", + "Vehicle Speed Sensor", "TM", None, "km/h", "m/s"), + ], + ) + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = conflicting + + try: + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + mph = query.channel_with_alias(channel_alias="vehicle_speed_mph").alias("mph") + ms = query.channel_with_alias(channel_alias="vehicle_speed_ms").alias("ms") + + with pytest.raises(ValueError, match="Conflicting unit conversions") as excinfo: + query.select(mph, ms).toPandas(spark, solver=solver) + + msg = str(excinfo.value) + assert "channel_id=7" in msg + assert "mph" in msg + assert "m/s" in msg + finally: + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = original + + def test_conflict_on_source_unit_raises( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + # Same physical channel, agreeing target_unit, but disagreeing + # source_unit. (The coalesce in filter_aliased_channel_metrics + # prefers channel_metrics.unit, but if it's null/absent the + # mapping's source_unit wins — and these two mappings disagree.) + original = key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] + conflicting = self._mapping_with( + spark, + [ + ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_a", + "Vehicle Speed Sensor", "TM", None, "km/h", "m/s"), + ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_b", + "Vehicle Speed Sensor", "TM", None, "mph", "m/s"), + ], + ) + # Also drop channel_metrics.unit so neither alias has a value to + # coalesce against — both rely on mapping.source_unit. + original_cm = key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] + cm_no_unit = original_cm.drop("unit") + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = conflicting + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = cm_no_unit + + try: + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + a = query.channel_with_alias(channel_alias="vehicle_speed_a").alias("a") + b = query.channel_with_alias(channel_alias="vehicle_speed_b").alias("b") + + with pytest.raises(ValueError, match="Conflicting unit conversions") as excinfo: + query.select(a, b).toPandas(spark, solver=solver) + + msg = str(excinfo.value) + assert "channel_id=7" in msg + assert "km/h" in msg + assert "mph" in msg + finally: + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = original + key_value_store_unit_conversion_db.config.debug_tables["channel_metrics"] = original_cm + + def test_no_conflict_when_aliases_agree( + self, + spark: SparkSession, + key_value_store_unit_conversion_db: MeasurementDB, + channels_csv_path: str, + ): + # Two aliases on the same physical channel agree on (source_unit, + # target_unit). Both selectors should resolve and produce the same + # converted values. + original = key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] + agreeing = self._mapping_with( + spark, + [ + ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_a", + "Vehicle Speed Sensor", "TM", None, "km/h", "m/s"), + ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_b", + "Vehicle Speed Sensor", "TM", None, "km/h", "m/s"), + ], + ) + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = agreeing + + try: + solver = _solver(spark) + query = key_value_store_unit_conversion_db.query + a = query.channel_with_alias(channel_alias="vehicle_speed_a").alias("a") + b = query.channel_with_alias(channel_alias="vehicle_speed_b").alias("b") + + pdf = query.select(a, b).toPandas(spark, solver=solver) + for cid in (1, 2): + expected = _expected_raw_values(channels_csv_path, cid, 7) * 0.277778 + row = pdf.loc[pdf["container_id"] == cid].iloc[0] + np.testing.assert_allclose(row.a.values, expected, rtol=1e-6) + np.testing.assert_allclose(row.b.values, expected, rtol=1e-6) + finally: + key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = original + + class TestComputeConversionFactors: def test_factor_one_for_identical_units( self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB From 6b075105d5d5e5c8580eff9f2fcdc095ffa52765 Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Sun, 24 May 2026 16:07:03 +0200 Subject: [PATCH 09/10] fix formatting --- .../query/solvers/key_value_store_solver.py | 12 +--- .../key_value_store_unit_conversion_test.py | 72 +++++++++++++++---- 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py index bcfc40d..870d1df 100644 --- a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py @@ -566,9 +566,7 @@ def resolve_channel_selections( # conversion factor to the selector instead of the channel row # (see PR #30 review). conflicts = ( - grouped.where( - (F.size("_source_units") > 1) | (F.size("_target_units") > 1) - ) + grouped.where((F.size("_source_units") > 1) | (F.size("_target_units") > 1)) .select( self.config.container_id_col, self.config.channel_id_col, @@ -594,12 +592,8 @@ def resolve_channel_selections( # try_element_at, matching the prior F.first(ignorenulls=True) # behavior. Plain element_at raises on empty arrays in Spark 4. grouped = ( - grouped.withColumn( - source_unit_col, F.try_element_at("_source_units", F.lit(1)) - ) - .withColumn( - target_unit_col, F.try_element_at("_target_units", F.lit(1)) - ) + grouped.withColumn(source_unit_col, F.try_element_at("_source_units", F.lit(1))) + .withColumn(target_unit_col, F.try_element_at("_target_units", F.lit(1))) .drop("_source_units", "_target_units") ) diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py index 93ae1e0..3c9e3a2 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py @@ -405,10 +405,26 @@ def test_conflict_on_target_unit_raises( conflicting = self._mapping_with( spark, [ - ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_mph", - "Vehicle Speed Sensor", "TM", None, "km/h", "mph"), - ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_ms", - "Vehicle Speed Sensor", "TM", None, "km/h", "m/s"), + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_mph", + "Vehicle Speed Sensor", + "TM", + None, + "km/h", + "mph", + ), + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_ms", + "Vehicle Speed Sensor", + "TM", + None, + "km/h", + "m/s", + ), ], ) key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = conflicting @@ -440,10 +456,26 @@ def test_conflict_on_source_unit_raises( conflicting = self._mapping_with( spark, [ - ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_a", - "Vehicle Speed Sensor", "TM", None, "km/h", "m/s"), - ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_b", - "Vehicle Speed Sensor", "TM", None, "mph", "m/s"), + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_a", + "Vehicle Speed Sensor", + "TM", + None, + "km/h", + "m/s", + ), + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_b", + "Vehicle Speed Sensor", + "TM", + None, + "mph", + "m/s", + ), ], ) # Also drop channel_metrics.unit so neither alias has a value to @@ -483,10 +515,26 @@ def test_no_conflict_when_aliases_agree( agreeing = self._mapping_with( spark, [ - ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_a", - "Vehicle Speed Sensor", "TM", None, "km/h", "m/s"), - ("SAMPLE_PROJECT", "container_concept", "vehicle_speed_b", - "Vehicle Speed Sensor", "TM", None, "km/h", "m/s"), + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_a", + "Vehicle Speed Sensor", + "TM", + None, + "km/h", + "m/s", + ), + ( + "SAMPLE_PROJECT", + "container_concept", + "vehicle_speed_b", + "Vehicle Speed Sensor", + "TM", + None, + "km/h", + "m/s", + ), ], ) key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = agreeing From d4bde5525450d34e4353996218207cf3ec7a6ece Mon Sep 17 00:00:00 2001 From: "tom.bonfert" Date: Sun, 24 May 2026 19:41:23 +0200 Subject: [PATCH 10/10] Enhance validation for unit conversion factors in KeyValueStoreSolver - Introduced a new method `_validate_unit_conversion_table` to ensure that the `conversion_factor` in the unit conversion table is a positive non-null number, raising a `ValueError` for invalid entries. - Updated the `_compute_conversion_factors` method to call the validation method, ensuring that any malformed data is caught early in the processing pipeline. - Added comprehensive unit tests to validate the new error handling for zero, negative, and null conversion factors, providing clear feedback on invalid rows. --- .../docs/data_model/silver_layer_schema.md | 2 +- .../query/solvers/key_value_store_solver.py | 56 ++++++++++++ .../key_value_store_unit_conversion_test.py | 87 +++++++++++++++++++ 3 files changed, 144 insertions(+), 1 deletion(-) diff --git a/docs/impulse/docs/data_model/silver_layer_schema.md b/docs/impulse/docs/data_model/silver_layer_schema.md index a561181..47add63 100644 --- a/docs/impulse/docs/data_model/silver_layer_schema.md +++ b/docs/impulse/docs/data_model/silver_layer_schema.md @@ -299,7 +299,7 @@ solve time when `source.unit_conversion_table` is configured and the |---------------------|----------|----------|------------------------------------------------------------------------------------------------------------| | `group_id` | `string` | No | Unit family identifier (e.g. `speed`, `rotation`). Only units within the same family can convert into each other. | | `unit` | `string` | No | Unit name. Matches the `source_unit` / `target_unit` values on `channel_mapping`. | -| `conversion_factor` | `double` | No | Multiplier that converts a value in this unit to the family's base unit. The base unit has factor `1.0`. | +| `conversion_factor` | `double` | No | Multiplier that converts a value in this unit to the family's base unit. The base unit has factor `1.0`. **Required to be a positive non-null number** — a row with `conversion_factor` null, zero, or negative is rejected at query time with `ValueError` (validation runs once per query that uses unit conversion). | For each aliased channel the solver looks up `source_factor` (the row whose `unit` matches `source_unit`) and `target_factor` (the row whose diff --git a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py index 870d1df..d5e5e5c 100644 --- a/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py +++ b/src/impulse_query_engine/analyze/query/solvers/key_value_store_solver.py @@ -603,6 +603,53 @@ def resolve_channel_selections( # Unit conversion # ------------------------------------------------------------------ + def _validate_unit_conversion_table(self, uc_table: DataFrame) -> None: + """Raise ``ValueError`` if the unit_conversion table contains rows + whose ``conversion_factor`` is null, zero, or negative. + + ``conversion_factor`` is conceptually a strictly-positive number. + A zero on the source side silently corrupts values to all-zero; + a zero on the target side raises a cryptic Spark + ``ArithmeticException`` deep in the conversion path under Spark 4 + ANSI mode; a negative value flips signs; a null silently skips + conversion (contract violation, not corruption). Catching all + four cases here turns each into a clear, actionable error + naming the offending row. + + Parameters + ---------- + uc_table : pyspark.sql.DataFrame + The ``unit_conversion`` table **after** + ``_apply_column_mapping`` has been applied. + + Raises + ------ + ValueError + If any row has ``conversion_factor IS NULL`` or + ``conversion_factor <= 0``. Up to three offending rows are + listed in the message. + """ + unit_col = self.config.unit_col + group_id_col = self.config.group_id_col + factor_col = self.config.conversion_factor_col + + bad_rows = ( + uc_table.where(F.col(factor_col).isNull() | (F.col(factor_col) <= 0)) + .select(group_id_col, unit_col, factor_col) + .limit(3) + .collect() + ) + if bad_rows: + details = [ + f"(group_id={row[group_id_col]}, unit={row[unit_col]}, " + f"conversion_factor={row[factor_col]})" + for row in bad_rows + ] + raise ValueError( + "Invalid conversion_factor in unit_conversion table " + "(must be a positive non-null number; first 3 shown):\n" + "\n".join(details) + ) + def _compute_conversion_factors(self, spark, query, channels_df: DataFrame) -> DataFrame: """ Join *channels_df* with the unit conversion table to compute a @@ -635,11 +682,20 @@ def _compute_conversion_factors(self, spark, query, channels_df: DataFrame) -> D ------- pyspark.sql.DataFrame *channels_df* augmented with a ``conversion_factor`` column. + + Raises + ------ + ValueError + If the ``unit_conversion`` table contains a row with a null, + zero, or negative ``conversion_factor``. See + :meth:`_validate_unit_conversion_table` for the underlying + check. """ uc_table = query.db.unit_conversion(spark) uc_table = self._apply_column_mapping( uc_table, self.config.unit_conversion.column_name_mapping ) + self._validate_unit_conversion_table(uc_table) unit_col = self.config.unit_col group_id_col = self.config.group_id_col diff --git a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py index 3c9e3a2..0f71e8b 100644 --- a/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py +++ b/tests/impulse_query_engine/unit/analyze/query/solvers/key_value_store_unit_conversion_test.py @@ -555,6 +555,93 @@ def test_no_conflict_when_aliases_agree( key_value_store_unit_conversion_db.config.debug_tables["channel_mapping"] = original +class TestConversionFactorValidation: + """`unit_conversion.conversion_factor` must be a positive non-null number. + + Catches malformed reference rows early so the user sees a clear error + instead of silent data corruption (zero/negative) or silent contract + violation (null). + """ + + @staticmethod + def _uc_with(spark: SparkSession, rows): + """Build a unit_conversion DataFrame with an explicit schema so the + nullable factor case doesn't confuse Spark's type inference.""" + from pyspark.sql.types import ( + BooleanType, + DoubleType, + StringType, + StructField, + StructType, + ) + + schema = StructType( + [ + StructField("group_id", StringType(), nullable=False), + StructField("unit", StringType(), nullable=False), + StructField("conversion_factor", DoubleType(), nullable=True), + StructField("is_base", BooleanType(), nullable=True), + ] + ) + return spark.createDataFrame(rows, schema=schema) + + def _run_with_uc_table(self, spark, db, uc_rows): + """Replace the unit_conversion debug table, run a vehicle_speed + aliased query, restore the original. Returns nothing — used inside + a ``pytest.raises`` block. + """ + original = db.config.debug_tables["unit_conversion"] + db.config.debug_tables["unit_conversion"] = self._uc_with(spark, uc_rows) + try: + solver = _solver(spark) + query = db.query + vehicle_speed = query.channel_with_alias(channel_alias="vehicle_speed").alias( + "vehicle_speed" + ) + query.select(vehicle_speed).toPandas(spark, solver=solver) + finally: + db.config.debug_tables["unit_conversion"] = original + + def test_zero_factor_raises( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + rows = [ + ("speed", "m/s", 1.0, True), + ("speed", "km/h", 0.0, False), # bad + ] + with pytest.raises(ValueError, match="Invalid conversion_factor") as excinfo: + self._run_with_uc_table(spark, key_value_store_unit_conversion_db, rows) + msg = str(excinfo.value) + assert "km/h" in msg + assert "conversion_factor=0" in msg + + def test_negative_factor_raises( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + rows = [ + ("speed", "m/s", 1.0, True), + ("speed", "km/h", -1.0, False), # bad + ] + with pytest.raises(ValueError, match="Invalid conversion_factor") as excinfo: + self._run_with_uc_table(spark, key_value_store_unit_conversion_db, rows) + msg = str(excinfo.value) + assert "km/h" in msg + assert "conversion_factor=-1" in msg + + def test_null_factor_raises( + self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB + ): + rows = [ + ("speed", "m/s", 1.0, True), + ("speed", "km/h", None, False), # bad + ] + with pytest.raises(ValueError, match="Invalid conversion_factor") as excinfo: + self._run_with_uc_table(spark, key_value_store_unit_conversion_db, rows) + msg = str(excinfo.value) + assert "km/h" in msg + assert "conversion_factor=None" in msg + + class TestComputeConversionFactors: def test_factor_one_for_identical_units( self, spark: SparkSession, key_value_store_unit_conversion_db: MeasurementDB