-
Notifications
You must be signed in to change notification settings - Fork 10
AppdaemonApp
gpbenton edited this page Dec 27, 2019
·
1 revision
Sample Appdaemon apps. etrv_manual.py provides basic manual operation, etrv.py provided automation
import logged_app
import datetime
#
# manual control for energenie trvs, including sending mqtt messages
#
# Args:
# trv_id - id number of trv
# input_slider - that determines temperature
# input_select - that determines the valve state
# target_offset - difference between target set and stable temperature achieved
class eTRVManual(logged_app.LoggedApp):
def initialize(self):
self.log("Initializing")
# Exercise the valve every week it is not used
self.valve_exercise_delay = 60*60*24*7
try:
self.target_temp_topic = "/energenie/eTRV/Command/Temperature/" + self.args["trv_id"]
self.target_temp_sensor = "sensor.command_temperature_" + self.args["trv_id"]
self.valve_state_topic = "/energenie/eTRV/Command/ValveState/" + self.args["trv_id"]
self.valve_state_sensor = "sensor.command_valvestate_" + self.args["trv_id"]
self.input_slider = self.args["input_slider"]
self.input_select = self.args["input_select"]
except KeyError as e:
self.log("Argument not found : {}".format(e), level = "ERROR")
return
try:
self.target_offset = float(self.args["target_offset"])
except KeyError:
self.target_offset = 0
except ValueError:
self.log("Argument overshoot must be a float. Using default 0",
level = "WARNING")
self.target_offset = 0
stored_valve_state = self.get_state("sensor.command_valvestate_" + self.args["trv_id"])
if stored_valve_state == "0":
self.set_state(self.input_select, state = "Open")
elif stored_valve_state == "1":
self.set_state(self.input_select, state = "Closed")
elif stored_valve_state == "2":
self.set_state(self.input_select, state = "Normal")
elif stored_valve_state == "unknown":
self.set_state(self.input_select, state = "Open")
else:
self.error("Invalid stored valve state {}".format(stored_valve_state))
self.listen_state(self.slider_changed, self.input_slider)
self.listen_state(self.select_changed, self.input_select)
self.listen_state(self.temp_command_event, self.target_temp_sensor)
self.listen_state(self.valve_state_event, self.valve_state_sensor)
# exercise the valve if it has not changed state for 7 days
self.listen_state(self.open_valve, "input_boolean.automatic_temperature",
new = "off")
# Start the exercise timer if valve is in open state
if self.get_state(self.input_select) == "Open":
self.log("Starting Exercise Timer")
self.exercise_timer = self.run_in(self.exercise_valve_again,
self.valve_exercise_delay)
def exercise_valve_again(self, kwargs):
self.log("Exercising Valve Again")
self.call_service("mqtt/publish",
topic = "/energenie/eTRV/Command/Exercise/" + self.args["trv_id"],
payload = "1")
self.exercise_timer = self.run_in(self.exercise_valve_again,
self.valve_exercise_delay)
def slider_changed(self, entity, attribute, old, new, kwargs):
if float(old) == float(new):
self.log("Not sending repeat message", level = "DEBUG")
else:
reason = self.get_state(entity, attribute = "reason")
self.log("reason: {}".format(reason), level = "DEBUG")
if reason == "mqtt_message":
self.log("Not sending because came from mqtt_message",
level = "DEBUG")
self.set_state(entity, attributes = {"reason" : "" })
else:
adjusted_target = float(new) + self.target_offset
self.log("Sending mqtt message topic {} with payload {}".format(
self.target_temp_topic, adjusted_target),
level = "DEBUG")
self.call_service("mqtt/publish", topic = self.target_temp_topic,
payload = adjusted_target, qos = "2", retain = "true")
def select_changed(self, entity, attribute, old, new, kwargs):
self.log("select_changed", level = "DEBUG")
if old == new:
self.log("Not sending repeat message", level = "DEBUG")
return
# Start or stop the exercise timer
if new == "Open":
self.exercise_timer = self.run_in(self.exercise_valve_again,
self.valve_exercise_delay)
else:
if hasattr(self, 'exercise_timer'):
self.cancel_timer(self.exercise_timer)
self.log("canceled exercise timer", level = "DEBUG")
del self.exercise_timer
reason = self.get_state(entity, attribute = "reason")
self.log("reason: {}".format(reason), level = "DEBUG")
if reason == "mqtt_message":
self.log("Not sending because came from mqtt_message",
level = "DEBUG")
self.set_state(entity, attributes = {"reason" : "" })
return
if new == "Open":
newstate = "0"
elif new == "Closed":
newstate = "1"
elif new == "Normal":
newstate = "2"
else:
self.error("Invalid new state {}".format(new))
return
self.log("Sending mqtt message topic {} with payload {}".format(
self.valve_state_topic, newstate),
level = "DEBUG")
self.call_service("mqtt/publish", topic = self.valve_state_topic,
payload = newstate, qos = "2", retain = "true" )
def temp_command_event(self, entity, attribute, old, new, kwargs):
self.log("temp_command_event: new = {}".format(new), level = "DEBUG")
# Stores mqtt_message in state attributes so that another message
# is not sent in the callback handler
adjusted_target = float(new) - self.target_offset
if float(self.get_state(self.input_slider)) != adjusted_target:
self.set_state(self.input_slider, state = adjusted_target,
attributes = { "reason" : "mqtt_message"})
def valve_state_event(self, entity, attribute, old, new, kwargs):
self.log("valve_state_event: new = {}".format(new), level = "DEBUG")
state = new
select_state = self.get_state(self.input_select)
# Stores mqtt_message in state attributes so that another message
# is not sent in the callback handler
if state == "0":
if select_state != "Open":
self.set_state(self.input_select, state = "Open",
attributes = { "reason" : "mqtt_message"})
elif state == "1" :
if select_state != "Closed":
self.set_state(self.input_select, state = "Closed",
attributes = { "reason" : "mqtt_message"})
elif state == "2":
if select_state != "Normal":
self.set_state(self.input_select, state = "Normal",
attributes = { "reason" : "mqtt_message"})
else:
self.error("Received invalid valve state {}".format(state))
def open_valve(self, entity, attribute, old, new, kwargs):
self.log("open_valve", level = "DEBUG")
self.set_state(self.input_select, state = "Open")
import logged_app
import datetime
class eTRV(logged_app.LoggedApp):
"""
energenie trv controller
Args:
manual_app - the app that does the manual control for this trv
sensor_temp - sensor that stores the temperature
away_temp - target temperature when nobody at home
power_mode_switch - switch that turns lower power mode on
schedule - list of times and temperatures to switch to, in format
time>temp
holiday_schedule - list of times and temperatures to switch to, in format
time>temp on non workdays
overshoot - float default 1.3 Temperature over target when valve is closed
"""
def initialize(self):
self.log("Initializing")
try:
self.manual_app = self.get_app(self.args["manual_app"])
self.trv_id = self.manual_app.args["trv_id"]
self.slider_set_temp = self.manual_app.args["input_slider"]
self.select_state = self.manual_app.args["input_select"]
self.sensor_temp = self.args["sensor_temp"]
self.away_temp = self.args["away_temp"]
self.power_mode_switch = self.args["power_mode_switch"]
self.args["schedule"]
self.args["holiday_schedule"]
self.register_constraint("is_workday")
except KeyError as e:
self.error("Argument not found : {}".format(e), level="ERROR")
self.log("Argument not found : {}".format(e), level="ERROR")
return
except AttributeError as e:
self.error("Invalid app name : {}".format(e), level="ERROR")
self.log("Invalid app name : {}".format(e), level="ERROR")
return
try:
self.overshoot = float(self.args["overshoot"])
except KeyError:
self.overshoot = 1.3
except ValueError:
self.log("Argument overshoot must be a float. Using default 1.3",
level = "WARNING")
self.overshoot = 1.3
# Get the target offset from the manual app args
self.get_target_offset()
# Set the schedule
for timetemperature in self.split_device_list(self.args["schedule"]):
try:
time, temperature = timetemperature.split(">")
self.log("{} at {}".format(temperature, time))
runtime = self.parse_time(time.strip())
self.run_daily(self.set_target, runtime, newtemp = temperature,
constrain_presence="anyone", is_workday=True)
except:
self.log(
"Unable to schedule Temp {} at {}".format(temperature, time),
level = "WARNING"
)
# Set the holiday
for timetemperature in self.split_device_list(self.args["holiday_schedule"]):
try:
time, temperature = timetemperature.split(">")
self.log("holidays {} at {}".format(temperature, time))
runtime = self.parse_time(time.strip())
self.run_daily(self.set_target, runtime, newtemp = temperature,
constrain_presence="anyone", is_workday=False)
except:
self.log(
"Unable to schedule Temp {} at {}".format(temperature, time),
level = "WARNING"
)
self.target_temp_topic = "/energenie/eTRV/Command/Temperature/" + self.trv_id
try:
slider_value = float(self.get_state("sensor.command_temperature_" +\
self.trv_id))\
- float(self.target_offset)
except (ValueError, TypeError):
# Couldn't decode the state, so set defaults
slider_value = 18.0
self.set_state("sensor.command_temperature_" + self.trv_id,
state = slider_value + float(self.target_offset))
self.set_value(self.slider_set_temp, slider_value)
self.set_home_target()
self.listen_state(self.boiler_on, "switch.central_heating", new = "on")
self.listen_state(self.boiler_off, "switch.central_heating", new = "off")
self.listen_state(self.temperature_changed, self.sensor_temp)
# listen to see if anyone at home
self.listen_state(self.arrive_home, "group.all_devices", new="home")
self.listen_state(self.everyone_out, "group.all_devices", new="not_home")
# resend target temperature every 30 mins
self.resend_target_timer = self.run_every(self.target_resend,
self.datetime(), 60 * 30)
self.listen_state(self.auto_on, "input_boolean.automatic_temperature",
new = "on")
def valve_state_resend(self):
valveState = self.get_state(self.select_state)
if valveState == "Normal":
sendValue = 2
elif valveState == "Closed":
sendValue = 1
else:
sendValue = 0
self.call_service("mqtt/publish",
topic = "/energenie/eTRV/Command/ValveState/" + self.trv_id,
payload = sendValue, qos = "1", retain = "true")
def target_resend(self, kwargs):
self.log("target_resend", level = "DEBUG")
try:
current_temp = float(self.get_state(self.sensor_temp))
target_temp = float(self.get_state(self.slider_set_temp))
except ValueError as e:
self.log("unable to send temperature: {}".format(e), level="DEBUG")
return
self.get_target_offset()
if abs(current_temp - target_temp) > 1.0:
self.valve_state_resend()
adjusted_target = target_temp + self.target_offset
self.call_service("mqtt/publish", topic = self.target_temp_topic,
payload = adjusted_target, qos = "1", retain = "true")
def boiler_on(self, entity, attribute, old, new, kwargs):
# Change to normal when boiler on, but open otherwise
self.log("boiler_on", level = "DEBUG")
if "window_sensor" in self.args\
and\
self.get_state(self.args["window_sensor"]) == "on":
self.log("Turning off because window is open")
self.select_option(self.select_state, "Closed")
warn_msg = "{} is open and heating is on".format(
self.friendly_name(self.args["window_sensor"]))
if self.get_state("input_boolean.notifications") == "on":
self.notify(warn_msg, title = "Window Open")
if self.anyone_home():
self.fire_event("appd_tts", msg=warn_msg)
self.turn_off(self.power_mode_switch)
def boiler_off(self, entity, attribute, old, new, kwargs):
self.log("boiler_off", level = "DEBUG")
self.turn_on(self.power_mode_switch)
self.select_option(self.select_state, "Normal")
def temperature_changed(self, entity, attribute, old, new, kwargs):
if self.get_state("switch.central_heating") == "off":
return
self.log("temperature_changed", level = "DEBUG")
try:
target_temp = float(self.get_state(self.slider_set_temp))
newFloat = float(new)
except ValueError as e:
self.log("ValueError: {}".format(e), level = "DEBUG")
return
self.log("target_temp: {}\nnew: {}".format(target_temp, newFloat), level="DEBUG")
if (newFloat > (target_temp + self.overshoot)
and
self.get_state(self.select_state) == "Normal"):
self.log("Turning off because too hot")
self.select_option(self.select_state, "Closed")
elif (newFloat < target_temp and
self.get_state(self.select_state) == "Closed"):
self.log("Turning to normal")
self.select_option(self.select_state, "Normal")
def set_target(self, kwargs):
self.log("Setting target temperature to {}".format(kwargs["newtemp"]))
try:
newtemp = float(kwargs["newtemp"])
if ("low_temperature_boost" in self.args) and\
(float(self.get_state("sensor.dark_sky_temperature")) < 4.0):
self.log("Adding low temp boost")
newtemp = newtemp + self.args["low_temperature_boost"]
self.set_value(self.slider_set_temp, newtemp)
self.home_target = newtemp
except ValueError as e:
self.error("Invalid Temperature: {}".format(e))
self.set_value(self.slider_set_temp, 18)
self.home_target = 18
return
def arrive_home(self, enity, attribute, old, new, kwargs):
self.set_home_target()
self.set_value(self.slider_set_temp, self.home_target)
# set the home target at startup or when arrive home
def set_home_target(self):
# Set a default value if nothing happens
self.home_target = 18
# start at midnight
previous_time = "00:00:01"
previous_temperature = 18
if self.is_workday(True):
schedule = self.args["schedule"]
else:
schedule = self.args["holiday_schedule"]
for timetemperature in self.split_device_list(schedule):
try:
time, temperature = timetemperature.split(">")
time = time.strip()
if self.now_is_between(previous_time, time):
self.home_target = previous_temperature
self.log ("Home target is {}".format(self.home_target))
break
else:
previous_time = time
previous_temperature = temperature
except Exception as e:
self.log("Unable to set current home target: {}".format(e),
level = "WARNING")
else:
self.home_target = previous_temperature
self.log ("Final Home target is {}".format(self.home_target))
def everyone_out(self, entity, attribute, old, new, kwargs):
# Set target to away temp
self.set_value(self.slider_set_temp, self.away_temp)
def get_target_offset(self):
"""
Gets the target_offset parameter from the manual app.
It has to be done each time as it could be changed without this
app reloading.
"""
try:
self.target_offset = float(self.manual_app.args["target_offset"])
except KeyError:
self.target_offset = 0
except:
self.log("Argument overshoot must be a float. Using default 0",
level = "WARNING")
self.target_offset = 0
self.log("target_offset = {}".format(self.target_offset), level="DEBUG")
def auto_on(self, entity, attribute, old, new, kwargs):
"""
Automatic temperature control turned on.
Sets the correct valve state and temperature.
"""
self.log("auto_on", level = "DEBUG")
self.set_state(self.select_state, state = "Normal")
self.set_home_target()
if self.anyone_home():
self.set_value(self.slider_set_temp, self.home_target)
else:
self.set_value(self.slider_set_temp, self.away_temp)
if self.get_state("input_boolean.boiler") == "on":
self.turn_off(self.power_mode_switch)
else:
self.turn_on(self.power_mode_switch)
def set_value(self, entity_id, new_value):
self.call_service("input_number/set_value",entity_id=entity_id,value=new_value)
# constraint to determine if workday or not
# value should be True or False
def is_workday(self, value):
return ((self.get_state("binary_sensor.workday_sensor") == "on") == value)