In [2]:
# Code from https://www.nickpascucci.com/literate-pid/index.html

class PIDController():
    """A classical PID controller which maintains state between calls.

    This class is intended to be integrated into an external control loop. It
    remembers enough of the state history to compute integral and derivative
    terms, and produces a combined control signal with the given gains.
    """


    def __init__(self,
                    kp: float,
                    ki: float,
                    kd: float,
                    target: float,
                    initial_state: float,
                    t_0: int) -> None:
        """Create a PID controller with the specified gains and initial state.

        Parameters
        ----------
        kp, ki, kd    : The PID control gains.
        target        : The desired system state, also called a "setpoint".
        initial_state : The starting state of the system.
        t_0           : The starting time.
        """
        # Gains for the proportional, integral, and derivative terms.
        self._kp: float = kp
        self._ki: float = ki
        self._kd: float = kd

        # The target state which the controller tries to maintain.
        self._target: float = target

        # Tracks the integrated error over time. This starts at 0 as no time has passed.
        self._accumulated_error: float = 0.0
        # Tracks the previous sample's error to compute derivative term.
        self._last_error: float = initial_state - target
        # Tracks the previous sample time point for computing the d_t value used in I and D terms.
        self._last_t: int = t_0


    def _proportional(self, error: float) -> float:
        return self._kp * error


    def _integral(self, d_t: float, error: float) -> float:
        # The constant part of the error.
        base_error = min(error, self._last_error) * d_t
        # Adjust by adding a little triangle on the constant part.
        error_adj = abs(error - self._last_error) * d_t / 2.0
        self._accumulated_error += base_error + error_adj
        return self._ki * self._accumulated_error


    def _derivative(self, d_t: float, error: float) -> float:
        d_e = (error - self._last_error)
        if d_t > 0:
            return self._kd * (d_e / d_t)
        else:
            return 0
        
    def next(self, t: int, state: float) -> float:
        """Incorporate a sample of the state at time t and produce a control value.

        Because the controller is stateful, calls to this method should be
        monotonic - that is, subsequent calls should not go backwards in time.

        Parameters
        ----------
        t     : The time at which the sample was taken.
        state : The system state at time t.
        """
        error = state - self._target
        d_t = (t - self._last_t)
        p = self._proportional(error)
        i = self._integral(d_t, error)
        d = self._derivative(d_t, error)
        self._last_t = t
        self._last_error = error
        return p + i + d

# Simulations

In [41]:
# Also from https://www.nickpascucci.com/literate-pid/index.html
# For testing

from typing import Callable, Iterable, Tuple, List

# Plotting stuff
import plotly.graph_objs as go
from plotly.subplots import make_subplots
from IPython.display import display, clear_output
import numpy as np
import plotly.offline as pyo
import time
from datetime import datetime

class ThermalMass():

    def __init__(self, mass: float, cp: float, start_temp: float):
        """Create a thermal mass with the given properties.

        Parameters
        ----------
        mass       : The body's mass in kilograms.
        cp         : The body's specific heat capacity in J/K/kg.
        start_temp : The body's starting temperature, in Kelvin.
        """
        self._mass = mass
        self._cp = cp
        self._temp = start_temp


    def current_temperature(self) -> float:
        """Get the current temperature of the body."""
        return self._temp


    def update(self, heat_in: float, heat_out: float):
        """Update the body state given a certain heat input and output.

        Parameters
        ----------
        heat_in  : The amount of heat added to the body in Joules.
        heat_out : The amount of heat removed from the body in Joules.
        """
        d_q = heat_in - heat_out
        # We find the change in temperature by taking the change in heat and scaling it by the
        # body's heat capacity.
        self._temp += d_q / (self._mass * self._cp)


class ThermalSystem():

    def __init__(self,
                 mass: ThermalMass,
                 in_flow: Callable[[int], float],
                 out_flow: Callable[[int, float], float]):
        """Create a simple thermal system with the given mass and flow functions.

        Parameters
        ----------
        mass     : A thermal mass to use as the system's body of interest.
        in_flow  : The heat flow rate (in J/s) into the body as a function of time.
        out_flow : The heat flow rate (in J/s) out of the body as a function of time and
                   temperature.
        """
        self._mass = mass
        self._in_flow = in_flow
        self._out_flow = out_flow




    def simulate(self, name, timesteps: Iterable[int]) -> List[Tuple[int, float, float, float]]:
        """Simulate the system behavior over time, and give a state trace.

        This function runs the system state forward using the given timestep
        sequence. At each time point it computes input and output heat flows,
        updates the thermal mass, and records a datapoint for the output trace.
        The trace consists of a sequence of tuples, one for each time point,
        containing the time value, the temperature, and the instantaneous heat
        flow in and out values at that time.

        Parameters
        ----------
        timesteps : A monotonically increasing sequence of timepoints.
        """
        trace = []
        it = timesteps.__iter__()

        start_milliseconds = time.time_ns() // 1_000_000
        last_t = 0

        temp = self._mass.current_temperature()
        flow_in = self._in_flow(last_t)
        flow_out = self._out_flow(last_t, temp)

        trace.append((0, temp, flow_in, flow_out))


        # Create a subplot with Plotly
        fig = make_subplots(rows=3, cols=1, subplot_titles=("Temperature", "Flow Out", "Flow In"), row_heights=[0.4, 0.3, 0.3])
        fig.update_layout(title=name)
        trace_heat = go.Scatter(x=[], y=[], mode='lines', name='Temperature')
        trace_flow_out = go.Scatter(x=[], y=[], mode='lines', name='Flow Out')
        trace_flow_in = go.Scatter(x=[], y=[], mode='lines', name='Flow In')
        fig.add_trace(trace_heat, row=1, col=1)
        fig.add_trace(trace_flow_out, row=2, col=1)
        fig.add_trace(trace_flow_in, row=3, col=1)
    
        # Function to update the plot
        def update_plot(times, y_temps, y_flow_outs, y_flow_ins):
            with fig.batch_update():
                fig.data[0].x = times
                fig.data[0].y = y_temps
                fig.data[1].x = times
                fig.data[1].y = y_flow_outs
                fig.data[2].x = times
                fig.data[2].y = y_flow_ins


        for t in it:
            milliseconds = time.time_ns() // 1_000_000
            t = (milliseconds - start_milliseconds) / 1000
            dt = t - last_t
            print(dt)
            last_t = t

            heat_in = flow_in * dt
            heat_out = flow_out * dt

            self._mass.update(heat_in, heat_out)

            temp = self._mass.current_temperature()
            flow_in = self._in_flow(t)
            flow_out = self._out_flow(t, temp)
            print('whats flow out? ', flow_out)
            trace.append((t, temp, flow_in, flow_out))

            # update_plot(
            #     [tr[0] for tr in trace],
            #     [tr[1] for tr in trace],
            #     [tr[2] for tr in trace],
            #     [tr[3] for tr in trace],
            # )
            # clear_output(wait=True)
            # display(fig)
            time.sleep(1)

        return trace


In [37]:
mass = ThermalMass(
    mass=6,  # 6 Liters of water weighs 6kg!
    cp=4.186,  # J/K/kg
    start_temp=25 + 273.15,  # Kelvin (25 + 273.15)
)
in_flow = lambda t: 100.0  # 1 kJ/s
out_flow = lambda t, s: 100.0  # 1 kJ/s
system = ThermalSystem(mass, in_flow, out_flow)

traces = system.simulate('equilibrium', range(0, 100))


KeyboardInterrupt: 

In [38]:
mass = ThermalMass(
    mass=6,  # 6 Liters of water weighs 6kg!
    cp=4.186,  # J/K/kg
    start_temp=25 + 273.15,  # Kelvin (25 + 273.15)
)
in_flow = lambda t: 200.0 # 1 kJ/s
out_flow = lambda t, s: 5.0 # 0.5 kJ/s
system = ThermalSystem(mass, in_flow, out_flow)

traces = system.simulate('runaway', range(0, 100))


KeyboardInterrupt: 

In [39]:
import random 

mass = ThermalMass(
    mass=6,  # 6 Liters of water weighs 6kg!
    cp=4.186,  # J/K/kg
    start_temp=25 + 273.15,  # Kelvin (25 + 273.15)
)
in_flow = lambda t: 100 * random.random() # 1 kJ/s
out_flow = lambda t, s: 100 * random.random()
system = ThermalSystem(mass, in_flow, out_flow)

traces = system.simulate('runaway', range(0, 100))

In [43]:
TARGET_TEMP = 30 + 273.15
START_TEMP=25 + 273.15

# Create our PID controller with some initial values for the gains.
controller = PIDController(
    kp=1.0, ki=0.2, kd=0.1, target=TARGET_TEMP, initial_state=START_TEMP, t_0=0
)


mass = ThermalMass(
    mass=6,  # 6 Liters of water weighs 6kg!
    cp=4181,  # J/K/kg
    start_temp=START_TEMP,  # Kelvin (25 + 273.15)
)

in_flow = lambda t: 100
out_flow = lambda t, s: controller.next(t, s) * 100
system = ThermalSystem(mass, in_flow, out_flow)

traces = system.simulate("pid_linear", range(0, 1000))


KeyboardInterrupt: 

In [26]:
import plotly.graph_objs as go
from plotly.subplots import make_subplots
from IPython.display import display, clear_output
import numpy as np
import plotly.offline as pyo
import time

# Create a subplot with Plotly
fig = make_subplots(rows=1, cols=1)
trace_heat = go.Scatter(x=[], y=[], mode='lines', name='Heat')
trace_flow_out = go.Scatter(x=[], y=[], mode='lines', name='Flow Out')
trace_flow_in = go.Scatter(x=[], y=[], mode='lines', name='Flow In')
fig.add_trace(trace_heat)
fig.add_trace(trace_flow_out)
fig.add_trace(trace_flow_in)

# Initialize empty data for each trace
x_data = []
y_heat = []
y_flow_out = []
y_flow_in = []

# Function to update the plot
def update_plot(frame):
    x_data.append(frame)  # Simulated new data point
    y_heat.append(np.sin(frame))  # Simulated data point for heat
    y_flow_out.append(np.cos(frame))  # Simulated data point for flow_out
    y_flow_in.append(np.sin(frame) + np.cos(frame))  # Simulated data point for flow_in
    
    # Update the Plotly traces
    with fig.batch_update():
        fig.data[0].x = x_data
        fig.data[0].y = y_heat
        fig.data[1].x = x_data
        fig.data[1].y = y_flow_out
        fig.data[2].x = x_data
        fig.data[2].y = y_flow_in

# Simulate data coming in continuously (replace this with your PID controller logic)
for frame in np.linspace(0, 8 * np.pi, 600):
    update_plot(frame)
    clear_output(wait=True)
    display(fig)
    time.sleep(0.1)

# To stop the animation, you can leave the plot open in the notebook or close it as needed.

# Save the plot as an HTML file for offline viewing
pyo.plot(fig, filename='live_plot.html', auto_open=False)

'live_plot.html'

In [28]:
!pip install --upgrade matplotlib ipywidgets


Collecting ipywidgets
  Downloading ipywidgets-8.1.1-py3-none-any.whl (139 kB)
[K     |████████████████████████████████| 139 kB 5.8 MB/s eta 0:00:01
[?25hCollecting widgetsnbextension~=4.0.9
  Downloading widgetsnbextension-4.0.9-py3-none-any.whl (2.3 MB)
[K     |████████████████████████████████| 2.3 MB 21.1 MB/s eta 0:00:01
Collecting jupyterlab-widgets~=3.0.9
  Downloading jupyterlab_widgets-3.0.9-py3-none-any.whl (214 kB)
[K     |████████████████████████████████| 214 kB 16.3 MB/s eta 0:00:01
Installing collected packages: widgetsnbextension, jupyterlab-widgets, ipywidgets
  Attempting uninstall: widgetsnbextension
    Found existing installation: widgetsnbextension 3.5.1
    Uninstalling widgetsnbextension-3.5.1:
      Successfully uninstalled widgetsnbextension-3.5.1
  Attempting uninstall: jupyterlab-widgets
    Found existing installation: jupyterlab-widgets 1.0.0
    Uninstalling jupyterlab-widgets-1.0.0:
      Successfully uninstalled jupyterlab-widgets-1.0.0
  Attempting u

In [29]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

HOST = "localhost"
PORT = 4223
THERMOCOUPLE_UID = 'Msr'
ANALOG_IN_UID = '27fh'
import time

from tinkerforge.ip_connection import IPConnection
from tinkerforge.bricklet_thermocouple_v2 import BrickletThermocoupleV2
from tinkerforge.bricklet_analog_in_v3 import BrickletAnalogInV3


import numpy as np
from tqdm import tqdm
import pandas as pd
import time


ipcon = IPConnection() # Create IP connection
tc = BrickletThermocoupleV2(THERMOCOUPLE_UID, ipcon) # Create device object    
ai = BrickletAnalogInV3(ANALOG_IN_UID, ipcon) # Create device object

ipcon.connect(HOST, PORT) # Connect to brickd

In [14]:
voltage_readings = []
temperature_readings = []

for i in range(10):
    for i in range(1000):
        voltage_readings.append(ai.get_voltage())
        temperature_readings.append(0.01 * tc.get_temperature())

    print(
        f"Caibration complete. Voltage {np.mean(voltage_readings)} V corresponds with temperature {np.mean(temperature_readings)} C"
    )


Caibration complete. Voltage 1650.165 V corresponds with temperature 23.59176 C
Caibration complete. Voltage 1650.214 V corresponds with temperature 23.583805 C
Caibration complete. Voltage 1650.208 V corresponds with temperature 23.590136666666663 C
Caibration complete. Voltage 1650.23975 V corresponds with temperature 23.59752 C
Caibration complete. Voltage 1650.2274 V corresponds with temperature 23.596974 C
Caibration complete. Voltage 1650.2486666666666 V corresponds with temperature 23.599033333333335 C
Caibration complete. Voltage 1650.2431428571429 V corresponds with temperature 23.602515714285712 C
Caibration complete. Voltage 1650.255625 V corresponds with temperature 23.604757499999998 C
Caibration complete. Voltage 1650.2647777777777 V corresponds with temperature 23.606599999999997 C
Caibration complete. Voltage 1650.2367 V corresponds with temperature 23.609935999999998 C


In [32]:
voltage_readings = []
temperature_readings = []

for i in range(10):
    for i in range(1000):
        voltage_readings.append(ai.get_voltage())
        temperature_readings.append(0.01 * tc.get_temperature())

    print(
        f"Caibration complete. Voltage {np.mean(voltage_readings)} V corresponds with temperature {np.mean(temperature_readings)} C"
    )


Caibration complete. Voltage 1718.193 V corresponds with temperature 26.400200000000005 C
Caibration complete. Voltage 1719.1925 V corresponds with temperature 26.421900000000004 C
Caibration complete. Voltage 1719.9303333333332 V corresponds with temperature 26.40536666666667 C
Caibration complete. Voltage 1720.1935 V corresponds with temperature 26.279870000000003 C
Caibration complete. Voltage 1720.3028 V corresponds with temperature 26.198278000000002 C
Caibration complete. Voltage 1720.5031666666666 V corresponds with temperature 26.14083166666667 C
Caibration complete. Voltage 1720.8237142857142 V corresponds with temperature 26.10933857142857 C
Caibration complete. Voltage 1721.254 V corresponds with temperature 26.086582500000006 C
Caibration complete. Voltage 1721.7253333333333 V corresponds with temperature 26.072297777777777 C
Caibration complete. Voltage 1722.2549 V corresponds with temperature 26.065116999999997 C


In [20]:
voltage_readings = []
temperature_readings = []

for i in range(30):
    for i in range(1000):
        voltage_readings.append(ai.get_voltage())
        temperature_readings.append(0.01 * tc.get_temperature())

    print(
        f"Caibration complete. Voltage {np.mean(voltage_readings)} V corresponds with temperature {np.mean(temperature_readings)} C"
    )


Caibration complete. Voltage 2101.901 V corresponds with temperature 33.04426 C
Caibration complete. Voltage 2102.021 V corresponds with temperature 33.05713000000001 C
Caibration complete. Voltage 2102.3953333333334 V corresponds with temperature 33.074126666666665 C
Caibration complete. Voltage 2102.9325 V corresponds with temperature 33.0826125 C
Caibration complete. Voltage 2103.5224 V corresponds with temperature 33.098312 C
Caibration complete. Voltage 2104.1648333333333 V corresponds with temperature 33.118903333333336 C
Caibration complete. Voltage 2104.815 V corresponds with temperature 33.13677 C
Caibration complete. Voltage 2105.47125 V corresponds with temperature 33.15013750000001 C
Caibration complete. Voltage 2106.1174444444446 V corresponds with temperature 33.15923333333334 C
Caibration complete. Voltage 2106.7473 V corresponds with temperature 33.16771800000001 C
Caibration complete. Voltage 2107.3675454545455 V corresponds with temperature 33.17431090909091 C
Caibrat

2116.6112 mV corresponds with temperature 33.190557 C
1650.23975 mV corresponds with temperature 23.59752 C


In [38]:
res = resistance_from_voltage(2.1166112)
print('resistance', res)
b_param_equation(res) # should be 33.19

resistance 118125.55024560013


33.19000010551764

In [37]:
res = resistance_from_voltage(1.6953891428571428)
print('resistance', res)
b_param_equation(res) # should be 25.080002

resistance 169553.91515003215


24.544196140894428

In [54]:
# Caibration complete. Voltage 1722.2549 V corresponds with temperature 26.065116999999997 C
res = resistance_from_voltage(1.7222549)
print('resistance', res)

b_param_equation(res) # should be 26.065116999999997

resistance 165522.71503887142


25.10461705490178

In [53]:
import numpy as np

def b_param_equation(
    R,  # Ohms
    B=3812.2763685,  # Kelvin, the datasheet says 3950 +- 1%
    T_23=23.59752 + 273.15,  # Kelvin
    R_23=176624.21076543574, # Ohms
):
    # R_23 & T_23 from measurement
    # B value from https://www.newegg.com/p/298-00KZ-002P5?item=9SIBPPKJYZ5208&source=region
    # for NTC thermistors https://en.wikipedia.org/wiki/Thermistor
    return (1 / ((1 / T_23) + (1 / B) * np.log(R / R_23))) - 273.15 # returned in Celsius


def resistance_from_voltage(
    V,  # Volts
    R_ref=88.87,  # kilo-Ohms
    V_in=4.93,  # Volts
):
    return 1000 * (((R_ref * V_in) / V) - R_ref) # returned in Ohms


resistance = resistance_from_voltage(1.651)
print(resistance)
temperature = b_param_equation(resistance)
print(temperature)

# R_0 = 1846.0461  # Convert voltage to resistance (ohms)
# T_0 = 22.189687 + 273.15  # Convert to Kelvin


176501.9563900666
23.61351476083786


In [44]:
df = pd.DataFrame(
    columns=["Voltage (Volts)", "Resistance (Ohms)", "Temperature (Celsius)"]
)
for milli_voltage in range(1000, 3000, 1):
    voltage = milli_voltage / 1000
    resistance = resistance_from_voltage(voltage)
    temperature = b_param_equation(resistance)
    df = df.append(
        {
            "Voltage (Volts)": voltage,
            "Resistance (Ohms)": resistance,
            "Temperature (Celsius)": temperature,
        },
        ignore_index=True,
    )


In [67]:
experiments_df

Unnamed: 0,Resistance (Ohms),Temperature (Celsius),color
0,169553.91515,25.080002,Red
1,165522.715039,26.065117,Red


In [73]:
import plotly.express as px
import matplotlib.pyplot as plt

# df['Distance (millimeters)'] = -df['Distance (millimeters)']
# df['Force (newtons))'] = -df['Force (newtons)']
fig = px.line(
    df,
    x="Resistance (Ohms)",
    y="Temperature (Celsius)",
    title=f"Resistance to Temperature",
)

# experiments_df = pd.DataFrame({
#     "Resistance (Ohms)": [169553.91515003215, 165522.71503887142],
#     "Temperature (Celsius)": [25.080002, 26.065116999999997],
#     "color": ['Red', 'Red']
# })

# scatter_trace = px.scatter(
#     experiments_df,
#     x="Resistance (Ohms)",
#     y="Temperature (Celsius)",
#     color='color',
# )

# Add the scatter trace to the existing figure
# fig.add_trace(scatter_trace)
fig.show()

