Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live db migrations and recovery #49036

Merged
merged 5 commits into from Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
395 changes: 240 additions & 155 deletions homeassistant/components/recorder/__init__.py

Large diffs are not rendered by default.

31 changes: 18 additions & 13 deletions homeassistant/components/recorder/migration.py
Expand Up @@ -11,15 +11,14 @@
)
from sqlalchemy.schema import AddConstraint, DropConstraint

from .const import DOMAIN
from .models import SCHEMA_VERSION, TABLE_STATES, Base, SchemaChanges
from .util import session_scope

_LOGGER = logging.getLogger(__name__)


def migrate_schema(instance):
"""Check if the schema needs to be upgraded."""
def get_schema_version(instance):
"""Get the schema version."""
with session_scope(session=instance.get_session()) as session:
res = (
session.query(SchemaChanges)
Expand All @@ -34,21 +33,27 @@ def migrate_schema(instance):
"No schema version found. Inspected version: %s", current_version
)

if current_version == SCHEMA_VERSION:
return
return current_version


def schema_is_current(current_version):
"""Check if the schema is current."""
return current_version == SCHEMA_VERSION


def migrate_schema(instance, current_version):
"""Check if the schema needs to be upgraded."""
with session_scope(session=instance.get_session()) as session:
_LOGGER.warning(
"Database is about to upgrade. Schema version: %s", current_version
)
for version in range(current_version, SCHEMA_VERSION):
new_version = version + 1
_LOGGER.info("Upgrading recorder db schema to version %s", new_version)
_apply_update(instance.engine, new_version, current_version)
session.add(SchemaChanges(schema_version=new_version))

with instance.hass.timeout.freeze(DOMAIN):
for version in range(current_version, SCHEMA_VERSION):
new_version = version + 1
_LOGGER.info("Upgrading recorder db schema to version %s", new_version)
_apply_update(instance.engine, new_version, current_version)
session.add(SchemaChanges(schema_version=new_version))

_LOGGER.info("Upgrade to version %s done", new_version)
_LOGGER.info("Upgrade to version %s done", new_version)


def _create_index(engine, table_name, index_name):
Expand Down
5 changes: 2 additions & 3 deletions homeassistant/components/recorder/purge.py
Expand Up @@ -6,7 +6,7 @@
import time
from typing import TYPE_CHECKING

from sqlalchemy.exc import OperationalError, SQLAlchemyError
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import distinct

Expand Down Expand Up @@ -69,8 +69,7 @@ def purge_old_data(
return False

_LOGGER.warning("Error purging history: %s", err)
except SQLAlchemyError as err:
bdraco marked this conversation as resolved.
Show resolved Hide resolved
_LOGGER.warning("Error purging history: %s", err)

return True


Expand Down
41 changes: 17 additions & 24 deletions homeassistant/components/recorder/util.py
Expand Up @@ -14,8 +14,13 @@
from homeassistant.helpers.typing import HomeAssistantType
import homeassistant.util.dt as dt_util

from .const import CONF_DB_INTEGRITY_CHECK, DATA_INSTANCE, SQLITE_URL_PREFIX
from .models import ALL_TABLES, process_timestamp
from .const import DATA_INSTANCE, SQLITE_URL_PREFIX
from .models import (
ALL_TABLES,
TABLE_RECORDER_RUNS,
TABLE_SCHEMA_CHANGES,
process_timestamp,
)

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -117,15 +122,15 @@ def execute(qry, to_native=False, validate_entity_ids=True):
time.sleep(QUERY_RETRY_WAIT)


def validate_or_move_away_sqlite_database(dburl: str, db_integrity_check: bool) -> bool:
def validate_or_move_away_sqlite_database(dburl: str) -> bool:
"""Ensure that the database is valid or move it away."""
dbpath = dburl_to_path(dburl)

if not os.path.exists(dbpath):
# Database does not exist yet, this is OK
return True

if not validate_sqlite_database(dbpath, db_integrity_check):
if not validate_sqlite_database(dbpath):
move_away_broken_database(dbpath)
return False

Expand Down Expand Up @@ -161,18 +166,21 @@ def basic_sanity_check(cursor):
"""Check tables to make sure select does not fail."""

for table in ALL_TABLES:
cursor.execute(f"SELECT * FROM {table} LIMIT 1;") # nosec # not injection
if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES):
cursor.execute(f"SELECT * FROM {table};") # nosec # not injection
else:
cursor.execute(f"SELECT * FROM {table} LIMIT 1;") # nosec # not injection

return True


def validate_sqlite_database(dbpath: str, db_integrity_check: bool) -> bool:
def validate_sqlite_database(dbpath: str) -> bool:
"""Run a quick check on an sqlite database to see if it is corrupt."""
import sqlite3 # pylint: disable=import-outside-toplevel

try:
conn = sqlite3.connect(dbpath)
run_checks_on_open_db(dbpath, conn.cursor(), db_integrity_check)
run_checks_on_open_db(dbpath, conn.cursor())
conn.close()
except sqlite3.DatabaseError:
_LOGGER.exception("The database at %s is corrupt or malformed", dbpath)
Expand All @@ -181,24 +189,14 @@ def validate_sqlite_database(dbpath: str, db_integrity_check: bool) -> bool:
return True


def run_checks_on_open_db(dbpath, cursor, db_integrity_check):
def run_checks_on_open_db(dbpath, cursor):
"""Run checks that will generate a sqlite3 exception if there is corruption."""
sanity_check_passed = basic_sanity_check(cursor)
last_run_was_clean = last_run_was_recently_clean(cursor)

if sanity_check_passed and last_run_was_clean:
_LOGGER.debug(
"The quick_check will be skipped as the system was restarted cleanly and passed the basic sanity check"
)
return

if not db_integrity_check:
# Always warn so when it does fail they remember it has
# been manually disabled
_LOGGER.warning(
"The quick_check on the sqlite3 database at %s was skipped because %s was disabled",
dbpath,
CONF_DB_INTEGRITY_CHECK,
"The system was restarted cleanly and passed the basic sanity check"
)
return

Expand All @@ -214,11 +212,6 @@ def run_checks_on_open_db(dbpath, cursor, db_integrity_check):
dbpath,
)

_LOGGER.info(
"A quick_check is being performed on the sqlite3 database at %s", dbpath
)
cursor.execute("PRAGMA QUICK_CHECK")


def move_away_broken_database(dbfile: str) -> None:
"""Move away a broken sqlite3 database."""
Expand Down