Skip to content

Commit

Permalink
Increase max bind vars based on database version (#101464)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco committed Oct 6, 2023
1 parent c7d533d commit da1d5fc
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 163 deletions.
6 changes: 6 additions & 0 deletions homeassistant/components/recorder/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
# have upgraded their sqlite version
SQLITE_MAX_BIND_VARS = 998

# The maximum bind vars for sqlite 3.32.0 and above, but
# capped at 4000 to avoid performance issues
SQLITE_MODERN_MAX_BIND_VARS = 4000

DEFAULT_MAX_BIND_VARS = 4000

DB_WORKER_PREFIX = "DbWorker"

ALL_DOMAIN_EXCLUDE_ATTRS = {ATTR_ATTRIBUTION, ATTR_RESTORED, ATTR_SUPPORTED_FEATURES}
Expand Down
9 changes: 9 additions & 0 deletions homeassistant/components/recorder/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
MYSQLDB_PYMYSQL_URL_PREFIX,
MYSQLDB_URL_PREFIX,
QUEUE_PERCENTAGE_ALLOWED_AVAILABLE_MEMORY,
SQLITE_MAX_BIND_VARS,
SQLITE_URL_PREFIX,
STATES_META_SCHEMA_VERSION,
STATISTICS_ROWS_SCHEMA_VERSION,
Expand Down Expand Up @@ -242,6 +243,13 @@ def __init__(
self._dialect_name: SupportedDialect | None = None
self.enabled = True

# For safety we default to the lowest value for max_bind_vars
# of all the DB types (SQLITE_MAX_BIND_VARS).
#
# We update the value once we connect to the DB
# and determine what is actually supported.
self.max_bind_vars = SQLITE_MAX_BIND_VARS

@property
def backlog(self) -> int:
"""Return the number of items in the recorder backlog."""
Expand Down Expand Up @@ -1351,6 +1359,7 @@ def _setup_recorder_connection(
not self._completed_first_database_setup,
):
self.database_engine = database_engine
self.max_bind_vars = database_engine.max_bind_vars
self._completed_first_database_setup = True

def _setup_connection(self) -> None:
Expand Down
16 changes: 12 additions & 4 deletions homeassistant/components/recorder/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1366,7 +1366,9 @@ def migrate_states_context_ids(instance: Recorder) -> bool:
session_maker = instance.get_session
_LOGGER.debug("Migrating states context_ids to binary format")
with session_scope(session=session_maker()) as session:
if states := session.execute(find_states_context_ids_to_migrate()).all():
if states := session.execute(
find_states_context_ids_to_migrate(instance.max_bind_vars)
).all():
session.execute(
update(States),
[
Expand Down Expand Up @@ -1401,7 +1403,9 @@ def migrate_events_context_ids(instance: Recorder) -> bool:
session_maker = instance.get_session
_LOGGER.debug("Migrating context_ids to binary format")
with session_scope(session=session_maker()) as session:
if events := session.execute(find_events_context_ids_to_migrate()).all():
if events := session.execute(
find_events_context_ids_to_migrate(instance.max_bind_vars)
).all():
session.execute(
update(Events),
[
Expand Down Expand Up @@ -1436,7 +1440,9 @@ def migrate_event_type_ids(instance: Recorder) -> bool:
_LOGGER.debug("Migrating event_types")
event_type_manager = instance.event_type_manager
with session_scope(session=session_maker()) as session:
if events := session.execute(find_event_type_to_migrate()).all():
if events := session.execute(
find_event_type_to_migrate(instance.max_bind_vars)
).all():
event_types = {event_type for _, event_type in events}
if None in event_types:
# event_type should never be None but we need to be defensive
Expand Down Expand Up @@ -1505,7 +1511,9 @@ def migrate_entity_ids(instance: Recorder) -> bool:
_LOGGER.debug("Migrating entity_ids")
states_meta_manager = instance.states_meta_manager
with session_scope(session=instance.get_session()) as session:
if states := session.execute(find_entity_ids_to_migrate()).all():
if states := session.execute(
find_entity_ids_to_migrate(instance.max_bind_vars)
).all():
entity_ids = {entity_id for _, entity_id in states}
if None in entity_ids:
# entity_id should never be None but we need to be defensive
Expand Down
1 change: 1 addition & 0 deletions homeassistant/components/recorder/models/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class DatabaseEngine:

dialect: SupportedDialect
optimizer: DatabaseOptimizer
max_bind_vars: int
version: AwesomeVersion | None


Expand Down
67 changes: 37 additions & 30 deletions homeassistant/components/recorder/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import homeassistant.util.dt as dt_util

from .const import SQLITE_MAX_BIND_VARS
from .db_schema import Events, States, StatesMeta
from .models import DatabaseEngine
from .queries import (
Expand Down Expand Up @@ -72,7 +71,7 @@ def purge_old_data(
purge_before.isoformat(sep=" ", timespec="seconds"),
)
with session_scope(session=instance.get_session()) as session:
# Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states or events record
# Purge a max of max_bind_vars, based on the oldest states or events record
has_more_to_purge = False
if instance.use_legacy_events_index and _purging_legacy_format(session):
_LOGGER.debug(
Expand All @@ -93,9 +92,11 @@ def purge_old_data(
instance, session, events_batch_size, purge_before
)

statistics_runs = _select_statistics_runs_to_purge(session, purge_before)
statistics_runs = _select_statistics_runs_to_purge(
session, purge_before, instance.max_bind_vars
)
short_term_statistics = _select_short_term_statistics_to_purge(
session, purge_before
session, purge_before, instance.max_bind_vars
)
if statistics_runs:
_purge_statistics_runs(session, statistics_runs)
Expand Down Expand Up @@ -141,7 +142,7 @@ def _purge_legacy_format(
attributes_ids,
data_ids,
) = _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
session, purge_before
session, purge_before, instance.max_bind_vars
)
_purge_state_ids(instance, session, state_ids)
_purge_unused_attributes_ids(instance, session, attributes_ids)
Expand All @@ -157,7 +158,7 @@ def _purge_legacy_format(
detached_state_ids,
detached_attributes_ids,
) = _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(
session, purge_before
session, purge_before, instance.max_bind_vars
)
_purge_state_ids(instance, session, detached_state_ids)
_purge_unused_attributes_ids(instance, session, detached_attributes_ids)
Expand Down Expand Up @@ -187,11 +188,12 @@ def _purge_states_and_attributes_ids(
# There are more states relative to attributes_ids so
# we purge enough state_ids to try to generate a full
# size batch of attributes_ids that will be around the size
# SQLITE_MAX_BIND_VARS
# max_bind_vars
attributes_ids_batch: set[int] = set()
max_bind_vars = instance.max_bind_vars
for _ in range(states_batch_size):
state_ids, attributes_ids = _select_state_attributes_ids_to_purge(
session, purge_before
session, purge_before, max_bind_vars
)
if not state_ids:
has_remaining_state_ids_to_purge = False
Expand Down Expand Up @@ -221,10 +223,13 @@ def _purge_events_and_data_ids(
# There are more events relative to data_ids so
# we purge enough event_ids to try to generate a full
# size batch of data_ids that will be around the size
# SQLITE_MAX_BIND_VARS
# max_bind_vars
data_ids_batch: set[int] = set()
max_bind_vars = instance.max_bind_vars
for _ in range(events_batch_size):
event_ids, data_ids = _select_event_data_ids_to_purge(session, purge_before)
event_ids, data_ids = _select_event_data_ids_to_purge(
session, purge_before, max_bind_vars
)
if not event_ids:
has_remaining_event_ids_to_purge = False
break
Expand All @@ -240,13 +245,13 @@ def _purge_events_and_data_ids(


def _select_state_attributes_ids_to_purge(
session: Session, purge_before: datetime
session: Session, purge_before: datetime, max_bind_vars: int
) -> tuple[set[int], set[int]]:
"""Return sets of state and attribute ids to purge."""
state_ids = set()
attributes_ids = set()
for state_id, attributes_id in session.execute(
find_states_to_purge(dt_util.utc_to_timestamp(purge_before))
find_states_to_purge(dt_util.utc_to_timestamp(purge_before), max_bind_vars)
).all():
state_ids.add(state_id)
if attributes_id:
Expand All @@ -260,13 +265,13 @@ def _select_state_attributes_ids_to_purge(


def _select_event_data_ids_to_purge(
session: Session, purge_before: datetime
session: Session, purge_before: datetime, max_bind_vars: int
) -> tuple[set[int], set[int]]:
"""Return sets of event and data ids to purge."""
event_ids = set()
data_ids = set()
for event_id, data_id in session.execute(
find_events_to_purge(dt_util.utc_to_timestamp(purge_before))
find_events_to_purge(dt_util.utc_to_timestamp(purge_before), max_bind_vars)
).all():
event_ids.add(event_id)
if data_id:
Expand Down Expand Up @@ -323,7 +328,7 @@ def _select_unused_attributes_ids(
#
# We used to generate a query based on how many attribute_ids to find but
# that meant sqlalchemy Transparent SQL Compilation Caching was working against
# us by cached up to SQLITE_MAX_BIND_VARS different statements which could be
# us by cached up to max_bind_vars different statements which could be
# up to 500MB for large database due to the complexity of the ORM objects.
#
# We now break the query into groups of 100 and use a lambda_stmt to ensure
Expand Down Expand Up @@ -405,13 +410,15 @@ def _purge_unused_data_ids(


def _select_statistics_runs_to_purge(
session: Session, purge_before: datetime
session: Session, purge_before: datetime, max_bind_vars: int
) -> list[int]:
"""Return a list of statistic runs to purge.
Takes care to keep the newest run.
"""
statistic_runs = session.execute(find_statistics_runs_to_purge(purge_before)).all()
statistic_runs = session.execute(
find_statistics_runs_to_purge(purge_before, max_bind_vars)
).all()
statistic_runs_list = [run_id for (run_id,) in statistic_runs]
# Exclude the newest statistics run
if (
Expand All @@ -424,18 +431,18 @@ def _select_statistics_runs_to_purge(


def _select_short_term_statistics_to_purge(
session: Session, purge_before: datetime
session: Session, purge_before: datetime, max_bind_vars: int
) -> list[int]:
"""Return a list of short term statistics to purge."""
statistics = session.execute(
find_short_term_statistics_to_purge(purge_before)
find_short_term_statistics_to_purge(purge_before, max_bind_vars)
).all()
_LOGGER.debug("Selected %s short term statistics to remove", len(statistics))
return [statistic_id for (statistic_id,) in statistics]


def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(
session: Session, purge_before: datetime
session: Session, purge_before: datetime, max_bind_vars: int
) -> tuple[set[int], set[int]]:
"""Return a list of state, and attribute ids to purge.
Expand All @@ -445,7 +452,7 @@ def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(
"""
states = session.execute(
find_legacy_detached_states_and_attributes_to_purge(
dt_util.utc_to_timestamp(purge_before)
dt_util.utc_to_timestamp(purge_before), max_bind_vars
)
).all()
_LOGGER.debug("Selected %s state ids to remove", len(states))
Expand All @@ -460,7 +467,7 @@ def _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(


def _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
session: Session, purge_before: datetime
session: Session, purge_before: datetime, max_bind_vars: int
) -> tuple[set[int], set[int], set[int], set[int]]:
"""Return a list of event, state, and attribute ids to purge linked by the event_id.
Expand All @@ -470,7 +477,7 @@ def _select_legacy_event_state_and_attributes_and_data_ids_to_purge(
"""
events = session.execute(
find_legacy_event_state_and_attributes_and_data_ids_to_purge(
dt_util.utc_to_timestamp(purge_before)
dt_util.utc_to_timestamp(purge_before), max_bind_vars
)
).all()
_LOGGER.debug("Selected %s event ids to remove", len(events))
Expand Down Expand Up @@ -511,8 +518,8 @@ def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int])
def _purge_batch_attributes_ids(
instance: Recorder, session: Session, attributes_ids: set[int]
) -> None:
"""Delete old attributes ids in batches of SQLITE_MAX_BIND_VARS."""
for attributes_ids_chunk in chunked(attributes_ids, SQLITE_MAX_BIND_VARS):
"""Delete old attributes ids in batches of max_bind_vars."""
for attributes_ids_chunk in chunked(attributes_ids, instance.max_bind_vars):
deleted_rows = session.execute(
delete_states_attributes_rows(attributes_ids_chunk)
)
Expand All @@ -525,8 +532,8 @@ def _purge_batch_attributes_ids(
def _purge_batch_data_ids(
instance: Recorder, session: Session, data_ids: set[int]
) -> None:
"""Delete old event data ids in batches of SQLITE_MAX_BIND_VARS."""
for data_ids_chunk in chunked(data_ids, SQLITE_MAX_BIND_VARS):
"""Delete old event data ids in batches of max_bind_vars."""
for data_ids_chunk in chunked(data_ids, instance.max_bind_vars):
deleted_rows = session.execute(delete_event_data_rows(data_ids_chunk))
_LOGGER.debug("Deleted %s data events", deleted_rows)

Expand Down Expand Up @@ -671,7 +678,7 @@ def _purge_filtered_states(
session.query(States.state_id, States.attributes_id, States.event_id)
.filter(States.metadata_id.in_(metadata_ids_to_purge))
.filter(States.last_updated_ts < purge_before_timestamp)
.limit(SQLITE_MAX_BIND_VARS)
.limit(instance.max_bind_vars)
.all()
)
if not to_purge:
Expand Down Expand Up @@ -709,7 +716,7 @@ def _purge_filtered_events(
session.query(Events.event_id, Events.data_id)
.filter(Events.event_type_id.in_(excluded_event_type_ids))
.filter(Events.time_fired_ts < purge_before_timestamp)
.limit(SQLITE_MAX_BIND_VARS)
.limit(instance.max_bind_vars)
.all()
)
if not to_purge:
Expand Down Expand Up @@ -760,7 +767,7 @@ def purge_entity_data(
if not selected_metadata_ids:
return True

# Purge a max of SQLITE_MAX_BIND_VARS, based on the oldest states
# Purge a max of max_bind_vars, based on the oldest states
# or events record.
if not _purge_filtered_states(
instance,
Expand Down

0 comments on commit da1d5fc

Please sign in to comment.