Skip to content
This repository was archived by the owner on Aug 27, 2022. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 30 additions & 13 deletions custom_components/sensor/authenticated.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
For more details about this component, please refer to the documentation at
https://github.com/custom-components/sensor.authenticated
"""
import logging
import socket
from datetime import timedelta
from pathlib import Path
import logging
import requests

import voluptuous as vol
import yaml
import homeassistant.helpers.config_validation as cv
Expand All @@ -20,9 +20,12 @@

CONF_NOTIFY = 'enable_notification'

ATTR_HOSTNAME = 'hostname'
ATTR_COUNTRY = 'country'
ATTR_REGION = 'region'
ATTR_CITY = 'city'
ATTR_LAST_AUTHENTICATE_TIME = 'last_authenticated_time'
ATTR_PREVIOUS_AUTHENTICATE_TIME = 'previous_authenticated_time'

SCAN_INTERVAL = timedelta(seconds=30)

Expand Down Expand Up @@ -52,24 +55,27 @@ def __init__(self, hass, notify, log, out):
"""Initialize the sensor."""
_LOGGER.info('version %s is starting, if you have ANY issues with this, please report them here: https://github.com/custom-components/%s', __version__, __name__.split('.')[1] + '.' + __name__.split('.')[2])
self._state = None
self._hostname = None
self._country = None
self._region = None
self._city = None
self._last_authenticated_time = None
self._previous_authenticated_time = None
self._notify = notify
self._log = log
self._out = out
self.hass = hass
self.update()

def first_ip(self, ip_address, access_time):
def first_ip(self, ip_address, access_time, hostname):
"""If the IP is the first"""
_LOGGER.debug('First IP, creating file...')
with open(self._out, 'a') as the_file:
the_file.write(ip_address + ':')
self.new_ip(ip_address, access_time)
self.new_ip(ip_address, access_time, hostname)
the_file.close()

def new_ip(self, ip_address, access_time):
def new_ip(self, ip_address, access_time, hostname):
"""If the IP is new"""
_LOGGER.debug('Found new IP %s', ip_address)
fetchurl = 'https://ipapi.co/' + ip_address + '/json/'
Expand All @@ -79,7 +85,7 @@ def new_ip(self, ip_address, access_time):
geo = 'none'
else:
geo = geo
if 'reserved' in geo:
if 'reserved' in geo or geo['error']:
_LOGGER.debug('The IP is reserved, no GEO info available...')
geo_country = 'none'
geo_region = 'none'
Expand All @@ -88,33 +94,37 @@ def new_ip(self, ip_address, access_time):
geo_country = geo['country']
geo_region = geo['region']
geo_city = geo['city']
self.write_file(ip_address, access_time, geo_country, geo_region, geo_city)
self.write_file(ip_address, access_time, hostname, geo_country, geo_region, geo_city)
if self._notify == 'True':
self.hass.components.persistent_notification.create('{}'.format(ip_address + ' (' + geo_country + ', ' + geo_region + ', ' + geo_city + ')'), 'New successful login from')
else:
_LOGGER.debug('persistent_notifications is disabled in config, enable_notification=%s', self._notify)

def update_ip(self, ip_address, access_time):
def update_ip(self, ip_address, access_time, hostname):
"""If we know this IP"""
_LOGGER.debug('Found known IP %s, updating access_timestamp.', ip_address)
_LOGGER.debug('Found known IP %s, updating timestamps.', ip_address)
with open(self._out) as f:
doc = yaml.load(f)
f.close()

doc[ip_address]['previous_authenticated_time'] = doc[ip_address]['last_authenticated']
doc[ip_address]['last_authenticated'] = access_time
doc[ip_address]['hostname'] = hostname

with open(self._out, 'w') as f:
yaml.dump(doc, f, default_flow_style=False)
f.close()

def write_file(self, ip_address, access_time, country='none', region='none', city='none'):
def write_file(self, ip_address, access_time, hostname, country='none', region='none', city='none'):
"""Writes info to out control file"""
with open(self._out) as f:
doc = yaml.load(f)
f.close()

doc[ip_address] = dict(
hostname=hostname,
last_authenticated=access_time,
previous_authenticated_time='none',
country=country,
region=region,
city=city
Expand All @@ -137,21 +147,25 @@ def update(self):
self._state = None
else:
ip_address = get_ip.split(' ')[8]
hostname = socket.getfqdn(ip_address)
access_time = get_ip.split(' ')[0] + ' ' + get_ip.split(' ')[1]
checkpath = Path(self._out)
if checkpath.exists():
if str(ip_address) in open(self._out).read():
self.update_ip(ip_address, access_time)
self.update_ip(ip_address, access_time, hostname)
else:
self.new_ip(ip_address, access_time)
self.new_ip(ip_address, access_time, hostname)
else:
self.first_ip(ip_address, access_time)
self.first_ip(ip_address, access_time, hostname)
self._state = ip_address
stream = open(self._out, 'r')
geo_info = yaml.load(stream)
self._hostname = geo_info[ip_address]['hostname']
self._country = geo_info[ip_address]['country']
self._region = geo_info[ip_address]['region']
self._city = geo_info[ip_address]['city']
self._last_authenticated_time = geo_info[ip_address]['last_authenticated']
self._previous_authenticated_time = geo_info[ip_address]['previous_authenticated_time']

@property
def name(self):
Expand All @@ -172,7 +186,10 @@ def icon(self):
def device_state_attributes(self):
"""Return attributes for the sensor."""
return {
ATTR_HOSTNAME: self._hostname,
ATTR_COUNTRY: self._country,
ATTR_REGION: self._region,
ATTR_CITY: self._city,
ATTR_LAST_AUTHENTICATE_TIME: self._last_authenticated_time,
ATTR_PREVIOUS_AUTHENTICATE_TIME: self._previous_authenticated_time,
}