Skip to content

Commit

Permalink
Merge af03a4f into 24db2b6
Browse files Browse the repository at this point in the history
  • Loading branch information
emontnemery committed Nov 16, 2018
2 parents 24db2b6 + af03a4f commit adfa09a
Show file tree
Hide file tree
Showing 8 changed files with 427 additions and 62 deletions.
109 changes: 70 additions & 39 deletions homeassistant/components/binary_sensor/mqtt.py
Expand Up @@ -5,7 +5,6 @@
https://home-assistant.io/components/binary_sensor.mqtt/
"""
import logging
from typing import Optional

import voluptuous as vol

Expand All @@ -19,7 +18,8 @@
from homeassistant.components.mqtt import (
ATTR_DISCOVERY_HASH, CONF_STATE_TOPIC, CONF_AVAILABILITY_TOPIC,
CONF_PAYLOAD_AVAILABLE, CONF_PAYLOAD_NOT_AVAILABLE, CONF_QOS,
MqttAvailability, MqttDiscoveryUpdate, MqttEntityDeviceInfo)
MqttAvailability, MqttDiscoveryUpdate, MqttEntityDeviceInfo,
subscription)
from homeassistant.components.mqtt.discovery import MQTT_DISCOVERY_NEW
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_connect
Expand Down Expand Up @@ -79,57 +79,80 @@ async def _async_setup_entity(hass, config, async_add_entities,
value_template.hass = hass

async_add_entities([MqttBinarySensor(
config.get(CONF_NAME),
config.get(CONF_STATE_TOPIC),
config.get(CONF_AVAILABILITY_TOPIC),
config.get(CONF_DEVICE_CLASS),
config.get(CONF_QOS),
config.get(CONF_FORCE_UPDATE),
config.get(CONF_OFF_DELAY),
config.get(CONF_PAYLOAD_ON),
config.get(CONF_PAYLOAD_OFF),
config.get(CONF_PAYLOAD_AVAILABLE),
config.get(CONF_PAYLOAD_NOT_AVAILABLE),
value_template,
config.get(CONF_UNIQUE_ID),
config.get(CONF_DEVICE),
discovery_hash,
config,
discovery_hash
)])


class MqttBinarySensor(MqttAvailability, MqttDiscoveryUpdate,
MqttEntityDeviceInfo, BinarySensorDevice):
"""Representation a binary sensor that is updated by MQTT."""

def __init__(self, name, state_topic, availability_topic, device_class,
qos, force_update, off_delay, payload_on, payload_off,
payload_available, payload_not_available, value_template,
unique_id: Optional[str], device_config: Optional[ConfigType],
discovery_hash):
def __init__(self, config, discovery_hash):
"""Initialize the MQTT binary sensor."""
MqttAvailability.__init__(self, availability_topic, qos,
payload_available, payload_not_available)
MqttDiscoveryUpdate.__init__(self, discovery_hash)
MqttEntityDeviceInfo.__init__(self, device_config)
self._name = name
self._config = config
self._state = None
self._state_topic = state_topic
self._device_class = device_class
self._payload_on = payload_on
self._payload_off = payload_off
self._qos = qos
self._force_update = force_update
self._off_delay = off_delay
self._template = value_template
self._unique_id = unique_id
self._discovery_hash = discovery_hash
self._sub_state = None
self._delay_listener = None

self._name = None
self._state_topic = None
self._device_class = None
self._payload_on = None
self._payload_off = None
self._qos = None
self._force_update = None
self._off_delay = None
self._template = None
self._unique_id = None

# Load config
self._setup_from_config(config)

availability_topic = config.get(CONF_AVAILABILITY_TOPIC)
payload_available = config.get(CONF_PAYLOAD_AVAILABLE)
payload_not_available = config.get(CONF_PAYLOAD_NOT_AVAILABLE)
device_config = config.get(CONF_DEVICE)

MqttAvailability.__init__(self, availability_topic, self._qos,
payload_available, payload_not_available)
MqttDiscoveryUpdate.__init__(self, discovery_hash,
self.discovery_update)
MqttEntityDeviceInfo.__init__(self, device_config)

async def async_added_to_hass(self):
"""Subscribe mqtt events."""
await MqttAvailability.async_added_to_hass(self)
await MqttDiscoveryUpdate.async_added_to_hass(self)
await self._subscribe_topics()

async def discovery_update(self, discovery_payload):
"""Handle updated discovery message."""
config = PLATFORM_SCHEMA(discovery_payload)
self._setup_from_config(config)
await self.availability_discovery_update(config)
await self._subscribe_topics()
self.async_schedule_update_ha_state()

def _setup_from_config(self, config):
"""(Re)Setup the entity."""
self._name = config.get(CONF_NAME)
self._state_topic = config.get(CONF_STATE_TOPIC)
self._device_class = config.get(CONF_DEVICE_CLASS)
self._qos = config.get(CONF_QOS)
self._force_update = config.get(CONF_FORCE_UPDATE)
self._off_delay = config.get(CONF_OFF_DELAY)
self._payload_on = config.get(CONF_PAYLOAD_ON)
self._payload_off = config.get(CONF_PAYLOAD_OFF)
value_template = config.get(CONF_VALUE_TEMPLATE)
if value_template is not None and value_template.hass is None:
value_template.hass = self.hass
self._template = value_template

self._unique_id = config.get(CONF_UNIQUE_ID)

async def _subscribe_topics(self):
"""(Re)Subscribe to topics."""
@callback
def off_delay_listener(now):
"""Switch device off after a delay."""
Expand Down Expand Up @@ -163,8 +186,16 @@ def state_message_received(_topic, payload, _qos):

self.async_schedule_update_ha_state()

await mqtt.async_subscribe(
self.hass, self._state_topic, state_message_received, self._qos)
self._sub_state = await subscription.async_subscribe_topics(
self.hass, self._sub_state,
{'state_topic': {'topic': self._state_topic,
'msg_callback': state_message_received,
'qos': self._qos}})

async def async_will_remove_from_hass(self):
"""Unsubscribe when removed."""
await subscription.async_unsubscribe_topics(self.hass, self._sub_state)
await MqttAvailability.async_will_remove_from_hass(self)

@property
def should_poll(self):
Expand Down
40 changes: 35 additions & 5 deletions homeassistant/components/mqtt/__init__.py
Expand Up @@ -832,12 +832,30 @@ def __init__(self, availability_topic: Optional[str], qos: Optional[int],
self._available = availability_topic is None # type: bool
self._payload_available = payload_available
self._payload_not_available = payload_not_available
self._availability_sub_state = None

async def async_added_to_hass(self) -> None:
"""Subscribe MQTT events.
This method must be run in the event loop and returns a coroutine.
"""
await self._availability_subscribe_topics()

async def availability_discovery_update(self, config: dict):
"""Handle updated discovery message."""
self._availability_setup_from_config(config)
await self._availability_subscribe_topics()

def _availability_setup_from_config(self, config):
"""(Re)Setup."""
self._availability_topic = config.get(CONF_AVAILABILITY_TOPIC)
self._payload_available = config.get(CONF_PAYLOAD_AVAILABLE)
self._payload_not_available = config.get(CONF_PAYLOAD_NOT_AVAILABLE)

async def _availability_subscribe_topics(self):
"""(Re)Subscribe to topics."""
from .subscription import async_subscribe_topics

@callback
def availability_message_received(topic: str,
payload: SubscribePayloadType,
Expand All @@ -850,10 +868,17 @@ def availability_message_received(topic: str,

self.async_schedule_update_ha_state()

if self._availability_topic is not None:
await async_subscribe(
self.hass, self._availability_topic,
availability_message_received, self._availability_qos)
self._availability_sub_state = await async_subscribe_topics(
self.hass, self._availability_sub_state,
{'availability_topic': {
'topic': self._availability_topic,
'msg_callback': availability_message_received,
'qos': self._availability_qos}})

async def async_will_remove_from_hass(self):
"""Unsubscribe when removed."""
from .subscription import async_unsubscribe_topics
await async_unsubscribe_topics(self.hass, self._availability_sub_state)

@property
def available(self) -> bool:
Expand All @@ -864,9 +889,10 @@ def available(self) -> bool:
class MqttDiscoveryUpdate(Entity):
"""Mixin used to handle updated discovery message."""

def __init__(self, discovery_hash) -> None:
def __init__(self, discovery_hash, discovery_update=None) -> None:
"""Initialize the discovery update mixin."""
self._discovery_hash = discovery_hash
self._discovery_update = discovery_update
self._remove_signal = None

async def async_added_to_hass(self) -> None:
Expand All @@ -886,6 +912,10 @@ def discovery_callback(payload):
self.hass.async_create_task(self.async_remove())
del self.hass.data[ALREADY_DISCOVERED][self._discovery_hash]
self._remove_signal()
elif self._discovery_update:
# Non-empty payload: Notify component
_LOGGER.info("Updating component: %s", self.entity_id)
self.hass.async_create_task(self._discovery_update(payload))

if self._discovery_hash:
self._remove_signal = async_dispatcher_connect(
Expand Down
35 changes: 19 additions & 16 deletions homeassistant/components/mqtt/discovery.py
Expand Up @@ -207,38 +207,41 @@ async def async_device_message_received(topic, payload, qos):
if value[-1] == TOPIC_BASE and key.endswith('_topic'):
payload[key] = "{}{}".format(value[:-1], base)

# If present, the node_id will be included in the discovered object id
discovery_id = '_'.join((node_id, object_id)) if node_id else object_id

if ALREADY_DISCOVERED not in hass.data:
hass.data[ALREADY_DISCOVERED] = {}

# If present, unique_id is used as the discovered object id. Otherwise,
# if present, the node_id will be included in the discovered object id
discovery_id = payload.get(
'unique_id', ' '.join(
(node_id, object_id)) if node_id else object_id)
discovery_hash = (component, discovery_id)

if discovery_hash in hass.data[ALREADY_DISCOVERED]:
_LOGGER.info(
"Component has already been discovered: %s %s, sending update",
component, discovery_id)
async_dispatcher_send(
hass, MQTT_DISCOVERY_UPDATED.format(discovery_hash), payload)
elif payload:
# Add component
if payload:
platform = payload.get(CONF_PLATFORM, 'mqtt')
if platform not in ALLOWED_PLATFORMS.get(component, []):
_LOGGER.warning("Platform %s (component %s) is not allowed",
platform, component)
return

payload[CONF_PLATFORM] = platform

if CONF_STATE_TOPIC not in payload:
payload[CONF_STATE_TOPIC] = '{}/{}/{}{}/state'.format(
discovery_topic, component,
'%s/' % node_id if node_id else '', object_id)

hass.data[ALREADY_DISCOVERED][discovery_hash] = None
payload[ATTR_DISCOVERY_HASH] = discovery_hash

if ALREADY_DISCOVERED not in hass.data:
hass.data[ALREADY_DISCOVERED] = {}
if discovery_hash in hass.data[ALREADY_DISCOVERED]:
# Dispatch update
_LOGGER.info(
"Component has already been discovered: %s %s, sending update",
component, discovery_id)
async_dispatcher_send(
hass, MQTT_DISCOVERY_UPDATED.format(discovery_hash), payload)
elif payload:
# Add component
_LOGGER.info("Found new component: %s %s", component, discovery_id)
hass.data[ALREADY_DISCOVERED][discovery_hash] = None

if platform not in CONFIG_ENTRY_PLATFORMS.get(component, []):
await async_load_platform(
Expand Down
54 changes: 54 additions & 0 deletions homeassistant/components/mqtt/subscription.py
@@ -0,0 +1,54 @@
"""
Helper to handle a set of topics to subscribe to.
For more details about this component, please refer to the documentation at
https://home-assistant.io/components/mqtt/
"""
import logging

from homeassistant.components import mqtt
from homeassistant.components.mqtt import DEFAULT_QOS
from homeassistant.loader import bind_hass
from homeassistant.helpers.typing import HomeAssistantType

_LOGGER = logging.getLogger(__name__)


@bind_hass
async def async_subscribe_topics(hass: HomeAssistantType, sub_state: dict,
topics: dict):
"""(Re)Subscribe to a set of MQTT topics.
State is kept in sub_state.
"""
cur_state = sub_state if sub_state is not None else {}
sub_state = {}
for key in topics:
topic = topics[key].get('topic', None)
msg_callback = topics[key].get('msg_callback', None)
qos = topics[key].get('qos', DEFAULT_QOS)
encoding = topics[key].get('encoding', 'utf-8')
topic = (topic, msg_callback, qos, encoding)
(cur_topic, unsub) = cur_state.pop(
key, ((None, None, None, None), None))

if topic != cur_topic and topic[0] is not None:
if unsub is not None:
unsub()
unsub = await mqtt.async_subscribe(
hass, topic[0], topic[1], topic[2], topic[3])
sub_state[key] = (topic, unsub)

for key, (topic, unsub) in list(cur_state.items()):
if unsub is not None:
unsub()

return sub_state


@bind_hass
async def async_unsubscribe_topics(hass: HomeAssistantType, sub_state: dict):
"""Unsubscribe from all MQTT topics managed by async_subscribe_topics."""
await async_subscribe_topics(hass, sub_state, {})

return sub_state
1 change: 1 addition & 0 deletions tests/common.py
Expand Up @@ -295,6 +295,7 @@ def async_mock_mqtt_component(hass, config=None):
with patch('paho.mqtt.client.Client') as mock_client:
mock_client().connect.return_value = 0
mock_client().subscribe.return_value = (0, 0)
mock_client().unsubscribe.return_value = (0, 0)
mock_client().publish.return_value = (0, 0)

result = yield from async_setup_component(hass, mqtt.DOMAIN, {
Expand Down

0 comments on commit adfa09a

Please sign in to comment.