Skip to content

Commit

Permalink
History query and schema optimizations for huge performance boost (#8748
Browse files Browse the repository at this point in the history
)

* Add DEBUG-level log for db row to native object conversion

This is now the bottleneck (by a large margin) for big history queries, so I'm leaving this log feature in to help diagnose users with a slow history page

* Rewrite of the "first synthetic datapoint" query for multiple entities

The old method was written in a manner that prevented an index from being used in the inner-most GROUP BY statement, causing massive performance issues especially when querying for a large time period.

The new query does have one material change that will cause it to return different results than before: instead of using max(state_id) to get the latest entry, we now get the max(last_updated). This is more appropriate (primary key should not be assumed to be in order of event firing) and allows an index to be used on the inner-most query. I added another JOIN layer to account for cases where there are two entries on the exact same `last_created` for a given entity. In this case we do use `state_id` as a tiebreaker.

For performance reasons the domain filters were also moved to the outermost query, as it's way more efficient to do it there than on the innermost query as before (due to indexing with GROUP BY problems)

The result is a query that only needs to do a filesort on the final result set, which will only be as many rows as there are entities.

* Remove the ORDER BY entity_id when fetching states, and add logging

Having this ORDER BY in the query prevents it from using an index due to the range filter, so it has been removed.

We already do a `groupby` in the `states_to_json` method which accomplishes exactly what the ORDER BY in the query was trying to do anyway, so this change causes no functional difference.

Also added DEBUG-level logging to allow diagnosing a user's slow history page.

* Add DEBUG-level logging for the synthetic-first-datapoint query

For diagnosing a user's slow history page

* Missed a couple instances of `created` that should be `last_updated`

* Remove `entity_id` sorting from state_changes; match significant_update

This is the same change as 09b3498 , but applied to the `state_changes_during_period` method which I missed before. This should give the same performance boost to the history sensor component!

* Bugfix in History query used for History Sensor

The date filter was using a different column for the upper and lower bounds. It would work, but it would be slow!

* Update Recorder purge script to use more appropriate columns

Two reasons: 1. the `created` column's meaning is fairly arbitrary and does not represent when an event or state change actually ocurred. It seems more correct to purge based on the event date than the time the database row was written.
2. The new columns are indexed, which will speed up this purge script by orders of magnitude

* Updating db model to match new query optimizations

A few things here: 1. New schema version with a new index and several removed indexes
2. A new method in the migration script to drop old indexes
3. Added an INFO-level log message when a new index will be added, as this can take quite some time on a Raspberry Pi
  • Loading branch information
OverloadUT authored and balloob committed Aug 5, 2017
1 parent 52cff83 commit 6e17851
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 38 deletions.
68 changes: 48 additions & 20 deletions homeassistant/components/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def get_significant_states(hass, start_time, end_time=None, entity_id=None,
as well as all states from certain domains (for instance
thermostat so that we get current temperature in our graphs).
"""
timer_start = time.perf_counter()
from homeassistant.components.recorder.models import States

entity_ids = (entity_id.lower(), ) if entity_id is not None else None
Expand All @@ -73,12 +74,18 @@ def get_significant_states(hass, start_time, end_time=None, entity_id=None,
if end_time is not None:
query = query.filter(States.last_updated < end_time)

query = query.order_by(States.last_updated)

states = (
state for state in execute(
query.order_by(States.entity_id, States.last_updated))
state for state in execute(query)
if (_is_significant(state) and
not state.attributes.get(ATTR_HIDDEN, False)))

if _LOGGER.isEnabledFor(logging.DEBUG):
elapsed = time.perf_counter() - timer_start
_LOGGER.debug(
'get_significant_states took %fs', elapsed)

return states_to_json(hass, states, start_time, entity_id, filters)


Expand All @@ -90,7 +97,7 @@ def state_changes_during_period(hass, start_time, end_time=None,
with session_scope(hass=hass) as session:
query = session.query(States).filter(
(States.last_changed == States.last_updated) &
(States.last_changed > start_time))
(States.last_updated > start_time))

if end_time is not None:
query = query.filter(States.last_updated < end_time)
Expand All @@ -99,7 +106,7 @@ def state_changes_during_period(hass, start_time, end_time=None,
query = query.filter_by(entity_id=entity_id.lower())

states = execute(
query.order_by(States.entity_id, States.last_updated))
query.order_by(States.last_updated))

return states_to_json(hass, states, start_time, entity_id)

Expand All @@ -125,39 +132,54 @@ def get_states(hass, utc_point_in_time, entity_ids=None, run=None,
most_recent_state_ids = session.query(
States.state_id.label('max_state_id')
).filter(
(States.created < utc_point_in_time) &
(States.last_updated < utc_point_in_time) &
(States.entity_id.in_(entity_ids))
).order_by(
States.created.desc())

if filters:
most_recent_state_ids = filters.apply(most_recent_state_ids,
entity_ids)
States.last_updated.desc())

most_recent_state_ids = most_recent_state_ids.limit(1)

else:
# We have more than one entity to look at (most commonly we want
# all entities,) so we need to do a search on all states since the
# last recorder run started.
most_recent_state_ids = session.query(
func.max(States.state_id).label('max_state_id')

most_recent_states_by_date = session.query(
States.entity_id.label('max_entity_id'),
func.max(States.last_updated).label('max_last_updated')
).filter(
(States.created >= run.start) &
(States.created < utc_point_in_time) &
(~States.domain.in_(IGNORE_DOMAINS)))
(States.last_updated >= run.start) &
(States.last_updated < utc_point_in_time)
)

if filters:
most_recent_state_ids = filters.apply(most_recent_state_ids,
entity_ids)
if entity_ids:
most_recent_states_by_date.filter(
States.entity_id.in_(entity_ids))

most_recent_states_by_date = most_recent_states_by_date.group_by(
States.entity_id)

most_recent_states_by_date = most_recent_states_by_date.subquery()

most_recent_state_ids = session.query(
func.max(States.state_id).label('max_state_id')
).join(most_recent_states_by_date, and_(
States.entity_id == most_recent_states_by_date.c.max_entity_id,
States.last_updated == most_recent_states_by_date.c.
max_last_updated))

most_recent_state_ids = most_recent_state_ids.group_by(
States.entity_id)

most_recent_state_ids = most_recent_state_ids.subquery()

query = session.query(States).join(most_recent_state_ids, and_(
States.state_id == most_recent_state_ids.c.max_state_id))
query = session.query(States).join(
most_recent_state_ids,
States.state_id == most_recent_state_ids.c.max_state_id
).filter((~States.domain.in_(IGNORE_DOMAINS)))

if filters:
query = filters.apply(query, entity_ids)

return [state for state in execute(query)
if not state.attributes.get(ATTR_HIDDEN, False)]
Expand All @@ -178,11 +200,17 @@ def states_to_json(hass, states, start_time, entity_id, filters=None):
entity_ids = [entity_id] if entity_id is not None else None

# Get the states at the start time
timer_start = time.perf_counter()
for state in get_states(hass, start_time, entity_ids, filters=filters):
state.last_changed = start_time
state.last_updated = start_time
result[state.entity_id].append(state)

if _LOGGER.isEnabledFor(logging.DEBUG):
elapsed = time.perf_counter() - timer_start
_LOGGER.debug(
'getting %d first datapoints took %fs', len(result), elapsed)

# Append all changes to it
for ent_id, group in groupby(states, lambda state: state.entity_id):
result[ent_id].extend(group)
Expand Down
85 changes: 81 additions & 4 deletions homeassistant/components/recorder/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def migrate_schema(instance):
new_version = version + 1
_LOGGER.info("Upgrading recorder db schema to version %s",
new_version)
_apply_update(instance.engine, new_version)
_apply_update(instance.engine, new_version, current_version)
session.add(SchemaChanges(schema_version=new_version))

_LOGGER.info("Upgrade to version %s done", new_version)
Expand All @@ -50,11 +50,71 @@ def _create_index(engine, table_name, index_name):
# Look up the index object by name from the table is the the models
index = next(idx for idx in table.indexes if idx.name == index_name)
_LOGGER.debug("Creating %s index", index_name)
_LOGGER.info("Adding index `%s` to database. Note: this can take several "
"minutes on large databases and slow computers. Please "
"be patient!", index_name)
index.create(engine)
_LOGGER.debug("Finished creating %s", index_name)


def _apply_update(engine, new_version):
def _drop_index(engine, table_name, index_name):
"""Drop an index from a specified table.
There is no universal way to do something like `DROP INDEX IF EXISTS`
so we will simply execute the DROP command and ignore any exceptions
WARNING: Due to some engines (MySQL at least) being unable to use bind
parameters in a DROP INDEX statement (at least via SQLAlchemy), the query
string here is generated from the method parameters without sanitizing.
DO NOT USE THIS FUNCTION IN ANY OPERATION THAT TAKES USER INPUT.
"""
from sqlalchemy import text
from sqlalchemy.exc import SQLAlchemyError

_LOGGER.debug("Dropping index %s from table %s", index_name, table_name)
success = False

# Engines like DB2/Oracle
try:
engine.execute(text("DROP INDEX {index}".format(
index=index_name)))
except SQLAlchemyError:
pass
else:
success = True

# Engines like SQLite, SQL Server
if not success:
try:
engine.execute(text("DROP INDEX {table}.{index}".format(
index=index_name,
table=table_name)))
except SQLAlchemyError:
pass
else:
success = True

if not success:
# Engines like MySQL, MS Access
try:
engine.execute(text("DROP INDEX {index} ON {table}".format(
index=index_name,
table=table_name)))
except SQLAlchemyError:
pass
else:
success = True

if success:
_LOGGER.debug("Finished dropping index %s from table %s",
index_name, table_name)
else:
_LOGGER.warning("Failed to drop index %s from table %s. Schema "
"Migration will continue; this is not a "
"critical operation.", index_name, table_name)


def _apply_update(engine, new_version, old_version):
"""Perform operations to bring schema up to date."""
if new_version == 1:
_create_index(engine, "events", "ix_events_time_fired")
Expand All @@ -63,9 +123,26 @@ def _apply_update(engine, new_version):
_create_index(engine, "recorder_runs", "ix_recorder_runs_start_end")
# Create indexes for states
_create_index(engine, "states", "ix_states_last_updated")
_create_index(engine, "states", "ix_states_entity_id_created")
elif new_version == 3:
_create_index(engine, "states", "ix_states_created_domain")
# There used to be a new index here, but it was removed in version 4.
pass
elif new_version == 4:
# Queries were rewritten in this schema release. Most indexes from
# earlier versions of the schema are no longer needed.

if old_version == 3:
# Remove index that was added in version 3
_drop_index(engine, "states", "ix_states_created_domain")
if old_version == 2:
# Remove index that was added in version 2
_drop_index(engine, "states", "ix_states_entity_id_created")

# Remove indexes that were added in version 0
_drop_index(engine, "states", "states__state_changes")
_drop_index(engine, "states", "states__significant_changes")
_drop_index(engine, "states", "ix_states_entity_id_created")

_create_index(engine, "states", "ix_states_entity_id_last_updated")
else:
raise ValueError("No schema migration defined for version {}"
.format(new_version))
Expand Down
15 changes: 6 additions & 9 deletions homeassistant/components/recorder/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# pylint: disable=invalid-name
Base = declarative_base()

SCHEMA_VERSION = 3
SCHEMA_VERSION = 4

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,14 +70,11 @@ class States(Base): # type: ignore
index=True)
created = Column(DateTime(timezone=True), default=datetime.utcnow)

__table_args__ = (Index('states__state_changes',
'last_changed', 'last_updated', 'entity_id'),
Index('states__significant_changes',
'domain', 'last_updated', 'entity_id'),
Index('ix_states_entity_id_created',
'entity_id', 'created'),
Index('ix_states_created_domain',
'created', 'domain'),)
__table_args__ = (
# Used for fetching the state of entities at a specific time
# (get_states in history.py)
Index(
'ix_states_entity_id_last_updated', 'entity_id', 'last_updated'),)

@staticmethod
def from_event(event):
Expand Down
4 changes: 2 additions & 2 deletions homeassistant/components/recorder/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ def purge_old_data(instance, purge_days):

with session_scope(session=instance.get_session()) as session:
deleted_rows = session.query(States) \
.filter((States.created < purge_before)) \
.filter((States.last_updated < purge_before)) \
.delete(synchronize_session=False)
_LOGGER.debug("Deleted %s states", deleted_rows)

deleted_rows = session.query(Events) \
.filter((Events.created < purge_before)) \
.filter((Events.time_fired < purge_before)) \
.delete(synchronize_session=False)
_LOGGER.debug("Deleted %s events", deleted_rows)

Expand Down
11 changes: 10 additions & 1 deletion homeassistant/components/recorder/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,19 @@ def execute(qry):

for tryno in range(0, RETRIES):
try:
return [
timer_start = time.perf_counter()
result = [
row for row in
(row.to_native() for row in qry)
if row is not None]

if _LOGGER.isEnabledFor(logging.DEBUG):
elapsed = time.perf_counter() - timer_start
_LOGGER.debug('converting %d rows to native objects took %fs',
len(result),
elapsed)

return result
except SQLAlchemyError as err:
_LOGGER.error("Error executing query: %s", err)

Expand Down
4 changes: 2 additions & 2 deletions tests/components/recorder/test_migrate.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def test_schema_update_calls(hass):
yield from wait_connection_ready(hass)

update.assert_has_calls([
call(hass.data[DATA_INSTANCE].engine, version+1) for version
call(hass.data[DATA_INSTANCE].engine, version+1, 0) for version
in range(0, SCHEMA_VERSION)])


Expand All @@ -64,4 +64,4 @@ def test_schema_migrate(hass):
def test_invalid_update():
"""Test that an invalid new version raises an exception."""
with pytest.raises(ValueError):
migration._apply_update(None, -1)
migration._apply_update(None, -1, 0)

0 comments on commit 6e17851

Please sign in to comment.