diff --git a/homeassistant/components/recorder/const.py b/homeassistant/components/recorder/const.py index 7389cbf8ddffcf..dbfa1a2ff7358a 100644 --- a/homeassistant/components/recorder/const.py +++ b/homeassistant/components/recorder/const.py @@ -30,6 +30,12 @@ # have upgraded their sqlite version SQLITE_MAX_BIND_VARS = 998 +# The maximum bind vars for sqlite 3.32.0 and above, but +# capped at 4000 to avoid performance issues +SQLITE_MODERN_MAX_BIND_VARS = 4000 + +DEFAULT_MAX_BIND_VARS = 4000 + DB_WORKER_PREFIX = "DbWorker" ALL_DOMAIN_EXCLUDE_ATTRS = {ATTR_ATTRIBUTION, ATTR_RESTORED, ATTR_SUPPORTED_FEATURES} diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 0e926ad2a2216d..a8746a0a807f63 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -55,6 +55,7 @@ MYSQLDB_PYMYSQL_URL_PREFIX, MYSQLDB_URL_PREFIX, QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY, + SQLITE_MAX_BIND_VARS, SQLITE_URL_PREFIX, STATES_META_SCHEMA_VERSION, STATISTICS_ROWS_SCHEMA_VERSION, @@ -242,6 +243,13 @@ def __init__( self._dialect_name: SupportedDialect | None = None self.enabled = True + # For safety we default to the lowest value for max_bind_vars + # of all the DB types (SQLITE_MAX_BIND_VARS). + # + # We update the value once we connect to the DB + # and determine what is actually supported. + self.max_bind_vars = SQLITE_MAX_BIND_VARS + @property def backlog(self) -> int: """Return the number of items in the recorder backlog.""" @@ -1351,6 +1359,7 @@ def _setup_recorder_connection( not self._completed_first_database_setup, ): self.database_engine = database_engine + self.max_bind_vars = database_engine.max_bind_vars self._completed_first_database_setup = True def _setup_connection(self) -> None: diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index f07e91ddaead06..7655002b45ffe1 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -1366,7 +1366,9 @@ def migrate_states_context_ids(instance: Recorder) -> bool: session_maker = instance.get_session _LOGGER.debug("Migrating states context_ids to binary format") with session_scope(session=session_maker()) as session: - if states := session.execute(find_states_context_ids_to_migrate()).all(): + if states := session.execute( + find_states_context_ids_to_migrate(instance.max_bind_vars) + ).all(): session.execute( update(States), [ @@ -1401,7 +1403,9 @@ def migrate_events_context_ids(instance: Recorder) -> bool: session_maker = instance.get_session _LOGGER.debug("Migrating context_ids to binary format") with session_scope(session=session_maker()) as session: - if events := session.execute(find_events_context_ids_to_migrate()).all(): + if events := session.execute( + find_events_context_ids_to_migrate(instance.max_bind_vars) + ).all(): session.execute( update(Events), [ @@ -1436,7 +1440,9 @@ def migrate_event_type_ids(instance: Recorder) -> bool: _LOGGER.debug("Migrating event_types") event_type_manager = instance.event_type_manager with session_scope(session=session_maker()) as session: - if events := session.execute(find_event_type_to_migrate()).all(): + if events := session.execute( + find_event_type_to_migrate(instance.max_bind_vars) + ).all(): event_types = {event_type for _, event_type in events} if None in event_types: # event_type should never be None but we need to be defensive @@ -1505,7 +1511,9 @@ def migrate_entity_ids(instance: Recorder) -> bool: _LOGGER.debug("Migrating entity_ids") states_meta_manager = instance.states_meta_manager with session_scope(session=instance.get_session()) as session: - if states := session.execute(find_entity_ids_to_migrate()).all(): + if states := session.execute( + find_entity_ids_to_migrate(instance.max_bind_vars) + ).all(): entity_ids = {entity_id for _, entity_id in states} if None in entity_ids: # entity_id should never be None but we need to be defensive diff --git a/homeassistant/components/recorder/models/database.py b/homeassistant/components/recorder/models/database.py index e39f05cd9c5a50..a8c23d2006199d 100644 --- a/homeassistant/components/recorder/models/database.py +++ b/homeassistant/components/recorder/models/database.py @@ -18,6 +18,7 @@ class DatabaseEngine: dialect: SupportedDialect optimizer: DatabaseOptimizer + max_bind_vars: int version: AwesomeVersion | None diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 9dff59d1f593b5..8bc6584c5a1b97 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -12,7 +12,6 @@ import homeassistant.util.dt as dt_util -from .const import SQLITE_MAX_BIND_VARS from .db_schema import Events, States, StatesMeta from .models import DatabaseEngine from .queries import ( @@ -72,7 +71,7 @@ def purge_old_data( purge_before.isoformat(sep=" ", timespec="seconds"), ) with session_scope(session=instance.get_session()) as session: - # Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states or events record + # Purge a max of max_bind_vars, based on the oldest states or events record has_more_to_purge = False if instance.use_legacy_events_index and _purging_legacy_format(session): _LOGGER.debug( @@ -93,9 +92,11 @@ def purge_old_data( instance, session, events_batch_size, purge_before ) - statistics_runs = _select_statistics_runs_to_purge(session, purge_before) + statistics_runs = _select_statistics_runs_to_purge( + session, purge_before, instance.max_bind_vars + ) short_term_statistics = _select_short_term_statistics_to_purge( - session, purge_before + session, purge_before, instance.max_bind_vars ) if statistics_runs: _purge_statistics_runs(session, statistics_runs) @@ -141,7 +142,7 @@ def _purge_legacy_format( attributes_ids, data_ids, ) = _select_legacy_event_state_and_attributes_and_data_ids_to_purge( - session, purge_before + session, purge_before, instance.max_bind_vars ) _purge_state_ids(instance, session, state_ids) _purge_unused_attributes_ids(instance, session, attributes_ids) @@ -157,7 +158,7 @@ def _purge_legacy_format( detached_state_ids, detached_attributes_ids, ) = _select_legacy_detached_state_and_attributes_and_data_ids_to_purge( - session, purge_before + session, purge_before, instance.max_bind_vars ) _purge_state_ids(instance, session, detached_state_ids) _purge_unused_attributes_ids(instance, session, detached_attributes_ids) @@ -187,11 +188,12 @@ def _purge_states_and_attributes_ids( # There are more states relative to attributes_ids so # we purge enough state_ids to try to generate a full # size batch of attributes_ids that will be around the size - # SQLITE_MAX_BIND_VARS + # max_bind_vars attributes_ids_batch: set[int] = set() + max_bind_vars = instance.max_bind_vars for _ in range(states_batch_size): state_ids, attributes_ids = _select_state_attributes_ids_to_purge( - session, purge_before + session, purge_before, max_bind_vars ) if not state_ids: has_remaining_state_ids_to_purge = False @@ -221,10 +223,13 @@ def _purge_events_and_data_ids( # There are more events relative to data_ids so # we purge enough event_ids to try to generate a full # size batch of data_ids that will be around the size - # SQLITE_MAX_BIND_VARS + # max_bind_vars data_ids_batch: set[int] = set() + max_bind_vars = instance.max_bind_vars for _ in range(events_batch_size): - event_ids, data_ids = _select_event_data_ids_to_purge(session, purge_before) + event_ids, data_ids = _select_event_data_ids_to_purge( + session, purge_before, max_bind_vars + ) if not event_ids: has_remaining_event_ids_to_purge = False break @@ -240,13 +245,13 @@ def _purge_events_and_data_ids( def _select_state_attributes_ids_to_purge( - session: Session, purge_before: datetime + session: Session, purge_before: datetime, max_bind_vars: int ) -> tuple[set[int], set[int]]: """Return sets of state and attribute ids to purge.""" state_ids = set() attributes_ids = set() for state_id, attributes_id in session.execute( - find_states_to_purge(dt_util.utc_to_timestamp(purge_before)) + find_states_to_purge(dt_util.utc_to_timestamp(purge_before), max_bind_vars) ).all(): state_ids.add(state_id) if attributes_id: @@ -260,13 +265,13 @@ def _select_state_attributes_ids_to_purge( def _select_event_data_ids_to_purge( - session: Session, purge_before: datetime + session: Session, purge_before: datetime, max_bind_vars: int ) -> tuple[set[int], set[int]]: """Return sets of event and data ids to purge.""" event_ids = set() data_ids = set() for event_id, data_id in session.execute( - find_events_to_purge(dt_util.utc_to_timestamp(purge_before)) + find_events_to_purge(dt_util.utc_to_timestamp(purge_before), max_bind_vars) ).all(): event_ids.add(event_id) if data_id: @@ -323,7 +328,7 @@ def _select_unused_attributes_ids( # # We used to generate a query based on how many attribute_ids to find but # that meant sqlalchemy Transparent SQL Compilation Caching was working against - # us by cached up to SQLITE_MAX_BIND_VARS different statements which could be + # us by cached up to max_bind_vars different statements which could be # up to 500MB for large database due to the complexity of the ORM objects. # # We now break the query into groups of 100 and use a lambda_stmt to ensure @@ -405,13 +410,15 @@ def _purge_unused_data_ids( def _select_statistics_runs_to_purge( - session: Session, purge_before: datetime + session: Session, purge_before: datetime, max_bind_vars: int ) -> list[int]: """Return a list of statistic runs to purge. Takes care to keep the newest run. """ - statistic_runs = session.execute(find_statistics_runs_to_purge(purge_before)).all() + statistic_runs = session.execute( + find_statistics_runs_to_purge(purge_before, max_bind_vars) + ).all() statistic_runs_list = [run_id for (run_id,) in statistic_runs] # Exclude the newest statistics run if ( @@ -424,18 +431,18 @@ def _select_statistics_runs_to_purge( def _select_short_term_statistics_to_purge( - session: Session, purge_before: datetime + session: Session, purge_before: datetime, max_bind_vars: int ) -> list[int]: """Return a list of short term statistics to purge.""" statistics = session.execute( - find_short_term_statistics_to_purge(purge_before) + find_short_term_statistics_to_purge(purge_before, max_bind_vars) ).all() _LOGGER.debug("Selected %s short term statistics to remove", len(statistics)) return [statistic_id for (statistic_id,) in statistics] def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge( - session: Session, purge_before: datetime + session: Session, purge_before: datetime, max_bind_vars: int ) -> tuple[set[int], set[int]]: """Return a list of state, and attribute ids to purge. @@ -445,7 +452,7 @@ def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge( """ states = session.execute( find_legacy_detached_states_and_attributes_to_purge( - dt_util.utc_to_timestamp(purge_before) + dt_util.utc_to_timestamp(purge_before), max_bind_vars ) ).all() _LOGGER.debug("Selected %s state ids to remove", len(states)) @@ -460,7 +467,7 @@ def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge( def _select_legacy_event_state_and_attributes_and_data_ids_to_purge( - session: Session, purge_before: datetime + session: Session, purge_before: datetime, max_bind_vars: int ) -> tuple[set[int], set[int], set[int], set[int]]: """Return a list of event, state, and attribute ids to purge linked by the event_id. @@ -470,7 +477,7 @@ def _select_legacy_event_state_and_attributes_and_data_ids_to_purge( """ events = session.execute( find_legacy_event_state_and_attributes_and_data_ids_to_purge( - dt_util.utc_to_timestamp(purge_before) + dt_util.utc_to_timestamp(purge_before), max_bind_vars ) ).all() _LOGGER.debug("Selected %s event ids to remove", len(events)) @@ -511,8 +518,8 @@ def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) def _purge_batch_attributes_ids( instance: Recorder, session: Session, attributes_ids: set[int] ) -> None: - """Delete old attributes ids in batches of SQLITE_MAX_BIND_VARS.""" - for attributes_ids_chunk in chunked(attributes_ids, SQLITE_MAX_BIND_VARS): + """Delete old attributes ids in batches of max_bind_vars.""" + for attributes_ids_chunk in chunked(attributes_ids, instance.max_bind_vars): deleted_rows = session.execute( delete_states_attributes_rows(attributes_ids_chunk) ) @@ -525,8 +532,8 @@ def _purge_batch_attributes_ids( def _purge_batch_data_ids( instance: Recorder, session: Session, data_ids: set[int] ) -> None: - """Delete old event data ids in batches of SQLITE_MAX_BIND_VARS.""" - for data_ids_chunk in chunked(data_ids, SQLITE_MAX_BIND_VARS): + """Delete old event data ids in batches of max_bind_vars.""" + for data_ids_chunk in chunked(data_ids, instance.max_bind_vars): deleted_rows = session.execute(delete_event_data_rows(data_ids_chunk)) _LOGGER.debug("Deleted %s data events", deleted_rows) @@ -671,7 +678,7 @@ def _purge_filtered_states( session.query(States.state_id, States.attributes_id, States.event_id) .filter(States.metadata_id.in_(metadata_ids_to_purge)) .filter(States.last_updated_ts < purge_before_timestamp) - .limit(SQLITE_MAX_BIND_VARS) + .limit(instance.max_bind_vars) .all() ) if not to_purge: @@ -709,7 +716,7 @@ def _purge_filtered_events( session.query(Events.event_id, Events.data_id) .filter(Events.event_type_id.in_(excluded_event_type_ids)) .filter(Events.time_fired_ts < purge_before_timestamp) - .limit(SQLITE_MAX_BIND_VARS) + .limit(instance.max_bind_vars) .all() ) if not to_purge: @@ -760,7 +767,7 @@ def purge_entity_data( if not selected_metadata_ids: return True - # Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states + # Purge a max of max_bind_vars, based on the oldest states # or events record. if not _purge_filtered_states( instance, diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index 71a996f0381560..d44094878c22b8 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -8,7 +8,6 @@ from sqlalchemy.sql.lambdas import StatementLambdaElement from sqlalchemy.sql.selectable import Select -from .const import SQLITE_MAX_BIND_VARS from .db_schema import ( EventData, Events, @@ -612,44 +611,48 @@ def delete_recorder_runs_rows( ) -def find_events_to_purge(purge_before: float) -> StatementLambdaElement: +def find_events_to_purge( + purge_before: float, max_bind_vars: int +) -> StatementLambdaElement: """Find events to purge.""" return lambda_stmt( lambda: select(Events.event_id, Events.data_id) .filter(Events.time_fired_ts < purge_before) - .limit(SQLITE_MAX_BIND_VARS) + .limit(max_bind_vars) ) -def find_states_to_purge(purge_before: float) -> StatementLambdaElement: +def find_states_to_purge( + purge_before: float, max_bind_vars: int +) -> StatementLambdaElement: """Find states to purge.""" return lambda_stmt( lambda: select(States.state_id, States.attributes_id) .filter(States.last_updated_ts < purge_before) - .limit(SQLITE_MAX_BIND_VARS) + .limit(max_bind_vars) ) def find_short_term_statistics_to_purge( - purge_before: datetime, + purge_before: datetime, max_bind_vars: int ) -> StatementLambdaElement: """Find short term statistics to purge.""" purge_before_ts = purge_before.timestamp() return lambda_stmt( lambda: select(StatisticsShortTerm.id) .filter(StatisticsShortTerm.start_ts < purge_before_ts) - .limit(SQLITE_MAX_BIND_VARS) + .limit(max_bind_vars) ) def find_statistics_runs_to_purge( - purge_before: datetime, + purge_before: datetime, max_bind_vars: int ) -> StatementLambdaElement: """Find statistics_runs to purge.""" return lambda_stmt( lambda: select(StatisticsRuns.run_id) .filter(StatisticsRuns.start < purge_before) - .limit(SQLITE_MAX_BIND_VARS) + .limit(max_bind_vars) ) @@ -659,7 +662,7 @@ def find_latest_statistics_runs_run_id() -> StatementLambdaElement: def find_legacy_event_state_and_attributes_and_data_ids_to_purge( - purge_before: float, + purge_before: float, max_bind_vars: int ) -> StatementLambdaElement: """Find the latest row in the legacy format to purge.""" return lambda_stmt( @@ -668,12 +671,12 @@ def find_legacy_event_state_and_attributes_and_data_ids_to_purge( ) .outerjoin(States, Events.event_id == States.event_id) .filter(Events.time_fired_ts < purge_before) - .limit(SQLITE_MAX_BIND_VARS) + .limit(max_bind_vars) ) def find_legacy_detached_states_and_attributes_to_purge( - purge_before: float, + purge_before: float, max_bind_vars: int ) -> StatementLambdaElement: """Find states rows with event_id set but not linked event_id in Events.""" return lambda_stmt( @@ -684,7 +687,7 @@ def find_legacy_detached_states_and_attributes_to_purge( (States.last_updated_ts < purge_before) | States.last_updated_ts.is_(None) ) .filter(Events.event_id.is_(None)) - .limit(SQLITE_MAX_BIND_VARS) + .limit(max_bind_vars) ) @@ -693,7 +696,7 @@ def find_legacy_row() -> StatementLambdaElement: return lambda_stmt(lambda: select(func.max(States.event_id))) -def find_events_context_ids_to_migrate() -> StatementLambdaElement: +def find_events_context_ids_to_migrate(max_bind_vars: int) -> StatementLambdaElement: """Find events context_ids to migrate.""" return lambda_stmt( lambda: select( @@ -704,11 +707,11 @@ def find_events_context_ids_to_migrate() -> StatementLambdaElement: Events.context_parent_id, ) .filter(Events.context_id_bin.is_(None)) - .limit(SQLITE_MAX_BIND_VARS) + .limit(max_bind_vars) ) -def find_event_type_to_migrate() -> StatementLambdaElement: +def find_event_type_to_migrate(max_bind_vars: int) -> StatementLambdaElement: """Find events event_type to migrate.""" return lambda_stmt( lambda: select( @@ -716,11 +719,11 @@ def find_event_type_to_migrate() -> StatementLambdaElement: Events.event_type, ) .filter(Events.event_type_id.is_(None)) - .limit(SQLITE_MAX_BIND_VARS) + .limit(max_bind_vars) ) -def find_entity_ids_to_migrate() -> StatementLambdaElement: +def find_entity_ids_to_migrate(max_bind_vars: int) -> StatementLambdaElement: """Find entity_id to migrate.""" return lambda_stmt( lambda: select( @@ -728,7 +731,7 @@ def find_entity_ids_to_migrate() -> StatementLambdaElement: States.entity_id, ) .filter(States.metadata_id.is_(None)) - .limit(SQLITE_MAX_BIND_VARS) + .limit(max_bind_vars) ) @@ -792,7 +795,7 @@ def has_entity_ids_to_migrate() -> StatementLambdaElement: ) -def find_states_context_ids_to_migrate() -> StatementLambdaElement: +def find_states_context_ids_to_migrate(max_bind_vars: int) -> StatementLambdaElement: """Find events context_ids to migrate.""" return lambda_stmt( lambda: select( @@ -803,7 +806,7 @@ def find_states_context_ids_to_migrate() -> StatementLambdaElement: States.context_parent_id, ) .filter(States.context_id_bin.is_(None)) - .limit(SQLITE_MAX_BIND_VARS) + .limit(max_bind_vars) ) diff --git a/homeassistant/components/recorder/table_managers/event_data.py b/homeassistant/components/recorder/table_managers/event_data.py index 85266a3793989b..4c46b1b9faf4bf 100644 --- a/homeassistant/components/recorder/table_managers/event_data.py +++ b/homeassistant/components/recorder/table_managers/event_data.py @@ -10,7 +10,6 @@ from homeassistant.core import Event from homeassistant.util.json import JSON_ENCODE_EXCEPTIONS -from ..const import SQLITE_MAX_BIND_VARS from ..db_schema import EventData from ..queries import get_shared_event_datas from ..util import chunked, execute_stmt_lambda_element @@ -95,7 +94,7 @@ def _load_from_hashes( """ results: dict[str, int | None] = {} with session.no_autoflush: - for hashs_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS): + for hashs_chunk in chunked(hashes, self.recorder.max_bind_vars): for data_id, shared_data in execute_stmt_lambda_element( session, get_shared_event_datas(hashs_chunk), orm_rows=False ): diff --git a/homeassistant/components/recorder/table_managers/event_types.py b/homeassistant/components/recorder/table_managers/event_types.py index fd03bdd14d2f49..45b3b96353c601 100644 --- a/homeassistant/components/recorder/table_managers/event_types.py +++ b/homeassistant/components/recorder/table_managers/event_types.py @@ -9,7 +9,6 @@ from homeassistant.core import Event -from ..const import SQLITE_MAX_BIND_VARS from ..db_schema import EventTypes from ..queries import find_event_type_ids from ..tasks import RefreshEventTypesTask @@ -78,7 +77,7 @@ def get_many( return results with session.no_autoflush: - for missing_chunk in chunked(missing, SQLITE_MAX_BIND_VARS): + for missing_chunk in chunked(missing, self.recorder.max_bind_vars): for event_type_id, event_type in execute_stmt_lambda_element( session, find_event_type_ids(missing_chunk), orm_rows=False ): diff --git a/homeassistant/components/recorder/table_managers/state_attributes.py b/homeassistant/components/recorder/table_managers/state_attributes.py index 653ef1689bd3d2..725bacae71c5cc 100644 --- a/homeassistant/components/recorder/table_managers/state_attributes.py +++ b/homeassistant/components/recorder/table_managers/state_attributes.py @@ -11,7 +11,6 @@ from homeassistant.helpers.entity import entity_sources from homeassistant.util.json import JSON_ENCODE_EXCEPTIONS -from ..const import SQLITE_MAX_BIND_VARS from ..db_schema import StateAttributes from ..queries import get_shared_attributes from ..util import chunked, execute_stmt_lambda_element @@ -108,7 +107,7 @@ def _load_from_hashes( """ results: dict[str, int | None] = {} with session.no_autoflush: - for hashs_chunk in chunked(hashes, SQLITE_MAX_BIND_VARS): + for hashs_chunk in chunked(hashes, self.recorder.max_bind_vars): for attributes_id, shared_attrs in execute_stmt_lambda_element( session, get_shared_attributes(hashs_chunk), orm_rows=False ): diff --git a/homeassistant/components/recorder/table_managers/states_meta.py b/homeassistant/components/recorder/table_managers/states_meta.py index b8f6204d3182db..9b7aa1f7f966e1 100644 --- a/homeassistant/components/recorder/table_managers/states_meta.py +++ b/homeassistant/components/recorder/table_managers/states_meta.py @@ -8,7 +8,6 @@ from homeassistant.core import Event -from ..const import SQLITE_MAX_BIND_VARS from ..db_schema import StatesMeta from ..queries import find_all_states_metadata_ids, find_states_metadata_ids from ..util import chunked, execute_stmt_lambda_element @@ -104,7 +103,7 @@ def get_many( update_cache = from_recorder or not self._did_first_load with session.no_autoflush: - for missing_chunk in chunked(missing, SQLITE_MAX_BIND_VARS): + for missing_chunk in chunked(missing, self.recorder.max_bind_vars): for metadata_id, entity_id in execute_stmt_lambda_element( session, find_states_metadata_ids(missing_chunk) ): diff --git a/homeassistant/components/recorder/util.py b/homeassistant/components/recorder/util.py index d438cbede9fd54..f94601bb2cbb96 100644 --- a/homeassistant/components/recorder/util.py +++ b/homeassistant/components/recorder/util.py @@ -31,7 +31,15 @@ from homeassistant.helpers import config_validation as cv, issue_registry as ir import homeassistant.util.dt as dt_util -from .const import DATA_INSTANCE, DOMAIN, SQLITE_URL_PREFIX, SupportedDialect +from .const import ( + DATA_INSTANCE, + DEFAULT_MAX_BIND_VARS, + DOMAIN, + SQLITE_MAX_BIND_VARS, + SQLITE_MODERN_MAX_BIND_VARS, + SQLITE_URL_PREFIX, + SupportedDialect, +) from .db_schema import ( TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES, @@ -87,6 +95,7 @@ def _simple_version(version: str) -> AwesomeVersion: MIN_VERSION_MYSQL = _simple_version("8.0.0") MIN_VERSION_PGSQL = _simple_version("12.0") MIN_VERSION_SQLITE = _simple_version("3.31.0") +MIN_VERSION_SQLITE_MODERN_BIND_VARS = _simple_version("3.32.0") # This is the maximum time after the recorder ends the session @@ -471,6 +480,7 @@ def setup_connection_for_dialect( version: AwesomeVersion | None = None slow_range_in_select = False if dialect_name == SupportedDialect.SQLITE: + max_bind_vars = SQLITE_MAX_BIND_VARS if first_connection: old_isolation = dbapi_connection.isolation_level # type: ignore[attr-defined] dbapi_connection.isolation_level = None # type: ignore[attr-defined] @@ -488,6 +498,9 @@ def setup_connection_for_dialect( version or version_string, "SQLite", MIN_VERSION_SQLITE ) + if version and version > MIN_VERSION_SQLITE_MODERN_BIND_VARS: + max_bind_vars = SQLITE_MODERN_MAX_BIND_VARS + # The upper bound on the cache size is approximately 16MiB of memory execute_on_connection(dbapi_connection, "PRAGMA cache_size = -16384") @@ -506,6 +519,7 @@ def setup_connection_for_dialect( execute_on_connection(dbapi_connection, "PRAGMA foreign_keys=ON") elif dialect_name == SupportedDialect.MYSQL: + max_bind_vars = DEFAULT_MAX_BIND_VARS execute_on_connection(dbapi_connection, "SET session wait_timeout=28800") if first_connection: result = query_on_connection(dbapi_connection, "SELECT VERSION()") @@ -546,6 +560,7 @@ def setup_connection_for_dialect( # Ensure all times are using UTC to avoid issues with daylight savings execute_on_connection(dbapi_connection, "SET time_zone = '+00:00'") elif dialect_name == SupportedDialect.POSTGRESQL: + max_bind_vars = DEFAULT_MAX_BIND_VARS if first_connection: # server_version_num was added in 2006 result = query_on_connection(dbapi_connection, "SHOW server_version") @@ -566,6 +581,7 @@ def setup_connection_for_dialect( dialect=SupportedDialect(dialect_name), version=version, optimizer=DatabaseOptimizer(slow_range_in_select=slow_range_in_select), + max_bind_vars=max_bind_vars, ) diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 096108e0349224..eedbd2c0e29956 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -10,11 +10,7 @@ from sqlalchemy.orm.session import Session from homeassistant.components import recorder -from homeassistant.components.recorder import purge, queries -from homeassistant.components.recorder.const import ( - SQLITE_MAX_BIND_VARS, - SupportedDialect, -) +from homeassistant.components.recorder.const import SupportedDialect from homeassistant.components.recorder.db_schema import ( Events, EventTypes, @@ -71,6 +67,39 @@ def mock_use_sqlite(request): yield +async def test_purge_big_database( + async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant +) -> None: + """Test deleting 2/3 old states from a big database.""" + + instance = await async_setup_recorder_instance(hass) + + for _ in range(25): + await _add_test_states(hass, wait_recording_done=False) + await async_wait_recording_done(hass) + + with patch.object(instance, "max_bind_vars", 100), patch.object( + instance.database_engine, "max_bind_vars", 100 + ), session_scope(hass=hass) as session: + states = session.query(States) + state_attributes = session.query(StateAttributes) + assert states.count() == 150 + assert state_attributes.count() == 3 + + purge_before = dt_util.utcnow() - timedelta(days=4) + + finished = purge_old_data( + instance, + purge_before, + states_batch_size=1, + events_batch_size=1, + repack=False, + ) + assert not finished + assert states.count() == 50 + assert state_attributes.count() == 1 + + async def test_purge_old_states( async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant ) -> None: @@ -628,7 +657,7 @@ async def _add_db_entries(hass: HomeAssistant, cutoff: datetime, rows: int) -> N service_data = {"keep_days": 2} # Force multiple purge batches to be run - rows = SQLITE_MAX_BIND_VARS + 1 + rows = 999 cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"]) await _add_db_entries(hass, cutoff, rows) @@ -1411,7 +1440,7 @@ def _add_keep_records(hass: HomeAssistant) -> None: assert states.count() == 0 -async def _add_test_states(hass: HomeAssistant): +async def _add_test_states(hass: HomeAssistant, wait_recording_done: bool = True): """Add multiple states to the db for testing.""" utcnow = dt_util.utcnow() five_days_ago = utcnow - timedelta(days=5) @@ -1421,24 +1450,26 @@ async def _add_test_states(hass: HomeAssistant): async def set_state(entity_id, state, **kwargs): """Set the state.""" hass.states.async_set(entity_id, state, **kwargs) - await hass.async_block_till_done() - await async_wait_recording_done(hass) + if wait_recording_done: + await hass.async_block_till_done() + await async_wait_recording_done(hass) + + with freeze_time() as freezer: + for event_id in range(6): + if event_id < 2: + timestamp = eleven_days_ago + state = f"autopurgeme_{event_id}" + attributes = {"autopurgeme": True, **base_attributes} + elif event_id < 4: + timestamp = five_days_ago + state = f"purgeme_{event_id}" + attributes = {"purgeme": True, **base_attributes} + else: + timestamp = utcnow + state = f"dontpurgeme_{event_id}" + attributes = {"dontpurgeme": True, **base_attributes} - for event_id in range(6): - if event_id < 2: - timestamp = eleven_days_ago - state = f"autopurgeme_{event_id}" - attributes = {"autopurgeme": True, **base_attributes} - elif event_id < 4: - timestamp = five_days_ago - state = f"purgeme_{event_id}" - attributes = {"purgeme": True, **base_attributes} - else: - timestamp = utcnow - state = f"dontpurgeme_{event_id}" - attributes = {"dontpurgeme": True, **base_attributes} - - with freeze_time(timestamp): + freezer.move_to(timestamp) await set_state("test.recorder2", state, attributes=attributes) @@ -1453,18 +1484,19 @@ async def _add_test_events(hass: HomeAssistant, iterations: int = 1): # thread as well can cause the test to fail await async_wait_recording_done(hass) - for _ in range(iterations): - for event_id in range(6): - if event_id < 2: - timestamp = eleven_days_ago - event_type = "EVENT_TEST_AUTOPURGE" - elif event_id < 4: - timestamp = five_days_ago - event_type = "EVENT_TEST_PURGE" - else: - timestamp = utcnow - event_type = "EVENT_TEST" - with freeze_time(timestamp): + with freeze_time() as freezer: + for _ in range(iterations): + for event_id in range(6): + if event_id < 2: + timestamp = eleven_days_ago + event_type = "EVENT_TEST_AUTOPURGE" + elif event_id < 4: + timestamp = five_days_ago + event_type = "EVENT_TEST_PURGE" + else: + timestamp = utcnow + event_type = "EVENT_TEST" + freezer.move_to(timestamp) hass.bus.async_fire(event_type, event_data) await async_wait_recording_done(hass) @@ -1605,11 +1637,11 @@ async def test_purge_many_old_events( ) -> None: """Test deleting old events.""" old_events_count = 5 - with patch.object(queries, "SQLITE_MAX_BIND_VARS", old_events_count), patch.object( - purge, "SQLITE_MAX_BIND_VARS", old_events_count - ): - instance = await async_setup_recorder_instance(hass) + instance = await async_setup_recorder_instance(hass) + with patch.object(instance, "max_bind_vars", old_events_count), patch.object( + instance.database_engine, "max_bind_vars", old_events_count + ): await _add_test_events(hass, old_events_count) with session_scope(hass=hass) as session: diff --git a/tests/components/recorder/test_purge_v32_schema.py b/tests/components/recorder/test_purge_v32_schema.py index 3b315481f4ed13..b3c20ad4e26ba6 100644 --- a/tests/components/recorder/test_purge_v32_schema.py +++ b/tests/components/recorder/test_purge_v32_schema.py @@ -13,10 +13,7 @@ from homeassistant.components import recorder from homeassistant.components.recorder import migration -from homeassistant.components.recorder.const import ( - SQLITE_MAX_BIND_VARS, - SupportedDialect, -) +from homeassistant.components.recorder.const import SupportedDialect from homeassistant.components.recorder.history import get_significant_states from homeassistant.components.recorder.purge import purge_old_data from homeassistant.components.recorder.services import ( @@ -631,7 +628,7 @@ async def _add_db_entries(hass: HomeAssistant, cutoff: datetime, rows: int) -> N service_data = {"keep_days": 2} # Force multiple purge batches to be run - rows = SQLITE_MAX_BIND_VARS + 1 + rows = 999 cutoff = dt_util.utcnow() - timedelta(days=service_data["keep_days"]) await _add_db_entries(hass, cutoff, rows) @@ -718,21 +715,22 @@ async def set_state(entity_id, state, **kwargs): await hass.async_block_till_done() await async_wait_recording_done(hass) - for event_id in range(6): - if event_id < 2: - timestamp = eleven_days_ago - state = f"autopurgeme_{event_id}" - attributes = {"autopurgeme": True, **base_attributes} - elif event_id < 4: - timestamp = five_days_ago - state = f"purgeme_{event_id}" - attributes = {"purgeme": True, **base_attributes} - else: - timestamp = utcnow - state = f"dontpurgeme_{event_id}" - attributes = {"dontpurgeme": True, **base_attributes} - - with freeze_time(timestamp): + with freeze_time() as freezer: + for event_id in range(6): + if event_id < 2: + timestamp = eleven_days_ago + state = f"autopurgeme_{event_id}" + attributes = {"autopurgeme": True, **base_attributes} + elif event_id < 4: + timestamp = five_days_ago + state = f"purgeme_{event_id}" + attributes = {"purgeme": True, **base_attributes} + else: + timestamp = utcnow + state = f"dontpurgeme_{event_id}" + attributes = {"dontpurgeme": True, **base_attributes} + + freezer.move_to(timestamp) await set_state("test.recorder2", state, attributes=attributes) @@ -952,46 +950,50 @@ async def test_purge_many_old_events( instance = await async_setup_recorder_instance(hass) await _async_attach_db_engine(hass) - await _add_test_events(hass, SQLITE_MAX_BIND_VARS) - - with session_scope(hass=hass) as session: - events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) - assert events.count() == SQLITE_MAX_BIND_VARS * 6 - - purge_before = dt_util.utcnow() - timedelta(days=4) - - # run purge_old_data() - finished = purge_old_data( - instance, - purge_before, - repack=False, - states_batch_size=3, - events_batch_size=3, - ) - assert not finished - assert events.count() == SQLITE_MAX_BIND_VARS * 3 - - # we should only have 2 groups of events left - finished = purge_old_data( - instance, - purge_before, - repack=False, - states_batch_size=3, - events_batch_size=3, - ) - assert finished - assert events.count() == SQLITE_MAX_BIND_VARS * 2 + old_events_count = 5 + with patch.object(instance, "max_bind_vars", old_events_count), patch.object( + instance.database_engine, "max_bind_vars", old_events_count + ): + await _add_test_events(hass, old_events_count) - # we should now purge everything - finished = purge_old_data( - instance, - dt_util.utcnow(), - repack=False, - states_batch_size=20, - events_batch_size=20, - ) - assert finished - assert events.count() == 0 + with session_scope(hass=hass) as session: + events = session.query(Events).filter(Events.event_type.like("EVENT_TEST%")) + assert events.count() == old_events_count * 6 + + purge_before = dt_util.utcnow() - timedelta(days=4) + + # run purge_old_data() + finished = purge_old_data( + instance, + purge_before, + repack=False, + states_batch_size=3, + events_batch_size=3, + ) + assert not finished + assert events.count() == old_events_count * 3 + + # we should only have 2 groups of events left + finished = purge_old_data( + instance, + purge_before, + repack=False, + states_batch_size=3, + events_batch_size=3, + ) + assert finished + assert events.count() == old_events_count * 2 + + # we should now purge everything + finished = purge_old_data( + instance, + dt_util.utcnow(), + repack=False, + states_batch_size=20, + events_batch_size=20, + ) + assert finished + assert events.count() == 0 async def test_purge_can_mix_legacy_and_new_format(