In [1]:
# Notebook to compare linear models with linear models based on nonlinear features


import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.colors as clr
import seaborn as sns

from sklearn.linear_model import Ridge
from sklearn.linear_model import LinearRegression
from sklearn.cross_decomposition import PLSRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
import random
# Initialize the random seed to ensure reproducibility of the results in the paper
random.seed(42)
np.random.seed(42) 

import pickle
from jax import jacfwd
import jax.numpy as jnp
import copy
# Import interact function from ipywidgets
from ipywidgets import interact, fixed
# Import float slider from ipywidgets
from ipywidgets import FloatSlider, IntSlider, widgets

import src.featlin
from src.featlin import Featlin
from src.featlin import jax_moment
from src.helper import optimize_cv

import src.basis as basis
from src.basis import BasicsData

plt.style.use('./styles/plots.mplstyle')

%load_ext autoreload
%autoreload 2

In [2]:
# Whether to generate data or load it from file
# Load it from file to reproduce results from the paper
# Otherwise, random noise will be different, leading to different results
generate_data = 1

# Setting this variable to 1 will save the generated data to file
# And overwrite the data in the folder 
save_generated_data = 0

remove_outliers = 1

feature = 'sums'

In [3]:
def construct_y_data(X, targetfun, per_range: list=None):
    """Construct responsese for a given target function
    """
    if per_range is None:
        per_range = [0, 1]

    columns = X.shape[1]
    low_ind = int(per_range[0]*columns)
    high_ind = int(per_range[1]*columns)
    # self.y = targetfun(self.X[:, low_ind:high_ind])

    rows = X.shape[0]
    y = np.zeros([rows])
    for i in range(rows):
        row_i = X[i, :]
        y[i] = targetfun(row_i[low_ind:high_ind])
    return y

def add_wgn(y, snr_y=50):
        """Generates synthethic data to test the linearization methodology. 

        """
        # Add Gaussian noise to the measurements
        # Snippet below partly copied/adapted/inspired by: 
        # https://stackoverflow.com/questions/14058340/adding-noise-to-a-signal-in-python
        # Answer from Noel Evans, accessed: 18.05.2022, 15:37 CET
        # Calculate signal power and convert to dB 

        for i, yi in enumerate(y): 
            sig_avg_watts = np.mean(yi**2)
            sig_avg_db = 10 * np.log10(sig_avg_watts)
            # Calculate noise according to [2] then convert to watts
            noise_avg_db = sig_avg_db - snr_y
            noise_avg_watts = 10 ** (noise_avg_db / 10)
            # Generate an sample of white noise
            mean_noise = 0
            noise = np.random.normal(mean_noise, np.sqrt(noise_avg_watts), 1)
            # Noise up the original signal
            y[i] += noise

        return y

if generate_data:
    # Load the LFP Dataset
    lfp_df = pd.read_csv('./data/lfp_slim.csv', index_col=0)

    X_lfp = np.array(lfp_df.iloc[:, 0:1000])
    X_lfp = X_lfp[:, ::-1]
    y_lfp_true = np.array(lfp_df.iloc[:, 1000])


    X_lfp_train = np.array(X_lfp[lfp_df.iloc[:, 1002]==0, :])
    y_lfp_train_true = np.log10(np.array(y_lfp_true[lfp_df.iloc[:, 1002]==0]))
    X_lfp_test = np.array(X_lfp[lfp_df.iloc[:, 1002]==1, :])
    y_lfp_test_true = np.log10(np.array(y_lfp_true[lfp_df.iloc[:, 1002]==1]))
    X_lfp_test2 = np.array(X_lfp[lfp_df.iloc[:, 1002]==2, :])
    y_lfp_test2_true = np.log10(np.array(y_lfp_true[lfp_df.iloc[:, 1002]==2]))

    sums_function = lambda a : jnp.sum(a**2)
    variance_function = lambda a : jnp.var(a)

    y_train_sums = add_wgn(construct_y_data(X_lfp_train, sums_function), snr_y=40)
    y_test_sums = add_wgn(construct_y_data(X_lfp_test, sums_function), snr_y=40)
    y_test2_sums = add_wgn(construct_y_data(X_lfp_test2, sums_function), snr_y=40)

    #y_train_sums = construct_y_data(X_lfp_train, sums_function)
    #y_test_sums = construct_y_data(X_lfp_test, sums_function)
    #y_test2_sums = construct_y_data(X_lfp_test2, sums_function)

    y_train_var = add_wgn(construct_y_data(X_lfp_train, variance_function), snr_y=40)
    y_test_var = add_wgn(construct_y_data(X_lfp_test, variance_function), snr_y=40)
    y_test2_var = add_wgn(construct_y_data(X_lfp_test2, variance_function), snr_y=40)
    #y_train_var = construct_y_data(X_lfp_train, variance_function)
    #y_test_var = construct_y_data(X_lfp_test, variance_function)
    #y_test2_var = construct_y_data(X_lfp_test2, variance_function)
    print('Data generated')
    # Pickle the data
    if save_generated_data:
        with open('./data/lfp_sums_nonlinex.pickle', 'wb') as f:
            pickle.dump([X_lfp_train, y_train_sums, X_lfp_test, y_test_sums, X_lfp_test2, y_test2_sums], f)
        with open('./data/lfp_variance_nonlinex.pickle', 'wb') as f:
            pickle.dump([X_lfp_train, y_train_var, X_lfp_test, y_test_var, X_lfp_test2, y_test2_var], f)

# Load the data
if not generate_data:
    with open('./data/lfp_sums_nonlinex.pickle', 'rb') as f:
        X_lfp_train, y_train_sums, X_lfp_test, y_test_sums, X_lfp_test2, y_test2_sums = pickle.load(f)
    
    with open('./data/lfp_variance_nonlinex.pickle', 'rb') as f:
        X_lfp_train, y_train_var, X_lfp_test, y_test_var, X_lfp_test2, y_test2_var = pickle.load(f)

if feature == 'sums':
    y_train = y_train_sums
    y_test = y_test_sums
    y_test2 = y_test2_sums
elif feature == 'variance':
    y_train = y_train_var
    y_test = y_test_var
    y_test2 = y_test2_var
    
x_lfp = np.linspace(2.0, 3.5, 1000)
labels_lfp = {'xdata_label': 'Voltage (V)', 'ydata_label': r'$\Delta \mathbf{Q}_{100\mathrm{-}10}$ (Ah)', 'row_label': 'Battery number'}



Data generated


In [4]:
# Remove the two nasty outliers
if remove_outliers:
    # Remove outlier Training
    id_outlier = np.where(np.mean(X_lfp_train, axis=1)==np.min(np.mean(X_lfp_train, axis=1)))
    X_lfp_train = np.delete(X_lfp_train, id_outlier, axis=0)
    y_train = np.delete(y_train, id_outlier, axis=0)

    # Remove outlier Test
    for i in range(2):
        id_outlier = np.where(np.mean(X_lfp_test, axis=1)==np.min(np.mean(X_lfp_test, axis=1)))
        X_lfp_test = np.delete(X_lfp_test, id_outlier, axis=0)
        y_test = np.delete(y_test, id_outlier, axis=0)

In [5]:
# Putting all data in a dictionary
meanX = np.mean(X_lfp_train, axis=0)
stdX = np.std(X_lfp_train, axis=0)
meany = np.mean(y_train)

X_dict = {
    'train': X_lfp_train, 'test': X_lfp_test, 'test2': X_lfp_test2, 'x': x_lfp, 
    'train_': X_lfp_train-meanX, 'test_': X_lfp_test-meanX, 'test2_': X_lfp_test2-meanX,
    'train_std_': (X_lfp_train-meanX)/stdX, 'test_std_': (X_lfp_test-meanX)/stdX, 'test2_std_': (X_lfp_test2-meanX)/stdX,
    'mean': meanX, 'std': stdX}

y_dict = {
    'train': y_train, 'test': y_test, 'test2': y_test2,
    'train_': y_train-meany, 'test_': y_test-meany, 'test2_': y_test2-meany,
    'mean': meany}

In [6]:
if feature == 'sums':
    x_train = np.sum(X_dict['train']**2, axis=1)
    x_test = np.sum(X_dict['test']**2, axis=1)
    x_test2 = np.sum(X_dict['test2']**2, axis=1)
elif feature == 'variance':
    x_train = np.var(X_dict['train'], axis=1)
    x_test = np.var(X_dict['test'], axis=1)
    x_test2 = np.var(X_dict['test2'], axis=1)

# append the variance to the X matrix
X_dict['train_feat'] = x_train
X_dict['test_feat'] = x_test
X_dict['test2_feat'] = x_test2
X_dict['train_feat_'] = X_dict['train_feat']-np.mean(X_dict['train_feat'], axis=0)
X_dict['test_feat_'] = X_dict['test_feat']-np.mean(X_dict['train_feat'], axis=0)
X_dict['test2_feat_'] = X_dict['test2_feat']-np.mean(X_dict['train_feat'], axis=0)
X_dict['train_feat_std_'] = X_dict['train_feat_']/np.std(X_dict['train_feat_'], axis=0)
X_dict['test_feat_std_'] = X_dict['test_feat_']/np.std(X_dict['train_feat_'], axis=0)
X_dict['test2_feat_std_'] = X_dict['test2_feat_']/np.std(X_dict['train_feat_'], axis=0)

In [9]:
# Run crossvalidation to estimate parameters.
# 4 runs, two per model (std and non std dat)
# Compare predictions where y is the cye life

res_dict_rr_ = optimize_cv(X_dict['train_'], y_dict['train_'], max_comps=20, alpha_lim=[10e-7, 10e3], folds=10, nb_stds=1, 
        plot_components=False, std=False, min_distance_search=False, 
        featlin=None, algorithm='RR', plot=False)


res_dict_pls_ = optimize_cv(X_dict['train_'], y_dict['train_'], max_comps=20, alpha_lim=[10e-5, 10e3], folds=10, nb_stds=1, 
        plot_components=False, std=False, min_distance_search=False, 
        featlin=None, algorithm='PLS', plot=False)


res_dict_rr_std = optimize_cv(X_dict['train_std_'], y_dict['train_'], max_comps=20, alpha_lim=[10e-1, 10e2], folds=10, nb_stds=1, 
        plot_components=False, std=False, min_distance_search=False, 
        featlin=None, algorithm='RR', plot=False)

res_dict_pls_std = optimize_cv(X_dict['train_std_'], y_dict['train_'], max_comps=20, alpha_lim=[10e-1, 10e2], folds=10, nb_stds=1, 
        plot_components=False, std=False, min_distance_search=False, 
        featlin=None, algorithm='PLS', plot=False)


In [10]:
print('PLS CV opt: ' + str(res_dict_pls_['cv_res']['rmse_min_param']))
print('PLS CV1sig: ' + str(res_dict_pls_['cv_res']['rmse_std_min_param']))
print('PLS CV opt std: ' + str(res_dict_pls_std['cv_res']['rmse_min_param']))
print('PLS CV1sig std: ' + str(res_dict_pls_std['cv_res']['rmse_std_min_param']))

print('RR CV opt: ' + str(res_dict_rr_['cv_res']['rmse_min_param']))
print('RR CV1sig: ' + str(res_dict_rr_['cv_res']['rmse_std_min_param']))
print('RR CV1 opt std: ' + str(res_dict_rr_std['cv_res']['rmse_min_param']))
print('RR CV1sig std: ' + str(res_dict_rr_std['cv_res']['rmse_std_min_param']))

PLS CV opt: 5
PLS CV1sig: 3
PLS CV opt std: 8
PLS CV1sig std: 1
RR CV opt: 0.0001
RR CV1sig: 0.0007196856730011522
RR CV1 opt std: 15.848931924611133
RR CV1sig std: 138.94954943731375


In [11]:
# Putting all models in a dictionary
# Scale and intercept are set to false becasue I prefer to do it manually
# This list is a bit long, but it is easier to keep track of the models this way for this comparison.
models = {
    'OLS': LinearRegression(fit_intercept=False),
    'PLS_cv': PLSRegression(n_components=res_dict_pls_['cv_res']['rmse_min_param'], scale=False),
    'PLS_cv1sig': PLSRegression(n_components=res_dict_pls_['cv_res']['rmse_std_min_param'], scale=False),
    'PLS_cv_std': PLSRegression(n_components=res_dict_pls_std['cv_res']['rmse_min_param'], scale=False),
    'PLS_cv1sig_std': PLSRegression(n_components=res_dict_pls_std['cv_res']['rmse_std_min_param'], scale=False),
    'RR_cv': Ridge(alpha=res_dict_rr_['cv_res']['rmse_min_param'], fit_intercept=False),
    'RR_cv1sig': Ridge(alpha=res_dict_rr_['cv_res']['rmse_std_min_param'], fit_intercept=False),
    'RR_cv_std': Ridge(alpha=res_dict_rr_std['cv_res']['rmse_min_param'], fit_intercept=False),
    'RR_cv1sig_std': Ridge(alpha=res_dict_rr_std['cv_res']['rmse_std_min_param'], fit_intercept=False)}


# Making a pd dataframe to store the results
results = pd.DataFrame(index=models.keys(), columns=['X', 'y', 'NRMSE Training', 'NRMSE Test', 'NRMSE Test 2', 'RMSE Training', 'RMSE Test', 'RMSE Test 2'])

In [12]:
# Fit all models that only depend on the raw data

for key in models.keys():
    if 'OLS' in key:
        X_ = X_dict['train_feat_'].reshape(-1, 1)
        X_test_ = X_dict['test_feat_'].reshape(-1, 1)
        X_test2_ = X_dict['test2_feat_'].reshape(-1, 1)
        X_tag = 'Sum of Squares'
    else:
        if key.endswith('std'):
            X_ = X_dict['train_std_']
            X_test_ = X_dict['test_std_']
            X_test2_ = X_dict['test2_std_']
            X_tag = 'Z-scored'
        else:
            X_ = X_dict['train_']
            X_test_ = X_dict['test_']
            X_test2_ = X_dict['test2_']
            X_tag = 'Mean-centered'
    y_ = y_dict['train_']
    y_test_ = y_dict['test_']
    y_test2_ = y_dict['test2_']
    y = y_dict['train']
    y_test = y_dict['test']
    y_test2 = y_dict['test2']
    y_tag = 'Sum of Squares, SNR=50'
    
    # Fit the model
    models[key].fit(X_, y_)

    # Predict the training data
    y_pred = models[key].predict(X_)+y_dict['mean']
    y_test_pred = models[key].predict(X_test_)+y_dict['mean']
    y_test2_pred = models[key].predict(X_test2_)+y_dict['mean']
    

    # Calculate the MSE
    mse_train = mean_squared_error(y, y_pred, squared=False)
    mse_test = mean_squared_error(y_test, y_test_pred, squared=False)
    mse_test2 = mean_squared_error(y_test2, y_test2_pred, squared=False)

    # Calculate the NRMSE
    # I am using the std of the training data to calculate the NRMSE    
    nrmse_train = 100*mean_squared_error(y, y_pred, squared=False)/(np.max(y_)-np.min(y_))
    nrmse_test = 100*mean_squared_error(y_test, y_test_pred, squared=False)/(np.max(y_)-np.min(y_))
    nrmse_test2 = 100*mean_squared_error(y_test2, y_test2_pred, squared=False)/(np.max(y_)-np.min(y_))
    
    # Store the results
    results.loc[key, 'Model'] = ''
    results.loc[key, 'X'] = X_tag
    results.loc[key, 'y'] = y_tag
    results.loc[key, 'NRMSE Training'] = nrmse_train
    results.loc[key, 'NRMSE Test'] = nrmse_test
    results.loc[key, 'NRMSE Test 2'] = nrmse_test2
    results.loc[key, 'RMSE Training'] = mse_train
    results.loc[key, 'RMSE Test'] = mse_test
    results.loc[key, 'RMSE Test 2'] = mse_test2

In [13]:
results.iloc[:5, :]

Unnamed: 0,X,y,NRMSE Training,NRMSE Test,NRMSE Test 2,RMSE Training,RMSE Test,RMSE Test 2,Model
OLS,Sum of Squares,"Sum of Squares, SNR=50",0.359425,0.421122,0.200589,0.007388,0.008656,0.004123,
PLS_cv,Mean-centered,"Sum of Squares, SNR=50",3.156452,3.111199,4.769558,0.064879,0.063949,0.098036,
PLS_cv1sig,Mean-centered,"Sum of Squares, SNR=50",3.610099,3.671289,4.830629,0.074204,0.075461,0.099291,
PLS_cv_std,Z-scored,"Sum of Squares, SNR=50",2.390064,3.003339,4.14236,0.049126,0.061732,0.085144,
PLS_cv1sig_std,Z-scored,"Sum of Squares, SNR=50",5.103634,4.719957,5.20599,0.104902,0.097016,0.107006,


In [None]:
latex_df = copy.deepcopy(results.iloc[:5, :5])
if remove_outliers:
    latex_df.iloc[:5, :].to_latex('reg-results_outlier_rem.tex', index=True)
else:
    latex_df.iloc[:5, :].to_latex('reg-results.tex', index=True)

  latex_df.iloc[:5, :].to_latex('reg-results.tex', index=True)


In [None]:
# The model struggles to learn the dependencies when outliers are removed, in the case of this ideal feature. 
# For the cycle life reponse, jthings are more complex, since you cannot simply state that features will make everyhthign better (but help eioth outliers) 
