Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Haveibeenpwned sensor platform #3618

Merged
merged 8 commits into from Oct 8, 2016
Merged
1 change: 1 addition & 0 deletions .coveragerc
Expand Up @@ -235,6 +235,7 @@ omit =
homeassistant/components/sensor/google_travel_time.py
homeassistant/components/sensor/gpsd.py
homeassistant/components/sensor/gtfs.py
homeassistant/components/sensor/haveibeenpwned.py
homeassistant/components/sensor/hp_ilo.py
homeassistant/components/sensor/imap.py
homeassistant/components/sensor/imap_email_content.py
Expand Down
181 changes: 181 additions & 0 deletions homeassistant/components/sensor/haveibeenpwned.py
@@ -0,0 +1,181 @@
"""
Support for haveibeenpwned (email breaches) sensor.

For more details about this platform, please refer to the documentation at
https://home-assistant.io/components/sensor.haveibeenpwned/
"""
from datetime import timedelta
import logging

import voluptuous as vol
import requests

from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (STATE_UNKNOWN, CONF_EMAIL)
from homeassistant.helpers.entity import Entity
import homeassistant.helpers.config_validation as cv
from homeassistant.util import Throttle
import homeassistant.util.dt as dt_util
from homeassistant.helpers.event import track_point_in_time

_LOGGER = logging.getLogger(__name__)

DATE_STR_FORMAT = "%Y-%m-%d %H:%M:%S"
USER_AGENT = "Home Assistant HaveIBeenPwned Sensor Component"

MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15)
MIN_TIME_BETWEEN_FORCED_UPDATES = timedelta(seconds=5)

PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_EMAIL): vol.All(cv.ensure_list, [cv.string]),
})


# pylint: disable=unused-argument
def setup_platform(hass, config, add_devices, discovery_info=None):
"""Setup the RESTful sensor."""
emails = config.get(CONF_EMAIL)
data = HaveIBeenPwnedData(emails)

devices = []
for email in emails:
devices.append(HaveIBeenPwnedSensor(data, hass, email))

add_devices(devices)

# To make sure we get initial data for the sensors
# ignoring the normal throttle of 15 minutes but using
# an update throttle of 5 seconds
for sensor in devices:
sensor.update_nothrottle()


class HaveIBeenPwnedSensor(Entity):
"""Implementation of HaveIBeenPwnedSensor."""

def __init__(self, data, hass, email):
"""Initialize the HaveIBeenPwnedSensor sensor."""
self._state = STATE_UNKNOWN
self._data = data
self._hass = hass
self._email = email
self._unit_of_measurement = "Breaches"

@property
def name(self):
"""Return the name of the sensor."""
return "Breaches {}".format(self._email)

@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
return self._unit_of_measurement

@property
def state(self):
"""Return the state of the device."""
return self._state

@property
def state_attributes(self):
"""Return the atrributes of the sensor."""
val = {}
if self._email not in self._data.data:
return val

for idx, value in enumerate(self._data.data[self._email]):
tmpname = "breach {}".format(idx+1)
tmpvalue = "{} {}".format(
value["Title"],
dt_util.as_local(dt_util.parse_datetime(
value["AddedDate"])).strftime(DATE_STR_FORMAT))
val[tmpname] = tmpvalue

return val

def update_nothrottle(self, dummy=None):
"""Update sensor without throttle."""
self._data.update_no_throttle()

# Schedule a forced update 5 seconds in the future if the
# update above returned no data for this sensors email.
# this is mainly to make sure that we don't
# get http error "too many requests" and to have initial
# data after hass startup once we have the data it will
# update as normal using update
if self._email not in self._data.data:
track_point_in_time(self._hass,
self.update_nothrottle,
dt_util.now() +
MIN_TIME_BETWEEN_FORCED_UPDATES)
return

if self._email in self._data.data:
self._state = len(self._data.data[self._email])
self.update_ha_state()

def update(self):
"""Update data and see if it contains data for our email."""
self._data.update()

if self._email in self._data.data:
self._state = len(self._data.data[self._email])


class HaveIBeenPwnedData(object):
"""Class for handling the data retrieval."""

def __init__(self, emails):
"""Initialize the data object."""
self._email_count = len(emails)
self._current_index = 0
self.data = {}
self._email = emails[0]
self._emails = emails

def set_next_email(self):
"""Set the next email to be looked up."""
self._current_index = (self._current_index + 1) % self._email_count
self._email = self._emails[self._current_index]

def update_no_throttle(self):
"""Get the data for a specific email."""
self.update(no_throttle=True)

@Throttle(MIN_TIME_BETWEEN_UPDATES, MIN_TIME_BETWEEN_FORCED_UPDATES)
def update(self, **kwargs):
"""Get the latest data for current email from REST service."""
try:
url = "https://haveibeenpwned.com/api/v2/breachedaccount/{}". \
format(self._email)

_LOGGER.info("Checking for breaches for email %s", self._email)

req = requests.get(url, headers={"User-agent": USER_AGENT},
allow_redirects=True, timeout=5)

except requests.exceptions.RequestException:
_LOGGER.error("failed fetching HaveIBeenPwned Data for '%s'",
self._email)
return

if req.status_code == 200:
self.data[self._email] = sorted(req.json(),
key=lambda k: k["AddedDate"],
reverse=True)

# only goto next email if we had data so that
# the forced updates try this current email again
self.set_next_email()

elif req.status_code == 404:
self.data[self._email] = []

# only goto next email if we had data so that
# the forced updates try this current email again
self.set_next_email()

else:
_LOGGER.error("failed fetching HaveIBeenPwned Data for '%s'"
"(HTTP Status_code = %d)", self._email,
req.status_code)