Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New Linky sensor #20535

Closed
wants to merge 26 commits into from
Closed
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
210 changes: 177 additions & 33 deletions homeassistant/components/sensor/linky.py
Expand Up @@ -10,83 +10,227 @@

import voluptuous as vol

from homeassistant.const import CONF_USERNAME, CONF_PASSWORD, CONF_TIMEOUT
from homeassistant.const import (CONF_USERNAME, CONF_PASSWORD, CONF_TIMEOUT,
grea09 marked this conversation as resolved.
Show resolved Hide resolved
STATE_UNAVAILABLE, CONF_NAME, CONF_SCAN_INTERVAL, CONF_MONITORED_VARIABLES)
grea09 marked this conversation as resolved.
Show resolved Hide resolved
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle
import homeassistant.helpers.config_validation as cv

REQUIREMENTS = ['pylinky==0.1.8']

KILOWATT_HOUR = 'kWh'

DEFAULT_NAME = 'Linky'

PEAK_HOURS = 'peak_hours'
PEAK_HOURS_COST = 'peak_hours_cost'
OFFPEAK_HOURS_COST = 'offpeak_hours_cost'

CONSUMPTION = "conso"
TIME = "time"

_LOGGER = logging.getLogger(__name__)

SCAN_INTERVAL = timedelta(minutes=10)
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=30)
SCAN_INTERVAL = timedelta(minutes=30)
DEFAULT_TIMEOUT = 10

PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_USERNAME): cv.string,
vol.Required(CONF_PASSWORD): cv.string,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(CONF_SCAN_INTERVAL, default=MIN_TIME_BETWEEN_UPDATES):
vol.All(cv.time_period, cv.positive_timedelta),
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Required(PEAK_HOURS): vol.All(cv.ensure_list),
vol.Required(PEAK_HOURS_COST): vol.Coerce(float),
vol.Required(OFFPEAK_HOURS_COST): vol.Coerce(float),
})


def setup_platform(hass, config, add_entities, discovery_info=None):
"""Configure the platform and add the Linky sensor."""
username = config[CONF_USERNAME]
password = config[CONF_PASSWORD]
timeout = config[CONF_TIMEOUT]
"""Set up the Linky sensor."""
username = config.get(CONF_USERNAME)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use [] syntax for required and optional with default keys.

password = config.get(CONF_PASSWORD)
timeout = config.get(CONF_TIMEOUT)
SCAN_INTERVAL = config.get(CONF_SCAN_INTERVAL)
grea09 marked this conversation as resolved.
Show resolved Hide resolved
name = config.get(CONF_NAME)
peak_hours = config.get(PEAK_HOURS)
peak_hours_cost = config.get(PEAK_HOURS_COST)
offpeak_hours_cost = config.get(OFFPEAK_HOURS_COST)

from pylinky.client import LinkyClient, PyLinkyError
client = LinkyClient(username, password, None, timeout)
try:
client.fetch_data()
except PyLinkyError as exp:
_LOGGER.error(exp)
client.close_session()
return
linky_data = LinkyData(username, password, timeout)

devices = [LinkySensor('Linky', client)]
add_entities(devices, True)
add_entities([LinkySensor(name, linky_data, peak_hours,
peak_hours_cost, offpeak_hours_cost)])
return True


class LinkySensor(Entity):
"""Representation of a sensor entity for Linky."""

def __init__(self, name, client):
def __init__(self, name, linky_data,
peak_hours, peak_hours_cost,
offpeak_hours_cost):
"""Initialize the sensor."""
self._name = name
self._client = client
self._peak_hours = peak_hours
self._peak_hours_cost = peak_hours_cost
self._offpeak_hours_cost = offpeak_hours_cost
self._unit = KILOWATT_HOUR
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use homeassistant.const.ENERGY_KILO_WATT_HOUR

self._icon = "mdi:flash"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make this a constant

self._lk = linky_data
self._state = None
self._attributes = {}
self.update()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this outside the init()


@property
def name(self):
"""Return the name of the sensor."""
return self._name

@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
return self._unit

@property
def state_attributes(self):
"""Return the state attributes."""
return self._attributes

@property
def state(self):
"""Return the state of the sensor."""
return self._state

@property
def unit_of_measurement(self):
"""Return the unit of measurement."""
return 'kWh'
def icon(self):
"""Return the icon of the sensor."""
return self._icon

@Throttle(SCAN_INTERVAL)
def update(self):
"""Fetch new state data for the sensor."""
"""Fetch new state data for the sensor.
"""
_LOGGER.debug("Start of update of linky data")
self._lk.update()
if not self._lk.success:
self._state = STATE_UNAVAILABLE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set this to None. Instead add another instance variable named _available to the sensor and expose it with the property available. Don't forget to mark available = True when the update is successful.

@property
def available(self):
    """Return the availablility of this sensor."""
   return self._available

self._attributes = []
return

self._state = self._lk.daily[1][CONSUMPTION]
self._attributes["halfhourly"] = [d[CONSUMPTION]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't do these calculations as part of the integration, instead it should be made a standalone integration, like the previous mentioned utility meter.

for d in self._lk.halfhourly]
self._attributes["daily"] = [d[CONSUMPTION]
for d in self._lk.daily]
self._attributes["peak_hours"] = sum([
d[CONSUMPTION]
if any([between(h[0], h[1], d[TIME])
for h in self._peak_hours])
else 0 for d in self._lk.halfhourly]) / 2
# From kW for 30 minutes to kWh
self._attributes["offpeak_hours"] = sum(
[0 if any([between(h[0], h[1], d[TIME])
for h in self._peak_hours])
else d[CONSUMPTION]
grea09 marked this conversation as resolved.
Show resolved Hide resolved
for d in self._lk.halfhourly]) / 2
# From kW for 30 minutes to kWh
self._attributes["peak_offpeak_percent"] = ((self._attributes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some users share on the forum to have divison by Zero here. Perhaps add some protections.

["peak_hours"] *
100) /
(self._attributes["peak_hours"] +
grea09 marked this conversation as resolved.
Show resolved Hide resolved
self._attributes["offpeak_hours"]))
grea09 marked this conversation as resolved.
Show resolved Hide resolved
self._attributes["daily_cost"] = (self
._peak_hours_cost *
grea09 marked this conversation as resolved.
Show resolved Hide resolved
self._attributes["peak_hours"] +
grea09 marked this conversation as resolved.
Show resolved Hide resolved
self._offpeak_hours_cost *
grea09 marked this conversation as resolved.
Show resolved Hide resolved
self._attributes["offpeak_hours"])
grea09 marked this conversation as resolved.
Show resolved Hide resolved
self._attributes["monthly_evolution"] = (
1 - ((self._lk.monthly[0][CONSUMPTION]) /
grea09 marked this conversation as resolved.
Show resolved Hide resolved
(self._lk.compare_month))) * 100
grea09 marked this conversation as resolved.
Show resolved Hide resolved
_LOGGER.debug("Computed values: " +
str(self._attributes))


def hour_to_min(hour):
grea09 marked this conversation as resolved.
Show resolved Hide resolved
return sum(map(lambda x, y: int(x)*y, hour.split(":"), [60,1]))
grea09 marked this conversation as resolved.
Show resolved Hide resolved


def between(start, end, hour):
grea09 marked this conversation as resolved.
Show resolved Hide resolved
min_start = hour_to_min(start)
min_end = hour_to_min(end)
min_hour = hour_to_min(hour)
return (min_start <= min_hour <= min_end
if min_start < min_end
grea09 marked this conversation as resolved.
Show resolved Hide resolved
else min_start >= min_hour >= min_end)
grea09 marked this conversation as resolved.
Show resolved Hide resolved


class LinkyData:
grea09 marked this conversation as resolved.
Show resolved Hide resolved
"""The class for handling the data retrieval."""

def __init__(self, username, password, timeout):
"""Initialize the data object."""
self._username = username
self._password = password
self._timeout = timeout
self.client = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not create the client here? Regardless it should not be {} but rather None if it doesn't exist.

self.data = {}
self.success = False

@Throttle(MIN_TIME_BETWEEN_UPDATES)
def _fetch_data(self):
"""Fetch latest data from Linky."""
from pylinky.client import PyLinkyError
from pylinky import LinkyClient
from datetime import datetime
grea09 marked this conversation as resolved.
Show resolved Hide resolved
from datetime import date
from dateutil.relativedelta import relativedelta
try:
self._client.fetch_data()
self.client = LinkyClient(self._username, self._password,
None, self._timeout)
self.client.fetch_data()
_LOGGER.info("Connected to Enedis server successfully.")
self.data = self.client.get_data()
today = date.today()
# Fixes a bug in PyLinky
self.data["monthly"][0] = (self.client
._get_data_per_month(
grea09 marked this conversation as resolved.
Show resolved Hide resolved
(today.replace(day=1) - relativedelta(months=12))
.strftime("%d/%m/%Y"),
((today - relativedelta(months=11)).replace(day=1)
- relativedelta(days=1)).strftime("%d/%m/%Y"))[0])
_LOGGER.info("Second request for bugfix")
# Get partial CONSUMPTION of the same month last year
self.compare_month = sum([d[CONSUMPTION]
for d in self.client
._get_data_per_month(
(today.replace(day=1) -
grea09 marked this conversation as resolved.
Show resolved Hide resolved
relativedelta(months=12))
grea09 marked this conversation as resolved.
Show resolved Hide resolved
.strftime("%d/%m/%Y"),
grea09 marked this conversation as resolved.
Show resolved Hide resolved
(today - relativedelta
grea09 marked this conversation as resolved.
Show resolved Hide resolved
(months=12))
grea09 marked this conversation as resolved.
Show resolved Hide resolved
.strftime("%d/%m/%Y"))])
grea09 marked this conversation as resolved.
Show resolved Hide resolved
_LOGGER.info("Same month last year " +
"(from 1st to same day): " +
str(self.compare_month))
except PyLinkyError as exp:
_LOGGER.error(exp)
self._client.close_session()
return

_LOGGER.debug(json.dumps(self._client.get_data(), indent=2))
_LOGGER.warning("Unable to fetch Linky data " +
"(maybe due to night maintenance " +
"downtime schedule): %s", exp)
return False
return True

if self._client.get_data():
# get the last past day data
self._state = self._client.get_data()['daily'][-2]['conso']
else:
self._state = None
def update(self):
"""Return the latest collected data from Linky."""
self._fetch_data()
if not self.data :
grea09 marked this conversation as resolved.
Show resolved Hide resolved
return
_LOGGER.debug("Linky data retrieved: " + str(self.data))
self.halfhourly = list(reversed(self.data["hourly"]))
self.daily = list(reversed(self.data["daily"]))
self.monthly = list(reversed(self.data["monthly"]))
self.yearly = list(reversed(self.data["yearly"]))
self.success = True