In [1]:
from time import sleep
from gpiozero import DigitalOutputDevice
from datetime import datetime
from dataclasses import dataclass
from IPython.display import clear_output
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [8, 5]

In [2]:
try:
    from max31855 import MAX31855
except ImportError:
    print("Stubbing MAX31855 & Power")
    class MAX31855:
        stubbed = True
        def getTemperature(self):
            return 75
    class DigitalOutputDevice:
        stubbed = True
        def __init__(self, pin, initial_value):
            self.value = initial_value
        def on(self):
            self.value = True
        def off(self):
            self.value = False

In [3]:
class queue:
    def __init__(self, depth):
        self.depth = depth
        self.queue = []

    def __getitem__(self, key):
        try:
            return self.queue[key]
        except:
            return None

    def __call__(self, value):
        self.queue.insert(0, value)
        while len(self.queue) > self.depth:
            self.queue.pop()

In [4]:
class KilnDrone:
    def __init__(self):
        self.targetTemp = 0

        self.cycleCount = 0
        self.cycleTime = 0.1

        self.tempProbe = MAX31855()
    
        self.tempMemoryDepth = 60000
        self.tempMemory = queue(self.tempMemoryDepth)
        self.tempMemory(self.temperature)
        
        self.setPower = 0
        self.setPowerMemory = queue(60000)
        self.currentPower = 0
        self.powerWindow = 0.5

        self.power = DigitalOutputDevice(pin="GPIO17", initial_value=False)
        self.powerMemoryDepth = 60000
        self.powerOnMemory = queue(self.powerMemoryDepth)
        self.powerMemory = queue(self.powerMemoryDepth)
        
        self.restrictPowerChangeUntil = 0
        
        self.extendObservations(70)
    
    @property
    def filteredTemp(self):
        window = 10
        fT = []
        for i in range(0, len(self.tempMemory.queue) - window, window):
            selected = sorted(self.tempMemory.queue[i:i + window])[int(window / 2)]
            fT.extend([selected for i in range(window)])
        return fT[:self.tempMemoryDepth]
    
    @property
    def temperature(self):
        return self.tempProbe.getTemperature()
    
    def getCurrentPower(self):
        return (sum(self.powerOnMemory.queue) / len(self.powerOnMemory.queue)) * 100
    
    def startObservation(self):
        self.observationStart = self.cycleCount
    
    @dataclass
    class ObservationsCharacter:
        timestamp: str
        cycle: int
        temperature: float
        target: float
        power: float
        setPower: float
        haltPowerChange: int
        powerStep: float
        slope: float
        error: float
        percentError: float

    def characterizeObservations(self, start=None, stop=None):
        start = 0 if start is None else start
        stop = 1000 if stop is None else stop
        obs = self.filteredTemp[start:stop]
        delta = stop - start
        error = self.targetTemp - obs[0]
        percentError = (error / (self.targetTemp if self.targetTemp != 0 else 1/10000)) * 100
        slope = obs[0] - obs[-1]
        powerStep = min(max(error / 800, 0.1), 3)
        char = self.ObservationsCharacter(**{
            "timestamp": str(datetime.utcnow()),
            "cycle": self.cycleCount,
            "temperature": obs[0],
            "target": self.targetTemp,
            "power": self.getCurrentPower(),
            "setPower": self.setPower,
            "haltPowerChange": self.restrictPowerChangeUntil,
            "powerStep": powerStep,
            "slope": slope,
            "error": error,
            "percentError": percentError
        })
        return char
    
    def tune(self):
        if self.targetTemp < 80:
            print("Kiln Disabled")
            self.setPower = 0
            return
        elif self.power.value:
            return

        try:
            obChar = self.characterizeObservations(stop=30)
            prevObChar = self.characterizeObservations(start=30, stop=60)
        except IndexError:
            print("Insufficient observations")
            raise
            return
        print(f"Tuning: {obChar}")

        accel = (obChar.slope + prevObChar.slope) / 2
        foresight = obChar.temperature + 3 * (obChar.slope + accel)
        
        if obChar.percentError < 0.01:
            print("Cutting Power")
            self.setPower = 0
        elif foresight > self.targetTemp:
            print("Preemptively Cutting Power")
            self.setPower = 0
        elif obChar.percentError > 0 and accel <= 0.01:
            if self.cycleCount > self.restrictPowerChangeUntil and prevObChar.slope <= 0:
                print("Add Energy")
                self.setPower = max(obChar.power + obChar.powerStep, self.setPower + obChar.powerStep)
                self.restrictPowerChangeUntil = self.cycleCount + 50

        self.setPower = max(min(self.setPower, 50), 0)
        print(f"Accel: {accel:7.4}. Foresight: {foresight:7.4}. Tuned power to {self.setPower}")

    def observe(self):
        self.setPowerMemory(self.setPower)
        self.powerMemory(self.getCurrentPower())
        self.powerOnMemory(self.power.value)
        self.tempMemory(self.temperature)
    
    def extendObservations(self, n):
        self.setPowerMemory.queue = [self.setPower for i in range(n)] + self.setPowerMemory.queue
        self.powerOnMemory.queue = [self.power.value for i in range(n)] + self.powerOnMemory.queue
        
        pMValue = self.getCurrentPower()
        self.powerMemory.queue = [pMValue for i in range(n)] + self.powerMemory.queue
        
        currentTemp = self.temperature
        self.tempMemory.queue = [currentTemp for i in range(n)] + self.tempMemory.queue
    
    def render(self, cyclesAgo=6000, renderPowerOn=True):
        startTime = datetime.utcnow()
        if self.power.value:
            print("Can't render while powering")
            return
        
        print(f"Summary:\nMaxTemp: {max(self.filteredTemp)} | MinTemp: {min(self.filteredTemp)}")

        fig, ax = plt.subplots()
        ax.set_xlabel("Cycles Ago")
        ax2 = ax.twinx()
        ax.set_ylabel("Temperature (*F)")

        ln1 = ax.plot(self.filteredTemp[:cyclesAgo], color="green", label="Temperature", zorder=8)
        ln2 = ax.plot([self.targetTemp for i in self.tempMemory.queue[:cyclesAgo]], color="blue", label="Target Temperature", zorder=7)
        
        setPoint = self.setPowerMemory.queue[:cyclesAgo]
        if renderPowerOn:
            powerOn = self.powerOnMemory.queue[:cyclesAgo]
            powerOn = [i * sp for i, sp in zip(powerOn, setPoint)]
            ln3 = ax2.plot(powerOn, color="red", label="Power Impulse", zorder=6)
    
        ln4 = ax2.plot(setPoint, color="darkred", label="Set Power", zorder=5)
        ln5 = ax2.plot(self.powerMemory.queue[:cyclesAgo], color="turquoise", label="Power", zorder=4)
    
        ax2.set_ylabel("Power Duty Cycle")
        lines = ln1 + ln2 + ln4 + ln5
        if renderPowerOn:
            lines = lines + ln3
        labels = [l.get_label() for l in lines]
        ax2.legend(lines, labels, loc="upper right").set_zorder(100)
        plt.title("Temperature and Power over Time")
        ax.set_zorder(ax2.get_zorder() + 1)
        ax.patch.set_visible(False)
        plt.show()
        renderTime = datetime.utcnow() - startTime
        self.extendObservations(int(renderTime.total_seconds() / self.cycleTime))

    def processPower(self):
        if self.power.value:
            self.power.off()
        else:
            if self.getCurrentPower() < min(self.setPower, 50):
                self.power.on()
            else:
                self.power.off()
    
    def cycle(self):
        sleep(self.cycleTime)
        self.cycleCount += 1
        self.observe()
        self.processPower()
        self.tune()
    
    def setTargetTemperature(self, targetTemperature):
        self.targetTemp = min(int(targetTemperature), 2300)
        self.restrictPowerChangeUntil = 0
    
    def __repr__(self):
        return f"\n{self.characterizeObservations()}"

In [5]:
k = KilnDrone()

In [6]:
try:
    k.setTargetTemperature(1000)
    renderWindow = 35
    nextRender = renderWindow
    clearWindow = 3000
    nextClear = clearWindow
    k.restrictPowerChangeUntil = 0
    k.cycle()
    k.tune()
    while True:
        k.cycle()
        if k.cycleCount > nextClear:
            nextClear = k.cycleCount + clearWindow - (k.cycleCount % clearWindow)
            clear_output(wait=False)
            k.render(cyclesAgo=60000)
        elif k.cycleCount > nextRender:
            nextRender = k.cycleCount + renderWindow - (k.cycleCount % renderWindow)
            k.render()
finally:
    k.power.off()

Tuning: KilnDrone.ObservationsCharacter(timestamp='2021-09-22 02:39:55.663370', cycle=1, temperature=435.2, target=1000, power=0.0, setPower=0, haltPowerChange=0, powerStep=0.706, slope=0.0, error=564.8, percentError=56.48)
Add Energy
Accel:     0.0. Foresight:   435.2. Tuned power to 0.706
Tuning: KilnDrone.ObservationsCharacter(timestamp='2021-09-22 02:39:55.671526', cycle=1, temperature=435.2, target=1000, power=0.0, setPower=0.706, haltPowerChange=51, powerStep=0.706, slope=0.0, error=564.8, percentError=56.48)
Accel:     0.0. Foresight:   435.2. Tuned power to 0.706
Tuning: KilnDrone.ObservationsCharacter(timestamp='2021-09-22 02:39:56.442949', cycle=3, temperature=435.2, target=1000, power=1.36986301369863, setPower=0.706, haltPowerChange=51, powerStep=0.706, slope=0.0, error=564.8, percentError=56.48)
Accel:     0.0. Foresight:   435.2. Tuned power to 0.706
Tuning: KilnDrone.ObservationsCharacter(timestamp='2021-09-22 02:39:56.818622', cycle=4, temperature=435.2, target=1000, po

KeyboardInterrupt: 

In [7]:
k.power.off()

In [None]:
k.observe()
k.render(cyclesAgo=60000)