Skip to content

Commit

Permalink
Rework mqtt callbacks for camera, image and event
Browse files Browse the repository at this point in the history
  • Loading branch information
jbouwh committed May 25, 2024
1 parent 6fc6d10 commit 80f8178
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 143 deletions.
30 changes: 17 additions & 13 deletions homeassistant/components/mqtt/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

from base64 import b64decode
from functools import partial
import logging
from typing import TYPE_CHECKING

Expand All @@ -20,7 +21,6 @@
from . import subscription
from .config import MQTT_BASE_SCHEMA
from .const import CONF_QOS, CONF_TOPIC
from .debug_info import log_messages
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import ReceiveMessage
from .schemas import MQTT_ENTITY_COMMON_SCHEMA
Expand Down Expand Up @@ -97,27 +97,31 @@ def config_schema() -> vol.Schema:
"""Return the config schema."""
return DISCOVERY_SCHEMA

@callback
def _image_received(self, msg: ReceiveMessage) -> None:
"""Handle new MQTT messages."""
if CONF_IMAGE_ENCODING in self._config:
self._last_image = b64decode(msg.payload)
else:
if TYPE_CHECKING:
assert isinstance(msg.payload, bytes)
self._last_image = msg.payload

def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""

@callback
@log_messages(self.hass, self.entity_id)
def message_received(msg: ReceiveMessage) -> None:
"""Handle new MQTT messages."""
if CONF_IMAGE_ENCODING in self._config:
self._last_image = b64decode(msg.payload)
else:
if TYPE_CHECKING:
assert isinstance(msg.payload, bytes)
self._last_image = msg.payload

self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass,
self._sub_state,
{
"state_topic": {
"topic": self._config[CONF_TOPIC],
"msg_callback": message_received,
"msg_callback": partial(
self._message_callback,
self._image_received,
None,
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": None,
}
Expand Down
159 changes: 80 additions & 79 deletions homeassistant/components/mqtt/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

from collections.abc import Callable
from functools import partial
import logging
from typing import Any

Expand Down Expand Up @@ -31,7 +32,6 @@
PAYLOAD_EMPTY_JSON,
PAYLOAD_NONE,
)
from .debug_info import log_messages
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import (
DATA_MQTT,
Expand Down Expand Up @@ -113,90 +113,91 @@ def _setup_from_config(self, config: ConfigType) -> None:
self._config.get(CONF_VALUE_TEMPLATE), entity=self
).async_render_with_possible_json_value

@callback
def _event_received(self, msg: ReceiveMessage) -> None:
"""Handle new MQTT messages."""
if msg.retain:
_LOGGER.debug(
"Ignoring event trigger from replayed retained payload '%s' on topic %s",
msg.payload,
msg.topic,
)
return
event_attributes: dict[str, Any] = {}
event_type: str
try:
payload = self._template(msg.payload, PayloadSentinel.DEFAULT)
except MqttValueTemplateException as exc:
_LOGGER.warning(exc)
return
if (
not payload
or payload is PayloadSentinel.DEFAULT
or payload in (PAYLOAD_NONE, PAYLOAD_EMPTY_JSON)
):
_LOGGER.debug(
"Ignoring empty payload '%s' after rendering for topic %s",
payload,
msg.topic,
)
return
try:
event_attributes = json_loads_object(payload)
event_type = str(event_attributes.pop(event.ATTR_EVENT_TYPE))
_LOGGER.debug(
(
"JSON event data detected after processing payload '%s' on"
" topic %s, type %s, attributes %s"
),
payload,
msg.topic,
event_type,
event_attributes,
)
except KeyError:
_LOGGER.warning(
("`event_type` missing in JSON event payload, " " '%s' on topic %s"),
payload,
msg.topic,
)
return
except JSON_DECODE_EXCEPTIONS:
_LOGGER.warning(
(
"No valid JSON event payload detected, "
"value after processing payload"
" '%s' on topic %s"
),
payload,
msg.topic,
)
return
try:
self._trigger_event(event_type, event_attributes)
except ValueError:
_LOGGER.warning(
"Invalid event type %s for %s received on topic %s, payload %s",
event_type,
self.entity_id,
msg.topic,
payload,
)
return
mqtt_data = self.hass.data[DATA_MQTT]
mqtt_data.state_write_requests.write_state_request(self)

def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""
topics: dict[str, dict[str, Any]] = {}

@callback
@log_messages(self.hass, self.entity_id)
def message_received(msg: ReceiveMessage) -> None:
"""Handle new MQTT messages."""
if msg.retain:
_LOGGER.debug(
"Ignoring event trigger from replayed retained payload '%s' on topic %s",
msg.payload,
msg.topic,
)
return
event_attributes: dict[str, Any] = {}
event_type: str
try:
payload = self._template(msg.payload, PayloadSentinel.DEFAULT)
except MqttValueTemplateException as exc:
_LOGGER.warning(exc)
return
if (
not payload
or payload is PayloadSentinel.DEFAULT
or payload in (PAYLOAD_NONE, PAYLOAD_EMPTY_JSON)
):
_LOGGER.debug(
"Ignoring empty payload '%s' after rendering for topic %s",
payload,
msg.topic,
)
return
try:
event_attributes = json_loads_object(payload)
event_type = str(event_attributes.pop(event.ATTR_EVENT_TYPE))
_LOGGER.debug(
(
"JSON event data detected after processing payload '%s' on"
" topic %s, type %s, attributes %s"
),
payload,
msg.topic,
event_type,
event_attributes,
)
except KeyError:
_LOGGER.warning(
(
"`event_type` missing in JSON event payload, "
" '%s' on topic %s"
),
payload,
msg.topic,
)
return
except JSON_DECODE_EXCEPTIONS:
_LOGGER.warning(
(
"No valid JSON event payload detected, "
"value after processing payload"
" '%s' on topic %s"
),
payload,
msg.topic,
)
return
try:
self._trigger_event(event_type, event_attributes)
except ValueError:
_LOGGER.warning(
"Invalid event type %s for %s received on topic %s, payload %s",
event_type,
self.entity_id,
msg.topic,
payload,
)
return
mqtt_data = self.hass.data[DATA_MQTT]
mqtt_data.state_write_requests.write_state_request(self)

topics["state_topic"] = {
"topic": self._config[CONF_STATE_TOPIC],
"msg_callback": message_received,
"msg_callback": partial(
self._message_callback,
self._event_received,
None,
),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": self._config[CONF_ENCODING] or None,
}
Expand Down
90 changes: 44 additions & 46 deletions homeassistant/components/mqtt/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from base64 import b64decode
import binascii
from collections.abc import Callable
from functools import partial
import logging
from typing import TYPE_CHECKING, Any

Expand All @@ -26,7 +27,6 @@
from . import subscription
from .config import MQTT_BASE_SCHEMA
from .const import CONF_ENCODING, CONF_QOS
from .debug_info import log_messages
from .mixins import MqttEntity, async_setup_entity_entry_helper
from .models import (
DATA_MQTT,
Expand Down Expand Up @@ -143,6 +143,45 @@ def _setup_from_config(self, config: ConfigType) -> None:
config.get(CONF_URL_TEMPLATE), entity=self
).async_render_with_possible_json_value

@callback
def _image_data_received(self, msg: ReceiveMessage) -> None:
"""Handle new MQTT messages."""
try:
if CONF_IMAGE_ENCODING in self._config:
self._last_image = b64decode(msg.payload)
else:
if TYPE_CHECKING:
assert isinstance(msg.payload, bytes)
self._last_image = msg.payload
except (binascii.Error, ValueError, AssertionError) as err:
_LOGGER.error(
"Error processing image data received at topic %s: %s",
msg.topic,
err,
)
self._last_image = None
self._attr_image_last_updated = dt_util.utcnow()
self.hass.data[DATA_MQTT].state_write_requests.write_state_request(self)

@callback
def _image_from_url_request_received(self, msg: ReceiveMessage) -> None:
"""Handle new MQTT messages."""
try:
url = cv.url(self._url_template(msg.payload))
self._attr_image_url = url
except MqttValueTemplateException as exc:
_LOGGER.warning(exc)
return
except vol.Invalid:
_LOGGER.error(
"Invalid image URL '%s' received at topic %s",
msg.payload,
msg.topic,
)
self._attr_image_last_updated = dt_util.utcnow()
self._cached_image = None
self.hass.data[DATA_MQTT].state_write_requests.write_state_request(self)

def _prepare_subscribe_topics(self) -> None:
"""(Re)Subscribe to topics."""

Expand All @@ -159,56 +198,15 @@ def add_subscribe_topic(topic: str, msg_callback: MessageCallbackType) -> bool:
if has_topic := self._topic[topic] is not None:
topics[topic] = {
"topic": self._topic[topic],
"msg_callback": msg_callback,
"msg_callback": partial(self._message_callback, msg_callback, None),
"entity_id": self.entity_id,
"qos": self._config[CONF_QOS],
"encoding": encoding,
}
return has_topic

@callback
@log_messages(self.hass, self.entity_id)
def image_data_received(msg: ReceiveMessage) -> None:
"""Handle new MQTT messages."""
try:
if CONF_IMAGE_ENCODING in self._config:
self._last_image = b64decode(msg.payload)
else:
if TYPE_CHECKING:
assert isinstance(msg.payload, bytes)
self._last_image = msg.payload
except (binascii.Error, ValueError, AssertionError) as err:
_LOGGER.error(
"Error processing image data received at topic %s: %s",
msg.topic,
err,
)
self._last_image = None
self._attr_image_last_updated = dt_util.utcnow()
self.hass.data[DATA_MQTT].state_write_requests.write_state_request(self)

add_subscribe_topic(CONF_IMAGE_TOPIC, image_data_received)

@callback
@log_messages(self.hass, self.entity_id)
def image_from_url_request_received(msg: ReceiveMessage) -> None:
"""Handle new MQTT messages."""
try:
url = cv.url(self._url_template(msg.payload))
self._attr_image_url = url
except MqttValueTemplateException as exc:
_LOGGER.warning(exc)
return
except vol.Invalid:
_LOGGER.error(
"Invalid image URL '%s' received at topic %s",
msg.payload,
msg.topic,
)
self._attr_image_last_updated = dt_util.utcnow()
self._cached_image = None
self.hass.data[DATA_MQTT].state_write_requests.write_state_request(self)

add_subscribe_topic(CONF_URL_TOPIC, image_from_url_request_received)
add_subscribe_topic(CONF_IMAGE_TOPIC, self._image_data_received)
add_subscribe_topic(CONF_URL_TOPIC, self._image_from_url_request_received)

self._sub_state = subscription.async_prepare_subscribe_topics(
self.hass, self._sub_state, topics
Expand Down
12 changes: 7 additions & 5 deletions homeassistant/components/mqtt/mixins.py
Original file line number Diff line number Diff line change
Expand Up @@ -1254,13 +1254,15 @@ def _attrs_have_changed(
def _message_callback(
self,
msg_callback: MessageCallbackType,
attributes: set[str],
attributes: set[str] | None,
msg: ReceiveMessage,
) -> None:
"""Process the message callback."""
attrs_snapshot: tuple[tuple[str, Any | UndefinedType], ...] = tuple(
(attribute, getattr(self, attribute, UNDEFINED)) for attribute in attributes
)
if attributes is not None:
attrs_snapshot: tuple[tuple[str, Any | UndefinedType], ...] = tuple(
(attribute, getattr(self, attribute, UNDEFINED))
for attribute in attributes
)
mqtt_data = self.hass.data[DATA_MQTT]
messages = mqtt_data.debug_info_entities[self.entity_id]["subscriptions"][
msg.subscribed_topic
Expand All @@ -1274,7 +1276,7 @@ def _message_callback(
_LOGGER.warning(exc)
return

if self._attrs_have_changed(attrs_snapshot):
if attributes is not None and self._attrs_have_changed(attrs_snapshot):
mqtt_data.state_write_requests.write_state_request(self)


Expand Down

0 comments on commit 80f8178

Please sign in to comment.