Skip to content

Commit

Permalink
Run database migrations in the background
Browse files Browse the repository at this point in the history
- Database migrations no longer block startup
- Events are queued and processed when the migration is completed
- There is a safety to start discarding events if more than 10000 happen before migration is completed
- A notification is shown when the migration is in progress "The database is being upgraded. Integrations such as logbook and history that read the database may return inconsistent results until the migration completes."
- The database recovery logic can now recover at point after setup including purge, migration, and event insert. In short we can _always_ (hopefully) recover and start a new db  without a restart.
- The quick check is no longer performed on unclean shutdown since we can always recover live. The `db_integrity_check` option has been deprecated.
  • Loading branch information
bdraco committed Apr 11, 2021
1 parent 62182ea commit 163cd74
Show file tree
Hide file tree
Showing 8 changed files with 669 additions and 302 deletions.
384 changes: 232 additions & 152 deletions homeassistant/components/recorder/__init__.py

Large diffs are not rendered by default.

31 changes: 18 additions & 13 deletions homeassistant/components/recorder/migration.py
Expand Up @@ -11,15 +11,14 @@
)
from sqlalchemy.schema import AddConstraint, DropConstraint

from .const import DOMAIN
from .models import SCHEMA_VERSION, TABLE_STATES, Base, SchemaChanges
from .util import session_scope

_LOGGER = logging.getLogger(__name__)


def migrate_schema(instance):
"""Check if the schema needs to be upgraded."""
def get_schema_version(instance):
"""Get the schema version."""
with session_scope(session=instance.get_session()) as session:
res = (
session.query(SchemaChanges)
Expand All @@ -34,21 +33,27 @@ def migrate_schema(instance):
"No schema version found. Inspected version: %s", current_version
)

if current_version == SCHEMA_VERSION:
return
return current_version


def schema_is_current(current_version):
"""Check if the schema is current."""
return current_version == SCHEMA_VERSION


def migrate_schema(instance, current_version):
"""Check if the schema needs to be upgraded."""
with session_scope(session=instance.get_session()) as session:
_LOGGER.warning(
"Database is about to upgrade. Schema version: %s", current_version
)
for version in range(current_version, SCHEMA_VERSION):
new_version = version + 1
_LOGGER.info("Upgrading recorder db schema to version %s", new_version)
_apply_update(instance.engine, new_version, current_version)
session.add(SchemaChanges(schema_version=new_version))

with instance.hass.timeout.freeze(DOMAIN):
for version in range(current_version, SCHEMA_VERSION):
new_version = version + 1
_LOGGER.info("Upgrading recorder db schema to version %s", new_version)
_apply_update(instance.engine, new_version, current_version)
session.add(SchemaChanges(schema_version=new_version))

_LOGGER.info("Upgrade to version %s done", new_version)
_LOGGER.info("Upgrade to version %s done", new_version)


def _create_index(engine, table_name, index_name):
Expand Down
5 changes: 2 additions & 3 deletions homeassistant/components/recorder/purge.py
Expand Up @@ -6,7 +6,7 @@
import time
from typing import TYPE_CHECKING

from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct

Expand Down Expand Up @@ -69,8 +69,7 @@ def purge_old_data(
return False

_LOGGER.warning("Error purging history: %s", err)
except SQLAlchemyError as err:
_LOGGER.warning("Error purging history: %s", err)

return True


Expand Down
41 changes: 17 additions & 24 deletions homeassistant/components/recorder/util.py
Expand Up @@ -14,8 +14,13 @@
from homeassistant.helpers.typing import HomeAssistantType
import homeassistant.util.dt as dt_util

from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, SQLITE_URL_PREFIX
from .models import ALL_TABLES, process_timestamp
from .const import DATA_INSTANCE, SQLITE_URL_PREFIX
from .models import (
ALL_TABLES,
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
process_timestamp,
)

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -117,15 +122,15 @@ def execute(qry, to_native=False, validate_entity_ids=True):
time.sleep(QUERY_RETRY_WAIT)


def validate_or_move_away_sqlite_database(dburl: str, db_integrity_check: bool) -> bool:
def validate_or_move_away_sqlite_database(dburl: str) -> bool:
"""Ensure that the database is valid or move it away."""
dbpath = dburl_to_path(dburl)

if not os.path.exists(dbpath):
# Database does not exist yet, this is OK
return True

if not validate_sqlite_database(dbpath, db_integrity_check):
if not validate_sqlite_database(dbpath):
move_away_broken_database(dbpath)
return False

Expand Down Expand Up @@ -161,18 +166,21 @@ def basic_sanity_check(cursor):
"""Check tables to make sure select does not fail."""

for table in ALL_TABLES:
cursor.execute(f"SELECT * FROM {table} LIMIT 1;") # nosec # not injection
if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES):
cursor.execute(f"SELECT * FROM {table};") # nosec # not injection
else:
cursor.execute(f"SELECT * FROM {table} LIMIT 1;") # nosec # not injection

return True


def validate_sqlite_database(dbpath: str, db_integrity_check: bool) -> bool:
def validate_sqlite_database(dbpath: str) -> bool:
"""Run a quick check on an sqlite database to see if it is corrupt."""
import sqlite3 # pylint: disable=import-outside-toplevel

try:
conn = sqlite3.connect(dbpath)
run_checks_on_open_db(dbpath, conn.cursor(), db_integrity_check)
run_checks_on_open_db(dbpath, conn.cursor())
conn.close()
except sqlite3.DatabaseError:
_LOGGER.exception("The database at %s is corrupt or malformed", dbpath)
Expand All @@ -181,24 +189,14 @@ def validate_sqlite_database(dbpath: str, db_integrity_check: bool) -> bool:
return True


def run_checks_on_open_db(dbpath, cursor, db_integrity_check):
def run_checks_on_open_db(dbpath, cursor):
"""Run checks that will generate a sqlite3 exception if there is corruption."""
sanity_check_passed = basic_sanity_check(cursor)
last_run_was_clean = last_run_was_recently_clean(cursor)

if sanity_check_passed and last_run_was_clean:
_LOGGER.debug(
"The quick_check will be skipped as the system was restarted cleanly and passed the basic sanity check"
)
return

if not db_integrity_check:
# Always warn so when it does fail they remember it has
# been manually disabled
_LOGGER.warning(
"The quick_check on the sqlite3 database at %s was skipped because %s was disabled",
dbpath,
CONF_DB_INTEGRITY_CHECK,
"The system was restarted cleanly and passed the basic sanity check"
)
return

Expand All @@ -214,11 +212,6 @@ def run_checks_on_open_db(dbpath, cursor, db_integrity_check):
dbpath,
)

_LOGGER.info(
"A quick_check is being performed on the sqlite3 database at %s", dbpath
)
cursor.execute("PRAGMA QUICK_CHECK")


def move_away_broken_database(dbfile: str) -> None:
"""Move away a broken sqlite3 database."""
Expand Down

0 comments on commit 163cd74

Please sign in to comment.