Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
234 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
""" | ||
homeassistant.components.mqtt_eventstream | ||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
Connect two Home Assistant instances via mqtt. | ||
Configuration: | ||
To use the mqtt_eventstream component you will need to add the following to | ||
your configuration.yaml file. | ||
If you do not specify a publish_topic you will not forward events to the queue. | ||
If you do not specify a subscribe_topic then you will not receive events from | ||
the remote server. | ||
mqtt_eventstream: | ||
publish_topic: MyServerName | ||
subscribe_topic: OtherHaServerName | ||
""" | ||
import json | ||
from homeassistant.core import EventOrigin, State | ||
from homeassistant.const import ( | ||
MATCH_ALL, | ||
EVENT_TIME_CHANGED, | ||
EVENT_CALL_SERVICE, | ||
EVENT_SERVICE_EXECUTED, | ||
EVENT_STATE_CHANGED, | ||
) | ||
|
||
import homeassistant.loader as loader | ||
from homeassistant.remote import JSONEncoder | ||
|
||
# The domain of your component. Should be equal to the name of your component | ||
DOMAIN = "mqtt_eventstream" | ||
|
||
# List of component names (string) your component depends upon | ||
DEPENDENCIES = ['mqtt'] | ||
|
||
|
||
def setup(hass, config): | ||
""" Setup our mqtt_eventstream component. """ | ||
def _event_handler(event): | ||
""" Handle events by publishing them on the mqtt queue. """ | ||
if event.origin != EventOrigin.local: | ||
return | ||
if event.event_type in ( | ||
EVENT_TIME_CHANGED, | ||
EVENT_CALL_SERVICE, | ||
EVENT_SERVICE_EXECUTED | ||
): | ||
return | ||
event = {'event_type': event.event_type, 'event_data': event.data} | ||
msg = json.dumps(event, cls=JSONEncoder) | ||
mqtt.publish(hass, pub_topic, msg) | ||
|
||
mqtt = loader.get_component('mqtt') | ||
pub_topic = config[DOMAIN].get('publish_topic', None) | ||
sub_topic = config[DOMAIN].get('subscribe_topic', None) | ||
|
||
# Only listen for local events if you are going to publish them | ||
if (pub_topic): | ||
hass.bus.listen(MATCH_ALL, _event_handler) | ||
|
||
# Process events from a remote server that are received on a queue | ||
def _event_receiver(topic, payload, qos): | ||
""" | ||
A new MQTT message, published by the other HA instance, | ||
has been received. | ||
""" | ||
# TODO error handling | ||
event = json.loads(payload) | ||
event_type = event.get('event_type') | ||
event_data = event.get('event_data') | ||
|
||
# Special case handling for event STATE_CHANGED | ||
# We will try to convert state dicts back to State objects | ||
if event_type == EVENT_STATE_CHANGED and event_data: | ||
for key in ('old_state', 'new_state'): | ||
state = State.from_dict(event_data.get(key)) | ||
|
||
if state: | ||
event_data[key] = state | ||
|
||
hass.bus.fire( | ||
event_type, | ||
event_data=event_data, | ||
origin=EventOrigin.remote | ||
) | ||
|
||
# Only subscribe if you specified a topic | ||
if (sub_topic): | ||
mqtt.subscribe(hass, sub_topic, _event_receiver) | ||
|
||
hass.states.set('{domain}.initialized'.format(domain=DOMAIN), True) | ||
# return boolean to indicate that initialization was successful | ||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
""" | ||
tests.test_component_mqtt_eventstream | ||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | ||
Tests MQTT eventstream component. | ||
""" | ||
import json | ||
import unittest | ||
from unittest.mock import ANY, patch | ||
|
||
import homeassistant.components.mqtt_eventstream as eventstream | ||
from homeassistant.const import EVENT_STATE_CHANGED | ||
from homeassistant.core import State | ||
from homeassistant.remote import JSONEncoder | ||
import homeassistant.util.dt as dt_util | ||
|
||
from tests.common import ( | ||
get_test_home_assistant, | ||
mock_mqtt_component, | ||
fire_mqtt_message, | ||
mock_state_change_event, | ||
fire_time_changed | ||
) | ||
|
||
|
||
class TestMqttEventStream(unittest.TestCase): | ||
""" Test the MQTT eventstream module. """ | ||
|
||
def setUp(self): # pylint: disable=invalid-name | ||
super(TestMqttEventStream, self).setUp() | ||
self.hass = get_test_home_assistant() | ||
self.mock_mqtt = mock_mqtt_component(self.hass) | ||
|
||
def tearDown(self): # pylint: disable=invalid-name | ||
""" Stop down stuff we started. """ | ||
self.hass.stop() | ||
|
||
def add_eventstream(self, sub_topic=None, pub_topic=None): | ||
""" Add a mqtt_eventstream component to the hass. """ | ||
config = {} | ||
if sub_topic: | ||
config['subscribe_topic'] = sub_topic | ||
if pub_topic: | ||
config['publish_topic'] = pub_topic | ||
return eventstream.setup(self.hass, {eventstream.DOMAIN: config}) | ||
|
||
def test_setup_succeeds(self): | ||
self.assertTrue(self.add_eventstream()) | ||
|
||
def test_setup_with_pub(self): | ||
# Should start off with no listeners for all events | ||
self.assertEqual(self.hass.bus.listeners.get('*'), None) | ||
|
||
self.assertTrue(self.add_eventstream(pub_topic='bar')) | ||
self.hass.pool.block_till_done() | ||
|
||
# Verify that the event handler has been added as a listener | ||
self.assertEqual(self.hass.bus.listeners.get('*'), 1) | ||
|
||
@patch('homeassistant.components.mqtt.subscribe') | ||
def test_subscribe(self, mock_sub): | ||
sub_topic = 'foo' | ||
self.assertTrue(self.add_eventstream(sub_topic=sub_topic)) | ||
self.hass.pool.block_till_done() | ||
|
||
# Verify that the this entity was subscribed to the topic | ||
mock_sub.assert_called_with(self.hass, sub_topic, ANY) | ||
|
||
@patch('homeassistant.components.mqtt.publish') | ||
@patch('homeassistant.core.dt_util.datetime_to_str') | ||
def test_state_changed_event_sends_message(self, mock_datetime, mock_pub): | ||
now = '00:19:19 11-01-2016' | ||
e_id = 'fake.entity' | ||
pub_topic = 'bar' | ||
mock_datetime.return_value = now | ||
|
||
# Add the eventstream component for publishing events | ||
self.assertTrue(self.add_eventstream(pub_topic=pub_topic)) | ||
self.hass.pool.block_till_done() | ||
|
||
# Reset the mock because it will have already gotten calls for the | ||
# mqtt_eventstream state change on initialization, etc. | ||
mock_pub.reset_mock() | ||
|
||
# Set a state of an entity | ||
mock_state_change_event(self.hass, State(e_id, 'on')) | ||
self.hass.pool.block_till_done() | ||
|
||
# The order of the JSON is indeterminate, | ||
# so first just check that publish was called | ||
mock_pub.assert_called_with(self.hass, pub_topic, ANY) | ||
self.assertTrue(mock_pub.called) | ||
|
||
# Get the actual call to publish and make sure it was the one | ||
# we were looking for | ||
msg = mock_pub.call_args[0][2] | ||
event = {} | ||
event['event_type'] = EVENT_STATE_CHANGED | ||
new_state = { | ||
"last_updated": now, | ||
"state": "on", | ||
"entity_id": e_id, | ||
"attributes": {}, | ||
"last_changed": now | ||
} | ||
event['event_data'] = {"new_state": new_state, "entity_id": e_id} | ||
|
||
# Verify that the message received was that expected | ||
self.assertEqual(json.loads(msg), event) | ||
|
||
@patch('homeassistant.components.mqtt.publish') | ||
def test_time_event_does_not_send_message(self, mock_pub): | ||
self.assertTrue(self.add_eventstream(pub_topic='bar')) | ||
self.hass.pool.block_till_done() | ||
|
||
# Reset the mock because it will have already gotten calls for the | ||
# mqtt_eventstream state change on initialization, etc. | ||
mock_pub.reset_mock() | ||
|
||
fire_time_changed(self.hass, dt_util.utcnow()) | ||
self.assertFalse(mock_pub.called) | ||
|
||
def test_receiving_remote_event_fires_hass_event(self): | ||
sub_topic = 'foo' | ||
self.assertTrue(self.add_eventstream(sub_topic=sub_topic)) | ||
self.hass.pool.block_till_done() | ||
|
||
calls = [] | ||
self.hass.bus.listen_once('test_event', lambda _: calls.append(1)) | ||
self.hass.pool.block_till_done() | ||
|
||
payload = json.dumps( | ||
{'event_type': 'test_event', 'event_data': {}}, | ||
cls=JSONEncoder | ||
) | ||
fire_mqtt_message(self.hass, sub_topic, payload) | ||
self.hass.pool.block_till_done() | ||
|
||
self.assertEqual(1, len(calls)) |