From ada6f7b3fb6d85c2730b8ac86111435ae8e1eae4 Mon Sep 17 00:00:00 2001 From: Aidan Timson Date: Thu, 2 Oct 2025 22:44:28 +0100 Subject: [PATCH 1/3] Update ovoenergy to 3.0.2 (#153488) --- homeassistant/components/ovo_energy/manifest.json | 2 +- requirements_all.txt | 2 +- requirements_test_all.txt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/homeassistant/components/ovo_energy/manifest.json b/homeassistant/components/ovo_energy/manifest.json index da6fb5232f71b..824b3305543cc 100644 --- a/homeassistant/components/ovo_energy/manifest.json +++ b/homeassistant/components/ovo_energy/manifest.json @@ -7,5 +7,5 @@ "integration_type": "service", "iot_class": "cloud_polling", "loggers": ["ovoenergy"], - "requirements": ["ovoenergy==3.0.1"] + "requirements": ["ovoenergy==3.0.2"] } diff --git a/requirements_all.txt b/requirements_all.txt index b9a377b524fee..a41b4d3bedb98 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1670,7 +1670,7 @@ orvibo==1.1.2 ourgroceries==1.5.4 # homeassistant.components.ovo_energy -ovoenergy==3.0.1 +ovoenergy==3.0.2 # homeassistant.components.p1_monitor p1monitor==3.2.0 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index 403439864ac53..bac55bfab433c 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -1420,7 +1420,7 @@ oralb-ble==0.17.6 ourgroceries==1.5.4 # homeassistant.components.ovo_energy -ovoenergy==3.0.1 +ovoenergy==3.0.2 # homeassistant.components.p1_monitor p1monitor==3.2.0 From c7d3512ad2b2586efa96ce8e650d5d2c01b5ef0a Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Thu, 2 Oct 2025 18:18:05 -0400 Subject: [PATCH 2/3] Bump universal-silabs-flasher to 0.0.35 (#153500) --- homeassistant/components/homeassistant_hardware/manifest.json | 2 +- requirements_all.txt | 2 +- requirements_test_all.txt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/homeassistant/components/homeassistant_hardware/manifest.json b/homeassistant/components/homeassistant_hardware/manifest.json index 510c1fc6d6cf8..192aecc93bf98 100644 --- a/homeassistant/components/homeassistant_hardware/manifest.json +++ b/homeassistant/components/homeassistant_hardware/manifest.json @@ -6,7 +6,7 @@ "documentation": "https://www.home-assistant.io/integrations/homeassistant_hardware", "integration_type": "system", "requirements": [ - "universal-silabs-flasher==0.0.34", + "universal-silabs-flasher==0.0.35", "ha-silabs-firmware-client==0.2.0" ] } diff --git a/requirements_all.txt b/requirements_all.txt index a41b4d3bedb98..3ead88e99b324 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -3063,7 +3063,7 @@ unifi_ap==0.0.2 unifiled==0.11 # homeassistant.components.homeassistant_hardware -universal-silabs-flasher==0.0.34 +universal-silabs-flasher==0.0.35 # homeassistant.components.upb upb-lib==0.6.1 diff --git a/requirements_test_all.txt b/requirements_test_all.txt index bac55bfab433c..c7f71ee6fb905 100644 --- a/requirements_test_all.txt +++ b/requirements_test_all.txt @@ -2534,7 +2534,7 @@ ultraheat-api==0.5.7 unifi-discovery==1.2.0 # homeassistant.components.homeassistant_hardware -universal-silabs-flasher==0.0.34 +universal-silabs-flasher==0.0.35 # homeassistant.components.upb upb-lib==0.6.1 From 982166df3cd420ea6b16c6b284838e0cea873a9f Mon Sep 17 00:00:00 2001 From: Erik Montnemery Date: Fri, 3 Oct 2025 01:00:09 +0200 Subject: [PATCH 3/3] Remove module recorder.history.modern (#153502) --- .../components/recorder/history/__init__.py | 922 +++++++++++++++-- .../components/recorder/history/modern.py | 925 ------------------ tests/components/recorder/test_history.py | 2 +- tests/components/recorder/test_util.py | 4 +- 4 files changed, 860 insertions(+), 993 deletions(-) delete mode 100644 homeassistant/components/recorder/history/modern.py diff --git a/homeassistant/components/recorder/history/__init__.py b/homeassistant/components/recorder/history/__init__.py index 20453a0b1c892..32e0b4f9a7169 100644 --- a/homeassistant/components/recorder/history/__init__.py +++ b/homeassistant/components/recorder/history/__init__.py @@ -2,21 +2,53 @@ from __future__ import annotations +from collections.abc import Callable, Iterable, Iterator from datetime import datetime -from typing import Any +from itertools import groupby +from operator import itemgetter +from typing import TYPE_CHECKING, Any, cast +from sqlalchemy import ( + CompoundSelect, + Select, + StatementLambdaElement, + Subquery, + and_, + func, + lambda_stmt, + literal, + select, + union_all, +) +from sqlalchemy.engine.row import Row from sqlalchemy.orm.session import Session -from homeassistant.core import HomeAssistant, State +from homeassistant.const import COMPRESSED_STATE_LAST_UPDATED, COMPRESSED_STATE_STATE +from homeassistant.core import HomeAssistant, State, split_entity_id +from homeassistant.helpers.recorder import get_instance +from homeassistant.util import dt as dt_util +from homeassistant.util.collection import chunked_or_all +from ..const import MAX_IDS_FOR_INDEXED_GROUP_BY +from ..db_schema import ( + SHARED_ATTR_OR_LEGACY_ATTRIBUTES, + StateAttributes, + States, + StatesMeta, +) from ..filters import Filters -from .const import NEED_ATTRIBUTE_DOMAINS, SIGNIFICANT_DOMAINS -from .modern import ( - get_full_significant_states_with_session as _modern_get_full_significant_states_with_session, - get_last_state_changes as _modern_get_last_state_changes, - get_significant_states as _modern_get_significant_states, - get_significant_states_with_session as _modern_get_significant_states_with_session, - state_changes_during_period as _modern_state_changes_during_period, +from ..models import ( + LazyState, + datetime_to_timestamp_or_none, + extract_metadata_ids, + row_to_compressed_state, +) +from ..util import execute_stmt_lambda_element, session_scope +from .const import ( + LAST_CHANGED_KEY, + NEED_ATTRIBUTE_DOMAINS, + SIGNIFICANT_DOMAINS, + STATE_KEY, ) # These are the APIs of this package @@ -30,41 +62,166 @@ "state_changes_during_period", ] +_FIELD_MAP = { + "metadata_id": 0, + "state": 1, + "last_updated_ts": 2, +} -def get_full_significant_states_with_session( + +def _stmt_and_join_attributes( + no_attributes: bool, + include_last_changed: bool, + include_last_reported: bool, +) -> Select: + """Return the statement and if StateAttributes should be joined.""" + _select = select(States.metadata_id, States.state, States.last_updated_ts) + if include_last_changed: + _select = _select.add_columns(States.last_changed_ts) + if include_last_reported: + _select = _select.add_columns(States.last_reported_ts) + if not no_attributes: + _select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES) + return _select + + +def _stmt_and_join_attributes_for_start_state( + no_attributes: bool, + include_last_changed: bool, + include_last_reported: bool, +) -> Select: + """Return the statement and if StateAttributes should be joined.""" + _select = select(States.metadata_id, States.state) + _select = _select.add_columns(literal(value=0).label("last_updated_ts")) + if include_last_changed: + _select = _select.add_columns(literal(value=0).label("last_changed_ts")) + if include_last_reported: + _select = _select.add_columns(literal(value=0).label("last_reported_ts")) + if not no_attributes: + _select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES) + return _select + + +def _select_from_subquery( + subquery: Subquery | CompoundSelect, + no_attributes: bool, + include_last_changed: bool, + include_last_reported: bool, +) -> Select: + """Return the statement to select from the union.""" + base_select = select( + subquery.c.metadata_id, + subquery.c.state, + subquery.c.last_updated_ts, + ) + if include_last_changed: + base_select = base_select.add_columns(subquery.c.last_changed_ts) + if include_last_reported: + base_select = base_select.add_columns(subquery.c.last_reported_ts) + if no_attributes: + return base_select + return base_select.add_columns(subquery.c.attributes) + + +def get_significant_states( hass: HomeAssistant, - session: Session, start_time: datetime, end_time: datetime | None = None, entity_ids: list[str] | None = None, filters: Filters | None = None, include_start_time_state: bool = True, significant_changes_only: bool = True, + minimal_response: bool = False, no_attributes: bool = False, -) -> dict[str, list[State]]: - """Return a dict of significant states during a time period.""" - return _modern_get_full_significant_states_with_session( - hass, - session, - start_time, - end_time, - entity_ids, - filters, - include_start_time_state, - significant_changes_only, - no_attributes, - ) + compressed_state_format: bool = False, +) -> dict[str, list[State | dict[str, Any]]]: + """Wrap get_significant_states_with_session with an sql session.""" + with session_scope(hass=hass, read_only=True) as session: + return get_significant_states_with_session( + hass, + session, + start_time, + end_time, + entity_ids, + filters, + include_start_time_state, + significant_changes_only, + minimal_response, + no_attributes, + compressed_state_format, + ) -def get_last_state_changes( - hass: HomeAssistant, number_of_states: int, entity_id: str -) -> dict[str, list[State]]: - """Return the last number_of_states.""" - return _modern_get_last_state_changes(hass, number_of_states, entity_id) +def _significant_states_stmt( + start_time_ts: float, + end_time_ts: float | None, + single_metadata_id: int | None, + metadata_ids: list[int], + metadata_ids_in_significant_domains: list[int], + significant_changes_only: bool, + no_attributes: bool, + include_start_time_state: bool, + run_start_ts: float | None, + slow_dependent_subquery: bool, +) -> Select | CompoundSelect: + """Query the database for significant state changes.""" + include_last_changed = not significant_changes_only + stmt = _stmt_and_join_attributes(no_attributes, include_last_changed, False) + if significant_changes_only: + # Since we are filtering on entity_id (metadata_id) we can avoid + # the join of the states_meta table since we already know which + # metadata_ids are in the significant domains. + if metadata_ids_in_significant_domains: + stmt = stmt.filter( + States.metadata_id.in_(metadata_ids_in_significant_domains) + | (States.last_changed_ts == States.last_updated_ts) + | States.last_changed_ts.is_(None) + ) + else: + stmt = stmt.filter( + (States.last_changed_ts == States.last_updated_ts) + | States.last_changed_ts.is_(None) + ) + stmt = stmt.filter(States.metadata_id.in_(metadata_ids)).filter( + States.last_updated_ts > start_time_ts + ) + if end_time_ts: + stmt = stmt.filter(States.last_updated_ts < end_time_ts) + if not no_attributes: + stmt = stmt.outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) + if not include_start_time_state or not run_start_ts: + return stmt.order_by(States.metadata_id, States.last_updated_ts) + unioned_subquery = union_all( + _select_from_subquery( + _get_start_time_state_stmt( + start_time_ts, + single_metadata_id, + metadata_ids, + no_attributes, + include_last_changed, + slow_dependent_subquery, + ).subquery(), + no_attributes, + include_last_changed, + False, + ), + _select_from_subquery( + stmt.subquery(), no_attributes, include_last_changed, False + ), + ).subquery() + return _select_from_subquery( + unioned_subquery, + no_attributes, + include_last_changed, + False, + ).order_by(unioned_subquery.c.metadata_id, unioned_subquery.c.last_updated_ts) -def get_significant_states( +def get_significant_states_with_session( hass: HomeAssistant, + session: Session, start_time: datetime, end_time: datetime | None = None, entity_ids: list[str] | None = None, @@ -75,22 +232,133 @@ def get_significant_states( no_attributes: bool = False, compressed_state_format: bool = False, ) -> dict[str, list[State | dict[str, Any]]]: - """Return a dict of significant states during a time period.""" - return _modern_get_significant_states( - hass, - start_time, - end_time, + """Return states changes during UTC period start_time - end_time. + + entity_ids is an optional iterable of entities to include in the results. + + filters is an optional SQLAlchemy filter which will be applied to the database + queries unless entity_ids is given, in which case its ignored. + + Significant states are all states where there is a state change, + as well as all states from certain domains (for instance + thermostat so that we get current temperature in our graphs). + """ + if filters is not None: + raise NotImplementedError("Filters are no longer supported") + if not entity_ids: + raise ValueError("entity_ids must be provided") + entity_id_to_metadata_id: dict[str, int | None] | None = None + metadata_ids_in_significant_domains: list[int] = [] + instance = get_instance(hass) + if not ( + entity_id_to_metadata_id := instance.states_meta_manager.get_many( + entity_ids, session, False + ) + ) or not (possible_metadata_ids := extract_metadata_ids(entity_id_to_metadata_id)): + return {} + metadata_ids = possible_metadata_ids + if significant_changes_only: + metadata_ids_in_significant_domains = [ + metadata_id + for entity_id, metadata_id in entity_id_to_metadata_id.items() + if metadata_id is not None + and split_entity_id(entity_id)[0] in SIGNIFICANT_DOMAINS + ] + oldest_ts: float | None = None + if include_start_time_state and not ( + oldest_ts := _get_oldest_possible_ts(hass, start_time) + ): + include_start_time_state = False + start_time_ts = start_time.timestamp() + end_time_ts = datetime_to_timestamp_or_none(end_time) + single_metadata_id = metadata_ids[0] if len(metadata_ids) == 1 else None + rows: list[Row] = [] + if TYPE_CHECKING: + assert instance.database_engine is not None + slow_dependent_subquery = instance.database_engine.optimizer.slow_dependent_subquery + if include_start_time_state and slow_dependent_subquery: + # https://github.com/home-assistant/core/issues/137178 + # If we include the start time state we need to limit the + # number of metadata_ids we query for at a time to avoid + # hitting limits in the MySQL optimizer that prevent + # the start time state query from using an index-only optimization + # to find the start time state. + iter_metadata_ids = chunked_or_all(metadata_ids, MAX_IDS_FOR_INDEXED_GROUP_BY) + else: + iter_metadata_ids = (metadata_ids,) + for metadata_ids_chunk in iter_metadata_ids: + stmt = _generate_significant_states_with_session_stmt( + start_time_ts, + end_time_ts, + single_metadata_id, + metadata_ids_chunk, + metadata_ids_in_significant_domains, + significant_changes_only, + no_attributes, + include_start_time_state, + oldest_ts, + slow_dependent_subquery, + ) + row_chunk = cast( + list[Row], + execute_stmt_lambda_element(session, stmt, None, end_time, orm_rows=False), + ) + if rows: + rows += row_chunk + else: + # If we have no rows yet, we can just assign the chunk + # as this is the common case since its rare that + # we exceed the MAX_IDS_FOR_INDEXED_GROUP_BY limit + rows = row_chunk + return _sorted_states_to_dict( + rows, + start_time_ts if include_start_time_state else None, entity_ids, - filters, - include_start_time_state, - significant_changes_only, + entity_id_to_metadata_id, minimal_response, - no_attributes, compressed_state_format, + no_attributes=no_attributes, ) -def get_significant_states_with_session( +def _generate_significant_states_with_session_stmt( + start_time_ts: float, + end_time_ts: float | None, + single_metadata_id: int | None, + metadata_ids: list[int], + metadata_ids_in_significant_domains: list[int], + significant_changes_only: bool, + no_attributes: bool, + include_start_time_state: bool, + oldest_ts: float | None, + slow_dependent_subquery: bool, +) -> StatementLambdaElement: + return lambda_stmt( + lambda: _significant_states_stmt( + start_time_ts, + end_time_ts, + single_metadata_id, + metadata_ids, + metadata_ids_in_significant_domains, + significant_changes_only, + no_attributes, + include_start_time_state, + oldest_ts, + slow_dependent_subquery, + ), + track_on=[ + bool(single_metadata_id), + bool(metadata_ids_in_significant_domains), + bool(end_time_ts), + significant_changes_only, + no_attributes, + include_start_time_state, + slow_dependent_subquery, + ], + ) + + +def get_full_significant_states_with_session( hass: HomeAssistant, session: Session, start_time: datetime, @@ -99,23 +367,89 @@ def get_significant_states_with_session( filters: Filters | None = None, include_start_time_state: bool = True, significant_changes_only: bool = True, - minimal_response: bool = False, no_attributes: bool = False, - compressed_state_format: bool = False, -) -> dict[str, list[State | dict[str, Any]]]: - """Return a dict of significant states during a time period.""" - return _modern_get_significant_states_with_session( - hass, - session, - start_time, - end_time, - entity_ids, - filters, - include_start_time_state, - significant_changes_only, - minimal_response, +) -> dict[str, list[State]]: + """Variant of get_significant_states_with_session. + + Difference with get_significant_states_with_session is that it does not + return minimal responses. + """ + return cast( + dict[str, list[State]], + get_significant_states_with_session( + hass=hass, + session=session, + start_time=start_time, + end_time=end_time, + entity_ids=entity_ids, + filters=filters, + include_start_time_state=include_start_time_state, + significant_changes_only=significant_changes_only, + minimal_response=False, + no_attributes=no_attributes, + ), + ) + + +def _state_changed_during_period_stmt( + start_time_ts: float, + end_time_ts: float | None, + single_metadata_id: int, + no_attributes: bool, + limit: int | None, + include_start_time_state: bool, + run_start_ts: float | None, +) -> Select | CompoundSelect: + stmt = ( + _stmt_and_join_attributes(no_attributes, False, True) + .filter( + ( + (States.last_changed_ts == States.last_updated_ts) + | States.last_changed_ts.is_(None) + ) + & (States.last_updated_ts > start_time_ts) + ) + .filter(States.metadata_id == single_metadata_id) + ) + if end_time_ts: + stmt = stmt.filter(States.last_updated_ts < end_time_ts) + if not no_attributes: + stmt = stmt.outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) + if limit: + stmt = stmt.limit(limit) + stmt = stmt.order_by(States.metadata_id, States.last_updated_ts) + if not include_start_time_state or not run_start_ts: + # If we do not need the start time state or the + # oldest possible timestamp is newer than the start time + # we can return the statement as is as there will + # never be a start time state. + return stmt + return _select_from_subquery( + union_all( + _select_from_subquery( + _get_single_entity_start_time_stmt( + start_time_ts, + single_metadata_id, + no_attributes, + False, + True, + ).subquery(), + no_attributes, + False, + True, + ), + _select_from_subquery( + stmt.subquery(), + no_attributes, + False, + True, + ), + ).subquery(), no_attributes, - compressed_state_format, + False, + True, ) @@ -129,14 +463,474 @@ def state_changes_during_period( limit: int | None = None, include_start_time_state: bool = True, ) -> dict[str, list[State]]: - """Return a list of states that changed during a time period.""" - return _modern_state_changes_during_period( - hass, - start_time, - end_time, - entity_id, + """Return states changes during UTC period start_time - end_time.""" + if not entity_id: + raise ValueError("entity_id must be provided") + entity_ids = [entity_id.lower()] + + with session_scope(hass=hass, read_only=True) as session: + instance = get_instance(hass) + if not ( + possible_metadata_id := instance.states_meta_manager.get( + entity_id, session, False + ) + ): + return {} + single_metadata_id = possible_metadata_id + entity_id_to_metadata_id: dict[str, int | None] = { + entity_id: single_metadata_id + } + oldest_ts: float | None = None + if include_start_time_state and not ( + oldest_ts := _get_oldest_possible_ts(hass, start_time) + ): + include_start_time_state = False + start_time_ts = start_time.timestamp() + end_time_ts = datetime_to_timestamp_or_none(end_time) + stmt = lambda_stmt( + lambda: _state_changed_during_period_stmt( + start_time_ts, + end_time_ts, + single_metadata_id, + no_attributes, + limit, + include_start_time_state, + oldest_ts, + ), + track_on=[ + bool(end_time_ts), + no_attributes, + bool(limit), + include_start_time_state, + ], + ) + return cast( + dict[str, list[State]], + _sorted_states_to_dict( + execute_stmt_lambda_element( + session, stmt, None, end_time, orm_rows=False + ), + start_time_ts if include_start_time_state else None, + entity_ids, + entity_id_to_metadata_id, + descending=descending, + no_attributes=no_attributes, + ), + ) + + +def _get_last_state_changes_single_stmt(metadata_id: int) -> Select: + return ( + _stmt_and_join_attributes(False, False, False) + .join( + ( + lastest_state_for_metadata_id := ( + select( + States.metadata_id.label("max_metadata_id"), + func.max(States.last_updated_ts).label("max_last_updated"), + ) + .filter(States.metadata_id == metadata_id) + .group_by(States.metadata_id) + .subquery() + ) + ), + and_( + States.metadata_id == lastest_state_for_metadata_id.c.max_metadata_id, + States.last_updated_ts + == lastest_state_for_metadata_id.c.max_last_updated, + ), + ) + .outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) + .order_by(States.state_id.desc()) + ) + + +def _get_last_state_changes_multiple_stmt( + number_of_states: int, metadata_id: int +) -> Select: + return ( + _stmt_and_join_attributes(False, False, True) + .where( + States.state_id + == ( + select(States.state_id) + .filter(States.metadata_id == metadata_id) + .order_by(States.last_updated_ts.desc()) + .limit(number_of_states) + .subquery() + ).c.state_id + ) + .outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) + .order_by(States.state_id.desc()) + ) + + +def get_last_state_changes( + hass: HomeAssistant, number_of_states: int, entity_id: str +) -> dict[str, list[State]]: + """Return the last number_of_states.""" + entity_id_lower = entity_id.lower() + entity_ids = [entity_id_lower] + + # Calling this function with number_of_states > 1 can cause instability + # because it has to scan the table to find the last number_of_states states + # because the metadata_id_last_updated_ts index is in ascending order. + + with session_scope(hass=hass, read_only=True) as session: + instance = get_instance(hass) + if not ( + possible_metadata_id := instance.states_meta_manager.get( + entity_id, session, False + ) + ): + return {} + metadata_id = possible_metadata_id + entity_id_to_metadata_id: dict[str, int | None] = {entity_id_lower: metadata_id} + if number_of_states == 1: + stmt = lambda_stmt( + lambda: _get_last_state_changes_single_stmt(metadata_id), + ) + else: + stmt = lambda_stmt( + lambda: _get_last_state_changes_multiple_stmt( + number_of_states, metadata_id + ), + ) + states = list(execute_stmt_lambda_element(session, stmt, orm_rows=False)) + return cast( + dict[str, list[State]], + _sorted_states_to_dict( + reversed(states), + None, + entity_ids, + entity_id_to_metadata_id, + no_attributes=False, + ), + ) + + +def _get_start_time_state_for_entities_stmt_dependent_sub_query( + epoch_time: float, + metadata_ids: list[int], + no_attributes: bool, + include_last_changed: bool, +) -> Select: + """Baked query to get states for specific entities.""" + # Engine has a fast dependent subquery optimizer + # This query is the result of significant research in + # https://github.com/home-assistant/core/issues/132865 + # A reverse index scan with a limit 1 is the fastest way to get the + # last state change before a specific point in time for all supported + # databases. Since all databases support this query as a join + # condition we can use it as a subquery to get the last state change + # before a specific point in time for all entities. + stmt = ( + _stmt_and_join_attributes_for_start_state( + no_attributes=no_attributes, + include_last_changed=include_last_changed, + include_last_reported=False, + ) + .select_from(StatesMeta) + .join( + States, + and_( + States.last_updated_ts + == ( + select(States.last_updated_ts) + .where( + (StatesMeta.metadata_id == States.metadata_id) + & (States.last_updated_ts < epoch_time) + ) + .order_by(States.last_updated_ts.desc()) + .limit(1) + ) + .scalar_subquery() + .correlate(StatesMeta), + States.metadata_id == StatesMeta.metadata_id, + ), + ) + .where(StatesMeta.metadata_id.in_(metadata_ids)) + ) + if no_attributes: + return stmt + return stmt.outerjoin( + StateAttributes, (States.attributes_id == StateAttributes.attributes_id) + ) + + +def _get_start_time_state_for_entities_stmt_group_by( + epoch_time: float, + metadata_ids: list[int], + no_attributes: bool, + include_last_changed: bool, +) -> Select: + """Baked query to get states for specific entities.""" + # Simple group-by for MySQL, must use less + # than 1000 metadata_ids in the IN clause for MySQL + # or it will optimize poorly. Callers are responsible + # for ensuring that the number of metadata_ids is less + # than 1000. + most_recent_states_for_entities_by_date = ( + select( + States.metadata_id.label("max_metadata_id"), + func.max(States.last_updated_ts).label("max_last_updated"), + ) + .filter( + (States.last_updated_ts < epoch_time) & States.metadata_id.in_(metadata_ids) + ) + .group_by(States.metadata_id) + .subquery() + ) + stmt = ( + _stmt_and_join_attributes_for_start_state( + no_attributes=no_attributes, + include_last_changed=include_last_changed, + include_last_reported=False, + ) + .join( + most_recent_states_for_entities_by_date, + and_( + States.metadata_id + == most_recent_states_for_entities_by_date.c.max_metadata_id, + States.last_updated_ts + == most_recent_states_for_entities_by_date.c.max_last_updated, + ), + ) + .filter( + (States.last_updated_ts < epoch_time) & States.metadata_id.in_(metadata_ids) + ) + ) + if no_attributes: + return stmt + return stmt.outerjoin( + StateAttributes, (States.attributes_id == StateAttributes.attributes_id) + ) + + +def _get_oldest_possible_ts( + hass: HomeAssistant, utc_point_in_time: datetime +) -> float | None: + """Return the oldest possible timestamp. + + Returns None if there are no states as old as utc_point_in_time. + """ + + oldest_ts = get_instance(hass).states_manager.oldest_ts + if oldest_ts is not None and oldest_ts < utc_point_in_time.timestamp(): + return oldest_ts + return None + + +def _get_start_time_state_stmt( + epoch_time: float, + single_metadata_id: int | None, + metadata_ids: list[int], + no_attributes: bool, + include_last_changed: bool, + slow_dependent_subquery: bool, +) -> Select: + """Return the states at a specific point in time.""" + if single_metadata_id: + # Use an entirely different (and extremely fast) query if we only + # have a single entity id + return _get_single_entity_start_time_stmt( + epoch_time, + single_metadata_id, + no_attributes, + include_last_changed, + False, + ) + # We have more than one entity to look at so we need to do a query on states + # since the last recorder run started. + if slow_dependent_subquery: + return _get_start_time_state_for_entities_stmt_group_by( + epoch_time, + metadata_ids, + no_attributes, + include_last_changed, + ) + + return _get_start_time_state_for_entities_stmt_dependent_sub_query( + epoch_time, + metadata_ids, no_attributes, - descending, - limit, - include_start_time_state, + include_last_changed, + ) + + +def _get_single_entity_start_time_stmt( + epoch_time: float, + metadata_id: int, + no_attributes: bool, + include_last_changed: bool, + include_last_reported: bool, +) -> Select: + # Use an entirely different (and extremely fast) query if we only + # have a single entity id + stmt = ( + _stmt_and_join_attributes_for_start_state( + no_attributes, include_last_changed, include_last_reported + ) + .filter( + States.last_updated_ts < epoch_time, + States.metadata_id == metadata_id, + ) + .order_by(States.last_updated_ts.desc()) + .limit(1) ) + if no_attributes: + return stmt + return stmt.outerjoin( + StateAttributes, States.attributes_id == StateAttributes.attributes_id + ) + + +def _sorted_states_to_dict( + states: Iterable[Row], + start_time_ts: float | None, + entity_ids: list[str], + entity_id_to_metadata_id: dict[str, int | None], + minimal_response: bool = False, + compressed_state_format: bool = False, + descending: bool = False, + no_attributes: bool = False, +) -> dict[str, list[State | dict[str, Any]]]: + """Convert SQL results into JSON friendly data structure. + + This takes our state list and turns it into a JSON friendly data + structure {'entity_id': [list of states], 'entity_id2': [list of states]} + + States must be sorted by entity_id and last_updated + + We also need to go back and create a synthetic zero data point for + each list of states, otherwise our graphs won't start on the Y + axis correctly. + """ + field_map = _FIELD_MAP + state_class: Callable[ + [Row, dict[str, dict[str, Any]], float | None, str, str, float | None, bool], + State | dict[str, Any], + ] + if compressed_state_format: + state_class = row_to_compressed_state + attr_time = COMPRESSED_STATE_LAST_UPDATED + attr_state = COMPRESSED_STATE_STATE + else: + state_class = LazyState + attr_time = LAST_CHANGED_KEY + attr_state = STATE_KEY + + # Set all entity IDs to empty lists in result set to maintain the order + result: dict[str, list[State | dict[str, Any]]] = { + entity_id: [] for entity_id in entity_ids + } + metadata_id_to_entity_id: dict[int, str] = {} + metadata_id_to_entity_id = { + v: k for k, v in entity_id_to_metadata_id.items() if v is not None + } + # Get the states at the start time + if len(entity_ids) == 1: + metadata_id = entity_id_to_metadata_id[entity_ids[0]] + assert metadata_id is not None # should not be possible if we got here + states_iter: Iterable[tuple[int, Iterator[Row]]] = ( + (metadata_id, iter(states)), + ) + else: + key_func = itemgetter(field_map["metadata_id"]) + states_iter = groupby(states, key_func) + + state_idx = field_map["state"] + last_updated_ts_idx = field_map["last_updated_ts"] + + # Append all changes to it + for metadata_id, group in states_iter: + entity_id = metadata_id_to_entity_id[metadata_id] + attr_cache: dict[str, dict[str, Any]] = {} + ent_results = result[entity_id] + if ( + not minimal_response + or split_entity_id(entity_id)[0] in NEED_ATTRIBUTE_DOMAINS + ): + ent_results.extend( + [ + state_class( + db_state, + attr_cache, + start_time_ts, + entity_id, + db_state[state_idx], + db_state[last_updated_ts_idx], + False, + ) + for db_state in group + ] + ) + continue + + prev_state: str | None = None + # With minimal response we only provide a native + # State for the first and last response. All the states + # in-between only provide the "state" and the + # "last_changed". + if not ent_results: + if (first_state := next(group, None)) is None: + continue + prev_state = first_state[state_idx] + ent_results.append( + state_class( + first_state, + attr_cache, + start_time_ts, + entity_id, + prev_state, + first_state[last_updated_ts_idx], + no_attributes, + ) + ) + + # + # minimal_response only makes sense with last_updated == last_updated + # + # We use last_updated for for last_changed since its the same + # + # With minimal response we do not care about attribute + # changes so we can filter out duplicate states + if compressed_state_format: + # Compressed state format uses the timestamp directly + ent_results.extend( + [ + { + attr_state: (prev_state := state), + attr_time: row[last_updated_ts_idx], + } + for row in group + if (state := row[state_idx]) != prev_state + ] + ) + continue + + # Non-compressed state format returns an ISO formatted string + _utc_from_timestamp = dt_util.utc_from_timestamp + ent_results.extend( + [ + { + attr_state: (prev_state := state), + attr_time: _utc_from_timestamp( + row[last_updated_ts_idx] + ).isoformat(), + } + for row in group + if (state := row[state_idx]) != prev_state + ] + ) + + if descending: + for ent_results in result.values(): + ent_results.reverse() + + # Filter out the empty lists if some states had 0 results. + return {key: val for key, val in result.items() if val} diff --git a/homeassistant/components/recorder/history/modern.py b/homeassistant/components/recorder/history/modern.py deleted file mode 100644 index a636ed34ef4d8..0000000000000 --- a/homeassistant/components/recorder/history/modern.py +++ /dev/null @@ -1,925 +0,0 @@ -"""Provide pre-made queries on top of the recorder component.""" - -from __future__ import annotations - -from collections.abc import Callable, Iterable, Iterator -from datetime import datetime -from itertools import groupby -from operator import itemgetter -from typing import TYPE_CHECKING, Any, cast - -from sqlalchemy import ( - CompoundSelect, - Select, - StatementLambdaElement, - Subquery, - and_, - func, - lambda_stmt, - literal, - select, - union_all, -) -from sqlalchemy.engine.row import Row -from sqlalchemy.orm.session import Session - -from homeassistant.const import COMPRESSED_STATE_LAST_UPDATED, COMPRESSED_STATE_STATE -from homeassistant.core import HomeAssistant, State, split_entity_id -from homeassistant.helpers.recorder import get_instance -from homeassistant.util import dt as dt_util -from homeassistant.util.collection import chunked_or_all - -from ..const import MAX_IDS_FOR_INDEXED_GROUP_BY -from ..db_schema import ( - SHARED_ATTR_OR_LEGACY_ATTRIBUTES, - StateAttributes, - States, - StatesMeta, -) -from ..filters import Filters -from ..models import ( - LazyState, - datetime_to_timestamp_or_none, - extract_metadata_ids, - row_to_compressed_state, -) -from ..util import execute_stmt_lambda_element, session_scope -from .const import ( - LAST_CHANGED_KEY, - NEED_ATTRIBUTE_DOMAINS, - SIGNIFICANT_DOMAINS, - STATE_KEY, -) - -_FIELD_MAP = { - "metadata_id": 0, - "state": 1, - "last_updated_ts": 2, -} - - -def _stmt_and_join_attributes( - no_attributes: bool, - include_last_changed: bool, - include_last_reported: bool, -) -> Select: - """Return the statement and if StateAttributes should be joined.""" - _select = select(States.metadata_id, States.state, States.last_updated_ts) - if include_last_changed: - _select = _select.add_columns(States.last_changed_ts) - if include_last_reported: - _select = _select.add_columns(States.last_reported_ts) - if not no_attributes: - _select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES) - return _select - - -def _stmt_and_join_attributes_for_start_state( - no_attributes: bool, - include_last_changed: bool, - include_last_reported: bool, -) -> Select: - """Return the statement and if StateAttributes should be joined.""" - _select = select(States.metadata_id, States.state) - _select = _select.add_columns(literal(value=0).label("last_updated_ts")) - if include_last_changed: - _select = _select.add_columns(literal(value=0).label("last_changed_ts")) - if include_last_reported: - _select = _select.add_columns(literal(value=0).label("last_reported_ts")) - if not no_attributes: - _select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES) - return _select - - -def _select_from_subquery( - subquery: Subquery | CompoundSelect, - no_attributes: bool, - include_last_changed: bool, - include_last_reported: bool, -) -> Select: - """Return the statement to select from the union.""" - base_select = select( - subquery.c.metadata_id, - subquery.c.state, - subquery.c.last_updated_ts, - ) - if include_last_changed: - base_select = base_select.add_columns(subquery.c.last_changed_ts) - if include_last_reported: - base_select = base_select.add_columns(subquery.c.last_reported_ts) - if no_attributes: - return base_select - return base_select.add_columns(subquery.c.attributes) - - -def get_significant_states( - hass: HomeAssistant, - start_time: datetime, - end_time: datetime | None = None, - entity_ids: list[str] | None = None, - filters: Filters | None = None, - include_start_time_state: bool = True, - significant_changes_only: bool = True, - minimal_response: bool = False, - no_attributes: bool = False, - compressed_state_format: bool = False, -) -> dict[str, list[State | dict[str, Any]]]: - """Wrap get_significant_states_with_session with an sql session.""" - with session_scope(hass=hass, read_only=True) as session: - return get_significant_states_with_session( - hass, - session, - start_time, - end_time, - entity_ids, - filters, - include_start_time_state, - significant_changes_only, - minimal_response, - no_attributes, - compressed_state_format, - ) - - -def _significant_states_stmt( - start_time_ts: float, - end_time_ts: float | None, - single_metadata_id: int | None, - metadata_ids: list[int], - metadata_ids_in_significant_domains: list[int], - significant_changes_only: bool, - no_attributes: bool, - include_start_time_state: bool, - run_start_ts: float | None, - slow_dependent_subquery: bool, -) -> Select | CompoundSelect: - """Query the database for significant state changes.""" - include_last_changed = not significant_changes_only - stmt = _stmt_and_join_attributes(no_attributes, include_last_changed, False) - if significant_changes_only: - # Since we are filtering on entity_id (metadata_id) we can avoid - # the join of the states_meta table since we already know which - # metadata_ids are in the significant domains. - if metadata_ids_in_significant_domains: - stmt = stmt.filter( - States.metadata_id.in_(metadata_ids_in_significant_domains) - | (States.last_changed_ts == States.last_updated_ts) - | States.last_changed_ts.is_(None) - ) - else: - stmt = stmt.filter( - (States.last_changed_ts == States.last_updated_ts) - | States.last_changed_ts.is_(None) - ) - stmt = stmt.filter(States.metadata_id.in_(metadata_ids)).filter( - States.last_updated_ts > start_time_ts - ) - if end_time_ts: - stmt = stmt.filter(States.last_updated_ts < end_time_ts) - if not no_attributes: - stmt = stmt.outerjoin( - StateAttributes, States.attributes_id == StateAttributes.attributes_id - ) - if not include_start_time_state or not run_start_ts: - return stmt.order_by(States.metadata_id, States.last_updated_ts) - unioned_subquery = union_all( - _select_from_subquery( - _get_start_time_state_stmt( - start_time_ts, - single_metadata_id, - metadata_ids, - no_attributes, - include_last_changed, - slow_dependent_subquery, - ).subquery(), - no_attributes, - include_last_changed, - False, - ), - _select_from_subquery( - stmt.subquery(), no_attributes, include_last_changed, False - ), - ).subquery() - return _select_from_subquery( - unioned_subquery, - no_attributes, - include_last_changed, - False, - ).order_by(unioned_subquery.c.metadata_id, unioned_subquery.c.last_updated_ts) - - -def get_significant_states_with_session( - hass: HomeAssistant, - session: Session, - start_time: datetime, - end_time: datetime | None = None, - entity_ids: list[str] | None = None, - filters: Filters | None = None, - include_start_time_state: bool = True, - significant_changes_only: bool = True, - minimal_response: bool = False, - no_attributes: bool = False, - compressed_state_format: bool = False, -) -> dict[str, list[State | dict[str, Any]]]: - """Return states changes during UTC period start_time - end_time. - - entity_ids is an optional iterable of entities to include in the results. - - filters is an optional SQLAlchemy filter which will be applied to the database - queries unless entity_ids is given, in which case its ignored. - - Significant states are all states where there is a state change, - as well as all states from certain domains (for instance - thermostat so that we get current temperature in our graphs). - """ - if filters is not None: - raise NotImplementedError("Filters are no longer supported") - if not entity_ids: - raise ValueError("entity_ids must be provided") - entity_id_to_metadata_id: dict[str, int | None] | None = None - metadata_ids_in_significant_domains: list[int] = [] - instance = get_instance(hass) - if not ( - entity_id_to_metadata_id := instance.states_meta_manager.get_many( - entity_ids, session, False - ) - ) or not (possible_metadata_ids := extract_metadata_ids(entity_id_to_metadata_id)): - return {} - metadata_ids = possible_metadata_ids - if significant_changes_only: - metadata_ids_in_significant_domains = [ - metadata_id - for entity_id, metadata_id in entity_id_to_metadata_id.items() - if metadata_id is not None - and split_entity_id(entity_id)[0] in SIGNIFICANT_DOMAINS - ] - oldest_ts: float | None = None - if include_start_time_state and not ( - oldest_ts := _get_oldest_possible_ts(hass, start_time) - ): - include_start_time_state = False - start_time_ts = start_time.timestamp() - end_time_ts = datetime_to_timestamp_or_none(end_time) - single_metadata_id = metadata_ids[0] if len(metadata_ids) == 1 else None - rows: list[Row] = [] - if TYPE_CHECKING: - assert instance.database_engine is not None - slow_dependent_subquery = instance.database_engine.optimizer.slow_dependent_subquery - if include_start_time_state and slow_dependent_subquery: - # https://github.com/home-assistant/core/issues/137178 - # If we include the start time state we need to limit the - # number of metadata_ids we query for at a time to avoid - # hitting limits in the MySQL optimizer that prevent - # the start time state query from using an index-only optimization - # to find the start time state. - iter_metadata_ids = chunked_or_all(metadata_ids, MAX_IDS_FOR_INDEXED_GROUP_BY) - else: - iter_metadata_ids = (metadata_ids,) - for metadata_ids_chunk in iter_metadata_ids: - stmt = _generate_significant_states_with_session_stmt( - start_time_ts, - end_time_ts, - single_metadata_id, - metadata_ids_chunk, - metadata_ids_in_significant_domains, - significant_changes_only, - no_attributes, - include_start_time_state, - oldest_ts, - slow_dependent_subquery, - ) - row_chunk = cast( - list[Row], - execute_stmt_lambda_element(session, stmt, None, end_time, orm_rows=False), - ) - if rows: - rows += row_chunk - else: - # If we have no rows yet, we can just assign the chunk - # as this is the common case since its rare that - # we exceed the MAX_IDS_FOR_INDEXED_GROUP_BY limit - rows = row_chunk - return _sorted_states_to_dict( - rows, - start_time_ts if include_start_time_state else None, - entity_ids, - entity_id_to_metadata_id, - minimal_response, - compressed_state_format, - no_attributes=no_attributes, - ) - - -def _generate_significant_states_with_session_stmt( - start_time_ts: float, - end_time_ts: float | None, - single_metadata_id: int | None, - metadata_ids: list[int], - metadata_ids_in_significant_domains: list[int], - significant_changes_only: bool, - no_attributes: bool, - include_start_time_state: bool, - oldest_ts: float | None, - slow_dependent_subquery: bool, -) -> StatementLambdaElement: - return lambda_stmt( - lambda: _significant_states_stmt( - start_time_ts, - end_time_ts, - single_metadata_id, - metadata_ids, - metadata_ids_in_significant_domains, - significant_changes_only, - no_attributes, - include_start_time_state, - oldest_ts, - slow_dependent_subquery, - ), - track_on=[ - bool(single_metadata_id), - bool(metadata_ids_in_significant_domains), - bool(end_time_ts), - significant_changes_only, - no_attributes, - include_start_time_state, - slow_dependent_subquery, - ], - ) - - -def get_full_significant_states_with_session( - hass: HomeAssistant, - session: Session, - start_time: datetime, - end_time: datetime | None = None, - entity_ids: list[str] | None = None, - filters: Filters | None = None, - include_start_time_state: bool = True, - significant_changes_only: bool = True, - no_attributes: bool = False, -) -> dict[str, list[State]]: - """Variant of get_significant_states_with_session. - - Difference with get_significant_states_with_session is that it does not - return minimal responses. - """ - return cast( - dict[str, list[State]], - get_significant_states_with_session( - hass=hass, - session=session, - start_time=start_time, - end_time=end_time, - entity_ids=entity_ids, - filters=filters, - include_start_time_state=include_start_time_state, - significant_changes_only=significant_changes_only, - minimal_response=False, - no_attributes=no_attributes, - ), - ) - - -def _state_changed_during_period_stmt( - start_time_ts: float, - end_time_ts: float | None, - single_metadata_id: int, - no_attributes: bool, - limit: int | None, - include_start_time_state: bool, - run_start_ts: float | None, -) -> Select | CompoundSelect: - stmt = ( - _stmt_and_join_attributes(no_attributes, False, True) - .filter( - ( - (States.last_changed_ts == States.last_updated_ts) - | States.last_changed_ts.is_(None) - ) - & (States.last_updated_ts > start_time_ts) - ) - .filter(States.metadata_id == single_metadata_id) - ) - if end_time_ts: - stmt = stmt.filter(States.last_updated_ts < end_time_ts) - if not no_attributes: - stmt = stmt.outerjoin( - StateAttributes, States.attributes_id == StateAttributes.attributes_id - ) - if limit: - stmt = stmt.limit(limit) - stmt = stmt.order_by(States.metadata_id, States.last_updated_ts) - if not include_start_time_state or not run_start_ts: - # If we do not need the start time state or the - # oldest possible timestamp is newer than the start time - # we can return the statement as is as there will - # never be a start time state. - return stmt - return _select_from_subquery( - union_all( - _select_from_subquery( - _get_single_entity_start_time_stmt( - start_time_ts, - single_metadata_id, - no_attributes, - False, - True, - ).subquery(), - no_attributes, - False, - True, - ), - _select_from_subquery( - stmt.subquery(), - no_attributes, - False, - True, - ), - ).subquery(), - no_attributes, - False, - True, - ) - - -def state_changes_during_period( - hass: HomeAssistant, - start_time: datetime, - end_time: datetime | None = None, - entity_id: str | None = None, - no_attributes: bool = False, - descending: bool = False, - limit: int | None = None, - include_start_time_state: bool = True, -) -> dict[str, list[State]]: - """Return states changes during UTC period start_time - end_time.""" - if not entity_id: - raise ValueError("entity_id must be provided") - entity_ids = [entity_id.lower()] - - with session_scope(hass=hass, read_only=True) as session: - instance = get_instance(hass) - if not ( - possible_metadata_id := instance.states_meta_manager.get( - entity_id, session, False - ) - ): - return {} - single_metadata_id = possible_metadata_id - entity_id_to_metadata_id: dict[str, int | None] = { - entity_id: single_metadata_id - } - oldest_ts: float | None = None - if include_start_time_state and not ( - oldest_ts := _get_oldest_possible_ts(hass, start_time) - ): - include_start_time_state = False - start_time_ts = start_time.timestamp() - end_time_ts = datetime_to_timestamp_or_none(end_time) - stmt = lambda_stmt( - lambda: _state_changed_during_period_stmt( - start_time_ts, - end_time_ts, - single_metadata_id, - no_attributes, - limit, - include_start_time_state, - oldest_ts, - ), - track_on=[ - bool(end_time_ts), - no_attributes, - bool(limit), - include_start_time_state, - ], - ) - return cast( - dict[str, list[State]], - _sorted_states_to_dict( - execute_stmt_lambda_element( - session, stmt, None, end_time, orm_rows=False - ), - start_time_ts if include_start_time_state else None, - entity_ids, - entity_id_to_metadata_id, - descending=descending, - no_attributes=no_attributes, - ), - ) - - -def _get_last_state_changes_single_stmt(metadata_id: int) -> Select: - return ( - _stmt_and_join_attributes(False, False, False) - .join( - ( - lastest_state_for_metadata_id := ( - select( - States.metadata_id.label("max_metadata_id"), - func.max(States.last_updated_ts).label("max_last_updated"), - ) - .filter(States.metadata_id == metadata_id) - .group_by(States.metadata_id) - .subquery() - ) - ), - and_( - States.metadata_id == lastest_state_for_metadata_id.c.max_metadata_id, - States.last_updated_ts - == lastest_state_for_metadata_id.c.max_last_updated, - ), - ) - .outerjoin( - StateAttributes, States.attributes_id == StateAttributes.attributes_id - ) - .order_by(States.state_id.desc()) - ) - - -def _get_last_state_changes_multiple_stmt( - number_of_states: int, metadata_id: int -) -> Select: - return ( - _stmt_and_join_attributes(False, False, True) - .where( - States.state_id - == ( - select(States.state_id) - .filter(States.metadata_id == metadata_id) - .order_by(States.last_updated_ts.desc()) - .limit(number_of_states) - .subquery() - ).c.state_id - ) - .outerjoin( - StateAttributes, States.attributes_id == StateAttributes.attributes_id - ) - .order_by(States.state_id.desc()) - ) - - -def get_last_state_changes( - hass: HomeAssistant, number_of_states: int, entity_id: str -) -> dict[str, list[State]]: - """Return the last number_of_states.""" - entity_id_lower = entity_id.lower() - entity_ids = [entity_id_lower] - - # Calling this function with number_of_states > 1 can cause instability - # because it has to scan the table to find the last number_of_states states - # because the metadata_id_last_updated_ts index is in ascending order. - - with session_scope(hass=hass, read_only=True) as session: - instance = get_instance(hass) - if not ( - possible_metadata_id := instance.states_meta_manager.get( - entity_id, session, False - ) - ): - return {} - metadata_id = possible_metadata_id - entity_id_to_metadata_id: dict[str, int | None] = {entity_id_lower: metadata_id} - if number_of_states == 1: - stmt = lambda_stmt( - lambda: _get_last_state_changes_single_stmt(metadata_id), - ) - else: - stmt = lambda_stmt( - lambda: _get_last_state_changes_multiple_stmt( - number_of_states, metadata_id - ), - ) - states = list(execute_stmt_lambda_element(session, stmt, orm_rows=False)) - return cast( - dict[str, list[State]], - _sorted_states_to_dict( - reversed(states), - None, - entity_ids, - entity_id_to_metadata_id, - no_attributes=False, - ), - ) - - -def _get_start_time_state_for_entities_stmt_dependent_sub_query( - epoch_time: float, - metadata_ids: list[int], - no_attributes: bool, - include_last_changed: bool, -) -> Select: - """Baked query to get states for specific entities.""" - # Engine has a fast dependent subquery optimizer - # This query is the result of significant research in - # https://github.com/home-assistant/core/issues/132865 - # A reverse index scan with a limit 1 is the fastest way to get the - # last state change before a specific point in time for all supported - # databases. Since all databases support this query as a join - # condition we can use it as a subquery to get the last state change - # before a specific point in time for all entities. - stmt = ( - _stmt_and_join_attributes_for_start_state( - no_attributes=no_attributes, - include_last_changed=include_last_changed, - include_last_reported=False, - ) - .select_from(StatesMeta) - .join( - States, - and_( - States.last_updated_ts - == ( - select(States.last_updated_ts) - .where( - (StatesMeta.metadata_id == States.metadata_id) - & (States.last_updated_ts < epoch_time) - ) - .order_by(States.last_updated_ts.desc()) - .limit(1) - ) - .scalar_subquery() - .correlate(StatesMeta), - States.metadata_id == StatesMeta.metadata_id, - ), - ) - .where(StatesMeta.metadata_id.in_(metadata_ids)) - ) - if no_attributes: - return stmt - return stmt.outerjoin( - StateAttributes, (States.attributes_id == StateAttributes.attributes_id) - ) - - -def _get_start_time_state_for_entities_stmt_group_by( - epoch_time: float, - metadata_ids: list[int], - no_attributes: bool, - include_last_changed: bool, -) -> Select: - """Baked query to get states for specific entities.""" - # Simple group-by for MySQL, must use less - # than 1000 metadata_ids in the IN clause for MySQL - # or it will optimize poorly. Callers are responsible - # for ensuring that the number of metadata_ids is less - # than 1000. - most_recent_states_for_entities_by_date = ( - select( - States.metadata_id.label("max_metadata_id"), - func.max(States.last_updated_ts).label("max_last_updated"), - ) - .filter( - (States.last_updated_ts < epoch_time) & States.metadata_id.in_(metadata_ids) - ) - .group_by(States.metadata_id) - .subquery() - ) - stmt = ( - _stmt_and_join_attributes_for_start_state( - no_attributes=no_attributes, - include_last_changed=include_last_changed, - include_last_reported=False, - ) - .join( - most_recent_states_for_entities_by_date, - and_( - States.metadata_id - == most_recent_states_for_entities_by_date.c.max_metadata_id, - States.last_updated_ts - == most_recent_states_for_entities_by_date.c.max_last_updated, - ), - ) - .filter( - (States.last_updated_ts < epoch_time) & States.metadata_id.in_(metadata_ids) - ) - ) - if no_attributes: - return stmt - return stmt.outerjoin( - StateAttributes, (States.attributes_id == StateAttributes.attributes_id) - ) - - -def _get_oldest_possible_ts( - hass: HomeAssistant, utc_point_in_time: datetime -) -> float | None: - """Return the oldest possible timestamp. - - Returns None if there are no states as old as utc_point_in_time. - """ - - oldest_ts = get_instance(hass).states_manager.oldest_ts - if oldest_ts is not None and oldest_ts < utc_point_in_time.timestamp(): - return oldest_ts - return None - - -def _get_start_time_state_stmt( - epoch_time: float, - single_metadata_id: int | None, - metadata_ids: list[int], - no_attributes: bool, - include_last_changed: bool, - slow_dependent_subquery: bool, -) -> Select: - """Return the states at a specific point in time.""" - if single_metadata_id: - # Use an entirely different (and extremely fast) query if we only - # have a single entity id - return _get_single_entity_start_time_stmt( - epoch_time, - single_metadata_id, - no_attributes, - include_last_changed, - False, - ) - # We have more than one entity to look at so we need to do a query on states - # since the last recorder run started. - if slow_dependent_subquery: - return _get_start_time_state_for_entities_stmt_group_by( - epoch_time, - metadata_ids, - no_attributes, - include_last_changed, - ) - - return _get_start_time_state_for_entities_stmt_dependent_sub_query( - epoch_time, - metadata_ids, - no_attributes, - include_last_changed, - ) - - -def _get_single_entity_start_time_stmt( - epoch_time: float, - metadata_id: int, - no_attributes: bool, - include_last_changed: bool, - include_last_reported: bool, -) -> Select: - # Use an entirely different (and extremely fast) query if we only - # have a single entity id - stmt = ( - _stmt_and_join_attributes_for_start_state( - no_attributes, include_last_changed, include_last_reported - ) - .filter( - States.last_updated_ts < epoch_time, - States.metadata_id == metadata_id, - ) - .order_by(States.last_updated_ts.desc()) - .limit(1) - ) - if no_attributes: - return stmt - return stmt.outerjoin( - StateAttributes, States.attributes_id == StateAttributes.attributes_id - ) - - -def _sorted_states_to_dict( - states: Iterable[Row], - start_time_ts: float | None, - entity_ids: list[str], - entity_id_to_metadata_id: dict[str, int | None], - minimal_response: bool = False, - compressed_state_format: bool = False, - descending: bool = False, - no_attributes: bool = False, -) -> dict[str, list[State | dict[str, Any]]]: - """Convert SQL results into JSON friendly data structure. - - This takes our state list and turns it into a JSON friendly data - structure {'entity_id': [list of states], 'entity_id2': [list of states]} - - States must be sorted by entity_id and last_updated - - We also need to go back and create a synthetic zero data point for - each list of states, otherwise our graphs won't start on the Y - axis correctly. - """ - field_map = _FIELD_MAP - state_class: Callable[ - [Row, dict[str, dict[str, Any]], float | None, str, str, float | None, bool], - State | dict[str, Any], - ] - if compressed_state_format: - state_class = row_to_compressed_state - attr_time = COMPRESSED_STATE_LAST_UPDATED - attr_state = COMPRESSED_STATE_STATE - else: - state_class = LazyState - attr_time = LAST_CHANGED_KEY - attr_state = STATE_KEY - - # Set all entity IDs to empty lists in result set to maintain the order - result: dict[str, list[State | dict[str, Any]]] = { - entity_id: [] for entity_id in entity_ids - } - metadata_id_to_entity_id: dict[int, str] = {} - metadata_id_to_entity_id = { - v: k for k, v in entity_id_to_metadata_id.items() if v is not None - } - # Get the states at the start time - if len(entity_ids) == 1: - metadata_id = entity_id_to_metadata_id[entity_ids[0]] - assert metadata_id is not None # should not be possible if we got here - states_iter: Iterable[tuple[int, Iterator[Row]]] = ( - (metadata_id, iter(states)), - ) - else: - key_func = itemgetter(field_map["metadata_id"]) - states_iter = groupby(states, key_func) - - state_idx = field_map["state"] - last_updated_ts_idx = field_map["last_updated_ts"] - - # Append all changes to it - for metadata_id, group in states_iter: - entity_id = metadata_id_to_entity_id[metadata_id] - attr_cache: dict[str, dict[str, Any]] = {} - ent_results = result[entity_id] - if ( - not minimal_response - or split_entity_id(entity_id)[0] in NEED_ATTRIBUTE_DOMAINS - ): - ent_results.extend( - [ - state_class( - db_state, - attr_cache, - start_time_ts, - entity_id, - db_state[state_idx], - db_state[last_updated_ts_idx], - False, - ) - for db_state in group - ] - ) - continue - - prev_state: str | None = None - # With minimal response we only provide a native - # State for the first and last response. All the states - # in-between only provide the "state" and the - # "last_changed". - if not ent_results: - if (first_state := next(group, None)) is None: - continue - prev_state = first_state[state_idx] - ent_results.append( - state_class( - first_state, - attr_cache, - start_time_ts, - entity_id, - prev_state, - first_state[last_updated_ts_idx], - no_attributes, - ) - ) - - # - # minimal_response only makes sense with last_updated == last_updated - # - # We use last_updated for for last_changed since its the same - # - # With minimal response we do not care about attribute - # changes so we can filter out duplicate states - if compressed_state_format: - # Compressed state format uses the timestamp directly - ent_results.extend( - [ - { - attr_state: (prev_state := state), - attr_time: row[last_updated_ts_idx], - } - for row in group - if (state := row[state_idx]) != prev_state - ] - ) - continue - - # Non-compressed state format returns an ISO formatted string - _utc_from_timestamp = dt_util.utc_from_timestamp - ent_results.extend( - [ - { - attr_state: (prev_state := state), - attr_time: _utc_from_timestamp( - row[last_updated_ts_idx] - ).isoformat(), - } - for row in group - if (state := row[state_idx]) != prev_state - ] - ) - - if descending: - for ent_results in result.values(): - ent_results.reverse() - - # Filter out the empty lists if some states had 0 results. - return {key: val for key, val in result.items() if val} diff --git a/tests/components/recorder/test_history.py b/tests/components/recorder/test_history.py index 645d9cede8478..e3a7f2a0d3004 100644 --- a/tests/components/recorder/test_history.py +++ b/tests/components/recorder/test_history.py @@ -51,7 +51,7 @@ def multiple_start_time_chunk_sizes( to call _generate_significant_states_with_session_stmt multiple times. """ with patch( - "homeassistant.components.recorder.history.modern.MAX_IDS_FOR_INDEXED_GROUP_BY", + "homeassistant.components.recorder.history.MAX_IDS_FOR_INDEXED_GROUP_BY", ids_for_start_time_chunk_sizes, ): yield diff --git a/tests/components/recorder/test_util.py b/tests/components/recorder/test_util.py index b60db68d7138c..7f6ec871fa568 100644 --- a/tests/components/recorder/test_util.py +++ b/tests/components/recorder/test_util.py @@ -25,9 +25,7 @@ SupportedDialect, ) from homeassistant.components.recorder.db_schema import RecorderRuns -from homeassistant.components.recorder.history.modern import ( - _get_single_entity_start_time_stmt, -) +from homeassistant.components.recorder.history import _get_single_entity_start_time_stmt from homeassistant.components.recorder.models import ( UnsupportedDialect, process_timestamp,