-
-
Notifications
You must be signed in to change notification settings - Fork 28.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Haveibeenpwned sensor platform (#3618)
* Initial version of "haveibeenpwned" sensor component * 2 flake8 fixes * remove debugging error message * Increase scan_interval as well as throttle to make sure that during initial startup of hass the request happens with 5 seconds delays and after startup with 15 minutes delays. Scan_interval is increased also to not call update as often * update .coveragerc * remove (ssl) verify=False * - use dict to keep the request values with email as key - use track_point_in_time system to make sure data updates initially at 5 seconds between each call until all sensor's email have a result in the dict. * fix a pylint error that happend on the py35 tests
- Loading branch information
1 parent
7b40a64
commit 8f8bba4
Showing
2 changed files
with
182 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,181 @@ | ||
""" | ||
Support for haveibeenpwned (email breaches) sensor. | ||
For more details about this platform, please refer to the documentation at | ||
https://home-assistant.io/components/sensor.haveibeenpwned/ | ||
""" | ||
from datetime import timedelta | ||
import logging | ||
|
||
import voluptuous as vol | ||
import requests | ||
|
||
from homeassistant.components.sensor import PLATFORM_SCHEMA | ||
from homeassistant.const import (STATE_UNKNOWN, CONF_EMAIL) | ||
from homeassistant.helpers.entity import Entity | ||
import homeassistant.helpers.config_validation as cv | ||
from homeassistant.util import Throttle | ||
import homeassistant.util.dt as dt_util | ||
from homeassistant.helpers.event import track_point_in_time | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
||
DATE_STR_FORMAT = "%Y-%m-%d %H:%M:%S" | ||
USER_AGENT = "Home Assistant HaveIBeenPwned Sensor Component" | ||
|
||
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=15) | ||
MIN_TIME_BETWEEN_FORCED_UPDATES = timedelta(seconds=5) | ||
|
||
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({ | ||
vol.Required(CONF_EMAIL): vol.All(cv.ensure_list, [cv.string]), | ||
}) | ||
|
||
|
||
# pylint: disable=unused-argument | ||
def setup_platform(hass, config, add_devices, discovery_info=None): | ||
"""Setup the RESTful sensor.""" | ||
emails = config.get(CONF_EMAIL) | ||
data = HaveIBeenPwnedData(emails) | ||
|
||
devices = [] | ||
for email in emails: | ||
devices.append(HaveIBeenPwnedSensor(data, hass, email)) | ||
|
||
add_devices(devices) | ||
|
||
# To make sure we get initial data for the sensors | ||
# ignoring the normal throttle of 15 minutes but using | ||
# an update throttle of 5 seconds | ||
for sensor in devices: | ||
sensor.update_nothrottle() | ||
|
||
|
||
class HaveIBeenPwnedSensor(Entity): | ||
"""Implementation of HaveIBeenPwnedSensor.""" | ||
|
||
def __init__(self, data, hass, email): | ||
"""Initialize the HaveIBeenPwnedSensor sensor.""" | ||
self._state = STATE_UNKNOWN | ||
self._data = data | ||
self._hass = hass | ||
self._email = email | ||
self._unit_of_measurement = "Breaches" | ||
|
||
@property | ||
def name(self): | ||
"""Return the name of the sensor.""" | ||
return "Breaches {}".format(self._email) | ||
|
||
@property | ||
def unit_of_measurement(self): | ||
"""Return the unit the value is expressed in.""" | ||
return self._unit_of_measurement | ||
|
||
@property | ||
def state(self): | ||
"""Return the state of the device.""" | ||
return self._state | ||
|
||
@property | ||
def state_attributes(self): | ||
"""Return the atrributes of the sensor.""" | ||
val = {} | ||
if self._email not in self._data.data: | ||
return val | ||
|
||
for idx, value in enumerate(self._data.data[self._email]): | ||
tmpname = "breach {}".format(idx+1) | ||
tmpvalue = "{} {}".format( | ||
value["Title"], | ||
dt_util.as_local(dt_util.parse_datetime( | ||
value["AddedDate"])).strftime(DATE_STR_FORMAT)) | ||
val[tmpname] = tmpvalue | ||
|
||
return val | ||
|
||
def update_nothrottle(self, dummy=None): | ||
"""Update sensor without throttle.""" | ||
self._data.update_no_throttle() | ||
|
||
# Schedule a forced update 5 seconds in the future if the | ||
# update above returned no data for this sensors email. | ||
# this is mainly to make sure that we don't | ||
# get http error "too many requests" and to have initial | ||
# data after hass startup once we have the data it will | ||
# update as normal using update | ||
if self._email not in self._data.data: | ||
track_point_in_time(self._hass, | ||
self.update_nothrottle, | ||
dt_util.now() + | ||
MIN_TIME_BETWEEN_FORCED_UPDATES) | ||
return | ||
|
||
if self._email in self._data.data: | ||
self._state = len(self._data.data[self._email]) | ||
self.update_ha_state() | ||
|
||
def update(self): | ||
"""Update data and see if it contains data for our email.""" | ||
self._data.update() | ||
|
||
if self._email in self._data.data: | ||
self._state = len(self._data.data[self._email]) | ||
|
||
|
||
class HaveIBeenPwnedData(object): | ||
"""Class for handling the data retrieval.""" | ||
|
||
def __init__(self, emails): | ||
"""Initialize the data object.""" | ||
self._email_count = len(emails) | ||
self._current_index = 0 | ||
self.data = {} | ||
self._email = emails[0] | ||
self._emails = emails | ||
|
||
def set_next_email(self): | ||
"""Set the next email to be looked up.""" | ||
self._current_index = (self._current_index + 1) % self._email_count | ||
self._email = self._emails[self._current_index] | ||
|
||
def update_no_throttle(self): | ||
"""Get the data for a specific email.""" | ||
self.update(no_throttle=True) | ||
|
||
@Throttle(MIN_TIME_BETWEEN_UPDATES, MIN_TIME_BETWEEN_FORCED_UPDATES) | ||
def update(self, **kwargs): | ||
"""Get the latest data for current email from REST service.""" | ||
try: | ||
url = "https://haveibeenpwned.com/api/v2/breachedaccount/{}". \ | ||
format(self._email) | ||
|
||
_LOGGER.info("Checking for breaches for email %s", self._email) | ||
|
||
req = requests.get(url, headers={"User-agent": USER_AGENT}, | ||
allow_redirects=True, timeout=5) | ||
|
||
except requests.exceptions.RequestException: | ||
_LOGGER.error("failed fetching HaveIBeenPwned Data for '%s'", | ||
self._email) | ||
return | ||
|
||
if req.status_code == 200: | ||
self.data[self._email] = sorted(req.json(), | ||
key=lambda k: k["AddedDate"], | ||
reverse=True) | ||
|
||
# only goto next email if we had data so that | ||
# the forced updates try this current email again | ||
self.set_next_email() | ||
|
||
elif req.status_code == 404: | ||
self.data[self._email] = [] | ||
|
||
# only goto next email if we had data so that | ||
# the forced updates try this current email again | ||
self.set_next_email() | ||
|
||
else: | ||
_LOGGER.error("failed fetching HaveIBeenPwned Data for '%s'" | ||
"(HTTP Status_code = %d)", self._email, | ||
req.status_code) |