# Turn Off Fritz!DECT Smart Switch Only When Devices Are Idle

I had a rather specific problem. The TL;DR version is that I have a smart switch controling an appliance which irregularly draws power without giving a clear indication. I'm a little paranoid and I prefer to turn off the switch only when the appliance is idle. The smart switch provides real time (well, almost...) power measurements and I used to manually check the power read out manufacturers app and wait for an idle state to turn of the machine. The goal was to automate this somewhat tedious procedure with a Python script.


Now the full story. The appliance is my dual boiler espresso machine which takes about 20 minutes to heat up before use and, when running, has irregular heating periods to keep the boilers at operating temperature. During the heating periods the machine draws several hundreds of Watts of power up to about 1.4 kH, when idle the power consumption is about 3 W. The initial heating period is about 10-15 minutes long while the other periods are sometimes less than a minute. There is no visual indication for heating activity and the heating is mostly inaudible as well. Due to the long initial heating period I have the machine control by a Fritz!DECT 200 smart switch connected to my Fritz!Box router so that I can turn on the machine automatically in the morning so that it's ready to go when I get up. In this setup the smart switch completely replaces the machine's actual on/off switch which is constantly set to 'on'.

Now, I'm not an electrician but I certified service technician has assured me of two things:
1. The smart switch has pretty much the same effect as the actual on/off switch.
2. Switching off the machine during heating does not harm the machine.

Nevertheless, I feel better when the smart switch only goes to 'off' when the machine is idle.


## Controlling Fritz!DECT devices using Python and firtzconnection

The Fritz!Box allows third party interaction with the device. The official documentation is available in German on the manufacturer's homepage (https://fritz.com/service/schnittstellen/). Luckily, there is a Python library called `fritzconnection` which simplifies the process. In particular, the relevant functions for Fritz!DECT smart switches are:
1. Retrieving the power measurements.
2. Setting the switch state.

The documentation for `fritzconnection` can be found here: https://fritzconnection.readthedocs.io/en/1.15.0/


## A problem for the implementation: latency

Recall that the goal is to write a function, say `turnOffWhenIdle(...)`, that does what the name suggests. At first, the solution seems obvious. Simply connect to the Fritz!Box, monitor the power of the smart switch, wait until an idle value is reported, and only then turn off the switch. This would be easy using `firtzconnection` if the smart switch actually provided real time measurements as the manufacturer claims. However, a little expirementation suggests that this isn't quite the case. As far as I can tell, the following is true:
* Power measurements are only recorded every 10 seconds.
* The time stamp of the measurement only has second precision. The actual time of measurement is truncated.
* Requesting and receiving the power measurements takes somewhere between 0.7 and 1 seconds. I'm assuming that it takes about half of this time for the request to reach the smart switch.
* The first request after longer periods without requests is not always reliable. This is indicated by an outdated timestamp.

Unfortunately, this complicates the design of an optimal `turnOffWhenIdle(...)` function:
* Several requests of power data are necessary to obtain reliable values. It can take more than 10 seconds after an initial request until the data is reliable.
* Even in the case, the read out is accurate, the latency between the measurement and retrieving the information can be over 12 seconds in the worst case.
* Even in the best case, some latency has to be expected because the measuring cycle is not known precisely and it is next to impossible to set up a completely synchronized request cycle due to fluctuating request durations.

With this understood the main task is to set up a request cycle using `fritzconnection` that is reasonably accurately synchronized with the power measurement cycle on the smart switch. 

# The Implementation

## Connecting to the Fritz!Box using fritzconnection
The first task is to import (and install, if needed) `fritzconnection` and other needed packages.

In [1]:
# !pip install fritzconnection
import fritzconnection
from fritzconnection import FritzConnection
from fritzconnection.lib.fritzhomeauto import FritzHomeAutomation

import json 
import time, datetime

Next we have to connect to the Fritz!Box and its home automation services

In [2]:
# get IP address, user name and password from config file
with open("fritz_access.ini","r") as file:
    config = json.load(file)
fritzbox_ip = config['ip']
fritzbox_user = config['user']
fritzbox_pw = config['pw']
# Note: fritz_access.ini is not provided to keep my data private
# It is contains a single JSON dictionary of the following form
# {"ip": "<ip_address>", "user": "<user_name", "pw": "password"}
# You can also declare the variables manually

# Connect to FritzBox
fc = FritzConnection(address=fritzbox_ip, user=fritzbox_user, password=fritzbox_pw)
# Connect to home automation services
fh = FritzHomeAutomation(fc)

# Print some information
print(fc)
print("-"*10)
print(type(fc))
print(type(fh))

FRITZ!Box 7490 at http://192.168.178.1
FRITZ!OS: 7.60
----------
<class 'fritzconnection.core.fritzconnection.FritzConnection'>
<class 'fritzconnection.lib.fritzhomeauto.FritzHomeAutomation'>


As you can see, `fritzconnection` provides classes called `FritzConnection` and `FritzHomeAutomation` to interact with the Fritz!Box and its home automation services. Similarly, there is a class `HomeAutomationDevice` for smart home devices such as switches, radiator controls, etc. We can conveniently create instances for all either all devices or only the switches using the functions below. I'll focus on the switches.

In [3]:
def getDectDevices(fh):
    """Obtain all smart home devices as HomeAutomationDevice() objects."""
    devices = fh.get_homeautomation_devices()
    # devices = [device for device in devices if device.is_switchable]
    devices = [device for device in devices if 'FRITZ!DECT' in device.ProductName]
    return devices

def getDectSwitches(fh):
    """Get only the smart switches as HomeAutomationDevice() objects."""
    devices = fh.get_homeautomation_devices()
    switches = [device for device in devices if device.is_switchable]
    return switches

# get a list of HomeAutomationDevice() instances, one for each smart switch
switches = getDectSwitches(fh)
# print information
first_switch = switches[0]
print(first_switch)
print(type(first_switch))

ain: 08761 0406876, AVM - FRITZ!DECT 200
<class 'fritzconnection.lib.fritzhomeauto.HomeAutomationDevice'>


The first line might be a little cryptic. So let's take a closer look at the `HomeAutomationDevice`class and print some attributes.

In [4]:
header = f"{"AIN":16}{"Device Name":21}{"Product Type"}"
print(header)
print("-" * 60)
for switch in switches:
    print(f"{switch.identifier:16}{switch.DeviceName:21}{switch.Manufacturer:4}{switch.ProductName}")

AIN             Device Name          Product Type
------------------------------------------------------------
08761 0406876   l'angolo del caffè   AVM FRITZ!DECT 200
11630 0073749   TV etc               AVM FRITZ!DECT 200


The AIN is a unique identifier assigned to each smart home device which is used to communicate with the device. The device name is the user assigned name to the smart device. 

The `HomeAutomationClass`also has several methods. We will make use of the following:
* `get_basic_device_stats()` returns a nested dictionary with various statistics recorded on the device (including power usage).
* `get_switch_state()` returns the current switch state (`True` if on, `False` if off).
* `set_switch(<boolean>)` sets the swtich state (`True` = on, `False` = off).

## "Turn Off When Idle" as a Class Method

I will implement the desired function `turn_of_when_idle` as a method for a class `SmartSwitch` which "manually inherits" the relevant attributes and methods from the `HomeAutomationDevice` class. Here "manual inheritance" means that each instance of `SmartSwitch` holds an instance of `HomeAutomationDevice` as a private attribute from which some attributes are copied to the `SmartSwitch` instance. The reason for using the crux of "manual inheritace" is that the initialization of `HomeAutomationDevice` instances within `fritzconnection` is somewhat convoluted.

The methods defined for `SmartSwitch` could easily be intergrated into `fritzconnection`. However, the methods are arguably too specialized to warrant the inclusion. 

### An outline of the strategy
Here is an outline of the strategy used to construct the `turn_of_when_idle` method:
1. Request the most recent power record on the device and make sure it is reliable.
2. Set up a request cycle parallel to the measurement cycle of the smart switch. As initial base time use the timestamp of the power record obtained in 1.
3. Synchronize the request cycle with reasonbale accuracy to the measuring cycle by repeatedly checking the latency and adjusting the base time accordingly. The is done by approximating the offset of the actual time of measurement from the reported timestamp from below using a divide and conquer strategy and setting the offset for the request cycle accordingly while compensating for the request duration.

The algorithm reliably results in power measurements with latency below 1 second. Since the request duration typically at least 0.7 seconds, this is close to optimal.

### Some helper functions

It will be necessarily (and beneficial) to work with timestamps and timedeltas from the `datetime` library. For convenience, I will define two related functions: one that adjusts a given timestamp by a given number of seconds, and another one that formats a timestamp as a string of the form HH:MM:SS or HH:MM:SS.ddd... 

In [5]:
def nudge_timestamp(timestamp,seconds):
    """Nudges a timestamp by a given number of seconds."""
    # create timedelta from seconds
    timedelta= datetime.timedelta(seconds=seconds)
    # add timedelta to timestamp
    nudged_timestamp = timestamp + timedelta
    return nudged_timestamp

def print_timestamp(timestamp, dig=None):
    """Convert timestamp to a string in the format HH:MM:SS.digits with 
    digits controlling the number of digits for the seconds value."""
    try:
        if dig == 0:
            time_string = timestamp.strftime("%H:%M:%S")
        elif 0 < dig <=6:
            time_string = timestamp.strftime("%H:%M:%S.%f")[:dig-6]
    except:
        time_string = timestamp.strftime("%H:%M:%S.%f")
    return time_string

### Definition of the SmartSwitch class


In [6]:
# from fritzconnection.lib.fritzhomeauto import HomeAutomationDevice

class SmartSwitch():
    """Provides convenient methods to interact with Fritz!DECT smart switches. 
    Some methods and attributes are shared with the HomeAutomationDevice class
    from the firtzconnection library.
    
    Attributes:
    - __device: hidden atrribute holding an instance of HomeAutomationDevice
    - ain: identifier used to communicate the the smart switch
    - name: name of the smart switch as assigned by the used
    - product_name: model name and number of the smart switch
    - idle_threshold: threshold for power in idle state (measured in Watts)

    Methods:
    - is_switchable(): checks if the HomeAutomationDevice is actually a switch
    - get_switch_state(): get the on/off status of the switch as a boolean
    - set_switch(): changes the current switch state
    - get_basic_device_stats(): get statistics recorded by the smart switch
    - get_power_stats(): gets only the statistics related to power 
    - get_latest_power_record(): gets only the latest power record and related time information
    - get_reliable_power_record(): specialized method to get a reliable power record
    - get_next_power_record(): specialized method to schedule a new power record
    - turn_off_if_idle(): checks if the switch is idle, and if so, turns it off
    - turn_off_when_idle(): waits for the device to be reliably idle and turns it off
    """

    def __init__(self,fritzdevice,idle_threshold=5):
        # private attribute containing a HomeAutomationDevice() instance
        self.__device = fritzdevice
        # some attributes inherited from the HomeAutomationDevice() instance
        self.identifier = self.__device.identifier
        self.DeviceName = self.__device.DeviceName
        self.model = f"{self.__device.Manufacturer} {self.__device.ProductName}"
        # additional attribute: threshold for power in idle state (measured in Watts)
        self.idle_threshold = idle_threshold
    
    def __str__(self):
        return f"{self.DeviceName} ({self.model}, AIN: {self.identifier})"

    # some methods inherited from the HomeAutomationDevice() instance
    def is_switchable(self):
        return self.__device.is_switchable()
    def get_switch_state(self):
        return self.__device.get_switch_state()
    def set_switch(self,arg):
        return self.__device.set_switch(arg)
    def get_basic_device_stats(self):
        return self.__device.get_basic_device_stats()
    
    # some customized methods
    def get_power_stats(self):
        """Get the power statistics recorded by the smart switch."""
        power_stats = self.__device.get_basic_device_stats()['power']
        return power_stats
    
    def get_latest_power_record(self):
        """Returns information regarding the latest power value recorded
        by a FritzDECT device.

        Returns:
            info : dictionary containing the following information:
                'power' : lastest power value recorded by device
                'record time' : timestamp of record
                'request time' : timestamp of request
                'repsonse time' : timestamp of response
                'duration' : time between request and response in seconds
                'latency' : time between record time and response in seconds
        """
        # Get timestamp of request time
        request_time = datetime.datetime.now()
        # Get stats
        power_stats = self.get_power_stats()
        # Get timestamp of response time
        response_time = datetime.datetime.now()
        # Extract latest power record and convert to Watt
        # (Note: power is recorded as integer multiple of 0.01 W)
        power = power_stats['data'][0] / 100
        # Extract time stamp of record
        record_time = power_stats['datatime']
        # Compute latency and duration
        duration = (response_time - request_time).total_seconds()
        latency = (response_time - record_time).total_seconds()
        # Package the information in dictionary and return
        data = {
            'power' : power,
            'record time' : record_time,
            'request time' : request_time,
            'response time' : response_time,
            'duration' : duration,
            'latency' : latency,
        }
        return data
    
    def get_reliable_power_record(self, interval=2):
        """Returns a reliable power record. 
        
        Explanation:
        Note that .get_latest_power_record() does not always give reliable 
        information after no power stats have been requested from the device 
        in a while. This can be seen by an outdated timestamp in the power 
        record. The way out is to send a few requests a few seconds apart 
        until the timestamp changes. This process may take up to ~12 seconds.

        Arguments:
        - interval: time to pause until the next request
        Returns:
        - power_record: a power record as returned by .get_latest_power_record()
        """
        # get initial power record and extract its timestamp
        power_record = self.get_latest_power_record()
        init_time = power_record['record time']
        # do the same once more right away
        power_record = self.get_latest_power_record()
        next_time = power_record['record time']
        # unless the timestamp has changed, way a bit and repeat until it changes
        while next_time == init_time:
            time.sleep(interval)
            power_record = self.get_latest_power_record()
            next_time = power_record['record time']
        # return the final power record
        return power_record
    
    def get_next_power_record(self, base_time, cycle=10):
        """Schedules the next iteration in a cycle of power record requests
        starting at a given timestamp (base_time) repeating every (cycle) seconds.

        Arguments:
        - base_time: the timestamp of the start of the cycle
        - cycle: the cycle length in seconds
        Returns:
        - power_record: a power record as returned by .get_latest_power_record()
        """
        exec_time = base_time
        while exec_time < datetime.datetime.now():
            exec_time += datetime.timedelta(seconds=cycle)
        sleep_time = (exec_time - datetime.datetime.now()).total_seconds()
        time.sleep(sleep_time)
        power_record = self.get_latest_power_record()
        return power_record


    def turn_off_if_idle(self,power_record,allowed_latency=2.5):
        """Check if the switch was reported idle with an acceptable latency, and if so, 
        turn it off.
        
        Arguments:
        - power_record: a power record as returned by .get_latest_power_record()
        - allowed_latency: maximal latency considered to be reliable
        Returns:
        - boolean indicating the final switch state (True indicates that)
        """
        # compare the power record to the idle threshold
        device_idle = power_record['power'] < self.idle_threshold
        # check the latency against 
        latency_ok = 0 < power_record['latency'] < allowed_latency
        # act accordingly
        if device_idle and latency_ok:
            self.set_switch(False)
            return False
        else:
            return True
        
    def turn_off_when_idle(self, allowed_latency:float=1, cycle_detection_precision:int=2, silent:bool = True) -> None:
        """Turns the switch off only when a power record with acceptable latency
        indicates idle state. 

        The strategy is to set up a request cycle that and synchronize it to
        the power measurement cycle of the switch. The latter is approximated 
        with the specified precision using a divide and conquer strategy.

        Arguments:
        - allowed_latency: sets the maximal allowed latency in seconds (default: 1)
        - cycle_detection_precision: determines the precision of the approximation
        as 10**(-cycle_detection_precision)
        - silent: controls if status updates are given as console output
        Returns:
        - None
        """
        # Help functions for console output
        def _status_update(status_string:str)->None:
            """Print input string to console if silent==False."""
            if not silent:
                print(status_string)
        def _power_update(power_record:dict,initial:bool=False)->None:
            """Print formatted extract of power record."""
            power = f"{power_record['power']:7.2f} W"
            record_time = print_timestamp(power_record['record time'],dig=0)
            request_time = print_timestamp(power_record['request time'],dig=cycle_detection_precision)
            latency = f"{power_record['latency']:5.2f} s"
            if initial:
                status_string = f"Power: {power:12} Latency: {latency:10} Record Time: {record_time}"
            else:
                status_string = f"Power: {power:12} Latency: {latency:10} Request Time: {request_time:15} (Current Precision: {increment:0.{cycle_detection_precision}f} s)"
            _status_update(status_string)
        def _power_off_update(switch_is_on:bool)->None:
            """Power off notification."""
            if not switch_is_on:
                _status_update("Device reported idle with low latency. Turning off...")

        ## MAIN ROUTINE
        # check if switch is on
        switch_is_on = self.get_switch_state()
        # if not, do nothing
        if not switch_is_on:
            _status_update(f"{self.DeviceName} is already off.")
            return
        # get first reliable power record
        _status_update("REQUESTING INITIAL POWER RECORD...") 
        power_record = self.get_reliable_power_record()
        _power_update(power_record, initial=True)
        # check if switch is idle with near optimal latency
        switch_is_on = self.turn_off_if_idle(power_record,allowed_latency=0.5)
        _power_off_update(switch_is_on)

        # start detection loop
        # set initial base time of detection loop to latest power record time
        base_time = power_record['record time']
        # initialize parameters to adjust base time for detection
        offset = 0
        lower_bound = -1
        increment = 1/4
        precision_is_low = increment > 10**(-cycle_detection_precision)
        _status_update("STARTING MAIN LOOP...")
        while switch_is_on:
            # get next power record
            power_record = self.get_next_power_record(base_time)
            _power_update(power_record)
            # extract information
            record_time = power_record['record time']
            latency = power_record['latency']
            # check if switch is reported idle with allowed latency, if so, turn off
            switch_is_on = self.turn_off_if_idle(power_record,allowed_latency)
            _power_off_update(switch_is_on)
            # adjust parameters if needed
            # at this point the latency should be between 0 and 12.5 seconds
            # latency 10 or higher indicates that the request was sent too soon
            if 9 < latency < 12.5:
                # adjust offset and its lower bound accordingly
                lower_bound = max([offset,lower_bound])
                offset += increment
            # in case of reasonably low latency, keep the lower bound, and reduce
            # the offset. 
            elif 0 < latency < 2.5:
                precision_can_be_increased = offset - increment == lower_bound
                if precision_is_low and precision_can_be_increased:
                    increment /= 2
                offset -= increment
            # in all other cases, something went wrong and the offset is reset to 0
            else:
                offset = 0
            # update the base time of the detection cycle
            base_time = nudge_timestamp(record_time,seconds=offset)


### A test run


Let's try to get an example of an "unreliable first power record"

In [7]:
test_run = SmartSwitch(first_switch)

test_run.get_latest_power_record()

{'power': 428.0,
 'record time': datetime.datetime(2025, 5, 27, 16, 11, 59),
 'request time': datetime.datetime(2025, 5, 27, 16, 12, 3, 256199),
 'response time': datetime.datetime(2025, 5, 27, 16, 12, 5, 821379),
 'duration': 2.56518,
 'latency': 6.821379}

And now a demonstration of the main function.

In [8]:
test_run.turn_off_when_idle(silent=False, allowed_latency=1, cycle_detection_precision=3)

REQUESTING INITIAL POWER RECORD...
Power:  427.36 W    Latency:  1.42 s    Record Time: 16:12:15
STARTING MAIN LOOP...
Power:  427.36 W    Latency: 10.82 s    Request Time: 16:12:25.000    (Current Precision: 0.250 s)
Power:    2.93 W    Latency:  1.06 s    Request Time: 16:12:35.250    (Current Precision: 0.250 s)
Power:    3.07 W    Latency:  0.95 s    Request Time: 16:12:45.125    (Current Precision: 0.125 s)
Device reported idle with low latency. Turning off...


# Comments on the current version

* As it stands, `turn_off_when_idle` can take quiet some time. For example, if the switch is currently not idle the main loop run will not time out by itself. And even if the switch is idle, if few iterations are needed to achieve low latency measurements. However, it does appear to do the job that it's supposed to do.
* The "manual inheritance" is not the most elegant concept. Maybe there is a better solution.

# Possible plans for the future

* Add time out feature to `turn_off_when_idle`.
* Add more specialized methods to `SmartSwitch` for monitoring and plotting power recods
* Find a way to change the "switch mode" from manual and automatic. Unfortunately, this is currently not possible using `fritzconnection'. And I'm not sure if it is documented how the official Fritz!Box interfaces achieve this. In the worst case, some minor network packet sniffing will be my best shot. 
* Along the same lines, it would be great to create a "gone for the weekend" template for the entire smart home setup. 
* Add GUI and create stand-alone executable.