In [4]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math
from tqdm import tqdm
from time import time
from sklearn import linear_model
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error

err1 = lambda x, y: np.sqrt(mean_absolute_error(x,y))
err2 = lambda x, y: np.sqrt(mean_squared_error(x,y))

Error1 = lambda x, y: err1(x,y)/err1(np.zeros(y.shape), y)
Error2 = lambda x, y: err2(x,y)/err2(np.zeros(y.shape), y)

In [3]:
class Experiment():

    def __init__(self, t_f, x_f, t_l = None, x_l = None, algorithm = linear_model.LinearRegression()):

        self.t_f = t_f # Position of time (or index axis) of the Model
        self.x_f = x_f # Position of space (or column axis) of the Model
        self.t_l = t_l # Position of time (or index axis) of the Labels (E.g. solution for SPDEs)
        self.x_l = x_l # Position of space (or column axis) of the Labels (E.g. solution for SPDEs)

        self.algorithm = algorithm # algorithm used for learning

        self.F = None # Features df
        self.L = None # Labels df
        self.learning_model = None
        self.train = None # list of features that are taken for training
        self.test = None # list of features that are taken for testing
        self.prediction = None # predicted values using the testing features
        self.error = None # error for the current split
        self.tangent = None # tangent for the current split

    # Create a dataframe of features out of a Model.
    def Create_Features(self, Models):
        if Models is None:
            print('Models are not given')
            return
        print('Creating features dataset.')
        if type(Models) is list: # List of Models is given
            num = len(Models) # number of realizations
            trees = list(Models[0].keys())  # all the trees
            X = Models[0][trees[0]].columns  # all the space points
        else: # df of Models is given
            num = Models.columns.levshape[0]
            trees = [m[0] for m in Models["M1"].columns[::Models["M1"].columns.levshape[1]]]
            X = Models["M1"][trees[0]].columns

        self.F = pd.DataFrame(index=np.arange(num), columns=trees)  # Features data of model
        
        for i in tqdm(range(num)): # i-th datapoint of the model
            if type(Models) is list:
                self.F.iloc[i] = [Models[i][A][X[self.x_f]].iloc[self.t_f] for A in trees]
            else:
                self.F.iloc[i] = [Models['M' + str(i + 1)][A][X[self.x_f]].iloc[self.t_f] for A in trees]

    # Create a df of labels out of results
    def Create_Labels(self, Results):
        if Results is None:
            print('Results are not given')
            return
        
        if self.t_l == None and self.x_l == None: # If t_l = x_l = None then consider Results as a label dataframe
            self.L = Results
        else:
            print('Creating labels dataset.')
            num = Results.shape[0]
            self.L = pd.DataFrame(index=np.arange(num), columns=['(' + str(self.t_l) + ',' + str(self.x_l) + ')'])
            
            for i in tqdm(range(num)):  # i-th datapoint of the results
                if self.t_l == None:
                    self.L.iloc[i] = Results[i][-1,self.x_l]
                elif self.x_l == None:
                    self.L.iloc[i] = Results[i][self.t_l, self.x_f]
                else:
                    self.L.iloc[i] = Results[i][self.t_l, self.x_l]

    # Perform learning of the solution from the model at a space-time point (m, n_S).
    # By default learning algorithm is linear regression.
    def one_experiment(self, Models = None, Results = None, split = True,  test_size=0.3, columns = None):

        if self.F is None:
            self.Create_Features(Models)
        if self.L is None:
            self.Create_Labels(Results)
        if columns is None:
            columns = self.F.columns

        # split data into test and train
        if split:
            X_train, X_test, y_train, _ = train_test_split(self.F[columns], self.L, test_size=test_size)
            self.test, self.train = list(X_test.index), list(X_train.index)
        else:
            X_train, X_test, y_train = self.F.iloc[self.train], self.F.iloc[self.test], self.L.iloc[self.train]

        self.learning_model = self.algorithm.fit(X_train, y_train)  # fit the model with train data
        self.prediction = self.learning_model.predict(X_test)  # compute the prediction
    
    def show_regression_experiment(self, metric = err2, normalise = True):

        if self.learning_model is None:
            print('There is no learning model created')
            return

        real = self.L.loc[self.test].values# actual values
        # compute the regression line between predicted and real data. I.e. prediction = a*real + b
        # Ideal scenario is when a = 1, b = 0
        fitting = linear_model.LinearRegression().fit(real, self.prediction.reshape(real.shape))
        b, a = fitting.intercept_, fitting.coef_
        y_min, y_max = min(self.prediction), max(self.prediction)
        x_min, x_max = min(real), max(real)
        size_y, size_x = np.abs(y_max-y_min), np.abs(x_max-x_min)
        self.tangent = a[0][0]
        fig = plt.figure(figsize=(14, 9))
        plt.scatter(real, self.prediction)
        plt.plot(real, b + real * a, 'r', label='Regression line between predicted and u(t,x)')
        plt.plot(real, real, 'g', label='y=x line')
        #plt.title("Comparison of predicted vs actual values of u(t,x).", fontsize=20)
        plt.xlabel("Values of u(t,x)", fontsize=20)
        plt.ylabel("Predicted Values", fontsize=20)
        plt.legend(loc=2, prop={'size': 15})
        plt.show();
        
        # If metric for computing an error is given, compute errors. By default error is an l^2 error normalised
        if metric:  # compute error
            if normalise: # normalise metric
                self.error = metric(self.prediction, real)/(metric(np.zeros(len(real)), real))
            else:
                self.error = metric(self.prediction, real)
            print("Error:", self.error, ". Tangent:", a[0][0])
        return

    def regression_error(self, metric = err2, normalise = True):

        if self.learning_model is None:
            print('There is not model created.')
            return

        real = self.L.loc[self.test].values
        fitting = linear_model.LinearRegression().fit(real, self.prediction)
        self.tangent = fitting.coef_[0][0]
        
        if normalise:
            self.error = metric(self.prediction, real)/(metric(np.zeros(len(real)), real))
        else:
            self.error = metric(self.prediction, real)
        return self.error, self.tangent

    # Perform several experiments to deduce the average error and a slope for this model
    def many_regression_experiments(self, num, Models = None, Results = None, test_size=0.3, metric = err2, normalise = True, columns = None, mini = False, full = False):

        errors, tangents = [], []

        for i in tqdm(range(num)):
            self.one_experiment(Models, Results, test_size = test_size, columns = columns)
            e, a = self.regression_error(metric = metric, normalise = normalise)
            errors.append(e)
            tangents.append(a)
        if mini:
            return min(errors), min(tangents)
        if full:
            return errors, tangents
        
        return sum(errors)/len(errors), sum(tangents)/len(errors)

    def save_Features(self, name):
        if self.F is None:
            print('Features are not created yet.')
        else:
            self.F.to_csv(name)

    def save_Labels(self, name):
        if self.F is None:
            print('Labels are not created yet.')
        else:
            self.L.to_csv(name)