Skip to content

Commit

Permalink
Adding expire_after to mqtt sensor to expire outdated values (#6708)
Browse files Browse the repository at this point in the history
* Adding expire_after to mqtt sensor to expire outdated values

* Extending test case

* mqtt: refactoring expire_after to use timed events instead of polling; lint

* refactor to reset unused trigger

* Fix: handler must be set to None after execution or removal to avoid warning

* Commenting out non-working test

* Fix lint

* Commit to trigger new build

* Commit to trigger new build

* Make testcase work

* Undo unnecessary change

* Remove default value, add extra check
  • Loading branch information
micw authored and emlove committed Mar 23, 2017
1 parent 3acd926 commit 6c59898
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 2 deletions.
33 changes: 32 additions & 1 deletion homeassistant/components/sensor/mqtt.py
Expand Up @@ -6,6 +6,7 @@
"""
import asyncio
import logging
from datetime import timedelta

import voluptuous as vol

Expand All @@ -16,10 +17,13 @@
from homeassistant.helpers.entity import Entity
import homeassistant.components.mqtt as mqtt
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.util import dt as dt_util

_LOGGER = logging.getLogger(__name__)

CONF_FORCE_UPDATE = 'force_update'
CONF_EXPIRE_AFTER = 'expire_after'

DEFAULT_NAME = 'MQTT Sensor'
DEFAULT_FORCE_UPDATE = False
Expand All @@ -28,6 +32,7 @@
PLATFORM_SCHEMA = mqtt.MQTT_RO_PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_EXPIRE_AFTER): cv.positive_int,
vol.Optional(CONF_FORCE_UPDATE, default=DEFAULT_FORCE_UPDATE): cv.boolean,
})

Expand All @@ -48,6 +53,7 @@ def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
config.get(CONF_QOS),
config.get(CONF_UNIT_OF_MEASUREMENT),
config.get(CONF_FORCE_UPDATE),
config.get(CONF_EXPIRE_AFTER),
value_template,
)])

Expand All @@ -56,7 +62,7 @@ class MqttSensor(Entity):
"""Representation of a sensor that can be updated using MQTT."""

def __init__(self, name, state_topic, qos, unit_of_measurement,
force_update, value_template):
force_update, expire_after, value_template):
"""Initialize the sensor."""
self._state = STATE_UNKNOWN
self._name = name
Expand All @@ -65,6 +71,8 @@ def __init__(self, name, state_topic, qos, unit_of_measurement,
self._unit_of_measurement = unit_of_measurement
self._force_update = force_update
self._template = value_template
self._expire_after = expire_after
self._expiration_trigger = None

def async_added_to_hass(self):
"""Subscribe mqtt events.
Expand All @@ -74,6 +82,22 @@ def async_added_to_hass(self):
@callback
def message_received(topic, payload, qos):
"""A new MQTT message has been received."""
# auto-expire enabled?
if self._expire_after is not None and self._expire_after > 0:
# Reset old trigger
if self._expiration_trigger:
self._expiration_trigger()
self._expiration_trigger = None

# Set new trigger
expiration_at = (
dt_util.utcnow() + timedelta(seconds=self._expire_after))

self._expiration_trigger = async_track_point_in_utc_time(
self.hass,
self.value_is_expired,
expiration_at)

if self._template is not None:
payload = self._template.async_render_with_possible_json_value(
payload, self._state)
Expand All @@ -83,6 +107,13 @@ def message_received(topic, payload, qos):
return mqtt.async_subscribe(
self.hass, self._state_topic, message_received, self._qos)

@callback
def value_is_expired(self, *_):
"""Triggered when value is expired."""
self._expiration_trigger = None
self._state = STATE_UNKNOWN
self.hass.async_add_job(self.async_update_ha_state())

@property
def should_poll(self):
"""No polling needed."""
Expand Down
73 changes: 72 additions & 1 deletion tests/components/sensor/test_mqtt.py
@@ -1,12 +1,16 @@
"""The tests for the MQTT sensor platform."""
import unittest

from datetime import timedelta, datetime
from unittest.mock import patch

import homeassistant.core as ha
from homeassistant.setup import setup_component
import homeassistant.components.sensor as sensor
from homeassistant.const import EVENT_STATE_CHANGED
from tests.common import mock_mqtt_component, fire_mqtt_message
import homeassistant.util.dt as dt_util

from tests.common import mock_mqtt_component, fire_mqtt_message
from tests.common import get_test_home_assistant, mock_component


Expand Down Expand Up @@ -42,6 +46,69 @@ def test_setting_sensor_value_via_mqtt_message(self):
self.assertEqual('fav unit',
state.attributes.get('unit_of_measurement'))

@patch('homeassistant.core.dt_util.utcnow')
def test_setting_sensor_value_expires(self, mock_utcnow):
"""Test the expiration of the value."""
mock_component(self.hass, 'mqtt')
assert setup_component(self.hass, sensor.DOMAIN, {
sensor.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'test-topic',
'unit_of_measurement': 'fav unit',
'expire_after': '4',
'force_update': True
}
})

state = self.hass.states.get('sensor.test')
self.assertEqual('unknown', state.state)

now = datetime(2017, 1, 1, 1, tzinfo=dt_util.UTC)
mock_utcnow.return_value = now
fire_mqtt_message(self.hass, 'test-topic', '100')
self.hass.block_till_done()

# Value was set correctly.
state = self.hass.states.get('sensor.test')
self.assertEqual('100', state.state)

# Time jump +3s
now = now + timedelta(seconds=3)
self._send_time_changed(now)
self.hass.block_till_done()

# Value is not yet expired
state = self.hass.states.get('sensor.test')
self.assertEqual('100', state.state)

# Next message resets timer
mock_utcnow.return_value = now
fire_mqtt_message(self.hass, 'test-topic', '101')
self.hass.block_till_done()

# Value was updated correctly.
state = self.hass.states.get('sensor.test')
self.assertEqual('101', state.state)

# Time jump +3s
now = now + timedelta(seconds=3)
self._send_time_changed(now)
self.hass.block_till_done()

# Value is not yet expired
state = self.hass.states.get('sensor.test')
self.assertEqual('101', state.state)

# Time jump +2s
now = now + timedelta(seconds=2)
self._send_time_changed(now)
self.hass.block_till_done()

# Value is expired now
state = self.hass.states.get('sensor.test')
self.assertEqual('unknown', state.state)

def test_setting_sensor_value_via_mqtt_json_message(self):
"""Test the setting of the value via MQTT with JSON playload."""
mock_component(self.hass, 'mqtt')
Expand Down Expand Up @@ -117,3 +184,7 @@ def callback(event):
fire_mqtt_message(self.hass, 'test-topic', '100')
self.hass.block_till_done()
self.assertEqual(2, len(events))

def _send_time_changed(self, now):
"""Send a time changed event."""
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now})

0 comments on commit 6c59898

Please sign in to comment.