## Import used libraries :

In [1]:
# Data Preprocessing :
from datetime import datetime
import pandas as pd
import numpy as np
from numpy import array

# Data Visualisation:
import matplotlib.pyplot as plt
import seaborn as sb

# Model Building, Training, and Testing:
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeRegressor
from sklearn.ensemble import RandomForestRegressor
from xgboost import XGBRegressor
from sklearn.svm import SVR

# Neural Networks :
from keras.callbacks import ModelCheckpoint, TensorBoard
from keras.models import Sequential, Model
from keras.layers import Dense, Activation, Flatten, LSTM, BatchNormalization
from keras.layers import Dropout, Conv1D, MaxPooling1D, Input, GRU

from keras.models import load_model
from keras.optimizers import Adam


# Utils:
from IPython.display import display, HTML
# from collections import deque  
import itertools
import warnings
import copy
import time
import os
from tqdm.notebook import tqdm

warnings.filterwarnings('ignore')
pd.set_option('display.max_rows', None)

Using TensorFlow backend.


## GRU Model:
The next function is used to build a GRU neural network. 

In [2]:
def GRU_Model(GRU_layers, dense_layers, input_shape):
    # GRU Neural Network:
    GRU_Model = Sequential()
    # Input layer:
    GRU_Model.add(GRU(GRU_layers[0], activation='relu', return_sequences=True, input_shape=input_shape))
    
    # GRU Layers: 
    if len(GRU_layers)>1:
        for i, n_nodes in enumerate(GRU_layers[1:]):
            if len(GRU_layers[1:]) - i >= 2 :
                GRU_Model.add(GRU(n_nodes, activation='relu', return_sequences=True))
            else: 
                GRU_Model.add(GRU(n_nodes, activation='relu'))
    
    # Dense Layers:
    for d in dense_layers:
        GRU_Model.add(Dense(d, activation='relu'))
    
    # Output Layer :
    GRU_Model.add(Dense(1))
    GRU_Model.compile(optimizer='adam', loss='mae')
    
    return GRU_Model

## LSTM Model:
The next function is used to build an LSTM neural network.

In [3]:
def LSTM_Model(LSTM_layers, dense_layers, input_shape):
    # LSTM Neural Network:
    LSTM_Model = Sequential()
    # Input layer:
    LSTM_Model.add(LSTM(LSTM_layers[0], activation='relu', return_sequences=True, input_shape=input_shape))
    
    # LSTM Layers: 
    if len(LSTM_layers)>1:
        for i, n_nodes in enumerate(LSTM_layers[1:]):
            if len(LSTM_layers[1:]) - i >= 2 :
                LSTM_Model.add(LSTM(n_nodes, activation='relu', return_sequences=True))
            else: 
                LSTM_Model.add(LSTM(n_nodes, activation='relu'))
    
    # Dense Layers:
    for d in dense_layers:
        LSTM_Model.add(Dense(d, activation='relu'))
    
    # Output Layer :
    LSTM_Model.add(Dense(1))
    LSTM_Model.compile(optimizer='adam', loss='mae')
    
    return LSTM_Model

## CNN Model:
The next function is used to build a convolutional neural network. 

In [4]:
def conv_block(inp, filters=64, bn=True, kernel_size=2,
               pool=True, dropout = 0.2):
    _ = Conv1D(filters=filters, kernel_size=kernel_size, activation='relu')(inp)
    if bn:
        _ = BatchNormalization()(_)
    if pool:
        _ = MaxPooling1D(pool_size=2)(_)
    if dropout > 0:
        _ = Dropout(0.2)(_)
    return _

def CNN_Model(input_shape, conv_layers, dense_layers):
    input_layer = Input(shape = input_shape)
    _ = conv_block(input_layer, filters=conv_layers[0], bn=False, pool=False)
    if len(conv_layers)>1:
        for c in conv_layers[1:]:
            _ = conv_block(_, filters=c)

    _  = Flatten()(_)

    # for Action1 calculation
    for d in dense_layers:
        _ = Dense(units=d, activation='relu')(_)

    output = Dense(units=1, name='output')(_)

    model = Model(inputs=input_layer, outputs=[output])
    model.compile(optimizer=Adam(lr=0.001),
                  loss={'output': 'mae'},
                  metrics={'output': 'mae'})

    return model

## Batch Class:
The next class is used to hold a data batch and preprocess it.

In [88]:
class Batch:
    def load_data(self):
        self.df = pd.read_csv(self.data_path)

    def __init__(self, name, df = None, data_path = None, test_size = 0.25, n_steps = 4, verbose = False):
        self.name = name
        self.df = df
        self.data_path = data_path
        self.test_size = test_size
        self.n_steps   = n_steps
        self.verbose = verbose
        if data_path and not df:
            self.load_data()
    
    
    def darw_graph(self, df, y_list, x_label ,y_label, n_shape, name, reshape  = False, w=15, h=5):
        if(reshape):
            plt.figure(figsize=(w, h))
        ax = plt.gca()
        for y in y_list:
            df.plot(kind='line',y=y,ax=ax)
        plt.xlabel(f"{x_label}\n\nFigure({n_shape})\n{name}")
        plt.ylabel(y_label)
        plt.show()
        if n_shape : n_shape += 1
        else : n_shape = 1
        return n_shape

    # split a univariate sequence into samples:
    def split_sequence(self, sequence, n_steps):
        X, y = list(), list()
        for i in range(len(sequence)):
            # find the end of this pattern
            end_ix = i + n_steps
            # check if we are beyond the sequence
            if end_ix > len(sequence)-1:
                break
            # gather input and output parts of the pattern
            seq_x, seq_y = sequence[i:end_ix], sequence[end_ix]
            X.append(seq_x)
            y.append(seq_y)
        return array(X), array(y)


    # Process the Data :
    def process_data(self, data_type = "all"):
        # Merge the data of the two tanks:
        self.df["Tank level"] = self.df["T1"] + self.df["T2"]
        # Convert Date Time column from string to DateTime object :
        self.df["Date Time"] = self.df["Date Time"].apply(lambda x: datetime.strptime(x, '%m/%d/%Y %H:%M'))
        # Create columns for (Year, Month, WeekDay, Day, DayQ, Hour ..etc):
        self.df['year'] = self.df['Date Time'].apply(lambda x: x.year)
        self.df['month'] = self.df['Date Time'].apply(lambda x: x.month)
        self.df['day'] = self.df['Date Time'].apply(lambda x: x.day)

        self.df['weekday'] = self.df['Date Time'].apply(lambda x: x.weekday()) # Mon=0, Sun=1, .. Thur=5, Fri = 6
        self.df['weekend'] = self.df['weekday'].apply(lambda x: 1 if x in [5,6] else 0) # = 1 if the day is a weekend day (Thursday, Friday)


        self.df['hour'] = self.df['Date Time'].apply(lambda x: x.hour)
        self.df['dayQ'] = self.df['hour'].apply(lambda x: x//6 +1)
        
        # Create new features (Refilled, Refilled Amount, Consumption):
        self.df["refilled"] = 0
        self.df["refilled_Amount"] = 0.0
        self.df["consumption"] = 0.0

        for i in range(1,len(self.df["refilled"]),1):
            # Compute consumption in leters:
            self.df["consumption"][i] = self.df["Tank level"][i-1] - self.df["Tank level"][i] 
            # Correct some outliers ( 0 > x >= -5  --> x=0):
            if((self.df["consumption"][i] < 0) and (self.df["consumption"][i] >= -5)): self.df["consumption"][i] = 0

            # Knowing what readings have anomalies or readings where the tank was filled before geting the reading :
            if(self.df["Tank level"][i] - self.df["Tank level"][i-1] > 5 ):
                self.df["refilled"][i]        =  1
                self.df["refilled_Amount"][i] =  self.df["Tank level"][i] - self.df["Tank level"][i-1]

        # Get some information about outliers :
        i1 = self.df.loc[ ((self.df["refilled_Amount"] < 20) & (self.df["refilled_Amount"] > 0))].shape[0]
        i2 = self.df.loc[ self.df["refilled_Amount"] >= 20].shape[0]
        i3 = self.df.loc[ self.df["consumption"] < 0].shape[0]

        if self.verbose:
            print(f"Number of samples with reading errors : {i1} ")
            print(f"Number of samples that were taken after refilling the tank : {i2} ")
            print(f"Total number of outliers : {i3} ")
        
        # Delete outliers and reset the dataset without the outliers :
        if self.verbose:
            print(f"Number of samples before deleting the outliers : {self.df.shape[0]}")
            '''Include only non-outlier samples (Exclude outliers) :
               In our data, an outlier is a reading where the consumption value is negative,
               so we will only include samples that have consumption value greater or equal to zero.'''
        self.df = self.df.loc[ self.df["consumption"] >= 0].reset_index(drop=True)
        if self.verbose: print(f"Number of samples after deleting the outliers : {self.df.shape[0]}")
        
        # Shift Consumption column one step to make it predict the future consumption :
        self.df['consumption'] = self.df['consumption'].shift(-1)
        self.df.dropna(inplace = True)

        # Select Columns of Concern:
        data_columns = ["month", "weekday", "weekend", "day", "dayQ", "hour"]
        target_column = ["consumption"]

        if data_type == "normal" or data_type == "all" :
            # Splitting Dataset to Training data and Test data:
            self.train_X, self.test_X, self.train_y, self.test_y = train_test_split(self.df[data_columns].values,
                                                  self.df[target_column].values,
                                                  test_size = self.test_size , random_state = 14,
                                                  shuffle = True)
        elif data_type == "time_series" or data_type == "all":
            # define input sequence
            raw_seq = self.df["consumption"].values
            # choose a number of time steps
            n_steps = self.n_steps
            # split into samples
            X, y = self.split_sequence(raw_seq, n_steps)
            # reshape from [samples, timesteps] into [samples, timesteps, features]
            n_features = 1
            X = X.reshape((X.shape[0], X.shape[1], n_features))

            # Splitting Dataset to Training data and Test data:
            self.ts_train_X, self.ts_test_X, self.ts_train_y, self.ts_test_y = train_test_split(X, y,
                                                                    test_size = self.test_size ,
                                                                    random_state = 14, shuffle = True)



    def visualize_data(self):
        # Visualizing Tank level and Diesel consumption over time:
        # Tank level and consumption values over time before cleaning the data:
        n_shape = self.darw_graph(df=self.df, y_list = ['consumption', 'Tank level'],
                    x_label = "Time" ,y_label = "Consumption - Tank Level",
                    n_shape = n_shape,
                    name = "Tank level and consumption values over time before cleaning the data")

## GridSearch Class:
The next class is used to perform a grid search on several models and choose the best of them.

In [6]:
class GridSearch:
    def __init__(self, model_name, kwargs, train_X, train_y, test_X, test_y,
                 n_best_models = 3, metric = "mean_absolute_error", PATH = "",
                 ITS = None):
        # model_name has to be the name used to call the model from its package
        self.model_name = model_name
        # Number of best models to choose:
        self.n_best_models = n_best_models
        # A list that holds models sorted by their performance
        self.models_sorted_list = [] 
        # Should have the same names of given model's arguments, 
        self.kwargs = kwargs
        self.results = pd.DataFrame(columns = list(self.kwargs.keys()))
        # metric has to be the name used to call the metric function from its package
        # Ex : to use MAE from scikit-learn metric should be "mean_absolute_error"
        self.metric = metric
        self.train_X = train_X
        self.train_y = train_y
        self.test_X = test_X
        self.test_y = test_y
        # ITS : Is Time Series (a dictionary that has some used values if data is time series data)
        self.ITS = ITS 
        self.model_info = {}
        self.PATH = PATH
    
    def make_models(self):
        '''Generates the models to be tested'''
        # This list stores the generated models
        models_list = []
        # 
        grid_search_string = ""
        j = 0
        # Generate the for loops :
        for i,(k,v) in enumerate(self.kwargs.items()):
            j=i+1
            grid_search_string += " "*i*4 + f"for arg{i} in self.kwargs['{k}']:\n"
        temp = "".join([f"{k} = arg{i}, " for i,(k,_) in enumerate(self.kwargs.items())])
        grid_search_string += " "*j*4 + f'models_list.append(eval("{self.model_name}({temp})"))'+"\n"
        temp = "".join(["self.model_info[models_list[-1]]={"]+[f"'{k}' : arg{i}, " for i,(k,_) in enumerate(self.kwargs.items())]+["}"])
        grid_search_string += " "*j*4 + f'exec("{temp}")'
        print(grid_search_string)
        exec(grid_search_string)
        self.models_list = models_list

    def search(self):
        for i,model in tqdm(enumerate(self.models_list),desc= "Training ", unit = "Model"):
            start = time.time()
            if not self.ITS : model.fit(self.train_X, self.train_y)
            else : model.fit(self.train_X, self.train_y, epochs = self.ITS["epochs"], batch_size = self.ITS["batch_size"],
                            verbose = 0, validation_split = 0.3, callbacks=self.ITS["callbacks"]) 
            end = time.time()
            training_time = round(end-start,3)
            # print(training_time)

            # Use Trained model to predict test data :
            model_Pred    = model.predict(self.test_X)
            if self.ITS : model_Pred    = model.predict(self.test_X).flatten()

            # Calculate Goodness:
            exec(f"{self.metric}_res = {self.metric}(self.test_y, model_Pred)")
            self.models_sorted_list.append((model, eval(f"{self.metric}_res")))
            # print(eval(f"{self.metric}_res"))
            # Update Results DataFrames:
            temp = ""
            for k,_ in self.kwargs.items():
                k1 = k
                t1 = eval(f"self.model_info[model]['{k1}']")
                if str(t1).isnumeric():temp += f"'{k}' : {t1}, "
                else: temp += f"'{k}' : '{t1}', "
            temp += f" 'Training Time' : {training_time}, '{self.metric}_res': {eval(f'{self.metric}_res')}" 
            
            
            # print(temp)
            exec("self.results = self.results.append({"+temp+"}, ignore_index=True)")
            self.results = self.results.sort_values(by = f"{self.metric}_res")
            self.results.to_csv(f"{self.PATH}{self.model_name}_Results.csv")
        # Sort the list ascendingly (from min to max):
        self.models_sorted_list = [(model, res) for model, res in sorted(self.models_sorted_list, key=lambda item: item[1])]
        self.models_sorted_list = self.models_sorted_list[:self.n_best_models]

## WeightedAverage Class:
The next class is used to make a fusion of several models and find the best combinatioon of models that would make the best results.

In [7]:
class WeightedAverage:
    def __init__(self, models_dic, is_time_series):
        # models_dic : A list of dictionaries that holds models and info about them
        # models_dic = [{"name":"Model1", "model":modelObject},{"name":"Model2", "model":modelObject2}, ... ]
        self.models_dic = models_dic
        self.is_time_series = is_time_series
        
    def calculate_metric(self, test_X, test_y):
        for model in self.models_dic:
            if self.is_time_series : prediction = model["model"].predict(test_X).flatten()
            else : prediction = model["model"].predict(test_X)
            mae = mean_absolute_error(test_y, prediction)
            model['MAE'] = mae


    def get_weights(self, comb):
        total_mae = sum([self.models_dic[model_idx]["MAE"] for model_idx in comb])
        weights = {}
        for model_idx in comb:
            weights[model_idx]= self.models_dic[model_idx]["MAE"]/total_mae
        return weights

    def find_best_combination(self, data_X, data_y):
        # Generate the combinations :
        self.combs = []
        for n in range(1,len(self.models_dic)+1,1):
            self.combs += list(itertools.combinations(range(len(self.models_dic)), n))
            
        self.combs_res = self.predict(data_X = data_X, data_y = data_y, return_metric=True)
        
        # Get the best combination depending on the best result:
        self.best_combination = self.combs[np.argmin(self.combs_res)]
        self.best_models_list = [self.models_dic[i]['model'] for i in self.best_combination]
    
    def predict(self, data_X, comb=None, data_y=None, return_metric=False):
        # prediction = np.zeros(shape = (data_X.shape[0],))
        if return_metric :
            combs_res = []
            self.comb_to_res = {}
            num_combs = len(self.combs)
            for comb in tqdm(self.combs, desc = "Progress ", unit = "Combination"):
#                 print(f"predict - combs = {self.combs}, comb = {comb}")
                weights = self.get_weights(comb = comb)
                prediction = np.zeros(shape = (data_X.shape[0],))
                for model_idx in comb :
                    if self.is_time_series : 
                        prediction += self.models_dic[model_idx]["model"].predict(data_X).flatten()*weights[model_idx]
                    else : 
                        prediction += self.models_dic[model_idx]["model"].predict(data_X)*weights[model_idx]
                metric_val = mean_absolute_error(data_y, prediction)
                combs_res.append(metric_val)
                self.comb_to_res[comb] = metric_val
            return combs_res
        else:
            weights = self.get_weights(comb = comb)
            prediction = np.zeros(shape = (data_X.shape[0],))
            for model_idx in comb :
                if self.is_time_series: 
                    prediction += self.models_dic[model_idx]["model"].predict(data_X).flatten()*weights[model_idx]
                else:
                    prediction += self.models_dic[model_idx]["model"].predict(data_X)*weights[model_idx]
            return prediction

A Function that tests an input model to measure its performance:

In [8]:
def test_model(model, test_X, test_y, metric = "mean_absolute_error", is_time_series = False):
    prediction = model.predict(test_X)
    if is_time_series: prediction = prediction.flatten()
    metric_val = eval(f"{metric}(test_y, prediction)")
    return metric_val

A function that tests data batches and find the best batch that if used to train a model will enable the model to get the highest possible score.

In [76]:
def testBatches(data_batch_list, model, data_type = "normal", use_Combs = True,
                test_on_self = False, return_batches = False, return_models = False,
                test_size = 0.3, is_time_series = False):
    if is_time_series:
        checkpoint_name = './Weights/Weights-{epoch:03d}--{val_loss:.5f}.hdf5' 
        checkpoint = ModelCheckpoint(checkpoint_name, monitor='val_loss', verbose = 0, save_best_only = True, mode ='auto')
        callbacks_list = [checkpoint]
    
    if not test_on_self:
        # Prepare Test Data (the last batch in the list of batches ):
        # Create a Batch object for the test_data_batch:
        batch_test = Batch(name = f"Batch - Test", df = data_batch_list[-1].copy(), test_size = 0.98)
        batch_test.process_data(data_type = data_type)
        if not is_time_series: test_X = batch_test.test_X
        else : test_X = batch_test.ts_test_X
        if not is_time_series: test_y = batch_test.test_y
        else :test_y = batch_test.ts_test_y
    
    if return_batches : comb2batch = {}
    if return_models : comb2model = {}

    # List of combinations
    combs = []
    # Generate the combinations:
    if use_Combs:
        for n in range(1,len(data_batch_list),1):
            combs += list(itertools.combinations(range(len(data_batch_list)-1), n))
    else : 
        combs = [tuple([i]) for i in range(len(data_batch_list)-1)] 
    results_dic = {} 
    # For every Combination ..
    for c,comb in enumerate(tqdm(combs)) :
        # Make a compy of the model :
        temp_model = copy.deepcopy(model)
        # Merge all the batches of the current combination :
        temp_data = None
        temp_data = data_batch_list[comb[0]].copy()
        if len(comb)>1:
            for i in range(1,len(comb),1):
                temp_data = temp_data.append(data_batch_list[comb[i]].copy(), ignore_index = True)
            
        # Create a Batch object:
        batch_name = f"Batch - {comb}"
        batch = Batch(name = batch_name, df = temp_data, test_size = test_size)
        # Process The Data :
        batch.process_data(data_type = data_type)
        # Fit the model the processed data :
        if not is_time_series : 
            temp_model.fit(batch.train_X, batch.train_y)
        else : 
            model.fit(batch.ts_train_X, batch.ts_train_y, epochs = 512, batch_size = 32,
                      verbose = 0, validation_split = 0.3, callbacks=callbacks_list)
        # Measure the goodness of the model :
        if test_on_self:
            if not is_time_series:
                result = test_model(model = temp_model, test_X = batch.test_X, test_y = batch.test_y,
                                    metric = "mean_absolute_error", is_time_series = is_time_series)
            else :
                result = test_model(model = temp_model, test_X = batch.ts_test_X, test_y = batch.ts_test_y,
                                    metric = "mean_absolute_error", is_time_series = is_time_series)
        else : 
            if not is_time_series:
                result = test_model(model = temp_model, test_X = test_X, test_y = test_y,
                                    metric = "mean_absolute_error", is_time_series = is_time_series)
            else:
                result = test_model(model = temp_model, test_X = test_X, test_y = test_y,
                                    metric = "mean_absolute_error", is_time_series = is_time_series)
        # Add the batch name and its result to the results dictionary:
        results_dic[batch_name] = result
        if return_batches : comb2batch [batch_name] = copy.deepcopy(batch)
        if return_models : comb2model [batch_name] = (copy.deepcopy(temp_model),result)
#         print(f"Done With Comb {c+1} / {len(combs)}")
    
    # Convert the dictionary into a DataFrame :
    results_df = pd.DataFrame(results_dic.items(), columns=['Batch Name', 'Result'])
    results_df = results_df.sort_values(by = "Result")
    
    if return_batches : comb2batch [(-1,)] = copy.deepcopy(batch_test)
    
    # To Do : return the trained models
    if return_batches and return_models :
        return results_df, comb2batch, comb2model
    elif return_batches and not return_models :
        return results_df, comb2batch
    elif not return_batches and return_models :
        return results_df, comb2model
    else:
        return results_df

# Define Data Batches:

In [77]:
# Load the dataset:
df = pd.read_csv("Data/data.csv")

In [78]:
n_samples = df.shape[0] # Total number of samples
# n_batch = (n_samples//1000)
n_batch = 5
SPB = n_samples//n_batch # samples_per_batch
batch_list = []
for i in range(n_batch):
    batch_list.append(df[i*SPB : i*SPB + SPB].reset_index(drop=True))
# batch_list.append(None) # Uncomment when using test_on_self=True

## Define Used Models:

In [79]:
model = SVR(gamma = "scale", kernel = "rbf", degree = 2)

## Data Batch Grid Search:
Finding the best combination of data batches.

In [80]:
results_df, comb2batch, comb2model = testBatches(data_batch_list = batch_list,
                                        model = model, data_type = "normal",
                                        use_Combs = True, return_batches = True,
                                        return_models = True, test_on_self = False)

HBox(children=(FloatProgress(value=0.0, max=15.0), HTML(value='')))




In [81]:
best_comb_name = results_df.reset_index()["Batch Name"][0]
best_batch = comb2batch[best_comb_name]
test_batch = comb2batch[(-1,)]
n_best_models = 5
best_models = [model for _, (model, _) in sorted(comb2model.items(), key=lambda item: item[1][1])]
best_models = best_models[:min(n_best_models, len(best_models))]

## Using the class WeightedAverage: 

In [83]:
# Get the best models from the grid search:
models_dic = [{"name":f"m{i}", "model":m} for i,m in enumerate(best_models)]

In [84]:
wa = WeightedAverage(models_dic,is_time_series=False)

In [85]:
wa.calculate_metric(test_batch.test_X, test_batch.test_y)

In [86]:
start = time.time()
wa.find_best_combination(data_X = test_batch.test_X, data_y = test_batch.test_y)
end = time.time()
print(f"Time = {int(end-start)//60 } min")
duration = 1  # seconds
freq = 350  # Hz
_ = os.system('play -nq -t alsa synth {} sine {}'.format(duration, freq))

HBox(children=(FloatProgress(value=0.0, description='Progress ', max=31.0, style=ProgressStyle(description_wid…


Time = 0 min


In [87]:
print(f"Best Combination is ({wa.best_combination}) with score of ({wa.comb_to_res[wa.best_combination]})")

Best Combination is ((0,)) with score of (13.335218961648316)


# Define the Time Series Model: 

In [89]:
n_steps = 4
CNN_Model = CNN_Model(input_shape = (n_steps,1), conv_layers = [128,128], dense_layers = [64])

## Data Batch Grid Search:
Finding the best combination of data batches.

In [90]:
results_df, comb2batch, comb2model = testBatches(data_batch_list = batch_list,
                                        model = CNN_Model, data_type = "time_series",
                                        use_Combs = True, return_batches = True,
                                        return_models = True, test_on_self = False,
                                        is_time_series = True)

HBox(children=(FloatProgress(value=0.0, max=15.0), HTML(value='')))




In [92]:
best_comb_name = results_df.reset_index()["Batch Name"][0]
best_batch = comb2batch[best_comb_name]
test_batch = comb2batch[(-1,)]
n_best_models = 5
best_models = [model for _, (model, _) in sorted(comb2model.items(), key=lambda item: item[1][1])]
best_models = best_models[:min(n_best_models, len(best_models))]

## Using the class WeightedAverage: 

In [93]:
# Get the best models from the grid search:
models_dic = [{"name":f"m{i}", "model":m} for i,m in enumerate(best_models)]

In [94]:
wa = WeightedAverage(models_dic,is_time_series=True)

In [95]:
wa.calculate_metric(test_batch.ts_test_X, test_batch.ts_test_y)

In [96]:
start = time.time()
wa.find_best_combination(data_X = test_batch.ts_test_X, data_y = test_batch.ts_test_y)
end = time.time()
print(f"Time = {int(end-start)//60 } min")
duration = 1  # seconds
freq = 350  # Hz
_ = os.system('play -nq -t alsa synth {} sine {}'.format(duration, freq))

HBox(children=(FloatProgress(value=0.0, description='Progress ', max=31.0, style=ProgressStyle(description_wid…


Time = 0 min


In [97]:
print(f"Best Combination is ({wa.best_combination}) with score of ({wa.comb_to_res[wa.best_combination]})")

Best Combination is ((0, 1, 2)) with score of (13.117593238450981)


In [11]:
##############################################################################