Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate attrs into another table (reduces database size) #68224

Merged
merged 62 commits into from Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
bd97dcc
Separate attrs into another table
bdraco Mar 16, 2022
aafcea4
tweak
bdraco Mar 16, 2022
3d5395c
tweak
bdraco Mar 16, 2022
9ed711f
fix queries
bdraco Mar 16, 2022
9ae17de
adjust
bdraco Mar 17, 2022
38e92ab
tweak
bdraco Mar 17, 2022
386455f
tweak
bdraco Mar 17, 2022
0e1c41b
fix purge
bdraco Mar 17, 2022
2220099
Update homeassistant/components/recorder/purge.py
bdraco Mar 17, 2022
36b5235
fix purge
bdraco Mar 17, 2022
e9f141d
Merge branch 'state_attr_table_poc' of github.com:bdraco/home-assista…
bdraco Mar 17, 2022
f766fa7
fixes
bdraco Mar 17, 2022
3b96923
wip
bdraco Mar 17, 2022
684a7a8
group_by is faster
bdraco Mar 17, 2022
a1e869d
Update homeassistant/components/recorder/purge.py
bdraco Mar 17, 2022
5f85dba
Update homeassistant/components/recorder/purge.py
bdraco Mar 17, 2022
3f07757
Update homeassistant/components/recorder/purge.py
bdraco Mar 17, 2022
05d865f
naming
bdraco Mar 17, 2022
6b9ed3d
naming
bdraco Mar 17, 2022
8862804
naming
bdraco Mar 17, 2022
6d29401
Update homeassistant/components/recorder/purge.py
bdraco Mar 17, 2022
5e10ac2
test fixes
bdraco Mar 17, 2022
22e10a0
test fixes
bdraco Mar 17, 2022
80e9c65
Merge branch 'state_attr_table_poc' of github.com:bdraco/home-assista…
bdraco Mar 17, 2022
d23968b
fix
bdraco Mar 17, 2022
8df930a
Update homeassistant/components/recorder/history.py
bdraco Mar 17, 2022
1064e65
Update homeassistant/components/recorder/history.py
bdraco Mar 17, 2022
2b53921
Update homeassistant/components/recorder/history.py
bdraco Mar 17, 2022
ceacc24
Update homeassistant/components/recorder/history.py
bdraco Mar 17, 2022
91d85f9
Update homeassistant/components/recorder/history.py
bdraco Mar 17, 2022
cdd2731
Update homeassistant/components/recorder/history.py
bdraco Mar 17, 2022
709cabe
Update homeassistant/components/recorder/util.py
bdraco Mar 17, 2022
6d7c648
purge tests
bdraco Mar 17, 2022
b218137
fix purge entities
bdraco Mar 17, 2022
52c7120
fixes
bdraco Mar 17, 2022
6d98163
fixes
bdraco Mar 17, 2022
9e905d2
fixes
bdraco Mar 17, 2022
d7751e5
comments
bdraco Mar 17, 2022
3410cb6
handle broken json
bdraco Mar 17, 2022
198319e
tweak messages
bdraco Mar 17, 2022
8f6f360
refactor
bdraco Mar 17, 2022
d22c4ef
tweaks
bdraco Mar 17, 2022
4ee5d10
small cleanups
bdraco Mar 17, 2022
f03ea54
fix mysql
bdraco Mar 17, 2022
6c9b696
fix refactoring error
bdraco Mar 17, 2022
3d2f431
fix plant tests
bdraco Mar 17, 2022
b1a0c67
fix
bdraco Mar 17, 2022
37e533e
Merge branch 'dev' into state_attr_table_poc
bdraco Mar 17, 2022
1bc48c2
Update homeassistant/components/statistics/sensor.py
bdraco Mar 17, 2022
33c18ec
coverage
bdraco Mar 17, 2022
97a4a1d
more cover
bdraco Mar 18, 2022
8401212
revert empty context change
bdraco Mar 18, 2022
d8bf27b
ensure length is 20
bdraco Mar 18, 2022
a2ad0ac
Update homeassistant/components/logbook/__init__.py
bdraco Mar 18, 2022
dd5f0a9
Update homeassistant/components/plant/__init__.py
bdraco Mar 18, 2022
aae19cb
Update homeassistant/components/recorder/models.py
bdraco Mar 18, 2022
fa2a7ef
delete ENABLE_LOAD_HISTORY
bdraco Mar 18, 2022
d7c5e58
Add comment about _state_attributes_ids LRU size
bdraco Mar 18, 2022
c4f31a8
add coverage for missing line in logbook
bdraco Mar 18, 2022
762c015
add coverage to make sure states without attributes_id are purged cor…
bdraco Mar 18, 2022
79e3cc6
Update homeassistant/components/statistics/sensor.py
bdraco Mar 18, 2022
54b3160
Merge branch 'dev' into state_attr_table_poc
bdraco Mar 18, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
35 changes: 23 additions & 12 deletions homeassistant/components/logbook/__init__.py
Expand Up @@ -5,6 +5,7 @@
from itertools import groupby
import json
import re
from typing import Any

import sqlalchemy
from sqlalchemy.orm import aliased
Expand All @@ -18,6 +19,7 @@
from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import (
Events,
StateAttributes,
States,
process_timestamp_to_utc_isoformat,
)
Expand Down Expand Up @@ -494,6 +496,7 @@ def _generate_events_query(session):
States.entity_id,
States.domain,
States.attributes,
StateAttributes.shared_attrs,
)


Expand All @@ -504,6 +507,7 @@ def _generate_events_query_without_states(session):
literal(value=None, type_=sqlalchemy.String).label("entity_id"),
literal(value=None, type_=sqlalchemy.String).label("domain"),
literal(value=None, type_=sqlalchemy.Text).label("attributes"),
literal(value=None, type_=sqlalchemy.Text).label("shared_attrs"),
)


Expand All @@ -519,6 +523,9 @@ def _generate_states_query(session, start_day, end_day, old_state, entity_ids):
(States.last_updated == States.last_changed)
& States.entity_id.in_(entity_ids)
)
.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
bdraco marked this conversation as resolved.
Show resolved Hide resolved
)


Expand All @@ -534,7 +541,9 @@ def _apply_events_types_and_states_filter(hass, query, old_state):
(Events.event_type != EVENT_STATE_CHANGED) | _continuous_entity_matcher()
)
)
return _apply_event_types_filter(hass, events_query, ALL_EVENT_TYPES)
return _apply_event_types_filter(hass, events_query, ALL_EVENT_TYPES).outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)


def _missing_state_matcher(old_state):
Expand All @@ -556,6 +565,9 @@ def _continuous_entity_matcher():
return sqlalchemy.or_(
sqlalchemy.not_(States.domain.in_(CONTINUOUS_DOMAINS)),
sqlalchemy.not_(States.attributes.contains(UNIT_OF_MEASUREMENT_JSON)),
sqlalchemy.not_(
StateAttributes.shared_attrs.contains(UNIT_OF_MEASUREMENT_JSON)
),
)


Expand Down Expand Up @@ -709,8 +721,9 @@ def attributes_icon(self):
"""Extract the icon from the decoded attributes or json."""
if self._attributes:
return self._attributes.get(ATTR_ICON)

result = ICON_JSON_EXTRACT.search(self._row.attributes)
result = ICON_JSON_EXTRACT.search(
self._row.shared_attrs or self._row.attributes
)
return result and result.group(1)

@property
Expand All @@ -734,14 +747,12 @@ def data_domain(self):
@property
def attributes(self):
"""State attributes."""
if not self._attributes:
if (
self._row.attributes is None
or self._row.attributes == EMPTY_JSON_OBJECT
):
if self._attributes is None:
source = self._row.shared_attrs or self._row.attributes
if source == EMPTY_JSON_OBJECT or source is None:
self._attributes = {}
else:
self._attributes = json.loads(self._row.attributes)
self._attributes = json.loads(source)
bdraco marked this conversation as resolved.
Show resolved Hide resolved
return self._attributes

@property
Expand Down Expand Up @@ -772,12 +783,12 @@ class EntityAttributeCache:
that are expected to change state.
"""

def __init__(self, hass):
def __init__(self, hass: HomeAssistant) -> None:
"""Init the cache."""
self._hass = hass
self._cache = {}
self._cache: dict[str, dict[str, Any]] = {}

def get(self, entity_id, attribute, event):
def get(self, entity_id: str, attribute: str, event: LazyEventPartialState) -> Any:
"""Lookup an attribute for an entity or get it from the cache."""
if entity_id in self._cache:
if attribute in self._cache[entity_id]:
Expand Down
23 changes: 14 additions & 9 deletions homeassistant/components/plant/__init__.py
Expand Up @@ -7,7 +7,7 @@
import voluptuous as vol

from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.models import States
from homeassistant.components.recorder.models import StateAttributes, States
from homeassistant.components.recorder.util import execute, session_scope
from homeassistant.const import (
ATTR_TEMPERATURE,
Expand Down Expand Up @@ -110,11 +110,6 @@
CONFIG_SCHEMA = vol.Schema({DOMAIN: {cv.string: PLANT_SCHEMA}}, extra=vol.ALLOW_EXTRA)


# Flag for enabling/disabling the loading of the history from the database.
# This feature is turned off right now as its tests are not 100% stable.
ENABLE_LOAD_HISTORY = False


async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Plant component."""
component = EntityComponent(_LOGGER, DOMAIN, hass)
Expand Down Expand Up @@ -282,7 +277,7 @@ def _check_max(self, sensor_name, value, params):

async def async_added_to_hass(self):
"""After being added to hass, load from history."""
if ENABLE_LOAD_HISTORY and "recorder" in self.hass.config.components:
if "recorder" in self.hass.config.components:
# only use the database if it's configured
await get_instance(self.hass).async_add_executor_job(
self._load_history_from_db
Expand Down Expand Up @@ -315,14 +310,24 @@ def _load_history_from_db(self):
_LOGGER.debug("Initializing values for %s from the database", self._name)
with session_scope(hass=self.hass) as session:
query = (
session.query(States)
session.query(States, StateAttributes)
.filter(
(States.entity_id == entity_id.lower())
and (States.last_updated > start_date)
)
.outerjoin(
StateAttributes,
States.attributes_id == StateAttributes.attributes_id,
)
.order_by(States.last_updated.asc())
)
states = execute(query, to_native=True, validate_entity_ids=False)
states = []
if results := execute(query, to_native=False, validate_entity_ids=False):
for state, attributes in results:
native = state.to_native()
if not native.attributes:
native.attributes = attributes.to_native()
states.append(native)

for state in states:
# filter out all None, NaN and "unknown" states
Expand Down
75 changes: 58 additions & 17 deletions homeassistant/components/recorder/__init__.py
Expand Up @@ -14,6 +14,7 @@
import time
from typing import Any, TypeVar

from lru import LRU # pylint: disable=no-name-in-module
from sqlalchemy import create_engine, event as sqlalchemy_event, exc, func, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import scoped_session, sessionmaker
Expand Down Expand Up @@ -67,6 +68,7 @@
Base,
Events,
RecorderRuns,
StateAttributes,
States,
StatisticsRuns,
process_timestamp,
Expand Down Expand Up @@ -131,6 +133,15 @@
# States and Events objects
EXPIRE_AFTER_COMMITS = 120

# The number of attribute ids to cache in memory
#
# Based on:
# - The number of overlapping attributes
# - How frequently states with overlapping attributes will change
# - How much memory our low end hardware has
STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048


DB_LOCK_TIMEOUT = 30
DB_LOCK_QUEUE_CHECK_TIMEOUT = 1

Expand Down Expand Up @@ -541,6 +552,8 @@ def __init__(
self._commits_without_expire = 0
self._keepalive_count = 0
self._old_states: dict[str, States] = {}
self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE)
self._pending_state_attributes: dict[str, StateAttributes] = {}
self._pending_expunge: list[States] = []
self.event_session = None
self.get_session = None
Expand Down Expand Up @@ -964,33 +977,58 @@ def _process_one_event(self, event):
dbevent.event_data = None
else:
dbevent = Events.from_event(event)
self.event_session.add(dbevent)
except (TypeError, ValueError):
_LOGGER.warning("Event is not JSON serializable: %s", event)
return

self.event_session.add(dbevent)
if event.event_type == EVENT_STATE_CHANGED:
try:
dbstate = States.from_event(event)
has_new_state = event.data.get("new_state")
if dbstate.entity_id in self._old_states:
old_state = self._old_states.pop(dbstate.entity_id)
if old_state.state_id:
dbstate.old_state_id = old_state.state_id
else:
dbstate.old_state = old_state
if not has_new_state:
dbstate.state = None
dbstate.event = dbevent
self.event_session.add(dbstate)
if has_new_state:
self._old_states[dbstate.entity_id] = dbstate
self._pending_expunge.append(dbstate)
bdraco marked this conversation as resolved.
Show resolved Hide resolved
except (TypeError, ValueError):
dbstate_attributes = StateAttributes.from_event(event)
except (TypeError, ValueError) as ex:
_LOGGER.warning(
"State is not JSON serializable: %s",
"State is not JSON serializable: %s: %s",
event.data.get("new_state"),
ex,
)
return

dbstate.attributes = None
shared_attrs = dbstate_attributes.shared_attrs
# Matching attributes found in the pending commit
if pending_attributes := self._pending_state_attributes.get(shared_attrs):
dbstate.state_attributes = pending_attributes
# Matching attributes id found in the cache
elif attributes_id := self._state_attributes_ids.get(shared_attrs):
dbstate.attributes_id = attributes_id
# Matching attributes found in the database
elif (
attributes := self.event_session.query(StateAttributes.attributes_id)
.filter(StateAttributes.hash == dbstate_attributes.hash)
.filter(StateAttributes.shared_attrs == shared_attrs)
.first()
bdraco marked this conversation as resolved.
Show resolved Hide resolved
):
dbstate.attributes_id = attributes[0]
self._state_attributes_ids[shared_attrs] = attributes[0]
# No matching attributes found, save them in the DB
else:
dbstate.state_attributes = dbstate_attributes
self._pending_state_attributes[shared_attrs] = dbstate_attributes
self.event_session.add(dbstate_attributes)

if old_state := self._old_states.pop(dbstate.entity_id, None):
if old_state.state_id:
dbstate.old_state_id = old_state.state_id
else:
dbstate.old_state = old_state
if event.data.get("new_state"):
self._old_states[dbstate.entity_id] = dbstate
self._pending_expunge.append(dbstate)
else:
dbstate.state = None
self.event_session.add(dbstate)
dbstate.event = dbevent

# If they do not have a commit interval
# than we commit right away
Expand Down Expand Up @@ -1042,6 +1080,7 @@ def _commit_event_session(self):
if dbstate in self.event_session:
self.event_session.expunge(dbstate)
self._pending_expunge = []
self._pending_state_attributes = {}
self.event_session.commit()

# Expire is an expensive operation (frequently more expensive
Expand All @@ -1062,6 +1101,8 @@ def _handle_sqlite_corruption(self):
def _close_event_session(self):
"""Close the event session."""
self._old_states = {}
self._state_attributes_ids = {}
self._pending_state_attributes = {}

if not self.event_session:
return
Expand Down
26 changes: 24 additions & 2 deletions homeassistant/components/recorder/history.py
Expand Up @@ -13,7 +13,12 @@
from homeassistant.core import split_entity_id
import homeassistant.util.dt as dt_util

from .models import LazyState, States, process_timestamp_to_utc_isoformat
from .models import (
LazyState,
StateAttributes,
States,
process_timestamp_to_utc_isoformat,
)
from .util import execute, session_scope

# mypy: allow-untyped-defs, no-check-untyped-defs
Expand Down Expand Up @@ -46,6 +51,7 @@
States.attributes,
States.last_changed,
States.last_updated,
StateAttributes.shared_attrs,
]

HISTORY_BAKERY = "recorder_history_bakery"
Expand Down Expand Up @@ -114,6 +120,9 @@ def get_significant_states_with_session(
if end_time is not None:
baked_query += lambda q: q.filter(States.last_updated < bindparam("end_time"))

baked_query += lambda q: q.outerjoin(
bdraco marked this conversation as resolved.
Show resolved Hide resolved
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
baked_query += lambda q: q.order_by(States.entity_id, States.last_updated)

states = execute(
Expand Down Expand Up @@ -159,6 +168,9 @@ def state_changes_during_period(hass, start_time, end_time=None, entity_id=None)
baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id"))
entity_id = entity_id.lower()

baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
baked_query += lambda q: q.order_by(States.entity_id, States.last_updated)

states = execute(
Expand Down Expand Up @@ -186,6 +198,9 @@ def get_last_state_changes(hass, number_of_states, entity_id):
baked_query += lambda q: q.filter_by(entity_id=bindparam("entity_id"))
entity_id = entity_id.lower()

baked_query += lambda q: q.outerjoin(
bdraco marked this conversation as resolved.
Show resolved Hide resolved
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
baked_query += lambda q: q.order_by(
States.entity_id, States.last_updated.desc()
)
Expand Down Expand Up @@ -263,6 +278,8 @@ def _get_states_with_session(
query = query.join(
most_recent_state_ids,
States.state_id == most_recent_state_ids.c.max_state_id,
).outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
else:
# We did not get an include-list of entities, query all states in the inner
Expand Down Expand Up @@ -301,7 +318,9 @@ def _get_states_with_session(
query = query.filter(~States.domain.in_(IGNORE_DOMAINS))
if filters:
query = filters.apply(query)

query = query.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
return [LazyState(row) for row in execute(query)]


Expand All @@ -315,6 +334,9 @@ def _get_single_entity_states_with_session(hass, session, utc_point_in_time, ent
States.last_updated < bindparam("utc_point_in_time"),
States.entity_id == bindparam("entity_id"),
)
baked_query += lambda q: q.outerjoin(
StateAttributes, States.attributes_id == StateAttributes.attributes_id
)
baked_query += lambda q: q.order_by(States.last_updated.desc())
baked_query += lambda q: q.limit(1)

Expand Down
2 changes: 1 addition & 1 deletion homeassistant/components/recorder/manifest.json
Expand Up @@ -2,7 +2,7 @@
"domain": "recorder",
"name": "Recorder",
"documentation": "https://www.home-assistant.io/integrations/recorder",
"requirements": ["sqlalchemy==1.4.32"],
"requirements": ["sqlalchemy==1.4.32","fnvhash==0.1.0","lru-dict==1.1.7"],
"codeowners": ["@home-assistant/core"],
"quality_scale": "internal",
"iot_class": "local_push"
Expand Down