Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IBM Watson IoT Platform component #13664

Merged
merged 1 commit into from Jun 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .coveragerc
Expand Up @@ -748,6 +748,7 @@ omit =
homeassistant/components/tts/picotts.py
homeassistant/components/vacuum/mqtt.py
homeassistant/components/vacuum/roomba.py
homeassistant/components/watson_iot.py
homeassistant/components/weather/bom.py
homeassistant/components/weather/buienradar.py
homeassistant/components/weather/darksky.py
Expand Down
214 changes: 214 additions & 0 deletions homeassistant/components/watson_iot.py
@@ -0,0 +1,214 @@
"""
A component which allows you to send data to the IBM Watson IoT Platform.

For more details about this component, please refer to the documentation at
https://home-assistant.io/components/watson_iot/
"""

import logging
import queue
import threading
import time

import voluptuous as vol

from homeassistant.const import (
CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_INCLUDE,
CONF_TOKEN, CONF_TYPE, EVENT_STATE_CHANGED, EVENT_HOMEASSISTANT_STOP,
STATE_UNAVAILABLE, STATE_UNKNOWN)
from homeassistant.helpers import state as state_helper
import homeassistant.helpers.config_validation as cv

REQUIREMENTS = ['ibmiotf==0.3.4']

_LOGGER = logging.getLogger(__name__)

CONF_ORG = 'organization'
CONF_ID = 'id'

DOMAIN = 'watson_iot'

RETRY_DELAY = 20
MAX_TRIES = 3

CONFIG_SCHEMA = vol.Schema({
DOMAIN: vol.All(vol.Schema({
vol.Required(CONF_ORG): cv.string,
vol.Required(CONF_TYPE): cv.string,
vol.Required(CONF_ID): cv.string,
vol.Required(CONF_TOKEN): cv.string,
vol.Optional(CONF_EXCLUDE, default={}): vol.Schema({
vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
vol.Optional(CONF_DOMAINS, default=[]):
vol.All(cv.ensure_list, [cv.string])
}),
vol.Optional(CONF_INCLUDE, default={}): vol.Schema({
vol.Optional(CONF_ENTITIES, default=[]): cv.entity_ids,
vol.Optional(CONF_DOMAINS, default=[]):
vol.All(cv.ensure_list, [cv.string])
}),
})),
}, extra=vol.ALLOW_EXTRA)


def setup(hass, config):
"""Set up the Watson IoT Platform component."""
from ibmiotf import gateway

conf = config[DOMAIN]

include = conf[CONF_INCLUDE]
exclude = conf[CONF_EXCLUDE]
whitelist_e = set(include[CONF_ENTITIES])
whitelist_d = set(include[CONF_DOMAINS])
blacklist_e = set(exclude[CONF_ENTITIES])
blacklist_d = set(exclude[CONF_DOMAINS])

client_args = {
'org': conf[CONF_ORG],
'type': conf[CONF_TYPE],
'id': conf[CONF_ID],
'auth-method': 'token',
'auth-token': conf[CONF_TOKEN],
}
watson_gateway = gateway.Client(client_args)

def event_to_json(event):
"""Add an event to the outgoing list."""
state = event.data.get('new_state')
if state is None or state.state in (
STATE_UNKNOWN, '', STATE_UNAVAILABLE) or \
state.entity_id in blacklist_e or state.domain in blacklist_d:
return

if (whitelist_e and state.entity_id not in whitelist_e) or \
(whitelist_d and state.domain not in whitelist_d):
return

try:
_state_as_value = float(state.state)
except ValueError:
_state_as_value = None

if _state_as_value is None:
try:
_state_as_value = float(state_helper.state_as_number(state))
except ValueError:
_state_as_value = None

out_event = {
'tags': {
'domain': state.domain,
'entity_id': state.object_id,
},
'time': event.time_fired.isoformat(),
'fields': {
'state': state.state
}
}
if _state_as_value is not None:
out_event['fields']['state_value'] = _state_as_value

for key, value in state.attributes.items():
if key != 'unit_of_measurement':
# If the key is already in fields
if key in out_event['fields']:
key = key + "_"
# For each value we try to cast it as float
# But if we can not do it we store the value
# as string
try:
out_event['fields'][key] = float(value)
except (ValueError, TypeError):
out_event['fields'][key] = str(value)

return out_event

instance = hass.data[DOMAIN] = WatsonIOTThread(
hass, watson_gateway, event_to_json)
instance.start()

def shutdown(event):
"""Shut down the thread."""
instance.queue.put(None)
instance.join()

hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, shutdown)

return True


class WatsonIOTThread(threading.Thread):
"""A threaded event handler class."""

def __init__(self, hass, gateway, event_to_json):
"""Initialize the listener."""
threading.Thread.__init__(self, name='WatsonIOT')
self.queue = queue.Queue()
self.gateway = gateway
self.gateway.connect()
self.event_to_json = event_to_json
self.write_errors = 0
self.shutdown = False
hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener)

def _event_listener(self, event):
"""Listen for new messages on the bus and queue them for Watson IOT."""
item = (time.monotonic(), event)
self.queue.put(item)

def get_events_json(self):
"""Return an event formatted for writing."""
events = []

try:
item = self.queue.get()

if item is None:
self.shutdown = True
else:
event_json = self.event_to_json(item[1])
if event_json:
events.append(event_json)

except queue.Empty:
pass

return events

def write_to_watson(self, events):
"""Write preprocessed events to watson."""
import ibmiotf

for event in events:
for retry in range(MAX_TRIES + 1):
try:
for field in event['fields']:
value = event['fields'][field]
device_success = self.gateway.publishDeviceEvent(
event['tags']['domain'],
event['tags']['entity_id'],
field, 'json', value)
if not device_success:
_LOGGER.error(
"Failed to publish message to watson iot")
continue
break
except (ibmiotf.MissingMessageEncoderException, IOError):
if retry < MAX_TRIES:
time.sleep(RETRY_DELAY)
else:
_LOGGER.exception(
"Failed to publish message to watson iot")

def run(self):
"""Process incoming events."""
while not self.shutdown:
event = self.get_events_json()
if event:
self.write_to_watson(event)
self.queue.task_done()

def block_till_done(self):
"""Block till all events processed."""
self.queue.join()
3 changes: 3 additions & 0 deletions requirements_all.txt
Expand Up @@ -438,6 +438,9 @@ hydrawiser==0.1.1
# homeassistant.components.sensor.htu21d
# i2csense==0.0.4

# homeassistant.components.watson_iot
ibmiotf==0.3.4

# homeassistant.components.light.iglo
iglo==1.2.7

Expand Down