Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding expire_after to mqtt sensor to expire outdated values #6708

Merged
merged 12 commits into from Mar 23, 2017
33 changes: 32 additions & 1 deletion homeassistant/components/sensor/mqtt.py
Expand Up @@ -6,6 +6,7 @@
"""
import asyncio
import logging
from datetime import timedelta

import voluptuous as vol

Expand All @@ -16,10 +17,13 @@
from homeassistant.helpers.entity import Entity
import homeassistant.components.mqtt as mqtt
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.util import dt as dt_util

_LOGGER = logging.getLogger(__name__)

CONF_FORCE_UPDATE = 'force_update'
CONF_EXPIRE_AFTER = 'expire_after'

DEFAULT_NAME = 'MQTT Sensor'
DEFAULT_FORCE_UPDATE = False
Expand All @@ -28,6 +32,7 @@
PLATFORM_SCHEMA = mqtt.MQTT_RO_PLATFORM_SCHEMA.extend({
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_UNIT_OF_MEASUREMENT): cv.string,
vol.Optional(CONF_EXPIRE_AFTER): cv.positive_int,
vol.Optional(CONF_FORCE_UPDATE, default=DEFAULT_FORCE_UPDATE): cv.boolean,
})

Expand All @@ -48,6 +53,7 @@ def async_setup_platform(hass, config, async_add_devices, discovery_info=None):
config.get(CONF_QOS),
config.get(CONF_UNIT_OF_MEASUREMENT),
config.get(CONF_FORCE_UPDATE),
config.get(CONF_EXPIRE_AFTER),
value_template,
)])

Expand All @@ -56,7 +62,7 @@ class MqttSensor(Entity):
"""Representation of a sensor that can be updated using MQTT."""

def __init__(self, name, state_topic, qos, unit_of_measurement,
force_update, value_template):
force_update, expire_after, value_template):
"""Initialize the sensor."""
self._state = STATE_UNKNOWN
self._name = name
Expand All @@ -65,6 +71,8 @@ def __init__(self, name, state_topic, qos, unit_of_measurement,
self._unit_of_measurement = unit_of_measurement
self._force_update = force_update
self._template = value_template
self._expire_after = expire_after
self._expiration_trigger = None

def async_added_to_hass(self):
"""Subscribe mqtt events.
Expand All @@ -74,6 +82,22 @@ def async_added_to_hass(self):
@callback
def message_received(topic, payload, qos):
"""A new MQTT message has been received."""
# auto-expire enabled?
if self._expire_after is not None and self._expire_after > 0:
# Reset old trigger
if self._expiration_trigger:
self._expiration_trigger()
self._expiration_trigger = None

# Set new trigger
expiration_at = (
dt_util.utcnow() + timedelta(seconds=self._expire_after))

self._expiration_trigger = async_track_point_in_utc_time(
self.hass,
self.value_is_expired,
expiration_at)

if self._template is not None:
payload = self._template.async_render_with_possible_json_value(
payload, self._state)
Expand All @@ -83,6 +107,13 @@ def message_received(topic, payload, qos):
return mqtt.async_subscribe(
self.hass, self._state_topic, message_received, self._qos)

@callback
def value_is_expired(self, *_):
"""Triggered when value is expired."""
self._expiration_trigger = None
self._state = STATE_UNKNOWN
self.hass.async_add_job(self.async_update_ha_state())

@property
def should_poll(self):
"""No polling needed."""
Expand Down
73 changes: 72 additions & 1 deletion tests/components/sensor/test_mqtt.py
@@ -1,12 +1,16 @@
"""The tests for the MQTT sensor platform."""
import unittest

from datetime import timedelta, datetime
from unittest.mock import patch

import homeassistant.core as ha
from homeassistant.setup import setup_component
import homeassistant.components.sensor as sensor
from homeassistant.const import EVENT_STATE_CHANGED
from tests.common import mock_mqtt_component, fire_mqtt_message
import homeassistant.util.dt as dt_util

from tests.common import mock_mqtt_component, fire_mqtt_message
from tests.common import get_test_home_assistant, mock_component


Expand Down Expand Up @@ -42,6 +46,69 @@ def test_setting_sensor_value_via_mqtt_message(self):
self.assertEqual('fav unit',
state.attributes.get('unit_of_measurement'))

@patch('homeassistant.core.dt_util.utcnow')
def test_setting_sensor_value_expires(self, mock_utcnow):
"""Test the expiration of the value."""
mock_component(self.hass, 'mqtt')
assert setup_component(self.hass, sensor.DOMAIN, {
sensor.DOMAIN: {
'platform': 'mqtt',
'name': 'test',
'state_topic': 'test-topic',
'unit_of_measurement': 'fav unit',
'expire_after': '4',
'force_update': True
}
})

state = self.hass.states.get('sensor.test')
self.assertEqual('unknown', state.state)

now = datetime(2017, 1, 1, 1, tzinfo=dt_util.UTC)
mock_utcnow.return_value = now
fire_mqtt_message(self.hass, 'test-topic', '100')
self.hass.block_till_done()

# Value was set correctly.
state = self.hass.states.get('sensor.test')
self.assertEqual('100', state.state)

# Time jump +3s
now = now + timedelta(seconds=3)
self._send_time_changed(now)
self.hass.block_till_done()

# Value is not yet expired
state = self.hass.states.get('sensor.test')
self.assertEqual('100', state.state)

# Next message resets timer
mock_utcnow.return_value = now
fire_mqtt_message(self.hass, 'test-topic', '101')
self.hass.block_till_done()

# Value was updated correctly.
state = self.hass.states.get('sensor.test')
self.assertEqual('101', state.state)

# Time jump +3s
now = now + timedelta(seconds=3)
self._send_time_changed(now)
self.hass.block_till_done()

# Value is not yet expired
state = self.hass.states.get('sensor.test')
self.assertEqual('101', state.state)

# Time jump +2s
now = now + timedelta(seconds=2)
self._send_time_changed(now)
self.hass.block_till_done()

# Value is expired now
state = self.hass.states.get('sensor.test')
self.assertEqual('unknown', state.state)

def test_setting_sensor_value_via_mqtt_json_message(self):
"""Test the setting of the value via MQTT with JSON playload."""
mock_component(self.hass, 'mqtt')
Expand Down Expand Up @@ -117,3 +184,7 @@ def callback(event):
fire_mqtt_message(self.hass, 'test-topic', '100')
self.hass.block_till_done()
self.assertEqual(2, len(events))

def _send_time_changed(self, now):
"""Send a time changed event."""
self.hass.bus.fire(ha.EVENT_TIME_CHANGED, {ha.ATTR_NOW: now})