Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid recording state_changed events in the events table #71165

Merged
merged 8 commits into from May 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
135 changes: 59 additions & 76 deletions homeassistant/components/logbook/__init__.py
Expand Up @@ -111,13 +111,28 @@
*ALL_EVENT_TYPES_EXCEPT_STATE_CHANGED,
]


EVENT_COLUMNS = [
Events.event_type,
Events.event_data,
Events.time_fired,
Events.context_id,
Events.context_user_id,
Events.context_parent_id,
Events.event_type.label("event_type"),
Events.event_data.label("event_data"),
Events.time_fired.label("time_fired"),
Events.context_id.label("context_id"),
Events.context_user_id.label("context_user_id"),
Events.context_parent_id.label("context_parent_id"),
]

STATE_COLUMNS = [
States.state.label("state"),
States.entity_id.label("entity_id"),
States.attributes.label("attributes"),
StateAttributes.shared_attrs.label("shared_attrs"),
]

EMPTY_STATE_COLUMNS = [
literal(value=None, type_=sqlalchemy.String).label("state"),
literal(value=None, type_=sqlalchemy.String).label("entity_id"),
literal(value=None, type_=sqlalchemy.Text).label("attributes"),
literal(value=None, type_=sqlalchemy.Text).label("shared_attrs"),
]

SCRIPT_AUTOMATION_EVENTS = {EVENT_AUTOMATION_TRIGGERED, EVENT_SCRIPT_STARTED}
Expand Down Expand Up @@ -502,80 +517,70 @@ def yield_events(query: Query) -> Generator[LazyEventPartialState, None, None]:

with session_scope(hass=hass) as session:
old_state = aliased(States, name="old_state")
query: Query
query = _generate_events_query_without_states(session)
query = _apply_event_time_filter(query, start_day, end_day)
query = _apply_event_types_filter(
hass, query, ALL_EVENT_TYPES_EXCEPT_STATE_CHANGED
)

if entity_ids is not None:
query = _generate_events_query_without_states(session)
query = _apply_event_time_filter(query, start_day, end_day)
query = _apply_event_types_filter(
hass, query, ALL_EVENT_TYPES_EXCEPT_STATE_CHANGED
)
if entity_matches_only:
# When entity_matches_only is provided, contexts and events that do not
# contain the entity_ids are not included in the logbook response.
query = _apply_event_entity_id_matchers(query, entity_ids)
query = query.outerjoin(EventData, (Events.data_id == EventData.data_id))

query = query.union_all(
_generate_states_query(
session, start_day, end_day, old_state, entity_ids
)
)
else:
query = _generate_events_query(session)
query = _apply_event_time_filter(query, start_day, end_day)
query = _apply_events_types_and_states_filter(
hass, query, old_state
).filter(
(States.last_updated == States.last_changed)
| (Events.event_type != EVENT_STATE_CHANGED)
)
if filters:
query = query.filter(
filters.entity_filter() | (Events.event_type != EVENT_STATE_CHANGED) # type: ignore[no-untyped-call]
)

if context_id is not None:
query = query.filter(Events.context_id == context_id)

query = query.outerjoin(EventData, (Events.data_id == EventData.data_id))

states_query = _generate_states_query(
session, start_day, end_day, old_state, entity_ids
)
if context_id is not None:
# Once all the old `state_changed` events
# are gone from the database this query can
# be simplified to filter only on States.context_id == context_id
states_query = states_query.outerjoin(
Events, (States.event_id == Events.event_id)
)
states_query = states_query.filter(
(States.context_id == context_id)
| (States.context_id.is_(None) & (Events.context_id == context_id))
)
if filters:
states_query = states_query.filter(filters.entity_filter()) # type: ignore[no-untyped-call]
query = query.union_all(states_query)

query = query.order_by(Events.time_fired)

return list(
humanify(hass, yield_events(query), entity_attr_cache, context_lookup)
)


def _generate_events_query(session: Session) -> Query:
return session.query(
*EVENT_COLUMNS,
EventData.shared_data,
States.state,
States.entity_id,
States.attributes,
StateAttributes.shared_attrs,
)


def _generate_events_query_without_data(session: Session) -> Query:
return session.query(
*EVENT_COLUMNS,
literal(value=EVENT_STATE_CHANGED, type_=sqlalchemy.String).label("event_type"),
literal(value=None, type_=sqlalchemy.Text).label("event_data"),
States.last_changed.label("time_fired"),
States.context_id.label("context_id"),
States.context_user_id.label("context_user_id"),
States.context_parent_id.label("context_parent_id"),
literal(value=None, type_=sqlalchemy.Text).label("shared_data"),
States.state,
States.entity_id,
States.attributes,
StateAttributes.shared_attrs,
*STATE_COLUMNS,
)


def _generate_events_query_without_states(session: Session) -> Query:
return session.query(
*EVENT_COLUMNS,
EventData.shared_data,
literal(value=None, type_=sqlalchemy.String).label("state"),
literal(value=None, type_=sqlalchemy.String).label("entity_id"),
literal(value=None, type_=sqlalchemy.Text).label("attributes"),
literal(value=None, type_=sqlalchemy.Text).label("shared_attrs"),
*EVENT_COLUMNS, EventData.shared_data.label("shared_data"), *EMPTY_STATE_COLUMNS
)


Expand All @@ -584,41 +589,19 @@ def _generate_states_query(
start_day: dt,
end_day: dt,
old_state: States,
entity_ids: Iterable[str],
entity_ids: Iterable[str] | None,
) -> Query:
return (
query = (
_generate_events_query_without_data(session)
.outerjoin(Events, (States.event_id == Events.event_id))
.outerjoin(old_state, (States.old_state_id == old_state.state_id))
.filter(_missing_state_matcher(old_state))
.filter(_not_continuous_entity_matcher())
.filter((States.last_updated > start_day) & (States.last_updated < end_day))
.filter(
(States.last_updated == States.last_changed)
& States.entity_id.in_(entity_ids)
)
.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)
)


def _apply_events_types_and_states_filter(
hass: HomeAssistant, query: Query, old_state: States
) -> Query:
events_query = (
query.outerjoin(States, (Events.event_id == States.event_id))
.outerjoin(old_state, (States.old_state_id == old_state.state_id))
.filter(
(Events.event_type != EVENT_STATE_CHANGED)
| _missing_state_matcher(old_state)
)
.filter(
(Events.event_type != EVENT_STATE_CHANGED)
| _not_continuous_entity_matcher()
)
.filter(States.last_updated == States.last_changed)
)
return _apply_event_types_filter(hass, events_query, ALL_EVENT_TYPES).outerjoin(
if entity_ids:
query = query.filter(States.entity_id.in_(entity_ids))
return query.outerjoin(
StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
)

Expand Down
3 changes: 1 addition & 2 deletions homeassistant/components/recorder/__init__.py
Expand Up @@ -1223,8 +1223,8 @@ def _process_event_into_session(self, event: Event) -> None:
] = dbevent_data
self.event_session.add(dbevent_data)

self.event_session.add(dbevent)
if event.event_type != EVENT_STATE_CHANGED:
self.event_session.add(dbevent)
return

try:
Expand Down Expand Up @@ -1272,7 +1272,6 @@ def _process_event_into_session(self, event: Event) -> None:
self._pending_expunge.append(dbstate)
else:
dbstate.state = None
dbstate.event = dbevent
self.event_session.add(dbstate)

def _handle_database_error(self, err: Exception) -> bool:
Expand Down
20 changes: 19 additions & 1 deletion homeassistant/components/recorder/migration.py
Expand Up @@ -442,7 +442,7 @@ def _apply_update(instance, new_version, old_version): # noqa: C901
# and we would have to move to something like
# sqlalchemy alembic to make that work
#
_drop_index(instance, "states", "ix_states_context_id")
# no longer dropping ix_states_context_id since its recreated in 28
_drop_index(instance, "states", "ix_states_context_user_id")
# This index won't be there if they were not running
# nightly but we don't treat that as a critical issue
Expand Down Expand Up @@ -652,6 +652,24 @@ def _apply_update(instance, new_version, old_version): # noqa: C901
elif new_version == 27:
_add_columns(instance, "events", [f"data_id {big_int}"])
_create_index(instance, "events", "ix_events_data_id")
elif new_version == 28:
_add_columns(instance, "events", ["origin_idx INTEGER"])
# We never use the user_id or parent_id index
_drop_index(instance, "events", "ix_events_context_user_id")
_drop_index(instance, "events", "ix_events_context_parent_id")
_add_columns(
instance,
"states",
[
"origin_idx INTEGER",
"context_id VARCHAR(36)",
"context_user_id VARCHAR(36)",
"context_parent_id VARCHAR(36)",
],
)
_create_index(instance, "states", "ix_states_context_id")
# Once there are no longer any state_changed events
# in the events table we can drop the index on states.event_id
else:
raise ValueError(f"No schema migration defined for version {new_version}")

Expand Down
45 changes: 32 additions & 13 deletions homeassistant/components/recorder/models.py
Expand Up @@ -17,6 +17,7 @@
Identity,
Index,
Integer,
SmallInteger,
String,
Text,
distinct,
Expand All @@ -43,7 +44,7 @@
# pylint: disable=invalid-name
Base = declarative_base()

SCHEMA_VERSION = 27
SCHEMA_VERSION = 28

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -86,6 +87,8 @@
.with_variant(oracle.DOUBLE_PRECISION(), "oracle")
.with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
)
EVENT_ORIGIN_ORDER = [EventOrigin.local, EventOrigin.remote]
EVENT_ORIGIN_TO_IDX = {origin: idx for idx, origin in enumerate(EVENT_ORIGIN_ORDER)}


class Events(Base): # type: ignore[misc,valid-type]
Expand All @@ -98,14 +101,15 @@ class Events(Base): # type: ignore[misc,valid-type]
{"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"},
)
__tablename__ = TABLE_EVENTS
event_id = Column(Integer, Identity(), primary_key=True)
event_id = Column(Integer, Identity(), primary_key=True) # no longer used
event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE))
event_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql"))
origin = Column(String(MAX_LENGTH_EVENT_ORIGIN))
origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used
origin_idx = Column(SmallInteger)
time_fired = Column(DATETIME_TYPE, index=True)
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
data_id = Column(Integer, ForeignKey("event_data.data_id"), index=True)
event_data_rel = relationship("EventData")

Expand All @@ -114,7 +118,7 @@ def __repr__(self) -> str:
return (
f"<recorder.Events("
f"id={self.event_id}, type='{self.event_type}', "
f"origin='{self.origin}', time_fired='{self.time_fired}'"
f"origin_idx='{self.origin_idx}', time_fired='{self.time_fired}'"
f", data_id={self.data_id})>"
)

Expand All @@ -124,7 +128,7 @@ def from_event(event: Event) -> Events:
return Events(
event_type=event.event_type,
event_data=None,
origin=str(event.origin.value),
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
time_fired=event.time_fired,
context_id=event.context.id,
context_user_id=event.context.user_id,
Expand All @@ -142,7 +146,9 @@ def to_native(self, validate_entity_id: bool = True) -> Event | None:
return Event(
self.event_type,
json.loads(self.event_data) if self.event_data else {},
EventOrigin(self.origin),
EventOrigin(self.origin)
if self.origin
else EVENT_ORIGIN_ORDER[self.origin_idx],
process_timestamp(self.time_fired),
context=context,
)
Expand Down Expand Up @@ -222,7 +228,10 @@ class States(Base): # type: ignore[misc,valid-type]
attributes_id = Column(
Integer, ForeignKey("state_attributes.attributes_id"), index=True
)
event = relationship("Events", uselist=False)
context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True)
context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID))
origin_idx = Column(SmallInteger) # 0 is local, 1 is remote
old_state = relationship("States", remote_side=[state_id])
state_attributes = relationship("StateAttributes")

Expand All @@ -242,7 +251,14 @@ def from_event(event: Event) -> States:
"""Create object from a state_changed event."""
entity_id = event.data["entity_id"]
state: State | None = event.data.get("new_state")
dbstate = States(entity_id=entity_id, attributes=None)
dbstate = States(
entity_id=entity_id,
attributes=None,
context_id=event.context.id,
context_user_id=event.context.user_id,
context_parent_id=event.context.parent_id,
origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin),
)

# None state means the state was removed from the state machine
if state is None:
Expand All @@ -258,6 +274,11 @@ def from_event(event: Event) -> States:

def to_native(self, validate_entity_id: bool = True) -> State | None:
"""Convert to an HA state object."""
context = Context(
id=self.context_id,
user_id=self.context_user_id,
parent_id=self.context_parent_id,
)
try:
return State(
self.entity_id,
Expand All @@ -267,9 +288,7 @@ def to_native(self, validate_entity_id: bool = True) -> State | None:
json.loads(self.attributes) if self.attributes else {},
process_timestamp(self.last_changed),
process_timestamp(self.last_updated),
# Join the events table on event_id to get the context instead
# as it will always be there for state_changed events
context=Context(id=None), # type: ignore[arg-type]
context=context,
validate_entity_id=validate_entity_id,
)
except ValueError:
Expand Down