Skip to content

Commit

Permalink
Merge 2dfb015 into 93689d6
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinHjelmare committed Nov 2, 2018
2 parents 93689d6 + 2dfb015 commit 920fcec
Show file tree
Hide file tree
Showing 8 changed files with 217 additions and 131 deletions.
9 changes: 7 additions & 2 deletions homeassistant/components/device_tracker/mysensors.py
Expand Up @@ -19,11 +19,16 @@ async def async_setup_scanner(hass, config, async_see, discovery_info=None):
return False

for device in new_devices:
gateway_id = id(device.gateway)
dev_id = (
id(device.gateway), device.node_id, device.child_id,
gateway_id, device.node_id, device.child_id,
device.value_type)
async_dispatcher_connect(
hass, mysensors.const.SIGNAL_CALLBACK.format(*dev_id),
hass, mysensors.const.CHILD_CALLBACK.format(*dev_id),
device.async_update_callback)
async_dispatcher_connect(
hass,
mysensors.const.NODE_CALLBACK.format(gateway_id, device.node_id),
device.async_update_callback)

return True
Expand Down
4 changes: 2 additions & 2 deletions homeassistant/components/mysensors/__init__.py
Expand Up @@ -22,7 +22,7 @@
from .device import get_mysensors_devices
from .gateway import get_mysensors_gateway, setup_gateways, finish_setup

REQUIREMENTS = ['pymysensors==0.17.0']
REQUIREMENTS = ['pymysensors==0.18.0']

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -135,7 +135,7 @@ def setup_mysensors_platform(
# Only act if called via MySensors by discovery event.
# Otherwise gateway is not set up.
if not discovery_info:
return
return None
if device_args is None:
device_args = ()
new_devices = []
Expand Down
4 changes: 3 additions & 1 deletion homeassistant/components/mysensors/const.py
Expand Up @@ -16,10 +16,12 @@
CONF_VERSION = 'version'

DOMAIN = 'mysensors'
MYSENSORS_GATEWAY_READY = 'mysensors_gateway_ready_{}'
MYSENSORS_GATEWAYS = 'mysensors_gateways'
PLATFORM = 'platform'
SCHEMA = 'schema'
SIGNAL_CALLBACK = 'mysensors_callback_{}_{}_{}_{}'
CHILD_CALLBACK = 'mysensors_child_callback_{}_{}_{}_{}'
NODE_CALLBACK = 'mysensors_node_callback_{}_{}'
TYPE = 'type'

# MySensors const schemas
Expand Down
10 changes: 7 additions & 3 deletions homeassistant/components/mysensors/device.py
Expand Up @@ -7,7 +7,7 @@
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import Entity

from .const import SIGNAL_CALLBACK
from .const import CHILD_CALLBACK, NODE_CALLBACK

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -103,7 +103,11 @@ def async_update_callback(self):

async def async_added_to_hass(self):
"""Register update callback."""
dev_id = id(self.gateway), self.node_id, self.child_id, self.value_type
gateway_id = id(self.gateway)
dev_id = gateway_id, self.node_id, self.child_id, self.value_type
async_dispatcher_connect(
self.hass, SIGNAL_CALLBACK.format(*dev_id),
self.hass, CHILD_CALLBACK.format(*dev_id),
self.async_update_callback)
async_dispatcher_connect(
self.hass, NODE_CALLBACK.format(gateway_id, self.node_id),
self.async_update_callback)
134 changes: 12 additions & 122 deletions homeassistant/components/mysensors/gateway.py
Expand Up @@ -4,32 +4,28 @@
import logging
import socket
import sys
from timeit import default_timer as timer

import async_timeout
import voluptuous as vol

from homeassistant.const import (
CONF_NAME, CONF_OPTIMISTIC, EVENT_HOMEASSISTANT_STOP)
CONF_OPTIMISTIC, EVENT_HOMEASSISTANT_STOP)
from homeassistant.core import callback
from homeassistant.helpers import discovery
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.setup import async_setup_component

from .const import (
ATTR_DEVICES, CONF_BAUD_RATE, CONF_DEVICE, CONF_GATEWAYS, CONF_NODES,
CONF_BAUD_RATE, CONF_DEVICE, CONF_GATEWAYS, CONF_NODES,
CONF_PERSISTENCE, CONF_PERSISTENCE_FILE, CONF_RETAIN, CONF_TCP_PORT,
CONF_TOPIC_IN_PREFIX, CONF_TOPIC_OUT_PREFIX, CONF_VERSION, DOMAIN,
MYSENSORS_CONST_SCHEMA, MYSENSORS_GATEWAYS, PLATFORM, SCHEMA,
SIGNAL_CALLBACK, TYPE)
from .device import get_mysensors_devices
MYSENSORS_GATEWAY_READY, MYSENSORS_GATEWAYS)
from .handler import HANDLERS
from .helpers import discover_mysensors_platform, validate_child

_LOGGER = logging.getLogger(__name__)

GATEWAY_READY_TIMEOUT = 15.0
MQTT_COMPONENT = 'mqtt'
MYSENSORS_GATEWAY_READY = 'mysensors_gateway_ready_{}'


def is_serial_port(value):
Expand Down Expand Up @@ -167,25 +163,16 @@ async def _discover_persistent_devices(hass, hass_config, gateway):
for node_id in gateway.sensors:
node = gateway.sensors[node_id]
for child in node.children.values():
validated = _validate_child(gateway, node_id, child)
validated = validate_child(gateway, node_id, child)
for platform, dev_ids in validated.items():
new_devices[platform].extend(dev_ids)
for platform, dev_ids in new_devices.items():
tasks.append(_discover_mysensors_platform(
tasks.append(discover_mysensors_platform(
hass, hass_config, platform, dev_ids))
if tasks:
await asyncio.wait(tasks, loop=hass.loop)


@callback
def _discover_mysensors_platform(hass, hass_config, platform, new_devices):
"""Discover a MySensors platform."""
task = hass.async_create_task(discovery.async_load_platform(
hass, platform, DOMAIN,
{ATTR_DEVICES: new_devices, CONF_NAME: DOMAIN}, hass_config))
return task


async def _gw_start(hass, gateway):
"""Start the gateway."""
# Don't use hass.async_create_task to avoid holding up setup indefinitely.
Expand Down Expand Up @@ -222,112 +209,15 @@ def _gw_callback_factory(hass, hass_config):
@callback
def mysensors_callback(msg):
"""Handle messages from a MySensors gateway."""
start = timer()
_LOGGER.debug(
"Node update: node %s child %s", msg.node_id, msg.child_id)

_set_gateway_ready(hass, msg)
msg_type = msg.gateway.const.MessageType(msg.type)
msg_handler = HANDLERS.get(msg_type.name)

try:
child = msg.gateway.sensors[msg.node_id].children[msg.child_id]
except KeyError:
_LOGGER.debug("Not a child update for node %s", msg.node_id)
if msg_handler is None:
return

signals = []

# Update all platforms for the device via dispatcher.
# Add/update entity if schema validates to true.
validated = _validate_child(msg.gateway, msg.node_id, child)
for platform, dev_ids in validated.items():
devices = get_mysensors_devices(hass, platform)
new_dev_ids = []
for dev_id in dev_ids:
if dev_id in devices:
signals.append(SIGNAL_CALLBACK.format(*dev_id))
else:
new_dev_ids.append(dev_id)
if new_dev_ids:
_discover_mysensors_platform(
hass, hass_config, platform, new_dev_ids)
for signal in set(signals):
# Only one signal per device is needed.
# A device can have multiple platforms, ie multiple schemas.
# FOR LATER: Add timer to not signal if another update comes in.
async_dispatcher_send(hass, signal)
end = timer()
if end - start > 0.1:
_LOGGER.debug(
"Callback for node %s child %s took %.3f seconds",
msg.node_id, msg.child_id, end - start)
return mysensors_callback


@callback
def _set_gateway_ready(hass, msg):
"""Set asyncio future result if gateway is ready."""
if (msg.type != msg.gateway.const.MessageType.internal or
msg.sub_type != msg.gateway.const.Internal.I_GATEWAY_READY):
return
gateway_ready = hass.data.get(MYSENSORS_GATEWAY_READY.format(
id(msg.gateway)))
if gateway_ready is None or gateway_ready.cancelled():
return
gateway_ready.set_result(True)


def _validate_child(gateway, node_id, child):
"""Validate that a child has the correct values according to schema.
Return a dict of platform with a list of device ids for validated devices.
"""
validated = defaultdict(list)
hass.async_create_task(msg_handler(hass, hass_config, msg))

if not child.values:
_LOGGER.debug(
"No child values for node %s child %s", node_id, child.id)
return validated
if gateway.sensors[node_id].sketch_name is None:
_LOGGER.debug("Node %s is missing sketch name", node_id)
return validated
pres = gateway.const.Presentation
set_req = gateway.const.SetReq
s_name = next(
(member.name for member in pres if member.value == child.type), None)
if s_name not in MYSENSORS_CONST_SCHEMA:
_LOGGER.warning("Child type %s is not supported", s_name)
return validated
child_schemas = MYSENSORS_CONST_SCHEMA[s_name]

def msg(name):
"""Return a message for an invalid schema."""
return "{} requires value_type {}".format(
pres(child.type).name, set_req[name].name)

for schema in child_schemas:
platform = schema[PLATFORM]
v_name = schema[TYPE]
value_type = next(
(member.value for member in set_req if member.name == v_name),
None)
if value_type is None:
continue
_child_schema = child.get_schema(gateway.protocol_version)
vol_schema = _child_schema.extend(
{vol.Required(set_req[key].value, msg=msg(key)):
_child_schema.schema.get(set_req[key].value, val)
for key, val in schema.get(SCHEMA, {v_name: cv.string}).items()},
extra=vol.ALLOW_EXTRA)
try:
vol_schema(child.values)
except vol.Invalid as exc:
level = (logging.WARNING if value_type in child.values
else logging.DEBUG)
_LOGGER.log(
level,
"Invalid values: %s: %s platform: node %s child %s: %s",
child.values, platform, node_id, child.id, exc)
continue
dev_id = id(gateway), node_id, child.id, value_type
validated[platform].append(dev_id)
return validated
return mysensors_callback
104 changes: 104 additions & 0 deletions homeassistant/components/mysensors/handler.py
@@ -0,0 +1,104 @@
"""Handle MySensors messages."""
import logging

from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.util import decorator

from .const import MYSENSORS_GATEWAY_READY, CHILD_CALLBACK, NODE_CALLBACK
from .device import get_mysensors_devices
from .helpers import discover_mysensors_platform, validate_child

_LOGGER = logging.getLogger(__name__)
HANDLERS = decorator.Registry()


@HANDLERS.register('presentation')
async def handle_presentation(hass, hass_config, msg):
"""Handle a mysensors presentation message."""
# Handle both node and child presentation.
from mysensors.const import SYSTEM_CHILD_ID
if msg.child_id == SYSTEM_CHILD_ID:
return
_handle_child_update(hass, hass_config, msg)


@HANDLERS.register('set')
async def handle_set(hass, hass_config, msg):
"""Handle a mysensors set message."""
_handle_child_update(hass, hass_config, msg)


@HANDLERS.register('internal')
async def handle_internal(hass, hass_config, msg):
"""Handle a mysensors internal message."""
internal = msg.gateway.const.Internal(msg.sub_type)
handler = HANDLERS.get(internal.name)
if handler is None:
return
await handler(hass, hass_config, msg)


@HANDLERS.register('I_BATTERY_LEVEL')
async def handle_battery_level(hass, hass_config, msg):
"""Handle an internal battery level message."""
_handle_node_update(hass, msg)


@HANDLERS.register('I_SKETCH_NAME')
async def handle_sketch_name(hass, hass_config, msg):
"""Handle an internal sketch name message."""
_handle_node_update(hass, msg)


@HANDLERS.register('I_SKETCH_VERSION')
async def handle_sketch_version(hass, hass_config, msg):
"""Handle an internal sketch version message."""
_handle_node_update(hass, msg)


@HANDLERS.register('I_GATEWAY_READY')
async def handle_gateway_ready(hass, hass_config, msg):
"""Handle an internal gateway ready message.
Set asyncio future result if gateway is ready.
"""
gateway_ready = hass.data.get(MYSENSORS_GATEWAY_READY.format(
id(msg.gateway)))
if gateway_ready is None or gateway_ready.cancelled():
return
gateway_ready.set_result(True)


@callback
def _handle_child_update(hass, hass_config, msg):
"""Handle a child update."""
child = msg.gateway.sensors[msg.node_id].children[msg.child_id]
signals = []

# Update all platforms for the device via dispatcher.
# Add/update entity if schema validates to true.
validated = validate_child(msg.gateway, msg.node_id, child)
for platform, dev_ids in validated.items():
devices = get_mysensors_devices(hass, platform)
new_dev_ids = []
for dev_id in dev_ids:
if dev_id in devices:
signals.append(CHILD_CALLBACK.format(*dev_id))
else:
new_dev_ids.append(dev_id)
if new_dev_ids:
discover_mysensors_platform(
hass, hass_config, platform, new_dev_ids)
for signal in set(signals):
# Only one signal per device is needed.
# A device can have multiple platforms, ie multiple schemas.
# FOR LATER: Add timer to not signal if another update comes in.
async_dispatcher_send(hass, signal)


@callback
def _handle_node_update(hass, msg):
"""Handle a node update."""
signal = NODE_CALLBACK.format(id(msg.gateway), msg.node_id)
async_dispatcher_send(hass, signal)

0 comments on commit 920fcec

Please sign in to comment.