Skip to content

Commit

Permalink
Refactor Waze Travel Time & Update Requirements (#22428)
Browse files Browse the repository at this point in the history
* Refactor Waze Travel Time & Update Requirements

Refactored Waze Travel Time to contain a data object.

Changed error retrieving data to a warning.
Added distance conversion depending on region.
Removed dependency on TRACKABLE_DOMAINS list.
Update to use WazeRouteCalculator 0.10

3rd time's a charm.  Deleted fork, caused last PR to screw up.  So here we are.

* Update requirements_all.txt

* Revert package upgrade.

* Revert package upgrade.
  • Loading branch information
Petro31 authored and balloob committed Jun 6, 2019
1 parent 1c13638 commit 2c341f2
Showing 1 changed file with 107 additions and 56 deletions.
163 changes: 107 additions & 56 deletions homeassistant/components/waze_travel_time/sensor.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
"""Support for Waze travel time sensor."""
from datetime import timedelta
import logging
import re

import voluptuous as vol

from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import (
ATTR_ATTRIBUTION, CONF_NAME, CONF_REGION, EVENT_HOMEASSISTANT_START,
ATTR_LATITUDE, ATTR_LONGITUDE)
ATTR_LATITUDE, ATTR_LONGITUDE, CONF_UNIT_SYSTEM_METRIC,
CONF_UNIT_SYSTEM_IMPERIAL)
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers import location
from homeassistant.helpers.entity import Entity
from homeassistant.util import Throttle

_LOGGER = logging.getLogger(__name__)

Expand All @@ -26,18 +27,21 @@
CONF_INCL_FILTER = 'incl_filter'
CONF_EXCL_FILTER = 'excl_filter'
CONF_REALTIME = 'realtime'
CONF_UNITS = 'units'
CONF_VEHICLE_TYPE = 'vehicle_type'

DEFAULT_NAME = 'Waze Travel Time'
DEFAULT_REALTIME = True
DEFAULT_VEHICLE_TYPE = 'car'

ICON = 'mdi:car'

UNITS = [CONF_UNIT_SYSTEM_METRIC, CONF_UNIT_SYSTEM_IMPERIAL]

REGIONS = ['US', 'NA', 'EU', 'IL', 'AU']
VEHICLE_TYPES = ['car', 'taxi', 'motorcycle']

SCAN_INTERVAL = timedelta(minutes=5)
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)

TRACKABLE_DOMAINS = ['device_tracker', 'sensor', 'zone', 'person']

PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
vol.Required(CONF_ORIGIN): cv.string,
Expand All @@ -47,6 +51,9 @@
vol.Optional(CONF_INCL_FILTER): cv.string,
vol.Optional(CONF_EXCL_FILTER): cv.string,
vol.Optional(CONF_REALTIME, default=DEFAULT_REALTIME): cv.boolean,
vol.Optional(CONF_VEHICLE_TYPE,
default=DEFAULT_VEHICLE_TYPE): vol.In(VEHICLE_TYPES),
vol.Optional(CONF_UNITS): vol.In(UNITS)
})


Expand All @@ -59,9 +66,14 @@ def setup_platform(hass, config, add_entities, discovery_info=None):
incl_filter = config.get(CONF_INCL_FILTER)
excl_filter = config.get(CONF_EXCL_FILTER)
realtime = config.get(CONF_REALTIME)
vehicle_type = config.get(CONF_VEHICLE_TYPE)
units = config.get(CONF_UNITS, hass.config.units.name)

data = WazeTravelTimeData(None, None, region, incl_filter,
excl_filter, realtime, units,
vehicle_type)

sensor = WazeTravelTime(name, origin, destination, region,
incl_filter, excl_filter, realtime)
sensor = WazeTravelTime(name, origin, destination, data)

add_entities([sensor])

Expand All @@ -79,27 +91,28 @@ def _get_location_from_attributes(state):
class WazeTravelTime(Entity):
"""Representation of a Waze travel time sensor."""

def __init__(self, name, origin, destination, region,
incl_filter, excl_filter, realtime):
def __init__(self, name, origin, destination, waze_data):
"""Initialize the Waze travel time sensor."""
self._name = name
self._region = region
self._incl_filter = incl_filter
self._excl_filter = excl_filter
self._realtime = realtime
self._waze_data = waze_data
self._state = None
self._origin_entity_id = None
self._destination_entity_id = None

if origin.split('.', 1)[0] in TRACKABLE_DOMAINS:
# Attempt to find entity_id without finding address with period.
pattern = "(?<![a-zA-Z0-9 ])[a-z_]+[.][a-zA-Z0-9_]+"

if re.fullmatch(pattern, origin):
_LOGGER.debug("Found origin source entity %s", origin)
self._origin_entity_id = origin
else:
self._origin = origin
self._waze_data.origin = origin

if destination.split('.', 1)[0] in TRACKABLE_DOMAINS:
if re.fullmatch(pattern, destination):
_LOGGER.debug("Found destination source entity %s", destination)
self._destination_entity_id = destination
else:
self._destination = destination
self._waze_data.destination = destination

@property
def name(self):
Expand All @@ -109,11 +122,9 @@ def name(self):
@property
def state(self):
"""Return the state of the sensor."""
if self._state is None:
return None
if self._waze_data.duration is not None:
return round(self._waze_data.duration)

if 'duration' in self._state:
return round(self._state['duration'])
return None

@property
Expand All @@ -129,16 +140,13 @@ def icon(self):
@property
def device_state_attributes(self):
"""Return the state attributes of the last update."""
if self._state is None:
if self._waze_data.duration is None:
return None

res = {ATTR_ATTRIBUTION: ATTRIBUTION}
if 'duration' in self._state:
res[ATTR_DURATION] = self._state['duration']
if 'distance' in self._state:
res[ATTR_DISTANCE] = self._state['distance']
if 'route' in self._state:
res[ATTR_ROUTE] = self._state['route']
res[ATTR_DURATION] = self._waze_data.duration
res[ATTR_DISTANCE] = self._waze_data.distance
res[ATTR_ROUTE] = self._waze_data.route
return res

def _get_location_from_entity(self, entity_id):
Expand All @@ -149,11 +157,12 @@ def _get_location_from_entity(self, entity_id):
_LOGGER.error("Unable to find entity %s", entity_id)
return None

# Check if the entity has location attributes (zone)
# Check if the entity has location attributes.
if location.has_location(state):
_LOGGER.debug("Getting %s location", entity_id)
return _get_location_from_attributes(state)

# Check if device is in a zone (device_tracker)
# Check if device is inside a zone.
zone_state = self.hass.states.get('zone.{}'.format(state.state))
if location.has_location(zone_state):
_LOGGER.debug(
Expand All @@ -162,11 +171,11 @@ def _get_location_from_entity(self, entity_id):
)
return _get_location_from_attributes(zone_state)

# If zone was not found in state then use the state as the location
# If zone was not found in state then use the state as the location.
if entity_id.startswith('sensor.'):
return state.state

# When everything fails just return nothing
# When everything fails just return nothing.
return None

def _resolve_zone(self, friendly_name):
Expand All @@ -178,46 +187,88 @@ def _resolve_zone(self, friendly_name):

return friendly_name

@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Fetch new state data for the sensor."""
import WazeRouteCalculator

_LOGGER.debug("Fetching Route for %s", self._name)
# Get origin latitude and longitude from entity_id.
if self._origin_entity_id is not None:
self._origin = self._get_location_from_entity(
self._waze_data.origin = self._get_location_from_entity(
self._origin_entity_id)

# Get destination latitude and longitude from entity_id.
if self._destination_entity_id is not None:
self._destination = self._get_location_from_entity(
self._waze_data.destination = self._get_location_from_entity(
self._destination_entity_id)

self._destination = self._resolve_zone(self._destination)
self._origin = self._resolve_zone(self._origin)
# Get origin from zone name.
self._waze_data.origin = self._resolve_zone(
self._waze_data.origin)

# Get destination from zone name.
self._waze_data.destination = self._resolve_zone(
self._waze_data.destination)

self._waze_data.update()


if self._destination is not None and self._origin is not None:
class WazeTravelTimeData():
"""WazeTravelTime Data object."""

def __init__(self, origin, destination, region, include, exclude,
realtime, units, vehicle_type):
"""Set up WazeRouteCalculator."""
import WazeRouteCalculator

self._calc = WazeRouteCalculator

self.origin = origin
self.destination = destination
self.region = region
self.include = include
self.exclude = exclude
self.realtime = realtime
self.units = units
self.duration = None
self.distance = None
self.route = None

# Currently WazeRouteCalc only supports PRIVATE, TAXI, MOTORCYCLE.
if vehicle_type.upper() == 'CAR':
# Empty means PRIVATE for waze which translates to car.
self.vehicle_type = ''
else:
self.vehicle_type = vehicle_type.upper()

def update(self):
"""Update WazeRouteCalculator Sensor."""
if self.origin is not None and self.destination is not None:
try:
params = WazeRouteCalculator.WazeRouteCalculator(
self._origin, self._destination, self._region,
log_lvl=logging.DEBUG)
routes = params.calc_all_routes_info(real_time=self._realtime)
params = self._calc.WazeRouteCalculator(
self.origin, self.destination, self.region,
self.vehicle_type, log_lvl=logging.DEBUG)
routes = params.calc_all_routes_info(real_time=self.realtime)

if self._incl_filter is not None:
if self.include is not None:
routes = {k: v for k, v in routes.items() if
self._incl_filter.lower() in k.lower()}
self.include.lower() in k.lower()}

if self._excl_filter is not None:
if self.exclude is not None:
routes = {k: v for k, v in routes.items() if
self._excl_filter.lower() not in k.lower()}
self.exclude.lower() in k.lower()}

route = sorted(routes, key=(lambda key: routes[key][0]))[0]
duration, distance = routes[route]
self._state = {
'duration': duration,
'distance': distance,
'route': route,
}
except WazeRouteCalculator.WRCError as exp:
_LOGGER.error("Error on retrieving data: %s", exp)

self.duration, distance = routes[route]

if self.units == CONF_UNIT_SYSTEM_IMPERIAL:
# Convert to miles.
self.distance = distance / 1.609
else:
self.distance = distance

self.route = route
except self._calc.WRCError as exp:
_LOGGER.warning("Error on retrieving data: %s", exp)
return
except KeyError:
_LOGGER.error("Error retrieving data from server")
Expand Down

0 comments on commit 2c341f2

Please sign in to comment.