In [1]:
import numpy as np
import numpy as np
import pandas as  pd
import tensorflow as tf
from tensorflow import keras
#import keras
import matplotlib.pyplot as plt 
from tensorflow.keras import layers
from sklearn import *
import math
from matplotlib.pyplot import figure
import matplotlib.pyplot as plt
import scipy
tf.autograph.set_verbosity(0)


In [2]:
def reconstruct_validation(mpc, y_pred, X_test):
    
    valid_reconstruct = np.empty((y_pred.shape[0], mpc*8,))

    indexes = np.arange(mpc * 8)

    idx_sb1 = np.arange(0,(mpc*2))
    idx_sb2 = np.arange((mpc*2),(mpc*4))
   

    idx_sb1_dis = np.array(list(set(indexes) - set(idx_sb1)))
    idx_sb2_dis = np.array(list(set(indexes) - set(idx_sb2)))
   
    sb_counter_1 = 0
    sb_counter_2 = -1
   
    for i in range(y_pred.shape[0],):

        if sb_counter_1%4==0:

            valid_reconstruct[i, idx_sb1] = X_test[i,:]
            valid_reconstruct[i, idx_sb1_dis] = y_pred[i,:]

        elif sb_counter_2%4==0:

            valid_reconstruct[i, idx_sb2] = X_test[i,:]
            valid_reconstruct[i, idx_sb2_dis] = y_pred[i,:]

        sb_counter_1 += 1
        sb_counter_2 += 1
        
    return valid_reconstruct


In [3]:
def m2tex(model,modelName):
    stringlist = []
    model.summary(line_length=70, print_fn=lambda x: stringlist.append(x))
    del stringlist[1:-4:2]
    del stringlist[-1]
    for ix in range(1,len(stringlist)-3):
        tmp = stringlist[ix]
        stringlist[ix] = tmp[0:31]+"& "+tmp[31:59]+"& "+tmp[59:]+"\\\\ \hline"
    stringlist[0] = "Model: {} \\\\ \hline".format(modelName)
    stringlist[1] = stringlist[1]+" \hline"
    stringlist[-4] += " \hline"
    stringlist[-3] += " \\\\"
    stringlist[-2] += " \\\\"
    stringlist[-1] += " \\\\ \hline"
    prefix = ["\\begin{table}[]", "\\begin{tabular}{lll}"]
    suffix = ["\end{tabular}", "\caption{{Model summary for {}.}}".format(modelName), "\label{tab:model-summary}" , "\end{table}"]
    stringlist = prefix + stringlist + suffix 
    out_str = " \n".join(stringlist)
    out_str = out_str.replace("_", "\_")
    out_str = out_str.replace("#", "\#")
    print(out_str)


In [4]:
class Sampling(layers.Layer):
    """Uses (z_mean, z_log_var) to sample z, the vector encoding a digit."""

    def call(self, inputs):
        z_mean, z_log_var = inputs
        batch = tf.shape(z_mean)[0]
        dim = tf.shape(z_mean)[1]
        epsilon = tf.keras.backend.random_normal(shape=(batch, dim)) # Parametrization trick
        return z_mean + tf.exp(0.5 * z_log_var) * epsilon 

In [5]:
class VAE(keras.Model):
    def __init__(self, encoder, decoder, KL_hyperparam, **kwargs):
        super(VAE, self).__init__(**kwargs)
        self.KL_hyperparam = KL_hyperparam
        self.encoder = encoder
        self.decoder = decoder
        self.total_loss_tracker = keras.metrics.Mean(name="total_loss")  #MEAN? 
        self.reconstruction_loss_tracker = keras.metrics.Mean(    #MEAN? 
            name="MSE"
        )
        self.kl_loss_tracker = keras.metrics.Mean(name="kl_loss")   #MEAN? 
        
        self.total_loss_tracker_val = keras.metrics.Mean(name="total_loss_val")  #MEAN? 
        self.reconstruction_loss_tracker_val = keras.metrics.Mean(    #MEAN? 
            name="MSE_val"
        )
        self.kl_loss_tracker_val = keras.metrics.Mean(name="kl_loss_val")   #MEAN? 

    @property
    def metrics(self):
        return [
            self.total_loss_tracker,
            self.reconstruction_loss_tracker,
            self.kl_loss_tracker,
            
            self.total_loss_tracker_val,
            self.reconstruction_loss_tracker_val,
            self.kl_loss_tracker_val
        ]

    def train_step(self, data):
        x, y_true = data
        with tf.GradientTape() as tape:
            z_mean, z_log_var, z = self.encoder(x)
            reconstruction = self.decoder(z)
            reconstruction_loss = keras.losses.mean_squared_error(y_true, reconstruction)
            
            kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
            kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
            #kl_loss = tf.reduce_mean(kl_loss, axis=1)
            
            total_loss = reconstruction_loss + kl_loss * self.KL_hyperparam
            
        grads = tape.gradient(total_loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
        
        self.total_loss_tracker.update_state(total_loss)
        
        self.reconstruction_loss_tracker.update_state(reconstruction_loss)
        
        self.kl_loss_tracker.update_state(kl_loss)
        
        return {
            "loss": self.total_loss_tracker.result(),
            "MSE": self.reconstruction_loss_tracker.result(),
            "kl_loss": self.kl_loss_tracker.result(),
        }

    def test_step(self, data):
        # Unpack the data
        x, y_true = data
        # Compute predictions
        z_mean, z_log_var, z = self.encoder(x)
        reconstruction = self.decoder(z)
        
        ## Losses
        reconstruction_loss = keras.losses.mean_squared_error(y_true, reconstruction)
        
        kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
        kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
        
        total_loss = reconstruction_loss + kl_loss * self.KL_hyperparam

        # Updates the metrics tracking the loss
        self.total_loss_tracker_val.update_state(total_loss)
        
        self.reconstruction_loss_tracker_val.update_state(reconstruction_loss)
        
        self.kl_loss_tracker_val.update_state(kl_loss)
        
        return {
            "loss_val": self.total_loss_tracker_val.result(),
            "MSE_val": self.reconstruction_loss_tracker_val.result(),
            "kl_loss_val": self.kl_loss_tracker_val.result(),
        }



    def call(self, inputs):

        z_mean, z_log_var, z = self.encoder(inputs)
        reconstruction = self.decoder(z)
                


        return reconstruction
    


In [6]:
def nrmse(y_pred, y_test):
    
    num = tf.math.reduce_sum(tf.square(y_pred - y_test))
    num = num / y_test.shape[1]
    
    den = tf.math.reduce_sum(tf.square(y_test))/y_test.shape[1]
    
    return (num/den).numpy()

def mse(y_pred, y_test):
    
    num = tf.math.reduce_sum(tf.square(y_pred - y_test))
    num = num / y_test.shape[1]
    
    return num.numpy()

def mae(y_pred, y_test):
    
    num = tf.math.reduce_sum(tf.abs(y_pred - y_test))
    num = num / y_test.shape[1]
    
    return num.numpy()


In [7]:
def plot_mse_loss(history, name_ds):
    plt.plot(history.history['MSE'])
    plt.title('MSE Training Loss {}'.format(name_ds), fontsize ='xx-large')
    plt.xlabel('Epoch', fontsize ='xx-large')
    plt.show()

In [8]:
def plot_kl_mse(history, name_ds):
    plt.plot(history.history['kl_loss'])
    #plt.plot(history.history['val_total_loss'])
    plt.title('Model Training Set Loss {}'.format(name_ds), fontsize ='xx-large')
    plt.ylabel('Loss', fontsize ='xx-large')
    plt.xlabel('epoch', fontsize ='xx-large')
    plt.plot(history.history['MSE'])
    plt.legend(['KL_loss', 'MSE'], loc='upper right', fontsize ='xx-large')
    plt.show()

In [9]:
def plot_train_test(history, name_ds):
    
    plt_1 = plt.figure(figsize=(6, 3))

    plt.plot(history.history['MSE'])
    #plt.plot(history.history['val_total_loss'])
    plt.ylabel('Loss', fontsize ='xx-large')
    plt.xlabel('epoch', fontsize ='xx-large')
    plt.plot(history.history['val_MSE_val'])

    plt.legend(['train', 'test'], loc='upper center', fontsize ='12')
    plt.title('Model Loss During Training {}'.format(name_ds), fontsize='xx-large')

    plt.show()

In [10]:
def  plot_pred_vs_true(y_pred, y_test, name_ds):
    
    plt_1 = plt.figure(figsize=(6, 3))
    plt.scatter(y_test.flatten(), y_pred.flatten(), s=2)
    plt.title('VED Predictions {}'.format(name_ds), fontsize='xx-large')
    plt.ylabel('Prediction', fontsize ='xx-large')
    plt.xlabel('Background Truth', fontsize ='xx-large')

    plt.show()

In [11]:
def plot_sorted_vs_true(y_pred, y_test, name_ds):

    plt_1 = plt.figure(figsize=(10, 6))

    plt.plot(np.sort(y_test.flatten()))
    x_idx = np.arange(len(y_test.flatten()))
    plt.scatter(x_idx, y_pred.flatten()[np.argsort(y_test.flatten())], s=2, c="red")

    plt.title('Sorted Predictions of Validation Set {}'.format(name_ds), fontsize='xx-large')
    plt.ylabel('Prediction', fontsize ='xx-large')
    plt.xlabel('Background Truth', fontsize ='xx-large')
    plt.legend(['Sorted Background Truth', 'Predictions'], loc='upper right', fontsize ='xx-large')

    plt.show()


In [12]:
def plot_1TTI(y_test, y_pred, X_test, samp_num, name_ds): 

    plt_1 = plt.figure(figsize=(10, 6))
    sample_1 = np.r_[ y_test[samp_num,], X_test[samp_num,]]
    idx = np.arange(len(sample_1))
    plt.scatter(np.arange(len(sample_1)), sample_1, s=2, c="black")
    plt.scatter(idx[-len(y_pred[samp_num,]):], y_pred[samp_num,], color='red',  s=2)
    plt.title('Prediction of TTI n={} {}'.format(samp_num, name_ds) , fontsize ='xx-large')
    plt.legend(['Background Truth', 'Predictions'],labelcolor = ['black', 'red'] ,loc='upper right', fontsize ='xx-large')
    plt.ylabel('Feature', fontsize ='xx-large')
    plt.xlabel('Amplitude', fontsize ='xx-large')

    plt.show()
    
    

In [13]:
def plot_modeling_prediction_error(data, y_test, y_pred, name_ds, nbins):

    plt_1 = plt.figure(figsize=(10, 6))

    # Error to be fitted
    data = y_test.flatten() - y_pred.flatten()

    # Fit of Normal Distribution
    _, bins, _ = plt.hist(data, nbins, density=1, alpha=0.5)
    mu, sigma = scipy.stats.norm.fit(data)
    best_fit_line = scipy.stats.norm.pdf(bins, mu, sigma)


    # Fit of Student-t
    param = scipy.stats.t.fit(data)
    pdf_fitted = scipy.stats.t.pdf(data, loc=param[1], scale=param[2], df=param[0])

    plt.title('Histogram of the prediction error of VED {}'.format(name_ds), fontsize='xx-large')
    plt.plot(bins, best_fit_line, c='red')
    plt.scatter(data, pdf_fitted, s=2, c='green')
    plt.legend(['Normal Fitted Curve', 'Student-T Fitted Curve'],
               labelcolor = ['red', 'green'],  loc='upper right', fontsize ='xx-large')
    plt.xlim([-0.6, 0.6])
    plt.xticks(fontsize='x-large')
    plt.yticks(fontsize='x-large')

    plt.show()

In [14]:
def plot_MPC_overtime(data, mpc, name_ds, Real):
    
    if Real:
        plt.plot(data[:,0], label='1st MPC SB1')
        plt.plot(data[:, mpc*2], label='1st MPC SB2')
        plt.plot(data[:, mpc*4], label='1st MPC SB3')
        plt.plot(data[:, mpc*6], label='1st MPC SB4')


        plt.title('plot of 1st MPC in 4 subbands Over Time (Real Part) {}'.format(name_ds),  fontsize='xx-large')
        plt.xlabel('TTI', fontsize='xx-large')
        plt.ylabel('Amplitude', fontsize='xx-large')
        plt.legend(fontsize='xx-large')
        plt.tight_layout()
        plt.show() 
    
    else:
        plt.plot(data[:,mpc], label='1st MPC SB1')
        plt.plot(data[:,mpc*3], label='1st MPC SB2')
        plt.plot(data[:,mpc*5], label='1st MPC SB3')
        plt.plot(data[:,mpc*7], label='1st MPC SB4')


        plt.title('plot of 1st MPC in 4 subbands Over Time (Imaginary Part) {}'.format(name_ds),  fontsize='xx-large')
        plt.xlabel('TTI', fontsize='xx-large')
        plt.ylabel('Amplitude', fontsize='xx-large')
        plt.legend(fontsize='xx-large')
        plt.tight_layout()
        plt.show() 