In [None]:
%matplotlib inline
import ball_heater_driver 
import time
import pandas as pd
import re
import seaborn as sns
import matplotlib.pyplot as plt
from list_ports import list_ports
import itertools
import random
from tqdm.notebook import tqdm
from importlib import reload


from IPython import display


import numpy as np

from pathlib import Path

data_dir = Path("test_data")
data_dir.mkdir(exist_ok=True)

def plot_log_file(log_file_path, x_unit="s", fig= None, axar = None):
    """Loads and plots a log file from the ball heater controller.

    Args:
        log_file_path (Pathlike): Path to the log file.
        x_unit (str, optional): Unit for the x axis. "s" or "min". Defaults to "s".
        fig (Figure, optional): Figure to plot on. Defaults to None, in which case a new figure is created.
        axar (list, optional): list of axes to plot on. Defaults to None.

    Returns:
        _type_: _description_
    """
    if fig is None:
        fig, axar = plt.subplots(2, 1, figsize=(10, 10), sharex=True)
    # else:
    #     for ax in axar:
    #         for artist in ax.lines +ax.collections:
    #             artist.remove()
    # else:
    #     plt.clf()

    df = pd.read_csv(log_file_path)
    df["time"] = df["time"] - df["time"][0]
    if x_unit == 'min':
        df["time"] = df["time"] / 60
    
    sns.lineplot(data=df, x="time", y="target_temp", label="target", ax=axar[0])
    sns.lineplot(data=df, x="time", y="heater_temp", label="heater", ax=axar[0])
    if 'aux_therm_temp' in df.columns and df.aux_therm_temp.mean() > 0:
        sns.lineplot(data=df, x="time", y="aux_therm_temp", label="aux", ax=axar[0])
    sns.lineplot(data=df, x="time", y="ball_heater_pwm", label="heater PWM", ax=axar[1])
    axar[0].set(ylabel="Temperature (°C)")
    axar[1].set(ylabel="Heater PWM (%)", xlabel=f"Time ({x_unit})")
    search_r = re.search(r"kp_([\d\.]+)_ki_([\d\.]+)_kd_([\d\.]+)", log_file_path.name)
    kp, ki, kd = search_r.groups() if search_r is not None else (0, 0, 0)
    fig.suptitle(f"kp:{kp}, ki:{ki}, kd:{kd}")
    return fig, axar



## Basic Control

In [None]:
#  Create a new instance of the BallHeaterDriver class using the first port with a Silicon Labs chip.

reload(ball_heater_driver)

ports = [
    port
    for port in list_ports()
    if port["manufacturer"] is not None and "Silicon Labs" in port["manufacturer"]
]
port = ports[0]["port"]
print(f"Using Port: {port}")
ball_heater = ball_heater_driver.BallHeaterDriver(port=port)

In [None]:
# Get and print the current status of the ball heater controller.
success, status = ball_heater.send_command("status", [])
print("current status is:")
print("\n".join((f"{key} : {value}") for key, value in status.items()))

In [None]:
# Start a thread to log the status of the ball heater controller to a csv file at 1s intervals.
filename = f'test_{time.strftime("%Y-%m-%d_%H-%M-%S")}.csv'
ball_heater.begin_logging(1, data_dir / filename)

In [None]:
# Set the target temperature to 35 (will also set control mode to remote)
ball_heater.send_command("set_target_temp", [35])

In [None]:
# plot the log file
fig, axes = plot_log_file(
    data_dir / filename,
)

In [None]:
# Set the control mode back to standby
ball_heater.send_command("set_control_mode", ["standby"])

In [None]:
# Stop logging
ball_heater.stop_logging()

## Run a step response test with given PID parameter Values

In [35]:
def update_plot(filename, fig=None, axar=None):
    fig, axar = plot_log_file(filename, fig=fig, axar=axar)
    display.display(fig)
    display.clear_output(wait=True)


def do_test_run(
    kp,
    ki,
    kd,
    data_dir,
    filename=None,
    target_temps=[45],
    on_wait=60,
    off_wait=5 * 60,
):
    if not data_dir.exists():
        data_dir.mkdir(exist_ok=True)
    if filename is None:
        filename = (
            data_dir
            / f'pid_test_kp_{kp}_ki_{ki}_kd_{kd}_{time.strftime("%Y-%m-%d_%H-%M-%S")}.csv'
        )
    else:
        filename = data_dir / filename
    # fig, axar = plt.subplots(2, 1, figsize=(10, 10))
    fig, axar = None, None

    ball_heater.send_command("set_pid_parameters", [kp, ki, kd])
    time.sleep(0.5)
    ball_heater.begin_logging(0.5, filename)
    time.sleep(2)
    start_time = time.time()
    try:
        for temp_i, target_temp in enumerate(target_temps, start=1):
            ball_heater.send_command("set_target_temp", [target_temp])

            while (time.time() - start_time) < (temp_i * on_wait):    
                update_plot(filename, fig=fig, axar=axar)
                time.sleep(1)

        ball_heater.send_command("set_target_temp", [20])
        off_time, standby = 0, False        
        while off_time < off_wait:
            off_time = time.time() - start_time - (temp_i * on_wait)
            update_plot(filename)
            time.sleep(1)
            if off_time > 30 and not standby:
                standby = True
                ball_heater.send_command("set_control_mode", [0])

    except Exception as e:
        ball_heater.send_command("set_target_temp", [20])
        raise e

    finally:
        ball_heater.stop_logging()
        ball_heater.send_command("set_target_temp", [20])
        ball_heater.send_command("set_control_mode", [0])
        display.clear_output()
        update_plot(filename, fig=fig, axar=axar)
        return filename

In [None]:
kp, ki, kd = 15, 1.0, 0.1
filename = do_test_run(kp, ki, kd, data_dir, target_temps=[45], on_wait=60, off_wait=5 * 60)


## Parametrically test many PID parameters

In [None]:
data_dir = Path(f'test_data/parametric_test_{time.strftime("%Y-%m-%d_%H-%M")}')
data_dir.mkdir(exist_ok=True)

kps = [10, 12.5, 15]
kis = [0.5, 1, 1.5]
kds = [
    0.1,
]
params = list(itertools.product(kps, kis, kds))

print(f"will take approx {len(params) * 6 / 60} hours")
random.shuffle(params)
params

In [None]:
# wait for temp to drop to 35, if needed.

temp = 50
while temp > 35:
    temp = ball_heater.send_command("status")[1]["heater_temp"]
    display.clear_output(wait=True)
    print(f"waiting for temp to drop to 35, currently {temp}")
    time.sleep(1)

In [None]:
for i, param_set in enumerate(params):
    print(f"run {i} of {len(params)}")
    try:
        kp, ki, kd = param_set
        tqdm.write(f"kp: {kp}, ki: {ki}, kd: {kd}")
        filename = do_test_run(
            kp,
            ki,
            kd,            
            data_dir=data_dir,
        )
    except Exception as e:
        if isinstance(e, KeyboardInterrupt):
            break
        else:
            raise e

In [None]:
# Plot all the log files in the data_dir

plot_dir = data_dir / "plots"
plot_dir.mkdir(exist_ok=True)

for filename in tqdm(data_dir.glob("*.csv")):
    fig, axs = plot_log_file(filename)
    # axs[0].set(xlim=(0, 75))
    fig.savefig(plot_dir / filename.with_suffix(".pdf").name)
    # break

In [None]:
# Combine all the log files into one dataframe and plot

df_list = []

for i, filename in enumerate(data_dir.glob("*.csv")):
    df = pd.read_csv(filename)
    df["kp"] = float(filename.name.split("_")[3])
    df["ki"] = float(filename.name.split("_")[5])
    df["kd"] = float(filename.name.split("_")[7])
    df["run"] = i
    df["time"] = df["time"] - df["time"][0]
    df_list.append(df)
df = pd.concat(df_list).reset_index(drop=True)

df = df.melt(
    id_vars=["time", "kp", "ki", "kd", "run"],
    value_vars=["target_temp", "heater_temp", "ball_heater_pwm"],
    var_name="variable",
    value_name="value",
).query('variable == "heater_temp" or variable == "target_temp"')

g = sns.relplot(
    data=df,
    x="time",
    y="value",
    hue="variable",
    style="run",
    dashes=False,
    col="ki",
    row="kp",
    kind="line",
    facet_kws={"sharey": True, "sharex": True},
)
g.set(
    # xlim=(1000, 1200),
    xlim=(0, 75),
    xlabel="Time (s)",
    ylabel="Temperature (°C)",
)
g.savefig(plot_dir / "combined_runs.pdf")

g.set(ylim=(30, 45))

g.savefig(plot_dir / "combined_runs_zoomed.pdf")