# Elevator/AI Experiment: Auxiliar functions

In [1]:
import pandas as pd
import numpy as np
import csv
import os
from numpy import linalg as LA
import random
import copy
import time

# Plot parameters:
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator
import seaborn as sns

markers = ["o","<","s","d","v","p","P","^","*","D",">"]

In [2]:
def rotate_3D(vector,n,theta_deg):
    """
    Rotates a vector or matrix by an angle theta around the axis n.

    --- Inputs ---
    
    {vector}: Vector to be rotated, must be a numpy array with three rows (x,y,z).
    {n}: Rotation axis, must be provided as a numpy array with 3 elements (x,y,z). No normalisation
    needed at this stage.
    {theta_deg}: Rotation angle in [degrees].
    
    --- Outputs ---
    
    {vector_R}: Numpy array; Rotated vector or matrix.
    """
    # Normalise u and convert angle to [rad]:
    u = n/LA.norm(n) # Normalised unit vector
    ux, uy, uz = u[0], u[1], u[2] # Cartesian components
    theta = theta_deg/360*2*np.pi # Rotation angle [rad]
    # Define the rotation matrix:
    c = np.cos(theta)
    q = 1-c
    s = np.sin(theta)
    R = np.array([[c+ux**2*q,ux*uy*q-uz*s,ux*uz*q+uy*s],
                  [uy*ux*q+uz*s,c+uy**2*q,uy*uz*q-ux*s],
                  [uz*ux*q-uy*s,ux*uy*q+ux*s,c+uz**2*q]
    ])
    return np.matmul(R,vector)

In [3]:
# Simulations:

mu0 = 1.256637e-16 # Vacuum permeability, SI units [N/A^2]
def single_dipole(r_traj,m_vec):
    """
    Calculate the vector magnetic field produced by a single dipole with magnetic moment m_vec at a 
    distance r_traj.
    {r_traj}: Numpy array; (x,y,z) elevator's trajectory sensed by the magnetometer. The output format has
    three rows, corresponding to (x,y,z), and each column is a timestep dt=1/{f_samp}. Units are [m].
    {m_vec} = Magnetic moment for the dipole. It must be a numpy array or list in the laboratory frame
    with format (X,Y,Z) [A*m^2]
    --- Returns ---
    {B_sd}: Numpy array; magnetic stray fields sensed by the magnetometer during the elevator's trajectory.
    The output format has three rows, corresponding to (Bx,By,Bz), and each column is a timestep correlated
    with the {r_traj} vector. Units are [T].
    """
    # Calculate modulus series [m]:
    r = np.sqrt(r_traj[0]*r_traj[0]+r_traj[1]*r_traj[1]+r_traj[2]*r_traj[2]) # Modulus series [m]

    # Calculate stray fields series:
    B_sd = 3*r_traj*np.dot(m_vec,r_traj)/r**5 # First term [A/m]
    B_sd += np.array([m_vec[0]/r**3,m_vec[1]/r**3,m_vec[2]/r**3]) # Add the second term [A/m]
    B_sd *= mu0/(4*np.pi) # Scale it to appropriate units [T].

    return B_sd

def optimize_sims_2SD(t,z,r1,r2,exp_data,m1_grid,m2_grid,d_12=28):
    """
    Optimizes the parameters used in 2-single-dipoles simulations to reproduce experimental measurements.
    * Note: the coordinates system is centered at the elevator's door at level 0, and the z-axis is solidary with
    the elevator's axis, poiting upwards.
    --- Inputs ---        
    {t}: Numpy array; time vector with uniform time steps [s].,
    {z}: Numpy array; z-position vector for the elevator's cage [m].
    {r1,r2}: Lists or numpy arrays; sensor position in (X,Y,Z) format, in [m], seen from the
    dipoles number 1 and number 2, respectively, when they are closest to the sensor.
    {d_12}: Float; maximum distance between the cage and counterweight [m]. Default: 28m.
    {exp_comp}: Dataframe with the following columns: 'Time_s', 'Bx_nT','By_nT','Bz_nT'; experimental
    measurements which simulations try to reproduce.
    {m1_grid,m2_grid}: List whose elements are Numpy arrays; each element is an option for the magnetic moments in 
    (mx,my,mz) format, in [A*m^2], for the two single dipoles.
    --- Outputs ---
    """
    # Translate the z-pos vector so it tracks the magnetometer position seen from the elevator's elements:
    r1_traj = np.array([np.linspace(-r1[0],-r1[0],len(z)),np.linspace(-r1[1],-r1[1],len(z)),-z]) # [m]
    r2_traj = np.array([np.linspace(-r2[0],-r2[0],len(z)),np.linspace(-r2[1],-r2[1],len(z)),z-d_12]) # [m]

    best_sum_dif = np.inf # Initiate scoring (worst possible result) [nT]
    best_config = [0,0]
    for m1 in m1_grid:
        for m2 in m2_grid:
            # Calculate the stray fields (Bx,By,Bz) sensed by the magnetometer:
            B1 = single_dipole(r1_traj,m1)*1e9 # Magnetic fields originated by the cage [nT]
            B2 = single_dipole(r2_traj,m2)*1e9 # Magnetic fields originated by the counterweight [nT]
            B = B1 + B2 # Total magnetic field [nT]
            # Evaluate simulationa and keep best parameters:
            sum_dif = np.sum(np.abs(B[0]-exp_data['Bx_nT']))*10 \
            + np.sum(np.abs(B[1]-exp_data['Bx_nT'])) \
            + np.sum(np.abs(B[2]-exp_data['Bz_nT'])) # [nT]
            if sum_dif < best_sum_dif:
                best_sum_dif = sum_dif
                best_config = [m1,m2]
                
    return best_config[0], best_config[1]

def sim_B_2SD(t,z,m1,m2,r1,r2,noise_X,noise_YZ,d_12=28,exp_comp=None,t_zoom=None,
              save_name=None,save_format='png'):    
    """
    Simulates the stray magnetic fields sensed by the magnetometer according to a two-single-dipoles model
    (symbolyzing the elevator's cage and the counterweight) and plots the cage and counterweight trajectories
    along with the sensed stray magnetic fields.
    * Note: the coordinates system is centered at the elevator's door at level 0, and the z-axis is solidary with
    the elevator's axis, poiting upwards.
    {t}: Numpy array; time vector with uniform time steps [s].,
    {z}: Numpy array; z-position vector for the elevator's cage [m].
    {m1,m2}: Numpy arrays; magnetic moments in (mx,my,mz) format, in [A*m^2] for the two different single
    dipoles. They are assumed to be constant.
    {r1,r2}: Lists or numpy arrays; sensor position in (X,Y,Z) format, in [m], seen from the
    dipoles number 1 and number 2, respectively.
    {d_12}: Float; maximum distance between the cage and counterweight [m]. Default: 28m.
    {noise_X}: Float; noise standard deviation (Gaussian distribution) along X-component, mean=0, in [nT].
    {noise_YZ}: Float; noise standard deviation (Gaussian distribution) along X,Y-components, mean=0, in [nT].
    {t_zoom}: List, numpy array or None. If None (default), the entire time range will be plotted. If values
    are provided, the time range will be restricted to t_zoom[0] to t_zoom[1], units [s].
    {exp_comp}: Dataframe with the following columns: 'Time_s', 'Bx_nT','By_nT','Bz_nT'.
    If provided, it will be used to compare with the simulations. If None (default), there is no comparison.
    {save_name}: String that allows to save the figure in the selected path. Format is .png by default.
    {save_format}: Chooses the format to save the figure, don't include the dot. Default: 'png'.
    """
    # Translate the z-pos vector so it tracks the magnetometer position seen from the elevator's elements:
    r1_traj = np.array([np.linspace(-r1[0],-r1[0],len(z)),np.linspace(-r1[1],-r1[1],len(z)),-z]) # [m]
    r2_traj = np.array([np.linspace(-r2[0],-r2[0],len(z)),np.linspace(-r2[1],-r2[1],len(z)),z-d_12]) # [m]  
    # Calculate the stray fields (Bx,By,Bz) sensed by the magnetometer:
    B1 = single_dipole(r1_traj,m1)*1e9 # Magnetic fields originated by the cage [nT]
    B2 = single_dipole(r2_traj,m2)*1e9 # Magnetic fields originated by the counterweight [nT]
    B = B1 + B2 # Total magnetic field [nT]
    # Add magnetic noise:
    B[0,:] += np.random.normal(loc=0.0, scale=noise_X, size=len(B[0])) # X-comp [nT]
    B[1,:] += np.random.normal(loc=0.0, scale=noise_YZ, size=len(B[1])) # Y-comp [nT]
    B[2,:] += np.random.normal(loc=0.0, scale=noise_YZ, size=len(B[2])) # Z-comp [nT] 
    
    # Plot the trajectory and magnetic fields seen and sensed by the magnetometer:
    fig = plt.figure(figsize=(6,4))
    gs = fig.add_gridspec(3, 2, hspace=0, wspace=0.5)
    (ax_X, ax_Bx), (ax_Y, ax_By), (ax_Z, ax_Bz) = gs.subplots(sharex='col')
    axes_pos, axes_B = [ax_X, ax_Y, ax_Z], [ax_Bx, ax_By, ax_Bz]
    ylabels_pos = ['$X$ [m]','$Y$ [m]','$Z$ [m]']
    ylabels_B = ['$B_{X}$ [nT]','$B_{Y}$ [nT]','$B_{Z}$ [nT]']
    colors = ['#33a1ffff','#3349ffff','#a433ffff'] # Bx, By, Bz
    for i in range(len(axes_pos)): # Trajectory
        axes_pos[i].plot(t,-r1_traj[i],'-',lw=1,color='#367642ff',label='Cage',alpha=1)
        axes_pos[i].plot(t,-r2_traj[i],'--',lw=1,color='#19b436ff',label='Ctwt',alpha=0.9)
        axes_pos[i].set_ylabel(ylabels_pos[i])
        axes_pos[i].legend(fontsize=6)
        if i == 0 or i == 1:
            axes_pos[i].set_yticks([r1[i],r2[i]])
    for i in range(len(axes_B)): # Stray fields
        axes_B[i].plot(t,B[i],color=colors[i],alpha=1,label='Sim.')
        axes_B[i].set_ylabel(ylabels_B[i])
        if exp_comp is not None: # If provided, compare with experiments
            comp = ['Bx_nT','By_nT','Bz_nT'][i]
            axes_B[i].plot(exp_comp['Time_s'],exp_comp[comp]-(exp_comp[comp].iloc[0]-B[i][0]),'o',color='gray',
                           alpha=0.7,markersize=0.3,label='Exp.')
        axes_B[i].legend(fontsize=6)
    if isinstance(t_zoom,(list,np.ndarray)): # If provided, set y limits
        for ax in axes_pos+axes_B:
            ax.set_xlim([t_zoom[0],t_zoom[1]])
    axes_pos[-1].set_xlabel(f'Time [s]')    
    axes_B[-1].set_xlabel(f'Time [s]')
    if save_name:
        save_file(save_name,save_format=save_format)    
    plt.show()

In [4]:
class Time_Wdw:
    """
    The Time Window object <Time_Wdw> can process an input dataframe with information about time, magnetic fields,
    z-position labels and optionally the ground truth z-position labels, all given in the Laboratory rotational
    frame, defined as RF1. <Time_Wdw> can return numpy arrays representing time windows for the magnetic projections
    Bx, By and Bz, in the requested rotational frame. All time windows will be correlated to z-position labels, and
    eventually to the ground truth labels, if they were provided in the input dataframe.

    --- Attributes ---
    
    GENERAL:
    {self.name}: String; name for the <Time_Wdw> object.
    {self.pp}: Integer; number of points for each time window.
    {self.gr_tr}: Boolean; specifies whether the ground truth labels are included (True) or not (False).
    {self.rotFrame}: List [Numpy array,Numeric,String]; specifies the rotated frame that is currently active for time
    windowing. The elements of the list are the following: [rotation axis in (x,y,z) format,rotation
    angle in degree units,  RF name]. By default, the rotated frame is the original lab frame,
    then [None,None,'RF1'].
    {self.norm_value}: Float; normalizing value for future reference, in [nT] units.
    ORIGINAL DATA:
    {self.time}: Numpy array; time vector from the original data, before windowing. Units: s.
    {self.z}: Numpy array: z-position vector from the original data, before windowing. Units: m.
    {self.Bx_RF1}: Numpy array; Bx vector from the original data in RF1, before windowing. Units: nT.
    {self.By_RF1}: Numpy array; By vector from the original data in RF1, before windowing. Units: nT.
    {self.Bz_RF1}: Numpy array; Bz vector from the original data in RF1, before windowing. Units: nT.
    {self.B_RF1}: Numpy array; calculated B (scalar) vector from the original data, before windowing. Units: nT.
    WINDOWED DATA:
    The following attributes are Numpy arrays containing the time windows' information. Each time window instance is
    stored as a row with self.pp points. Then, the matrix has a size (N x self.pp), where N is the number of time windows. 
    {self.time_wdw}: Numpy array; time matrix. Units: s.
    {self.Bx_wdw_RF1}: Numpy array; Bx matrix in RF1. Units: nT.
    {self.By_wdw_RF1}: Numpy array; By matrix in RF1. Units: nT.
    {self.Bz_wdw_RF1}: Numpy array; Bz matrix in RF1. Units: nT.
    {self.B_wdw}: Numpy array; B (scalar) matrix (rotational invariant). Units: nT.
    {self.Bx_wdw_RFX}: Numpy array; Bx matrix in the current rotated frame self.rotFrame, if any. Units: nT.
    {self.By_wdw_RFX}: Numpy array; By matrix in the current rotated frame self.rotFrame, if any. Units: nT.
    {self.Bz_wdw_RFX}: Numpy array; Bz matrix in the current rotated frame self.rotFrame, if any. Units: nT.
    {self.z_labels}: Numpy array; z-position vector from the original data, after windowing. Units: m.
    Note: windowed data can be augmented artificially, in which case the new augmented data will replace the previous
    attributes, but original data (not windowed) will remain the same.
    {self.augm_N}: Integer; specifies how many times the data was artificially augmented (if there
    is no augmentation, then augm_N=0).
    {self.augm_noise}: List [Bx_noise, By_noise, Bz_noise]; each element specifies the noise level
    which was used to augmentate the data, in [nT], for the [Bx,By,Bz] original data in RF1.
    """
    def __init__(self, pp, name, gr_tr=False):
        """
        --- Inputs ---
        {pp}: Integer; number of points for each time window.
        {name}: String; name for the <Time_Wdw> object, it may contain the segment number.
        {gr_tr}: Boolean; specifies whether the <Time_Wdw> object is labeled according to
        the ground truth (True) or not (False, default).
        """
        self.name = name # Name
        self.pp = pp # Number of points for every time window
        self.gr_tr = gr_tr # Informs whether ground truth is available (True) or not (False)
        self.rotFrame = [None,None,'RF1'] # At the beginning, there is no rotated frame

    def store_orig_data(self, df):
        """
        Receives a dataframe with the measurements data (t,Bx,By,Bz,z), eventually z_true as 
        well, in RF1. If <self.gr_tr>=True, then it will be assumed that the dataframe 
        includes the ground truth labels.        
        --- Inputs ---        
        {df}: Pandas dataframe; it must have the following columns: 'Time_s', 'Bx_nT', 'By_nT',
        'Bz_nT', 'zAut_m' and, optionally, 'zTrue_m'.
        """
        # Prepare and store original data, except z: t, Bx, By, Bz, B (scalar)
        t, Bx, By, Bz = df['Time_s'], df['Bx_nT'], df['By_nT'], df['Bz_nT'] # [s,nT,nT,nT]
        self.time = t.to_numpy() # Time [s]
        self.Bx_RF1 = Bx.to_numpy() # Bx fields in RF1 [nT]
        self.By_RF1 = By.to_numpy() # By fields in RF1 [nT]
        self.Bz_RF1 = Bz.to_numpy() # Bz fields in RF1 [nT]
        self.B_RF1 = np.sqrt(self.Bx_RF1**2 + self.By_RF1**2 + self.Bz_RF1**2) # Scalar B field [nT]
        # Store z-position, using the ground truth when available:
        z = df['zTrue_m'] if self.gr_tr else df['zAut_m'] # [m]
        self.z = z.to_numpy() # Z-position [m]
    
    def window_data(self, N_augm=0, Bx_noise=1, By_noise=1, Bz_noise=1):
        """
        Generates time windows in the original dataframe RF1, with predictors
        (magnetic fields) and targets (z-pos) from the original data according to the 
        attributes <self.pp> and <self.dp>, with the possibility of augmentating the data
        using random noise.
        --- Inputs ---
        {N_augm}: Integer; specifies how many times the data will be augmented. If 0 (default),
        there will be no augmentation.
        {Bx_noise}: Float; value for the standard deviation in the random normal distribution,
        in [nT] units. Default: 1 nT.
        {By_noise, Bz_noise}: idem noise_Bx, but for By and Bz components. Default: 1 nT.
        WARNING: This function will only work if the original data was previously stored.
        """
        # Store the [time, Bx, By, Bz, B, z] time windows and labels in RF1:
        N = len(self.z)-self.pp # Total number of time windows
        self.time_wdw = np.array([np.array(self.time[i:i+self.pp]) for i in range(N)]) # Store times [s]
        self.Bx_wdw_RF1 = np.array([self.Bx_RF1[i:i+self.pp] for i in range(N)]) # Store Bx fields windows in RF1 [nT]
        self.By_wdw_RF1 = np.array([self.By_RF1[i:i+self.pp] for i in range(N)]) # Store By fields windows in RF1 [nT]
        self.Bz_wdw_RF1 = np.array([self.Bz_RF1[i:i+self.pp] for i in range(N)]) # Store Bz fields windows in RF1 [nT]
        self.B_wdw = np.array([self.B_RF1[i:i+self.pp] for i in range(N)]) # Store scalar B fields windows [nT]
        self.z_labels = self.z[self.pp:] # z-position labels, for time windows [m]

        # Augmentate data if required:
        self.augm_N = N_augm # State how many times the data was augmented
        self.augm_noise = [Bx_noise,By_noise,Bz_noise] # State the augmentation noise levels [nT]
        if N_augm:
            # Initiate augmentated data with the original windows:
            Bx_wdw_augm = np.copy(self.Bx_wdw_RF1) # [nT]
            By_wdw_augm = np.copy(self.By_wdw_RF1) # [nT]
            Bz_wdw_augm = np.copy(self.Bz_wdw_RF1) # [nT]
            B_wdw_augm = np.copy(self.B_wdw) # [nT]
            z_labels_augm = np.copy(self.z_labels) # [m]
            # Determine noise factor for scalar B:
            B_noise = np.sqrt(Bx_noise**2+By_noise**2+Bz_noise**2) # [nT]
            # Augmentate data
            for i in range(N_augm):
                # Generate random noise matrices:
                new_Bx_wdw = self.Bx_wdw_RF1+np.random.normal(size=self.Bx_wdw_RF1.shape)*Bx_noise # [nT]
                new_By_wdw = self.By_wdw_RF1+np.random.normal(size=self.By_wdw_RF1.shape)*By_noise # [nT]
                new_Bz_wdw = self.Bz_wdw_RF1+np.random.normal(size=self.Bz_wdw_RF1.shape)*Bz_noise # [nT]
                new_B_wdw = self.B_wdw+np.random.normal(size=self.B_wdw.shape)*B_noise # [nT]
                # Augmentate windowed data:
                Bx_wdw_augm = np.vstack([Bx_wdw_augm,new_Bx_wdw]) # [nT]
                By_wdw_augm = np.vstack([By_wdw_augm,new_By_wdw]) # [nT]
                Bz_wdw_augm = np.vstack([Bz_wdw_augm,new_Bz_wdw]) # [nT]
                B_wdw_augm = np.vstack([B_wdw_augm,new_B_wdw]) # [nT]
                z_labels_augm = np.hstack([z_labels_augm,self.z_labels]) # [m]
            # Update attributes:    
            self.Bx_wdw_RF1 = Bx_wdw_augm # Update windowed data [nT]
            self.By_wdw_RF1 = By_wdw_augm # Update windowed data [nT]
            self.Bz_wdw_RF1 = Bz_wdw_augm # Update windowed data [nT]
            self.B_wdw = B_wdw_augm # Update windowed data [nT]
            self.z_labels = z_labels_augm # Update windowed data [m]

    def rotate_frame(self,RF_info):
        """
        Generates time windows in a rotated frame, labeled as "RFX" in the object's attributes.
        Only one rotated frame can be recorded at a time.        
        --- Inputs ---        
        {RF_info}: List [Numpy array,Numeric,String]; specifies the rotated frame that is currently stored. The elements
        of the list are the following: [rotation axis in (x,y,z) format,rotation angle in degree units,  RF name]. If
        RF_info[2] == 'RF1', then the data is kept in the original frame.
        """
        n_vec, theta_deg, RF_name = RF_info[0], RF_info[1], RF_info[2]
        self.rotFrame = [n_vec,theta_deg,RF_name] # RFX information: rotation axis, rotation angle [degree], name
        # Keep original frame:
        if RF_name == 'RF1':
            self.Bx_wdw_RFX = self.Bx_wdw_RF1
            self.By_wdw_RFX = self.By_wdw_RF1
            self.Bz_wdw_RFX = self.Bz_wdw_RF1
        else:
            # Rotate original information (RF1), window by window:
            # {vector}: Vector to be rotated, must be a numpy array with three rows (x,y,z).
            # Initiate:
            self.Bx_wdw_RFX = np.empty(shape=(self.Bx_wdw_RF1.shape))
            self.By_wdw_RFX = np.empty(shape=(self.By_wdw_RF1.shape))
            self.Bz_wdw_RFX = np.empty(shape=(self.Bz_wdw_RF1.shape))
            for i in range(len(self.Bx_wdw_RF1)):
                v_rot = rotate_3D(np.array([self.Bx_wdw_RF1[i],self.By_wdw_RF1[i],self.Bz_wdw_RF1[i]]),
                                  n_vec,theta_deg) # Rotated fields Bx', By', Bz' [nT]
                self.Bx_wdw_RFX[i] = v_rot[0] # [nT]
                self.By_wdw_RFX[i] = v_rot[1] # [nT]
                self.Bz_wdw_RFX[i] = v_rot[2] # [nT]
    
    def info(self):
        """
        Prints in screen a summary of the object.
        """
        print('-'*30)
        print('Time window object - Summary:')
        print('Name:',self.name)
        print('Points in each time window:',self.pp)
        print('Ground truth available:',self.gr_tr)
        if 'z_labels' in dir(self):
            print('Number of windows:',len(self.z_labels))
        print('Rotated Frame:',self.rotFrame[2])
        if 'augm_N' in dir(self):
            print(f'Data was augmented {self.augm_N} times.')
            print(f'Augmentation noise levels in RF1 (Bx,By,Bz): ({self.augm_noise}) nT')
        else:
            print('No data augmentation')
        print('-'*30)

    def plot_instances(self,stride_pp=None,start_wdw=0,RF=False,instances=5,
                       savefig=None,save_format='png'):
        """
        Plots some instances of time windows along with z-position labels.
        
        --- Inputs ---
        
        {stride_pp}: Integer; number of points by which the time windows' first point will be spaced in the plot.
        If None (default), then it will be chosen automatically.
        {start_pp}: Integer; first time window to start the plot. Default: 0 (first window).
        {RF_name}: Boolean; if False (default), the original rotational frame (RF1) is used. If True, then the
        current self.rotFrame is used.
        {instances}: Integer; number of instances to be plotted. Default: 5.
        {savefig}: String that allows to save the figure in .png format by default.
        {save_format}: Choose the saving format, 'png' by default, don't include the dot.   
        """
        # Prepare data:
        if stride_pp is None:
            stride_pp = max(1,round(self.pp*0.8))
        pp_i, pp_f = start_wdw, start_wdw+stride_pp*instances # Boundary indexes
        t_data = self.time_wdw[pp_i:pp_f:stride_pp] # [s]
        z_data = self.z_labels[pp_i:pp_f:stride_pp] # [m]
        Bx_data = self.Bx_wdw_RFX[pp_i:pp_f:stride_pp] # [nT]
        By_data = self.By_wdw_RFX[pp_i:pp_f:stride_pp] # [nT]
        Bz_data = self.Bz_wdw_RFX[pp_i:pp_f:stride_pp] # [nT]
        RF_title = self.rotFrame[2]
        # Plot figure:
        colors = plt.cm.viridis(np.linspace(0, 1, instances))
        fig, (ax_Bz, ax_By, ax_Bx, ax_z) = plt.subplots(4,1,figsize=(6,4))
        # Time windows:
        for i in range(instances):
            ax_Bx.scatter(t_data[i],Bx_data[i],color=colors[i],alpha=0.8)
            ax_By.scatter(t_data[i],By_data[i],color=colors[i],alpha=0.8)
            ax_Bz.scatter(t_data[i],Bz_data[i],color=colors[i],alpha=0.8)
            ax_z.scatter(t_data[i][-1],z_data[i],color=colors[i],alpha=0.8)
        # Floor references:
        label = 'Levels'
        for z in np.append(0,4.1+np.arange(0,3.7*6+0.1,3.7)):
            ax_z.axhline(z,ls='--',alpha=0.2,label=label)
            label = None # Prevent further labeling
        # Configuration:
        ax_z.legend(), ax_z.set(xlabel='Time [s]',ylabel='z labels [m]')
        ax_z.set_ylim([np.min(z_data)-1,np.max(z_data)+1]) # z limits [m]
        ax_z.spines['right'].set_visible(False), ax_z.spines['top'].set_visible(False)
        axes_B = [ax_Bx, ax_By, ax_Bz]; labels_B = ['Bx','By','Bz']
        for i in range(3):
            axes_B[i].set(ylabel=f'{labels_B[i]} [nT]')
            axes_B[i].set_xticks([])
            for spin in ['right','bottom','top']:
                axes_B[i].spines[spin].set_visible(False)
        for ax in axes_B+[ax_z]:
            ax.set_xlim([t_data[0][0]-1,t_data[-1][-1]+1])
        plt.suptitle(f'{self.name} ; Rotational Frame {RF_title}')
        fig.tight_layout()
        if savefig:
            save_file(savefig,save_format=save_format)
        plt.show()

    def matrix_format(self,mag_comps):
        """
        Returns a Numpy array with the magnetic signals in the current rotated frame RFX, in a matrix format
        suitable for Machine Learning models.
        
        --- Inputs ---
        
        {mag_comps}: List; each element is a string representing the magnetic components that will be used, must choose
        from options "Bx", "By", "Bz" and/or "B".
        
        --- Outputs ---
        
        Numpy array with shape [samples,time_wdw_pp,channels], in [nT] units, each channel is a magnetic component.
        The order of magnetic components is always Bx, By, Bz, B (ignoring not required components).
        """
        mag_data = [] # Initiate list with all magnetic numpy arrays
        if "Bx" in mag_comps:
            mag_data.append(self.Bx_wdw_RFX)
        if "By" in mag_comps:
            mag_data.append(self.By_wdw_RFX)
        if "Bz" in mag_comps:
            mag_data.append(self.Bz_wdw_RFX)
        if "B" in mag_comps:
            mag_data.append(self.B_wdw)
            
        return np.stack(mag_data,axis=2)

In [5]:
def copy_and_reduce_TimeWdw(time_wdw,p_train):
    """
    Copy the basic attributes of a Time_Wdw object, except for its time windows if any,
    reduce the original data to a fraction (cutting off from the last chronological data)
    and returns a new Time_Wdw object. Ignores any rotated data.
    {time_wdw}: Time Window object; information must be already stored, except for time
    windows (which will not be copied).
    {p_train}: Float; fraction between 0 and 1 that sets how much of the original training
    dataset is going to be preserved.
    """
    # Initiate object and copy attributes:
    new_time_wdw = Time_Wdw(time_wdw.pp, time_wdw.name, gr_tr=time_wdw.gr_tr)
    N_max = int(len(time_wdw.time)*p_train) # Last point to be preserved
    new_time_wdw.time = time_wdw.time[:N_max] # [s]
    new_time_wdw.Bx_RF1 = time_wdw.Bx_RF1[:N_max] # [nT]
    new_time_wdw.By_RF1 = time_wdw.By_RF1[:N_max] # [nT]
    new_time_wdw.Bz_RF1 = time_wdw.Bz_RF1[:N_max] # [nT]
    new_time_wdw.B_RF1 = time_wdw.B_RF1[:N_max] # [nT]
    new_time_wdw.z = time_wdw.z[:N_max] # Z-position [m]
    new_time_wdw.norm_value = time_wdw.norm_value # [nT]

    return new_time_wdw     

In [6]:
def summarize_TW_segments(t_wdws):
    """
    Summarizes the segment distribution for time windows.
    --- Inputs ---
    {t_wdws}. Type: List, each element is a Time_Wdw object. 
    Definition: List containing all time windows associated to different segments.
    All Time Windows must have the same number of points.
    """
    N = 0 # Initiate total number of points for the t_wdws collection
    for wdw in t_wdws:
        print(f'{wdw.name}: {len(wdw.z)} points, {int(len(wdw.z)/600)} min')
        N += len(wdw.z)
    print(f'Total points/time: {N} / {np.round(N/36000,2)} hours')

In [7]:
import tensorflow as tf
import keras
from keras.models import Sequential
from keras.layers import InputLayer,Reshape,Dense,Dropout,Conv1D,MaxPooling1D,Flatten,GlobalAveragePooling1D
from keras.backend import count_params# as K
from keras.regularizers import l2

2024-10-18 16:40:47.247972: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-10-18 16:40:47.273591: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-10-18 16:40:47.273625: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-10-18 16:40:47.274471: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-10-18 16:40:47.278815: I tensorflow/core/platform/cpu_feature_guar

In [8]:
class ML_Model:
    """
    The Machine Learning Model object <ML_Model> contains an supervised artificial intelligence model
    that can be trained and tested with <Time_Wdw> objects. It predicts the z-position of the elevator
    based on magnetic field signals. It can work in arbitrary rotational frames.

    --- Attributes ---
    
    {self.hyps}: Dictionary; it contains all the hyper-parameters information to build the Keras model, 
    with the following keys and value examples:
        * "Time_Window_pp": 20
        * "Magnetic_Components": ["Bx","Bz"] # Order does not matter
        * "Loss_Function": "MAE"
        * "Last_Activation_Function": 'linear'
        * "Batch_Size": 128,
        * "Epochs": 50
        * "Training_p_val": 0.2 # Validation dataset fraction
        * "Early_Stop_Monitor": "val_loss" # Criterium for early stopping during training.
        * "Earlt_Stop_Min_Delta": 0 # Minimum improvement to prevent early stopping.
        * "Early_Stop_Patience": 10 # Epochs during which the early stopping happens if no
        improvement is registered.
        * "Early_Stop_Start_From_Epoch": 20 # First epochs in which early stopping can't occur.
        * "Early_Stop_Restore_Best_Weights": True # Allows to restore best weights when the
        training is early stopped.
        * "z_thres": 1 # z-value threshold for accuracy, in [m]
        * "Activation_Function": 'tanh'
        * "Optimizer": 'adam'
        * "Learning_Rate": 0.0003
        * "RF": [[np.array([0.41,0.75,0.52]),90,'RF2']] # List with rotational frames, each element is a list with
        the format [Numpy array,String,Numeric]; meaning [rotation axis in (x,y,z) format, rotation angle in degree,
        RF name]. If RF name is 'RF1', then the data in kept in the original frame (defined as "RF1").
        * "Convolutional_Network": True # Allow to use Convolutional Layers before the Dense layers
        * "Conv_Layers": [[64,5],[128,5]] # List with Conv. layers, in order, in the format [filters,kernel size]
        * "Pool_Layers": [None,None] # List with pooling layers, in order, each element is the size (None=ignore layer)
        * "Flatten_Average": If True, just flattens the input data. If False, makes a global averaging for the many Conv filters.
        * "Dens_Layers": [1000,500] # List with dense layers, in order, each element is the number of neurons
        * "Dropout_Fraction": 0.2 # Fraction for dropout layers after each dense layer (0=ignore layer)
        * "Model_Name": "S1_Conv_5_5" # Usually contains the Stage and architecture information
        * "Full_Name": "S1_Conv_5_5_wdw2s_Bx_By_" # Contains all the information
        * "Train_Segms": "segm1segm3" # Specifies the segments used in training.
        * "Test_Segms": "segm1segm3" # Specifies the segments used in testing.
        * "p_train": 1 # Float between 0 and 1; fraction of the training dataset that is going
        to be used. If 1: entire training dataset.
        * "seed": 0 # Random initialization seed.
        * "N_augm": 1 # Number of times the original training dataset will be augmented.
        * "noise": [1,1,1] # List comprised of 3 float numbers; noise intensity for data
        augmentation in the [Bx,By,Bz] components, in [nT] units. Only relevant if N_augm>0.
    {self.model}: Keras object; Machine Learning model.
    {self.seed}: Integer; seed used in weights initialization when building the model.
    """
    def __init__(self, full_hyp, seed=0, load_model=None):
        """
        --- Inputs ---
        {full_hyp}: Dictionary with hyperparameters information.
        {load_model}: None (default) or Keras object. If None, starts a new model. Else, load a previous model.
        {seed}: Integer; seed for weight initialization. Default: 0.
        """
        self.hyps = full_hyp # Incorporate hyper-parameters information
        self.seed = seed # Record seed
        # Start building model:
        if load_model is not None:
            self.model = tf.keras.models.load_model(load_model)
        else:
            self.model = Sequential() # Initiate Keras object
            self.model.add(InputLayer(input_shape=(self.hyps["Time_Window_pp"],
                                                   len(self.hyps["Magnetic_Components"])))) # First (input) layer
            # Add Convolutional, Pooling and Flattening layers (if any):
            if self.hyps["Convolutional_Network"]:
                for i in range(len(self.hyps["Conv_Layers"])):
                    self.model.add(Conv1D(
                        filters=self.hyps["Conv_Layers"][i][0],
                        kernel_size=self.hyps["Conv_Layers"][i][1],
                        activation=self.hyps["Activation_Function"],
                        kernel_initializer=tf.keras.initializers.GlorotUniform(seed=self.seed),
                        kernel_regularizer=l2(0.01), bias_regularizer=l2(0.01)
                    ))
                    if self.hyps["Pool_Layers"][i] is not None:
                        self.model.add(MaxPooling1D(pool_size=self.hyps["Pool_Layers"][i]))
                self.model.add(Flatten()) if self.hyps["Flatten_Average"] else self.model.add(GlobalAveragePooling1D()) 
            else:
                self.model.add(Flatten())
            # Add Dense layers (and Dropout, if any):
            for i in range(len(self.hyps["Dens_Layers"])):
                self.model.add(Dense(
                    self.hyps["Dens_Layers"][i],
                    activation=self.hyps["Activation_Function"],
                    kernel_initializer='random_normal'))
                if self.hyps["Dropout_Fraction"]:
                    self.model.add(Dropout(self.hyps["Dropout_Fraction"]))
            # Last layers:
            self.model.add(Dense(1, activation=self.hyps["Last_Activation_Function"],
                                kernel_regularizer=l2(0.01), bias_regularizer=l2(0.01)
                                ))
            # Compile model:
            self.model.compile(
                loss=self.hyps["Loss_Function"],
                optimizer=self.hyps["Optimizer"],
            )
    
    def info(self):
        """
        Prints in screen information about the ML model.
        """
        print('-'*30)
        print('Full name:',self.hyps["Full_Name"])
        print("Time windows' points:",self.hyps["Time_Window_pp"])
        print("Input magnetic components:",self.hyps["Magnetic_Components"])
        print("Model's trainable parameters:",np.sum([count_params(w) for w in self.model.trainable_weights]))
        print('-'*30)
    
    def train_model(self,time_wdws,RF_info,savefigs_path=None,save_format='png',
                    exp_model_path=None):
        """
        Trains the ML model based on provided <Time_Wdw> objects and specified rotational frame. Data is randomly
        split between training and validation datasets.
        
        --- Inputs ---
        
        {time_wdws}: List; each element is a <Time_Wdw> object.
        {RF_info}: List; each element is a rotational frame, a list with the format [Numpy array,Numeric,String];
        meaning [rotation axis in (x,y,z) format, rotation angle in degree, RF name]. If RF[2]='RF1', then the
        data is kept in the original frame.
        {savefigs_path}: String; if specified, saves the training figures in .png format by default.
        {save_format}: String; Choose the saving format, 'png' by default, don't include the dot.
        {exp_model_path}: String; if specified, exports the ML model.
        
        --- Outputs ---
        
        {results}: Dictionary; contains information about the model and its results.
        """
        # Prepare data according to magnetic components and rotational frame:
        mag_data, z_data = [], [] # Initiate predictors and targets 
        for time_wdw in time_wdws:
            time_wdw.rotate_frame(RF_info)
            mag_data.append(time_wdw.matrix_format(self.hyps["Magnetic_Components"])) # Array format [samples,wdw_pp,channels], units [nT]
            z_data.append(time_wdw.z_labels) # z-position labels, units [m]
        all_mag_data = np.concatenate(mag_data,axis=0) # Concatenate all arrays, units [nT]
        all_z_data = np.concatenate(z_data,axis=0) # Concatenate all arrays, units [m]
        # Normalize magnetic data:
        all_mag_data /= time_wdws[0].norm_value # Adimensional units
        # Split training/validating datasets:
        msk = np.random.rand(all_mag_data.shape[0]) < 1-self.hyps["Training_p_val"] # Mask for training indexes
        X_train, Y_train = all_mag_data[msk], all_z_data[msk] # Training predictors and targets
        X_val, Y_val = all_mag_data[~msk], all_z_data[~msk] # Validating predictors and targets
        # Prepare callbacks:
        early_stop = keras.callbacks.EarlyStopping(monitor=self.hyps["Early_Stop_Monitor"],
                                                   min_delta=self.hyps["Early_Stop_Min_Delta"],
                                                   mode='min',
                                                   patience=self.hyps["Early_Stop_Patience"],
                                                   start_from_epoch=self.hyps["Early_Stop_Start_From_Epoch"],
                                                   restore_best_weights=self.hyps["Early_Stop_Restore_Best_Weights"],
                                                   verbose=1
                                                  )
        # Train the model:
        with tf.device('/GPU:0'):
            history = self.model.fit(X_train,Y_train,
                                     validation_data=(X_val, Y_val),
                                     epochs=self.hyps["Epochs"],
                                     callbacks=[plot_losses,early_stop],
                                     batch_size=self.hyps["Batch_Size"],
                                     verbose=1)
        # Save the training figure:
        history_frame = pd.DataFrame(history.history) # Generate training dataframe
        epochs = range(1,len(history_frame)+1) # (Range) Effective epochs
        if savefigs_path is not None:
            fig_name = f'Training_{self.hyps["Full_Name"]}'
            fig, ax = plt.subplots(figsize=(6,3))
            ax.plot(epochs,history_frame['val_loss'],'-s', label="val_loss", color='gray', alpha=0.8)
            ax.plot(epochs,history_frame['loss'],'-o', label="loss", color='brown', alpha=0.8)
            # ax.axhline(2,ls='--',color='orange'), 
            # ax.axhline(1,ls='--',color='g')
            ax.set(xlabel='Epochs',ylabel='Loss (MAE) [m]')
            ax.legend()
            fig.tight_layout()
            save_file(savefigs_path+fig_name,save_format=save_format)
            plt.close(fig)

        # Predictions on validation dataset:
        z_val_pred = np.squeeze(self.model.predict(X_val,verbose=1)) # Get the predictions [m]
        AE_val = np.transpose(np.abs(Y_val-z_val_pred)) # Array with absolute errors [m]
        # Calculate accuracy according to different threshold:
        acc_z = sum(AE_val<self.hyps["z_thres"])/len(AE_val)*100 # [%]
        print('-'*30)
        print(f'VALIDATING; Accuracy using {self.hyps["z_thres"]}m threshold: {np.round(acc_z,1)}%')
        print('-'*30)
        
        # Save histogram-results figure (validation dataset):
        # if savefigs_path is not None:
        #     weights = np.ones_like(AE_val) / len(AE_val)
        #     fig_name = f'Preds_Hist_Val_{self.hyps["Full_Name"]}_{RF_info[2]}_seed{self.seed}'
        #     fig_name += f'_pp{self.hyps["Time_Window_pp"]}'
        #     fig, ax = plt.subplots(figsize=(6,3))
        #     ax.hist(AE_val, weights=weights, bins=np.arange(0, max(AE_val) + 0.25, 0.25),
        #             color='gray', alpha=0.8, label = "Counts")
        #     ax.hist(AE_val, weights=weights, cumulative=True, bins=np.arange(0, max(AE_val) + 0.25, 0.25),
        #             color='green', alpha=0.2, label = "Cumulative")
        #     ax.axvline(2,ls='--',color='orange', label="Minimum AE")
        #     ax.axvline(1,ls='--',color='g', label="Optimal AE")
        #     ax.legend()
        #     ax.set(xlabel='Absolute Error [m]',ylabel='Prob. Density')
        #     fig.tight_layout()
        #     save_file(savefigs_path+f'Preds_Hist_Val_{self.hyps["Full_Name"]}',
        #               save_format=save_format)
        #     plt.close(fig)
            
        # Export the ML model:
        if exp_model_path is not None:
            self.model.save(exp_model_path+self.hyps["Full_Name"]+'.keras')
      
        # Generate dictionary with results:
        res_train = { # First basic parameters:
                "Full_Name": self.hyps["Full_Name"],
                "Wdw_pp": self.hyps["Time_Window_pp"],
                "Mag_Comps": "".join(self.hyps["Magnetic_Components"]),
                "RF": RF_info[2],
                "Seed": self.seed,
                "Train_Segms": self.hyps["Train_Segms"],
                "Batch": self.hyps["Batch_Size"],
                "Epochs": epochs[-1],
                "p_val": self.hyps["Training_p_val"],
                "Activ_func": self.hyps["Activation_Function"],
                "Optimizer": self.hyps["Optimizer"],
                "Learn_rate": self.hyps["Learning_Rate"],
                "Model_Name": self.hyps["Model_Name"],
                "Best_MAE_val": min(history_frame['val_loss']), # [m]
                "Best_MAE_train": min(history_frame['loss']), # [m]
                "z_thres": self.hyps["z_thres"], # [m]
                "Acc_Val_z": acc_z, # [%]
                "Trainable_pars": np.sum([count_params(w) for w in self.model.trainable_weights])
                }
        # Then optional parameters:
        for par in ['p_train','N_augm','noise']:
            if par in self.hyps:
                if par=='noise':
                    res_train['Bx_noise'] = self.hyps[par][0]
                    res_train['By_noise'] = self.hyps[par][1]
                    res_train['Bz_noise'] = self.hyps[par][2]
                else:
                    res_train[par] = self.hyps[par]
        
        return res_train
    
    def test_model(self,time_wdws,RF_info,return_preds=False,savefigs_path=None,save_format='png'):
        """
        Tests the ML model (already trained) on provided <Time_Wdw> objects and specified
        rotational frame. Returns the performance results and, if requested, the ground truth
        and predicted arrays.
        
        --- Inputs ---
        
        {time_wdws}: List; each element is a <Time_Wdw> object.
        {RF_info}: List; each element is a rotational frame, a list with the format [Numpy array,Numeric,String];
        meaning [rotation axis in (x,y,z) format, rotation angle in degree, RF name]. If RF[2]='RF1', then the
        data is kept in the original frame.
        {return_preds}: Boolean; if True, returns the ground truth and predicted arrays. Default: False.
        {savefigs_path}: String; if specified, saves the training figures in .png format by default.
        {save_format}: String; Choose the saving format, 'png' by default, don't include the dot.
        
        --- Outputs ---
        
        {results}: Dictionary; contains information about the model and its results.
        Optional:
        {t_data}: List; each element is a Numpy array representing the time, correlated with
        z-position, for a single segment. Units [s].
        {z_data}: List; each element is a Numpy array representing the ground truth for z-position
        for a single segment. Units [m].
        {z_preds}: List; each element is a Numpy array representing the predictions for z-position
        for a single segment. Units [m].
        """
        # Prepare data according to magnetic components and rotational frame:
        segms = [time_wdw.name for time_wdw in time_wdws]
        self.hyps["Test_Segms"] = "".join(segms)
        mag_data, z_data, t_data = [], [], [] # Initiate predictors, targets and time
        for time_wdw in time_wdws:
            time_wdw.rotate_frame(RF_info)
            mag_data.append(time_wdw.matrix_format(self.hyps["Magnetic_Components"])) # Array format [samples,wdw_pp,channels], units [nT]
            z_data.append(time_wdw.z_labels) # z-position labels, units [m]
            t_data.append(time_wdw.time[time_wdw.pp:]) # Time, units [s]
        all_mag_data = np.concatenate(mag_data,axis=0) # Concatenate all arrays, units [nT]
        all_z_data = np.concatenate(z_data,axis=0) # Concatenate all arrays, units [m]
        all_t_data = np.concatenate(t_data,axis=0) # Concatenate all arrays, units [s]
        # Normalize magnetic data:
        all_mag_data /= time_wdws[0].norm_value # Adimensional units
        for vec in mag_data:
            vec /= time_wdws[0].norm_value # Adimensional units
        
        # Obtain predictions and calculate performance metrics for ALL data:
        z_pred = np.squeeze(self.model.predict(all_mag_data,verbose=1)) # Get the predictions [m]
        AE_test = np.transpose(np.abs(all_z_data-z_pred)) # Array with absolute errors [m]
        # Calculate accuracy according to different threshold:
        acc_z = sum(AE_test<self.hyps["z_thres"])/len(AE_test)*100 # [%]
        print('-'*30)
        print(f'TESTING; Accuracy using {self.hyps["z_thres"]}m threshold: {np.round(acc_z,1)}%')
        print('-'*30)
        # Obtain predictions for each segment:
        z_preds = [np.squeeze(self.model.predict(mag_data[i],verbose=2)) for i in range(len(mag_data))] # [m]

        # Save the predictions figure:
        if savefigs_path is not None:
            for i in range(len(t_data)):
                fig_name = f'Testing_{self.hyps["Full_Name"]}_{time_wdws[i].name}'
                fig, ax = plt.subplots(figsize=(8,3))
                ax.plot(t_data[i],z_data[i],'--',color='teal',lw=0.5,label='Actual',alpha=0.8)
                ax.plot(t_data[i],z_preds[i],'-r',lw=0.5,label='Predictions',alpha=0.8)
                for z in np.append(0,4.1+np.arange(0,3.7*6+0.1,3.7)):
                    ax.axhline(z,ls='-',lw=0.5,alpha=0.2)         
                ax.set(xlabel='Time [s]',ylabel='z-pos [m]')
                ax.legend()
                fig.tight_layout()
                save_file(savefigs_path+fig_name,save_format=save_format)
                plt.show()

        # Save histogram-results figure (testing dataset):
        # if savefigs_path is not None:
        #     weights = np.ones_like(AE_test) / len(AE_test)
        #     fig_name = f'Preds_Hist_Test_{self.hyps["Full_Name"]}_{RF_info[2]}_seed{self.seed}'
        #     fig_name += f'_pp{self.hyps["Time_Window_pp"]}'
        #     fig, ax = plt.subplots(figsize=(6,3))
        #     ax.hist(AE_test, weights=weights, bins=np.arange(0, max(AE_test) + 0.25, 0.25),
        #             color='gray', alpha=0.8, label = "Counts")
        #     ax.hist(AE_test, weights=weights, cumulative=True, bins=np.arange(0, max(AE_test) + 0.25, 0.25),
        #             color='green', alpha=0.2, label = "Cumulative")
        #     ax.axvline(2,ls='--',color='orange', label="Minimum AE")
        #     ax.axvline(1,ls='--',color='g', label="Optimal AE")
        #     ax.legend()
        #     ax.set(xlabel='Absolute Error [m]',ylabel='Prob. Density')
        #     fig.tight_layout()
        #     save_file(savefigs_path+f'Preds_Hist_Test_{self.hyps["Full_Name"]}',
        #               save_format=save_format)
        #     plt.show()

        res_test = {
            "Test_Segms": self.hyps["Test_Segms"],
            "MAE_test": AE_test.mean(),
            "Acc_Test_z": acc_z,
        }
        if return_preds:
            return res_test, t_data, z_data, z_preds
        else:
            return res_test

In [9]:
from IPython.display import clear_output

class PlotLosses(keras.callbacks.Callback):
    """
    This class will be used to plot live charts of the training process.
    """
    def on_train_begin(self, logs={}):
        self.i = 0
        self.x = []
        self.losses = []
        self.val_losses = []        
        self.logs = []

    def on_epoch_end(self, epoch, logs={}):        
        self.logs.append(logs)
        self.x.append(self.i+1)
        self.losses.append(logs.get('loss'))
        self.val_losses.append(logs.get('val_loss'))
        self.i += 1
        self.fig = plt.figure(figsize=(6,3))
        clear_output(wait=True)
        if not all(elem is None for elem in self.val_losses):
            plt.plot(self.x, self.val_losses,'-s', label="val_loss", color='gray', alpha=0.8)        
        plt.plot(self.x, self.losses,'-o', label="loss", color='brown', alpha=0.8)
        # plt.axhline(1,ls='--',color='g')
        plt.legend()
        plt.xlabel("Epochs"), plt.ylabel("Loss (MAE) [m]")
        #plt.yscale('log') 
        plt.show()
        
plot_losses = PlotLosses()

In [10]:
def plot_training_results(df_results,test_col,val_col,groupby=None):
    """
    Plots the validating and testing results, to show overfitting or 
    underfitting. If requested, the plot will be replicated for different
    grouping features.
    --- Inputs ---
    {df_results}: Type: Dataframe. It must have the columns {test_col}
    and {val_col}.
    {test_col}: String; name for testing results. Content must be in [%] units.
    {val_col}: String; name for validating results. Content must be in [%] units.
    {groupby}: List or None. If a list is provided, each element must be a
    feature present in {df_results}. For each feature, a scoring plots will
    be shown, grouped by that feature.
    """
    if groupby is not None:
        for feature in groupby:
            group_name = df_results[feature].unique()
            colors = plt.cm.viridis(np.linspace(0, 1,len(group_name)+1))
            fig, ax = plt.subplots(figsize=(6,4))    
            # Plot results:
            for i, group in enumerate(group_name):
                data = df_results[df_results[feature]==group]
                ax.plot(data[test_col],data[val_col],
                        markers[i%len(markers)],color=colors[i],
                        alpha=0.7,label=group)
            # Plot identity line:
            xmin,xmax = ax.get_xlim()    
            ID = np.linspace(xmin,xmax,3)
            ax.plot(ID,ID,'--k')
            ax.set(xlabel="Testing Accuracy [%]",ylabel="Training Accuracy")
            ax.set_title(f'Results grouped by {feature}')
            ax.legend(title=feature)
            fig.tight_layout()
            plt.show()
    else:
        fig, ax = plt.subplots(figsize=(6,4))    
        # Plot results:
        ax.scatter(df_results[test_col],df_results[val_col],
                   color='orange',edgecolor='gray',alpha=0.6)
        # Plot identity line:
        xmin,xmax = ax.get_xlim()    
        ID = np.linspace(xmin,xmax,3)
        ax.plot(ID,ID,'--k')
        ax.set(xlabel="Testing Accuracy [%]",ylabel="Training Accuracy")
        ax.text(xmin,max(df_results[val_col])*0.95, "Overfitting ", ha='left')
        ax.text(xmax, ax.get_ylim()[0]*1.01, "Underfitting ", ha='right')
        ax.set_title('All results')
        fig.tight_layout()
        plt.show()

In [11]:
def train_stage1(B_comps_opts,arch_opts,seed_opts,RF_opts,
                 t_wdws_train,t_wdws_test,savefigs_path=None,
                 exp_model_path=None,results_path='./',
                 check_rep_model=None,quick_timing_test=False):
    """
    Training procedure for Stage 1, focusing on magnetic components. It trains many
    ML models using different options for magnetic components, architectures, seeds
    and rotational frames. It returns and exports a dataframe with the results.

    --- Inputs ---

    {B_comps_opts}: List; each element is a list in which the different magnetic
    components are included as elements. Options: 'Bx', 'By', 'Bz', 'B'.
    {arch_opts}: List; each element is a dictionary with the ML model hyper-parameters,
    see {self.hyps} attribute for <ML_Model> object.
    {seed_opts}: List; each element is an integer meaning a random initialization seed.
    {RF_opts}: List; each element is a list with the format [Numpy array,Numeric,String];
    meaning [rotation axis in (x,y,z) format, rotation angle in degree, RF name].
    If RF[2]='RF1', then the data is kept in the original frame.
    {t_wdws_train}: List; each element is a <Time_Wdw> object that will be used for
    the training process.
    {t_wdws_test}: List; each element is a <Time_Wdw> object that will be used for
    the testing process.
    {savefigs_path}: String; if specified, saves the training figures in .png format by default.
    {save_format}: String; Choose the saving format, 'png' by default, don't include the dot.
    {exp_model_path}: String; if specified, exports the ML model.
    {results_path}: String; path for exporting results. Default: current directory.
    {check_rep_model}: String or None; if provided, use a .csv file as a reference, same format
    as the output of this function, to skip any already trained model. Default: None.
    {quick_timing_test}: Boolean. If True, ONLY runs a dry test (no savings of any type) for 5 epochs
    and estimate the total training times.

    --- Outputs ---

    {pd_results}: Pandas dataframe; information about the ML models' results.
    
    """
    # If provided, load the reference file:
    if check_rep_model is not None:
        pd_ref = pd.read_csv(check_rep_model)
        trained_models = list(pd_ref["Full_Name"]) # Extract the full names of trained ML models
    else:
        trained_models = []
    # Initiate dataframe:
    pd_results = pd.DataFrame(columns=['Full_Name','Wdw_pp','Mag_Comps','RF','Seed',
                                       'Train_Segms','Test_Segms',
                                       'Batch','Epochs','p_val','Activ_func','Optimizer',
                                       'Learn_rate','Model_Name','Best_MAE_val',
                                       'Best_MAE_train','MAE_test',
                                       'z_thres','Acc_Val_z','Acc_Test_z'])
    # Prepare time windows:
    segms_train = [time_wdw.name for time_wdw in t_wdws_train] # List with training segments' names
    segms_test = [time_wdw.name for time_wdw in t_wdws_test] # List with testing segments' names
    for subset in [t_wdws_train,t_wdws_test]:
        for t_wdw in subset: 
            t_wdw.window_data()

    # Train all models:    
    N_models = 0 # Initiatie auxiliar count
    for B_comps in B_comps_opts: # Iterate over magnetic components
        for arch in arch_opts: # Iterate over ML architectures
            for RF in RF_opts: # Iterate over rotational frames
                for seed in seed_opts: # Iterate over random initialization seeds
                    # Prepare hyper-parameter options for model:
                    full_hyp = arch.copy() # Initiate Full Hyper-parameters dictionary
                    full_hyp["Magnetic_Components"] = B_comps
                    full_hyp["Full_Name"] = arch["Model_Name"]+'_'+'_'.join([b for b in B_comps])
                    full_hyp["Full_Name"] += f'_{RF[2]}_seed{seed}_{"".join(segms_train)}'
                    # Train the model (if not trained yet):
                    if full_hyp["Full_Name"] not in trained_models:
                        N_models += 1 # Update number of trainable models
                        full_hyp["Train_Segms"] = "".join(segms_train)
                        full_hyp["Test_Segms"] = "".join(segms_test)
                        if not quick_timing_test:
                            # Train the model:
                            keras.backend.clear_session()
                            ML_model = ML_Model(full_hyp,seed=seed)
                            res_train = ML_model.train_model(t_wdws_train,RF,savefigs_path=savefigs_path,
                                                             exp_model_path=exp_model_path)
                            # Test the model:
                            res_test = ML_model.test_model(t_wdws_test,RF,savefigs_path=savefigs_path)
                            # Update results:
                            results = dict(list(res_train.items())+list(res_test.items()))
                            pd_results.loc[len(pd_results)] = results
                            # Export .csv file (overwrites in everystep):
                            name = f'AI_S1_{len(B_comps_opts)}magcomps_{len(seed_opts)}seeds'
                            name += f'_{len(RF_opts)}RFs_wdw{t_wdws_train[0].pp}pp.csv'
                            pd_results.to_csv(results_path+name, index=False)
    if quick_timing_test:
        print('-'*10,' QUICK TIMING TEST MODE ','-'*10)
        print('Running dry test for 5 epochs...')
        time.sleep(1)
        # Only train last ML model:
        full_hyp['Epochs'] = 5
        start = time.time()
        keras.backend.clear_session()
        ML_model = ML_Model(full_hyp,seed=seed)
        _ = ML_model.train_model(t_wdws_train,RF)
        _ = ML_model.test_model(t_wdws_test,RF)
        end = time.time()
        elapsed_time = end - start # [s]
        single_model_est_time = elapsed_time/5*arch['Epochs']/60 # [min]
        # Print estimated times:
        print('\n','-'*40,'\n')
        print(f'Trainable models in this training session:',N_models)
        print(f'Estimated training time per model: {np.round(single_model_est_time,1)} min')
        print(f'Estimated total time: {np.round(single_model_est_time*N_models/60,1)} hours')
        print('\n','-'*10,' IMPORTANT ','-'*10,'\n')
        print('If you want to train all models, select quick_timing_test=False')
            
    return pd_results

In [12]:
def train_stage2(pp_opts,arch_opts,seed_opts,RF_opts,
                 t_wdws_train,t_wdws_test,savefigs_path=None,
                 exp_model_path=None,results_path='./',
                 check_rep_model=None,quick_timing_test=False):
    """
    Training procedure for Stage 2, focusing on time windows' length. It trains many
    ML models using different options for magnetic components, architectures, seeds
    and rotational frames. It returns and exports a dataframe with the results.

    --- Inputs ---

    {pp_opts}: List; each element is an integer meaning the number of points for each
    time window.
    {arch_opts}: List; each element is a dictionary with the ML model hyper-parameters,
    see {self.hyps} attribute for <ML_Model> object.
    {seed_opts}: List; each element is an integer meaning a random initialization seed.
    {RF_opts}: List; each element is a list with the format [Numpy array,Numeric,String];
    meaning [rotation axis in (x,y,z) format, rotation angle in degree, RF name].
    If RF[2]='RF1', then the data is kept in the original frame.
    {t_wdws_train}: List; each element is a <Time_Wdw> object that will be used for
    the training process. The number of points for each time window doesn't matter.
    {t_wdws_test}: List; each element is a <Time_Wdw> object that will be used for
    the testing process. The number of points for each time window doesn't matter.
    {savefigs_path}: String; if specified, saves the training figures in .png format by default.
    {save_format}: String; Choose the saving format, 'png' by default, don't include the dot.
    {exp_model_path}: String; if specified, exports the ML model.
    {results_path}: String; path for exporting results. Default: current directory.
    {check_rep_model}: String or None; if provided, use a .csv file as a reference, same format
    as the output of this function, to skip any already trained model. Default: None.    
    {quick_timing_test}: Boolean. If True, ONLY runs a dry test (no savings of any type) for 5 epochs
    and estimate the total training times.
    
    --- Outputs ---

    {pd_results}: Pandas dataframe; information about the ML models' results.
    
    """
    # If provided, load the reference file:
    if check_rep_model is not None:
        pd_ref = pd.read_csv(check_rep_model)
        trained_models = list(pd_ref["Full_Name"]) # Extract the full names of trained ML models
    else:
        trained_models = []
    # Initiate dataframe:
    pd_results = pd.DataFrame(columns=['Full_Name','Wdw_pp','Mag_Comps','RF','Seed',
                                       'Train_Segms','Test_Segms',
                                       'Batch','Epochs','p_val','Activ_func','Optimizer',
                                       'Learn_rate','Model_Name','Best_MAE_val',
                                       'Best_MAE_train','MAE_test',
                                       'z_thres','Acc_Val_z','Acc_Test_z'])
    # First, get the segments' names (time windows):
    segms_train = [time_wdw.name for time_wdw in t_wdws_train] # List with training segments' names
    segms_test = [time_wdw.name for time_wdw in t_wdws_test] # List with testing segments' names

    # Train all models:
    N_models = 0 # Initiatie auxiliar count
    for pp in pp_opts: # Iterate over number of points for time windows
        # Iterate over ML architectures
        for arch in arch_opts:
            # Only train architectures which are valid for the current number of points:
            if arch["Convolutional_Network"]:
                if np.sum([filt_kern[1] for filt_kern in arch["Conv_Layers"]])>pp:
                    continue # Skip this training
            for RF in RF_opts: # Iterate over rotational frames
                for seed in seed_opts: # Iterate over random initialization seeds
                    # Prepare hyper-parameter options for model:
                    full_hyp = arch.copy() # Initiate Full Hyper-parameters dictionary
                    full_hyp["Time_Window_pp"] = pp
                    full_hyp["Full_Name"] = arch["Model_Name"]+f'_{pp}pp'
                    full_hyp["Full_Name"] += f'_{RF[2]}_seed{seed}_{"".join(segms_train)}'
                    # Train the model (if not trained yet):
                    if full_hyp["Full_Name"] not in trained_models:
                        N_models += 1 # Update number of trainable models
                        full_hyp["Train_Segms"] = "".join(segms_train)
                        full_hyp["Test_Segms"] = "".join(segms_test)
                        if not quick_timing_test:
                            # Prepare Time windows:
                            for t_wdw in t_wdws_train:
                                t_wdw.pp = pp # Define time window's points
                                t_wdw.window_data() # Window data
                            for t_wdw in t_wdws_test:
                                t_wdw.pp = pp # Define time window's points
                                t_wdw.window_data() # Window data
                            # Train the model:
                            keras.backend.clear_session()
                            ML_model = ML_Model(full_hyp,seed=seed)
                            res_train = ML_model.train_model(t_wdws_train,RF,savefigs_path=savefigs_path,
                                                             exp_model_path=exp_model_path)
                            # Test the model:
                            res_test = ML_model.test_model(t_wdws_test,RF,savefigs_path=savefigs_path)
                            # Update results:
                            results = dict(list(res_train.items())+list(res_test.items()))
                            pd_results.loc[len(pd_results)] = results
                            # Export .csv file (overwrites in everystep):
                            B_comps = ''.join([b for b in full_hyp["Magnetic_Components"]])
                            name = f'AI_S2_{len(pp_opts)}ppOptions_{len(seed_opts)}seeds'
                            name += f'_{len(RF_opts)}RFs_comps{B_comps}.csv'
                            pd_results.to_csv(results_path+name, index=False)

    if quick_timing_test:
        print('-'*10,' QUICK TIMING TEST MODE ','-'*10)
        print('Running dry test for 5 epochs...')
        time.sleep(1)
        # Only train last ML model:
        full_hyp['Epochs'] = 5
        start = time.time()
        keras.backend.clear_session()
        # Prepare Time windows:
        for t_wdw in t_wdws_train:
            t_wdw.pp = pp # Define time window's points
            t_wdw.window_data() # Window data
        for t_wdw in t_wdws_test:
            t_wdw.pp = pp # Define time window's points
            t_wdw.window_data() # Window data
        # Train model:
        ML_model = ML_Model(full_hyp,seed=seed)
        _ = ML_model.train_model(t_wdws_train,RF)
        _ = ML_model.test_model(t_wdws_test,RF)
        end = time.time()
        elapsed_time = end - start # [s]
        single_model_est_time = elapsed_time/5*arch['Epochs']/60 # [min]
        # Print estimated times:
        print('\n','-'*40,'\n')
        print(f'Trainable models in this training session:',N_models)
        print(f'Estimated training time per model: {np.round(single_model_est_time,1)} min')
        print(f'Estimated total time: {np.round(single_model_est_time*N_models/60,1)} hours')
        print('\n','-'*10,' IMPORTANT ','-'*10,'\n')
        print('If you want to train all models, select quick_timing_test=False')
            
    return pd_results

In [13]:
def train_stage3(arch_opts,seed_opts,RF_opts,
                 t_wdws_train,t_wdws_test,savefigs_path=None,
                 exp_model_path=None,results_path='./',
                 check_rep_model=None,quick_timing_test=False):
    """
    Training procedure for Stage 3, focusing on ML model's main architecture. It trains
    many ML models using different options for seeds and rotational frames.
    It returns and exports a dataframe with the results.

    --- Inputs ---

    {arch_opts}: List; each element is a dictionary with the ML model hyper-parameters,
    see {self.hyps} attribute for <ML_Model> object.
    {seed_opts}: List; each element is an integer meaning a random initialization seed.
    {RF_opts}: List; each element is a list with the format [Numpy array,Numeric,String];
    meaning [rotation axis in (x,y,z) format, rotation angle in degree, RF name].
    If RF[2]='RF1', then the data is kept in the original frame.
    {t_wdws_train}: List; each element is a <Time_Wdw> object that will be used for
    the training process. The number of points for each time window doesn't matter.
    {t_wdws_test}: List; each element is a <Time_Wdw> object that will be used for
    the testing process. The number of points for each time window doesn't matter.
    {savefigs_path}: String; if specified, saves the training figures in .png format by default.
    {save_format}: String; Choose the saving format, 'png' by default, don't include the dot.
    {exp_model_path}: String; if specified, exports the ML model.
    {results_path}: String; path for exporting results. Default: current directory.
    {check_rep_model}: String or None; if provided, use a .csv file as a reference, same format
    as the output of this function, to skip any already trained model. Default: None.
    {quick_timing_test}: Boolean. If True, ONLY runs a dry test (no savings of any type) for 5 epochs
    and estimate the total training times.
    
    --- Outputs ---

    {pd_results}: Pandas dataframe; information about the ML models' results.
    
    """
    # If provided, load the reference file:
    if check_rep_model is not None:
        pd_ref = pd.read_csv(check_rep_model)
        trained_models = list(pd_ref["Full_Name"]) # Extract the full names of trained ML models
    else:
        trained_models = []
    # Initiate dataframe:
    pd_results = pd.DataFrame(columns=['Full_Name','Wdw_pp','Mag_Comps','RF','Seed',
                                       'Train_Segms','Test_Segms',
                                       'Batch','Epochs','p_val','Activ_func','Optimizer',
                                       'Learn_rate','Model_Name','Best_MAE_val',
                                       'Best_MAE_train','MAE_test',
                                       'z_thres','Acc_Val_z','Acc_Test_z'])
    # Prepare time windows:
    segms_train = [time_wdw.name for time_wdw in t_wdws_train] # List with training segments' names
    segms_test = [time_wdw.name for time_wdw in t_wdws_test] # List with testing segments' names
    for subset in [t_wdws_train,t_wdws_test]:
        for t_wdw in subset: 
            t_wdw.window_data()    
    # Train all models:
    N_models = 0 # Initiatie auxiliar count
    for arch in arch_opts: # Iterate over architectures
        for RF in RF_opts: # Iterate over rotational frames
            for seed in seed_opts: # Iterate over random initialization seeds
                # Prepare hyper-parameter options for model:
                full_hyp = arch.copy() # Initiate Full Hyper-parameters dictionary
                full_hyp["Full_Name"] = arch["Model_Name"]+f'_{RF[2]}_seed{seed}_{"".join(segms_train)}'
                # Train the model (if not trained yet):
                if full_hyp["Full_Name"] not in trained_models:
                    N_models += 1 # Update number of trainable models
                    full_hyp["Train_Segms"] = "".join(segms_train)
                    full_hyp["Test_Segms"] = "".join(segms_test)
                    if not quick_timing_test:
                        # Train the model:
                        keras.backend.clear_session()
                        ML_model = ML_Model(full_hyp,seed=seed)
                        res_train = ML_model.train_model(t_wdws_train,RF,savefigs_path=savefigs_path,
                                                         exp_model_path=exp_model_path)
                        # Test the model:
                        res_test = ML_model.test_model(t_wdws_test,RF,savefigs_path=savefigs_path)
                        # Update results:
                        results = dict(list(res_train.items())+list(res_test.items()))
                        pd_results.loc[len(pd_results)] = results
                        # Export .csv file (overwrites in everystep):
            
                        name = f'AI_S3_{len(seed_opts)}seeds_{len(RF_opts)}RFs.csv'
                        pd_results.to_csv(results_path+name, index=False)

    if quick_timing_test:
        print('-'*10,' QUICK TIMING TEST MODE ','-'*10)
        print('Running dry test for 5 epochs...')
        time.sleep(1)
        # Only train last ML model:
        full_hyp['Epochs'] = 5
        start = time.time()
        keras.backend.clear_session()
        # Train model:
        ML_model = ML_Model(full_hyp,seed=seed)
        _ = ML_model.train_model(t_wdws_train,RF)
        _ = ML_model.test_model(t_wdws_test,RF)
        end = time.time()
        elapsed_time = end - start # [s]
        single_model_est_time = elapsed_time/5*arch['Epochs']/60 # [min]
        # Print estimated times:
        print('\n','-'*40,'\n')
        print(f'Trainable models in this training session:',N_models)
        print(f'Estimated training time per model: {np.round(single_model_est_time,1)} min')
        print(f'Estimated total time: {np.round(single_model_est_time*N_models/60,1)} hours')
        print('\n','-'*10,' IMPORTANT ','-'*10,'\n')
        print('If you want to train all models, select quick_timing_test=False')
        
    return pd_results

In [14]:
def train_stage4(gen_hyps,activ_opts,optim_opts,lr_opts,seed_opts,
                 RF_opts,t_wdws_train,t_wdws_test,savefigs_path=None,
                 exp_model_path=None,results_path='./',
                 check_rep_model=None,quick_timing_test=False):
    """
    Training procedure for Stage 4, focusing on ML model's hyper-parameters. It trains
    many ML models using different options for seeds and rotational frames.
    It returns and exports a dataframe with the results.

    --- Inputs ---

    {gen_hyps}: Dictionary, general hyper-parameters for all ML models.
    {activ_opts}: List; each element is an activation function to be used in all layers
    except the last one.
    {optim_opts}: List; each element is an optimizer.
    {lr_opts}: List; each element is a learning rate value.    
    {seed_opts}: List; each element is an integer meaning a random initialization seed.
    {RF_opts}: List; each element is a list with the format [Numpy array,Numeric,String];
    meaning [rotation axis in (x,y,z) format, rotation angle in degree, RF name].
    If RF[2]='RF1', then the data is kept in the original frame.
    {t_wdws_train}: List; each element is a <Time_Wdw> object that will be used for
    the training process. The number of points for each time window doesn't matter.
    {t_wdws_test}: List; each element is a <Time_Wdw> object that will be used for
    the testing process. The number of points for each time window doesn't matter.
    {savefigs_path}: String; if specified, saves the training figures in .png format by default.
    {save_format}: String; Choose the saving format, 'png' by default, don't include the dot.
    {exp_model_path}: String; if specified, exports the ML model.
    {results_path}: String; path for exporting results. Default: current directory.
    {check_rep_model}: String or None; if provided, use a .csv file as a reference, same format
    as the output of this function, to skip any already trained model. Default: None.
    {quick_timing_test}: Boolean. If True, ONLY runs a dry test (no savings of any type) for 5 epochs
    and estimate the total training times.
    
    --- Outputs ---

    {pd_results}: Pandas dataframe; information about the ML models' results.
    
    """
    # If provided, load the reference file:
    if check_rep_model is not None:
        pd_ref = pd.read_csv(check_rep_model)
        trained_models = list(pd_ref["Full_Name"]) # Extract the full names of trained ML models
    else:
        trained_models = []
    # Initiate dataframe:
    pd_results = pd.DataFrame(columns=['Full_Name','Wdw_pp','Mag_Comps','RF','Seed',
                                       'Train_Segms','Test_Segms',
                                       'Batch','Epochs','p_val','Activ_func','Optimizer',
                                       'Learn_rate','Model_Name','Best_MAE_val',
                                       'Best_MAE_train','MAE_test',
                                       'z_thres','Acc_Val_z','Acc_Test_z'])    
   # Prepare time windows:
    segms_train = [time_wdw.name for time_wdw in t_wdws_train] # List with training segments' names
    segms_test = [time_wdw.name for time_wdw in t_wdws_test] # List with testing segments' names
    for subset in [t_wdws_train,t_wdws_test]:
        for t_wdw in subset: 
            t_wdw.window_data()
    # Train all models:
    N_models = 0 # Initiatie auxiliar count    
    for activ in activ_opts: 
        for optim in optim_opts: 
            for lr in lr_opts: 
                for RF in RF_opts: # Iterate over rotational frames
                    for seed in seed_opts: # Iterate over random initialization seeds
                        # Prepare hyper-parameter options for model:
                        full_hyp = gen_hyps.copy() # Initiate Full Hyper-parameters dictionary
                        full_hyp["Activation_Function"] = activ
                        full_hyp["Optimizer"] = optim
                        full_hyp["Learning_Rate"] = lr
                        fullname = f"activ{activ}_optim{optim}"
                        fullname += f"_lr{lr}_{RF[2]}_seed{seed}"
                        fullname += f'_{"".join(segms_train)}'
                        full_hyp["Full_Name"] = fullname
                        # Train the model (if not trained yet):
                        if full_hyp["Full_Name"] not in trained_models:
                            N_models += 1 # Update number of trainable models
                            full_hyp["Train_Segms"] = "".join(segms_train)
                            full_hyp["Test_Segms"] = "".join(segms_test)
                            if not quick_timing_test:
                                # Train the model:
                                keras.backend.clear_session()
                                ML_model = ML_Model(full_hyp,seed=seed)
                                res_train = ML_model.train_model(t_wdws_train,RF,savefigs_path=savefigs_path,
                                                                 exp_model_path=exp_model_path)
                                # Test the model:
                                res_test = ML_model.test_model(t_wdws_test,RF,savefigs_path=savefigs_path)
                                # Update results:
                                results = dict(list(res_train.items())+list(res_test.items()))
                                pd_results.loc[len(pd_results)] = results
                                # Export .csv file (overwrites in everystep):
                                B_comps = ''.join([b for b in full_hyp["Magnetic_Components"]])
                                name = f'AI_S4_{len(seed_opts)}seeds_{len(RF_opts)}RFs.csv'
                                pd_results.to_csv(results_path+name, index=False)
            
    if quick_timing_test:
        print('-'*10,' QUICK TIMING TEST MODE ','-'*10)
        print('Running dry test for 5 epochs...')
        time.sleep(1)
        # Only train last ML model:
        full_hyp['Epochs'] = 5
        start = time.time()
        keras.backend.clear_session()
        # Train model:
        ML_model = ML_Model(full_hyp,seed=seed)
        _ = ML_model.train_model(t_wdws_train,RF)
        _ = ML_model.test_model(t_wdws_test,RF)
        end = time.time()
        elapsed_time = end - start # [s]
        single_model_est_time = elapsed_time/5*gen_hyps['Epochs']/60 # [min]
        # Print estimated times:
        print('\n','-'*40,'\n')
        print(f'Trainable models in this training session:',N_models)
        print(f'Estimated training time per model: {np.round(single_model_est_time,1)} min')
        print(f'Estimated total time: {np.round(single_model_est_time*N_models/60,1)} hours')
        print('\n','-'*10,' IMPORTANT ','-'*10,'\n')
        print('If you want to train all models, select quick_timing_test=False')
        
    return pd_results

In [15]:
def train_stage5(gen_hyps,filter_opts,drop_opts,seed_opts,
                 RF_opts,t_wdws_train,t_wdws_test,savefigs_path=None,
                 exp_model_path=None,results_path='./',
                 check_rep_model=None,quick_timing_test=False):
    """
    Training procedure for Stage 5, focusing on ML model's fine architecture.
    It trains many ML models using different options for seeds and rotational frames.
    It returns and exports a dataframe with the results.

    --- Inputs ---

    {gen_hyps}: Dictionary, general hyper-parameters for all ML models.
    {filter_opts}: List, each element is an option for the convolutional layers.
    {drop_opts}: List; each element is a value for the dropout fraction in dense layers.
    {seed_opts}: List; each element is an integer meaning a random initialization seed.
    {RF_opts}: List; each element is a list with the format [Numpy array,Numeric,String];
    meaning [rotation axis in (x,y,z) format, rotation angle in degree, RF name].
    If RF[2]='RF1', then the data is kept in the original frame.
    {t_wdws_train}: List; each element is a <Time_Wdw> object that will be used for
    the training process. The number of points for each time window doesn't matter.
    {t_wdws_test}: List; each element is a <Time_Wdw> object that will be used for
    the testing process. The number of points for each time window doesn't matter.
    {savefigs_path}: String; if specified, saves the training figures in .png format by default.
    {save_format}: String; Choose the saving format, 'png' by default, don't include the dot.
    {exp_model_path}: String; if specified, exports the ML model.
    {results_path}: String; path for exporting results. Default: current directory.
    {check_rep_model}: String or None; if provided, use a .csv file as a reference, same format
    as the output of this function, to skip any already trained model. Default: None.
    {quick_timing_test}: Boolean. If True, ONLY runs a dry test (no savings of any type) for 5 epochs
    and estimate the total training times.
    
    --- Outputs ---

    {pd_results}: Pandas dataframe; information about the ML models' results.
    
    """
    # If provided, load the reference file:
    if check_rep_model is not None:
        pd_ref = pd.read_csv(check_rep_model)
        trained_models = list(pd_ref["Full_Name"]) # Extract the full names of trained ML models
    else:
        trained_models = []
    # Initiate dataframe:
    pd_results = pd.DataFrame(columns=['Full_Name','Wdw_pp','Mag_Comps','RF','Seed',
                                       'Train_Segms','Test_Segms',
                                       'Batch','Epochs','p_val','Activ_func','Optimizer',
                                       'Learn_rate','Model_Name','Best_MAE_val',
                                       'Best_MAE_train','MAE_test',
                                       'z_thres','Acc_Val_z','Acc_Test_z'])       
    # Prepare time windows:
    segms_train = [time_wdw.name for time_wdw in t_wdws_train] # List with training segments' names
    segms_test = [time_wdw.name for time_wdw in t_wdws_test] # List with testing segments' names
    for subset in [t_wdws_train,t_wdws_test]:
        for t_wdw in subset: 
            t_wdw.window_data()
    # Train all models:
    N_models = 0 # Initiatie auxiliar count    
    for filter in filter_opts: 
        for drop in drop_opts: 
            for RF in RF_opts: # Iterate over rotational frames
                for seed in seed_opts: # Iterate over random initialization seeds
                    # Prepare hyper-parameter options for model:
                    full_hyp = gen_hyps.copy() # Initiate Full Hyper-parameters dictionary
                    full_hyp["Conv_Layers"] = filter
                    full_hyp["Dropout_Fraction"] = drop
                    str_filters = '_'.join([str(f[0]) for f in filter])
                    fullname = f"Filters{str_filters}_dropout{drop}"
                    fullname += f"_{RF[2]}_seed{seed}_{''.join(segms_train)}"
                    full_hyp["Full_Name"] = fullname
                    # Train the model (if not trained yet):
                    if full_hyp["Full_Name"] not in trained_models:
                        N_models += 1 # Update number of trainable models
                        full_hyp["Train_Segms"] = "".join(segms_train)
                        full_hyp["Test_Segms"] = "".join(segms_test)
                        if not quick_timing_test:
                            # Train the model:
                            keras.backend.clear_session()
                            ML_model = ML_Model(full_hyp,seed=seed)
                            res_train = ML_model.train_model(t_wdws_train,RF,savefigs_path=savefigs_path,
                                                             exp_model_path=exp_model_path)
                            # Test the model:
                            res_test = ML_model.test_model(t_wdws_test,RF,savefigs_path=savefigs_path)
                            # Update results:
                            results = dict(list(res_train.items())+list(res_test.items()))
                            pd_results.loc[len(pd_results)] = results
                            # Export .csv file (overwrites in everystep):
                            name = f'AI_S5_{len(seed_opts)}seeds_{len(RF_opts)}RFs.csv'
                            pd_results.to_csv(results_path+name, index=False)
            
    if quick_timing_test:
        print('-'*10,' QUICK TIMING TEST MODE ','-'*10)
        print('Running dry test for 5 epochs...')
        time.sleep(1)
        # Only train last ML model:
        full_hyp['Epochs'] = 5
        start = time.time()
        keras.backend.clear_session()
        # Train model:
        ML_model = ML_Model(full_hyp,seed=seed)
        _ = ML_model.train_model(t_wdws_train,RF)
        _ = ML_model.test_model(t_wdws_test,RF)
        end = time.time()
        elapsed_time = end - start # [s]
        single_model_est_time = elapsed_time/5*gen_hyps['Epochs']/60 # [min]
        # Print estimated times:
        print('\n','-'*40,'\n')
        print(f'Trainable models in this training session:',N_models)
        print(f'Estimated training time per model: {np.round(single_model_est_time,1)} min')
        print(f'Estimated total time: {np.round(single_model_est_time*N_models/60,1)} hours')
        print('\n','-'*10,' IMPORTANT ','-'*10,'\n')
        print('If you want to train all models, select quick_timing_test=False')
    
    return pd_results

In [16]:
def train_stage6(gen_hyps,p_train_opts,N_augm_opts,noise_opts,
                 seed_opts,RF_opts,t_wdws_train,t_wdws_test,
                 savefigs_path=None,exp_model_path=None,results_path='./',
                 check_rep_model=None,quick_timing_test=False):
    """
    Training procedure for Stage 6, focusing on the effect of data augmentation and
    the size of the training dataset.
    It trains many ML models using different options for seeds and rotational frames.
    It returns and exports a dataframe with the results.

    --- Inputs ---

    {gen_hyps}: Dictionary, general hyper-parameters for all ML models.
    {p_train_opts}: List, each element is a fraction between 0 and 1, it is an option for
    the fraction of the training dataset that is going to be used.
    {N_augm_opts}: List; each element is an integer, it is an option the number of times
    that the original training dataset will be augmented.
    {noise_opts}: List, each element is a list comprised of 3 float numbers, representing an
    option for the noise intensity for data augmentation, in [nT] units.
    {seed_opts}: List; each element is an integer meaning a random initialization seed.
    {RF_opts}: List; each element is a list with the format [Numpy array,Numeric,String];
    meaning [rotation axis in (x,y,z) format, rotation angle in degree, RF name].
    If RF[2]='RF1', then the data is kept in the original frame.
    {t_wdws_train}: List; each element is a <Time_Wdw> object that will be used for
    the training process. The number of points for each time window doesn't matter.
    {t_wdws_test}: List; each element is a <Time_Wdw> object that will be used for
    the testing process. The number of points for each time window doesn't matter.
    {savefigs_path}: String; if specified, saves the training figures in .png format by default.
    {save_format}: String; Choose the saving format, 'png' by default, don't include the dot.
    {exp_model_path}: String; if specified, exports the ML model.
    {results_path}: String; path for exporting results. Default: current directory.
    {check_rep_model}: String or None; if provided, use a .csv file as a reference, same format
    as the output of this function, to skip any already trained model. Default: None.
    {quick_timing_test}: Boolean. If True, ONLY runs a dry test (no savings of any type) for 5 epochs
    and estimate the total training times.
    
    --- Outputs ---

    {pd_results}: Pandas dataframe; information about the ML models' results.
    
    """
    # If provided, load the reference file:
    if check_rep_model is not None:
        pd_ref = pd.read_csv(check_rep_model)
        trained_models = list(pd_ref["Full_Name"]) # Extract the full names of trained ML models
    else:
        trained_models = []
    # Initiate dataframe:
    pd_results = pd.DataFrame(columns=['Full_Name','Wdw_pp','Mag_Comps','RF','Seed',
                                       'Train_Segms','Test_Segms',
                                       'p_train','N_augm','Bx_noise','By_noise','Bz_noise',
                                       'Batch','Epochs','p_val','Activ_func','Optimizer',
                                       'Learn_rate','Model_Name','Best_MAE_val',
                                       'Best_MAE_train','MAE_test',
                                       'z_thres','Acc_Val_z','Acc_Test_z'])       
    # Prepare time windows:
    segms_train = [time_wdw.name for time_wdw in t_wdws_train] # List with training segments' names
    segms_test = [time_wdw.name for time_wdw in t_wdws_test] # List with testing segments' names
    for t_wdw in t_wdws_test: 
        t_wdw.window_data()    
    # Train all models:
    N_models = 0 # Initiatie auxiliar count    
    for p_train in p_train_opts:
        for N_augm in N_augm_opts: 
            for noise in noise_opts:
                for RF in RF_opts: # Iterate over rotational frames
                    for seed in seed_opts: # Iterate over random initialization seeds
                        # Prepare hyper-parameter options for model:
                        full_hyp = gen_hyps.copy() # Initiate Full Hyper-parameters dictionary
                        fullname = f"ptrain{p_train}_Naugm{N_augm}_"
                        fullname += f"noiseBx{noise[0]}_By{noise[1]}_Bz{noise[2]}_nT"
                        fullname += f"_{RF[2]}_seed{seed}_{''.join(segms_train)}"
                        full_hyp["Full_Name"] = fullname
                        # Train the model (if not trained yet):
                        if full_hyp["Full_Name"] not in trained_models:
                            N_models += 1 # Update number of trained models
                            full_hyp["Train_Segms"] = "".join(segms_train)
                            full_hyp["Test_Segms"] = "".join(segms_test)
                            if not quick_timing_test:
                                # Reduce original training dataset:
                                t_wdws_train_reduced = [copy_and_reduce_TimeWdw(time_wdw,p_train) for time_wdw in t_wdws_train]        
                                # Augmentate dataset:
                                for t_wdw in t_wdws_train_reduced:
                                    t_wdw.window_data(N_augm=N_augm,
                                                      Bx_noise=noise[0], By_noise=noise[1], Bz_noise=noise[2])
                                # Train the model:
                                keras.backend.clear_session()
                                ML_model = ML_Model(full_hyp,seed=seed)
                                res_train = ML_model.train_model(t_wdws_train_reduced,RF,savefigs_path=savefigs_path,
                                                                 exp_model_path=exp_model_path)
                                # Test the model:
                                res_test = ML_model.test_model(t_wdws_test,RF,savefigs_path=savefigs_path)
                                # Update results:
                                results = dict(list(res_train.items())+list(res_test.items()))
                                results['p_train'] = p_train
                                results['N_augm'] = N_augm
                                results['Bx_noise'] = noise[0] # [nT]
                                results['By_noise'] = noise[1] # [nT]
                                results['Bz_noise'] = noise[2] # [nT]
                                pd_results.loc[len(pd_results)] = results
                                # Export .csv file (overwrites in everystep):
                                name = f'AI_S6_{len(p_train_opts)}frac_opts_{len(N_augm_opts)}'
                                name += f'N_augm_opts_{len(noise_opts)}_noise_opts.csv'
                                pd_results.to_csv(results_path+name, index=False)
            
    if quick_timing_test:
        print('-'*10,' QUICK TIMING TEST MODE ','-'*10)
        print('Running dry test for 5 epochs...')
        time.sleep(1)
        # Only train last ML model:
        full_hyp['Epochs'] = 5
        start = time.time()
        keras.backend.clear_session()
        # Prepare Time windows:
        t_wdws_train_reduced = [copy_and_reduce_TimeWdw(time_wdw,p_train) for time_wdw in t_wdws_train]        
        # Augmentate dataset:
        for t_wdw in t_wdws_train_reduced:
            t_wdw.window_data(N_augm=N_augm,Bx_noise=noise[0],
                              By_noise=noise[1], Bz_noise=noise[2])
        # Train model:
        ML_model = ML_Model(full_hyp,seed=seed)
        _ = ML_model.train_model(t_wdws_train_reduced,RF)
        _ = ML_model.test_model(t_wdws_test,RF)
        end = time.time()
        elapsed_time = end - start # [s]
        single_model_est_time = elapsed_time/5*gen_hyps['Epochs']/60 # [min]
        # Print estimated times:
        print('\n','-'*40,'\n')
        print(f'Trainable models in this training session:',N_models)
        print(f'Estimated training time per model: {np.round(single_model_est_time,1)} min')
        print(f'Estimated total time: {np.round(single_model_est_time*N_models/60,1)} hours')
        print('\n','-'*10,' IMPORTANT ','-'*10,'\n')
        print('If you want to train all models, select quick_timing_test=False')
        
    return pd_results

In [17]:
def train_stage7(gen_hyps,t_wdws_train,t_wdws_test,
                 seed_opts,savefigs_path=None,
                 results_path='./',check_rep_model=None,
                 quick_timing_test=False):
    """
    Train a single model, export it and return the predictions, using the rotated frames
    provided in the hyperparameters description.

    --- Inputs ---

    {gen_hyps}: Dictionary, general hyper-parameters for all ML models.
    {t_wdws_train}: List; each element is a <Time_Wdw> object that will be used for
    the training process. The number of points for each time window doesn't matter.
    {t_wdws_test}: List; each element is a <Time_Wdw> object that will be used for
    the testing process. The number of points for each time window doesn't matter.
    {seed_opts}: List; each element is an integer meaning a random initialization seed.
    {savefigs_path}: String; if specified, saves the training figures in .png format by default.
    {save_format}: String; Choose the saving format, 'png' by default, don't include the dot.
    {results_path}: String; path for exporting results. Default: current directory.
    {check_rep_model}: String or None; if provided, use a .csv file as a reference, same format
    as the output of this function, to skip any already trained model. Default: None.
    {quick_timing_test}: Boolean. If True, ONLY runs a dry test (no savings of any type) for 5 epochs
    and estimate the total training times.    
    """
    # If provided, load the reference file:
    if check_rep_model is not None:
        pd_ref = pd.read_csv(check_rep_model)
        trained_models = list(pd_ref["Full_Name"]) # Extract the full names of trained ML models
    else:
        trained_models = []
    # Initiate dataframe:
    df_results = pd.DataFrame(columns=['Full_Name','Wdw_pp','Mag_Comps','RF','Seed',
                                   'Train_Segms','Test_Segms',
                                   'p_train','N_augm','Bx_noise','By_noise','Bz_noise',
                                   'Batch','Epochs','p_val','Activ_func','Optimizer',
                                   'Learn_rate','Model_Name','Best_MAE_val',
                                   'Best_MAE_train','MAE_test',
                                   'z_thres','Acc_Val_z','Acc_Test_z'])
    
    # Start preparing the ML model name:
    ML_name = f'MLmodel_{gen_hyps["Model_Name"]}' # Model name
    segms_train = [time_wdw.name for time_wdw in t_wdws_train] # ID name
    segms_test = [time_wdw.name for time_wdw in t_wdws_test] # ID name
    gen_hyps["Train_Segms"] = "".join(segms_train) # Add training segments information
    gen_hyps["Test_Segms"] = "".join(segms_test) # Add testing segments information
    ML_name += f'_{gen_hyps["Train_Segms"]}' # Update name
    
    # Reduce training dataset (if requested) and window data:
    if gen_hyps["p_train"] < 1:
        t_wdws_train_reduced = [copy_and_reduce_TimeWdw(time_wdw,p_train) for time_wdw in t_wdws_train]
        ML_name += f'_ptrain{gen_hyps["p_train"]}' # Update name
    else: 
        t_wdws_train_reduced = t_wdws_train

    # Window test data:
    for t_wdw in t_wdws_test: 
        t_wdw.window_data()    
        
    # Augment training dataset (if requested) and window training data:
    ML_name += f'_Naugm{gen_hyps["N_augm"]}'
    if gen_hyps["N_augm"] > 0:
        ML_name += f'_noise_{gen_hyps["noise"][0]}_' # Update name
        ML_name += f'{gen_hyps["noise"][1]}_{gen_hyps["noise"][2]}_nT' # Update name
    for t_wdw in t_wdws_train_reduced:
        t_wdw.window_data(N_augm=gen_hyps["N_augm"],Bx_noise=gen_hyps["noise"][0], 
                          By_noise=gen_hyps["noise"][1], Bz_noise=gen_hyps["noise"][2])        

    # Train all models, make and save predictions:
    N_models = 0 # Initiatie auxiliar count    
    predictions = {} # Initiate
    for RF in gen_hyps['RF']: # Iterate over rotational frames
        for seed in seed_opts: # Iterate over random initialization seeds
            full_hyp = gen_hyps.copy() # Initiate Full Hyper-parameters dictionary
            full_hyp["Full_Name"] = ML_name + f"_{RF[2]}_seed{seed}_{''.join(segms_train)}" # Add full name for the model
            if full_hyp["Full_Name"] not in trained_models:
                N_models += 1 # Update number of trained models
                full_hyp["Train_Segms"] = "".join(segms_train)
                full_hyp["Test_Segms"] = "".join(segms_test)
                if not quick_timing_test:
                    # Train the model:
                    keras.backend.clear_session()
                    ML_model = ML_Model(full_hyp,seed=seed)
                    res_train = ML_model.train_model(t_wdws_train_reduced,RF,savefigs_path=savefigs_path)
                    # Test the model and obtain ground truth and predictions, in [m]:
                    res_test, t, true_z, pred_z = ML_model.test_model(t_wdws_test,RF,return_preds=True,
                                                                      savefigs_path=savefigs_path)        
                    predictions[RF[2]] = [t, true_z, pred_z] # Update dictionary for current RF; [s],[m],[m]
                    # Update results:
                    results = dict(list(res_train.items())+list(res_test.items())) # Compile results
                    df_results.loc[len(df_results)] = results # Add new row in dataframe
                    # Export .csv file (overwrites in everystep):
                    name = f'AI_S7_{ML_name}_{full_hyp["Epochs"]}.csv'
                    df_results.to_csv(results_path+name, index=False)
                    # Export predictions file:
                    for i in range(len(t)):
                        np.savetxt(results_path+f"Preds_{ML_name}_{RF[2]}_seed{seed}_test{i}.csv",
                                   np.transpose(np.array([t[i],true_z[i], pred_z[i]])),
                                   delimiter=",",header='time[s]_trueZ[m]_predZ[m]',fmt='%.7e')
                        
    if quick_timing_test:
        print('-'*10,' QUICK TIMING TEST MODE ','-'*10)
        print('Running dry test for 5 epochs...')
        time.sleep(1)
        # Only train last ML model:
        full_hyp['Epochs'] = 5
        start = time.time()
        keras.backend.clear_session()
        # Train model:
        ML_model = ML_Model(full_hyp,seed=seed)
        _ = ML_model.train_model(t_wdws_train,RF)
        _ = ML_model.test_model(t_wdws_test,RF)
        end = time.time()
        elapsed_time = end - start # [s]
        single_model_est_time = elapsed_time/5*gen_hyps['Epochs']/60 # [min]
        # Print estimated times:
        print('\n','-'*40,'\n')
        print(f'Trainable models in this training session:',N_models)
        print(f'Estimated training time per model: {np.round(single_model_est_time,1)} min')
        print(f'Estimated total time: {np.round(single_model_est_time*N_models/60,1)} hours')
        print('\n','-'*10,' IMPORTANT ','-'*10,'\n')
        print('If you want to train all models, select quick_timing_test=False')    