In [2]:
from os import listdir
from os.path import isfile, join
import pandas as pd
import numpy as np
import joblib

class DataHandler:
    def __init__(self):
        pass     

    def import_raw(self, volume, path):
        full_data = pd.read_csv(path)    
        raw_data = pd.DataFrame()
        list_of_features = ['Epoch Time', 'Z1 BlkTemp', 'Z1 HeatSink', 'Z1 Iset', 'Z1 Imea']
        raw_data = pd.concat([raw_data, full_data[list_of_features]], axis=1)    
        raw_data['Volume'] = volume
        return raw_data
    
    def rename_labels(self, raw_data):
        raw_data = raw_data.rename(columns={'Z1 BlkTemp': 'Block Temp',
                                            'Z1 HeatSink': 'Heat Sink Temp',
                                            'Z1 Iset': 'Iset',
                                            'Z1 Imea': 'Imeasure'
                                           })        
        return raw_data
    
    def replace_with_period(self, raw_data):
        # Calculate and use delta_t instead of epoch time
        next_epoch = raw_data['Epoch Time'][1::]
        next_epoch.index -= 1
        
        raw_data['Epoch Time'] = next_epoch - raw_data['Epoch Time']
        raw_data = raw_data.rename(columns={'Epoch Time': 'Period'})
        raw_data = raw_data.dropna()
        
        return raw_data
    
    def add_block_rate(self, raw_data):
        prev_temp = raw_data['Block Temp'][:-1]
        prev_temp.index += 1
        raw_data['Block Rate'] = (raw_data['Block Temp'] - prev_temp) / raw_data['Period']
        raw_data = raw_data.dropna()
        
        return raw_data
    
    def add_new_block_temp(self, raw_data):        
        new_block_temp = raw_data['Block Temp'][1::]
        new_block_temp.index -= 1
        raw_data['New Block Temp'] = new_block_temp
        
        raw_data = raw_data.dropna()
        
        return raw_data
    
    def reorder_labels(self, raw_data):
        raw_data = raw_data[['Volume',
                             'Period',
                             'Heat Sink Temp',
                             'Block Temp',
                             'Block Rate', 'Iset',
                             'Imeasure',
                             'New Block Temp'
                            ]]
        return raw_data

    def process_data(self, raw_data):      
        raw_data = self.rename_labels(raw_data)
        raw_data = self.replace_with_period(raw_data)
        raw_data = self.add_block_rate(raw_data)
        raw_data = self.add_new_block_temp(raw_data)
        raw_data = self.reorder_labels(raw_data)
        return raw_data
  



In [13]:
handler = DataHandler()
dataset = pd.DataFrame()

for volume in [5, 10, 30, 50]:
    dir_path = f"train/raw/{volume}ul/"
    file_list = [join(dir_path, f) for f in listdir(dir_path) if isfile(join(dir_path, f))]
    for path in file_list:
        new_data = handler.import_raw(volume=volume, path=path)
        new_data = handler.process_data(new_data)
        dataset = dataset.append(new_data)

dataset.to_csv('train/training_set.csv', index=False)

dataset = pd.DataFrame()

for volume in [10, 30, 50]:
    dir_path = f"test/raw/{volume}ul/"
    file_list = [join(dir_path, f) for f in listdir(dir_path) if isfile(join(dir_path, f))]
    for path in file_list:
        new_data = handler.import_raw(volume=volume, path=path)
        new_data = handler.process_data(new_data)
        dataset = dataset.append(new_data)

dataset.to_csv('test/testing_set.csv', index=False)


In [6]:
import pandas as pd
import joblib
from sklearn.tree import DecisionTreeRegressor

class MachineLearning:
    def __init__(self):
        self.accuracy_window = 0    
        
    def set_accuracy_window(self, value):
        self.accuracy_window = value
    
    def load_data(self, path):
        dataset = pd.read_csv(path)        
        condition = dataset.iloc[:, :-1].values        
        result = dataset.iloc[:, -1].values        
    
        return condition, result
    
    def train_model(self, path):
        train_condition, train_result = self.load_data(path)
        # Training the model
        model = DecisionTreeRegressor(random_state=0)
        model.fit(train_condition, train_result)

        return model
    
    def test_model(self, model, path):
        test_condition, test_result = self.load_data(path)
        test_prediction = model.predict(test_condition)
        window = 0.25
        total = len(test_prediction)
        correct = 0
        for i in range(0, total):
            if test_result[i] - self.accuracy_window <= test_prediction[i] <= test_result[i] + self.accuracy_window:
                correct += 1
        print(f"The accuracy of model is {(correct * 100 / total):.2f}")
    
    def save_model(self, model, path):
        # save the model to disk        
         joblib.dump(model, path)

    def load_model(self, path):
        # load the model from disk
        return joblib.load(path)

In [63]:
import math

class  PCR_Machine:
    def __init__(self, 
                 model, 
                 sample_volume, 
                 sample_temp, 
                 block_temp, 
                 heat_sink_temp, 
                 block_rate=0, 
                 sample_rate=0,
                 update_period=0.2
                ):
        self.model = model
        self._sample_volume = sample_volume
        self._sample_temp = sample_temp
        self._block_temp = block_temp
        self._heat_sink_temp = heat_sink_temp
        self._block_rate = block_rate
        self._sample_rate = sample_rate
        self._update_period = update_period                
        self.calculate_conversion_constant(self._sample_volume)
        
    def calculate_conversion_constant(self, volume):
        heat_coeffs = [1.9017543860, 0.0604385965, -0.0000643860, 0.0000002982]
        cool_coeffs = [2.6573099415, 0.0906608187, -0.0006599415, 0.0000020760]     
        no_coeffs = len(heat_coeffs)
        heat_const = 0
        cool_const = 0        
        for i in range(0, no_coeffs):
            heat_const += heat_coeffs[i] * volume**i
            cool_const += cool_coeffs[i] * volume**i
        self.heat_conv = 1 - math.exp(-self.update_period / heat_const)
        self.heat_conv = 1 - math.exp(-self.update_period / cool_const)

    @property
    def sample_volume(self):        
        return self._sample_volume
    
    @sample_volume.setter
    def sample_volume(self, value):        
        self._sample_volume = value  
        
    @property
    def sample_temp(self):        
        return self._sample_temp
    
    @sample_temp.setter
    def sample_temp(self, value):        
        self._sample_temp = value        
        
    @property
    def block_temp(self):        
        return self._block_temp
    
    @block_temp.setter
    def block_temp(self, value):        
        self._block_temp = value

    @property
    def heat_sink_temp(self):        
        return self._heat_sink_temp
    
    @heat_sink_temp.setter
    def heat_sink_temp(self, value):        
        self._heat_sink_temp = value        
        
    @property
    def block_rate(self):        
        return self._block_rate
    
    @block_rate.setter
    def block_rate(self, value):        
        self._block_rate = value
        
    @property
    def sample_rate(self):        
        return self._sample_rate
    
    @block_rate.setter
    def sample_rate(self, value):        
        self._sample_rate = value
        
    @property
    def update_period(self):        
        return self._update_period
    
    @update_period.setter
    def update_period(self, value):        
        self._update_period = value        
        
    def update_sample_params(self, new_block_temp):
        if new_block_temp > self._block_temp: # block is heating up
            conv_const = self.heat_conv
        else: # block is cooling down
            conv_const = self.cool_conv        
        new_sample_temp = self._sample_temp + conv_const * (new_block_temp - self._sample_temp)            
        self._sample_rate = (new_sample_temp - self._sample_temp) / self._update_period
        self._sample_temp = new_sample_temp
    
    
    def update_block_params(self, new_block_temp):
        self._block_rate = (new_block_temp - self._block_temp) / self._update_period
        self._block_temp = new_block_temp
    
    def update_heat_sink_params(self, delta_Tblock):        
        if delta_Tblock > 0: # block is heating up
            self._heat_sink_temp -= 0.01 * delta_Tblock
        else: # block is cooling down
            self._heat_sink_temp -= 0.05 * delta_Tblock
    
    def update(self, Iset, Imeasure):
        condition = [self._sample_volume,
                     self._update_period,
                     self._heat_sink_temp,
                     self._block_temp,
                     self._block_rate,
                     Iset,
                     Imeasure
                    ]
        new_block_temp = self.model.predict(condition)
        self.update_heat_sink_temp(new_block_temp - self._block_temp)
        self.update_sample_params(new_block_temp)        
        self.update_block_params(new_block_temp)
