In [1]:
# Load Libraries and Datasets

import os
import pandas as pd
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from scipy.special import binom,legendre, hermitenorm
import warnings
from timeit import default_timer as timer
from joblib import Parallel, delayed
import scipy.stats as stats
import math

mpl.rcParams['font.family'] = 'Times New Roman'
mpl.rcParams['mathtext.fontset'] = 'custom'
mpl.rcParams['mathtext.rm'] = 'Times New Roman'
mpl.rcParams['mathtext.it'] = 'Times New Roman:italic'
mpl.rcParams['mathtext.bf'] = 'Times New Roman:bold'

base_path = os.path.abspath(os.path.join(os.getcwd(), '..', 'Results', 'csvFiles'))
try:
    data_path = base_path + '/rough_surface_statistics.csv'
    rough_dat = pd.read_csv(data_path, header = None)
    feature_name = rough_dat.iloc[0, :].values
    data_rough = rough_dat.iloc[1:, 1:-1].values.astype(float)
    labels = rough_dat.iloc[1:, -1].values
    case_names = rough_dat.iloc[1:, 0].values
except FileNotFoundError:
    print(f"File not found: {data_path}")


id_DNS,id_DNSle,id_DNSF,id_EXP,id_DNSlc = np.where((labels == 'DNS'))[0],np.where((labels == 'DNSLE'))[0],np.where((labels == 'DNSF'))[0],np.where((labels == 'EXP'))[0],np.where((labels == 'DNSLC'))[0]
print(f"DNS: {len(id_DNS)}, DNSLE: {len(id_DNSle)}, DNSF: {len(id_DNSF)}, EXP: {len(id_EXP)}")
print(f"Total TRAINING cases: {data_rough.shape[0]-len(id_DNSlc)}")


used_col = [0, 1, 4, 5, 6, 8, 9, 11, 12, 13]
input_data = data_rough[:, used_col]
Sx,kavg,krms,Ra,Ix,Po,Ex,Sk,Ku,ks = input_data[:,0].reshape(-1, 1), input_data[:,1].reshape(-1, 1), \
    input_data[:,2].reshape(-1, 1), input_data[:,3].reshape(-1, 1), \
    input_data[:,4].reshape(-1, 1), input_data[:,5].reshape(-1, 1), \
    input_data[:,6].reshape(-1, 1), input_data[:,7].reshape(-1, 1), \
    input_data[:,8].reshape(-1, 1), input_data[:,9].reshape(-1, 1)
kt,Iz,Ez = data_rough[:,3].reshape(-1, 1) , data_rough[:,7].reshape(-1, 1), data_rough[:,10].reshape(-1, 1)  # kt, Iz, Ez


# Helper Funcs
def cos(x):
    return np.cos(x)
def tanh(x):
    return np.tanh(x)
def exp(x):
    return np.exp(x)
def square(x):
    return np.square(x)
def abs(x):
    return np.abs(x)



DNS: 28, DNSLE: 24, DNSF: 31, EXP: 13
Total TRAINING cases: 96


In [2]:
# Your model to be analyzed
class symbolic_model():
    
    def __init__(self):
        pass

    @staticmethod
    def SrModel_cop50():
        x1,y1,z1,u1,v1  = Po,Ex,Sk,kavg,Sx
        # 50"
        out_tar = (square(cos(x1) - (x1 * exp(x1))) + 8.331)*\
            (tanh((y1 + -0.084) / 0.294))*\
            (tanh(z1) + (z1 / tanh(z1))) *\
            (u1 + ((u1 + -0.692) - (tanh(-0.024 / square(u1)) * exp(u1)))) *\
            (square(exp(v1)))

        # print(f"SR model's output for {len(id_used_)} ids")
        return out_tar


    @staticmethod
    def SrModel_4PCE(input_points):
        x1,y1,z1,u1,v1  = input_points[:,0].reshape(-1, 1),input_points[:,1].reshape(-1, 1),input_points[:,2].reshape(-1, 1),input_points[:,3].reshape(-1, 1),input_points[:,4].reshape(-1, 1)
        # 50"   
        out_tar = (square(cos(x1) - (x1 * exp(x1))) + 8.331)*\
            (tanh((y1 + -0.084) / 0.294))*\
            (tanh(z1) + (z1 / tanh(z1))) *\
            (u1 + ((u1 + -0.692) - (tanh(-0.024 / square(u1)) * exp(u1)))) *\
            (square(exp(v1)))
        # print(f"SR model's output for {input_points.shape[0]} samples. Rescaled input used.\n")
        return out_tar
    

In [None]:
# PCE Smolyak grid. Not called when employing least squares PCE
def columnize(*args):

    column_arrays = []
    for ele in args:
        if isinstance(ele, (list, tuple)):
            ele = np.array(ele)
        ele.shape = (-1, 1)
        column_arrays.append(ele)

    return column_arrays

def clencurt(rule=5, method='FFT'):

    if method.upper() == 'FFT':
        
        if rule == 1:
            x = np.array([0])
            w = np.array([2])
        else:
            n = rule - 1
            c = np.zeros((rule, 2))
            c[:rule:2,0] = 2 / np.append([1], np.array([1-np.arange(2,rule,2)**2]))
            c[1,1] = 1
            f = np.fft.ifft(np.concatenate((c[:rule,:], c[-2:0:-1,:]),axis=0), axis=0).real
            w = 2 * np.concatenate(([f[0,0]], 2*f[1:rule-1,0], [f[rule-1,0]]))/2
            x = (rule-1) * f[:rule,1]
            x = x[-1::-1]
        return x, w
    
    elif method.upper() == 'EXPLICIT':
        if rule == 1:
            x = np.array([0])
            w = np.array([2])
        else:
            n = rule - 1
            # build points
            theta = np.arange(rule) * np.pi / n
            x = np.cos(theta)
            w = 0

            c = np.ones((rule))
            c[1:-1] = 2
            j = np.arange((n/2)//1) + 1
            
            b = 2 * np.ones(j.shape)
            if np.mod(n/2,1) == 0:
                b[int(n/2) - 1] = 1
            
            j.shape = (1, -1)
            b.shape = (1, -1)
            j_theta = j * theta.reshape(-1,1)
            w = c/n * (1 - np.sum(b/(4*j**2 - 1) * np.cos(2*j_theta), axis=1))

            x = np.round(x[-1::-1] * 1e13) / 1e13
        return x, w

def quad_coeff(rule=5, kind='GL'):

    if kind.upper() == 'GL':
        x, w = np.polynomial.legendre.leggauss(rule)
        
    elif kind.upper() == 'GH':
        x, w = np.polynomial.hermite.hermgauss(rule)
        
    elif kind.upper() == 'CC':
        x, w = clencurt(rule)

    return x, w

class PceSmolyakGrid():

    def __init__(self, polynom, level):
        
        (self.x,
         self.eps,
         self.weight,
         self.unique_x,
         self.inverse_index) = self.smolyak_sparse_grid(polynom, level)


    def smolyak_sparse_grid(self, polynom, level):

        # min and max level
        o_min = max([level + 1, polynom.dim])
        o_max = level + polynom.dim
        
        # get multi-index for all level
        comb = np.empty((0, polynom.dim), dtype=int)
        for k in range(o_min, o_max+1):
            multi_index, _ = self.index_with_sum(polynom.dim, k)
            comb = np.append(comb, multi_index, axis=0)
        
        # initialize final array
        x_points = np.empty((0, polynom.dim))
        eps_points = np.empty((0, polynom.dim))
        weights = np.empty((0,))

        # define integration points and weights
        for lev in comb:
            local_x = []
            local_eps = []
            local_w = []
            coeff = (-1)**(level + polynom.dim - np.sum(lev)) * binom(polynom.dim - 1, level + polynom.dim - np.sum(lev))
            
            # cycle on integration variables
            for l, k, p in zip(lev, polynom.distrib, polynom.param):
                # set integration type depending on distrib
                if k.upper() == 'U':
                    kind = 'CC'
                elif k.upper() == 'N':
                    kind = 'GH'
                
                # get number of integration points
                n = self.growth_rule(l, kind=kind)
                # get gauss points and weight
                x0, w0 = quad_coeff(rule=n, kind=kind)

                # change of variable
                if (k.upper() == 'U'):
                    eps = np.copy(x0)
                    w = np.copy(w0) * 0.5
                    if p != [-1, 1]:
                        x = (p[1] - p[0]) / 2 * x0 + (p[1] + p[0]) / 2
                    else:
                        x = np.copy(x0)
                    
                elif (k.upper() == 'N'):
                    eps = np.sqrt(2) * 1 * x0 + 0 
                    x = eps * p[1] + p[0]
                    w = w0 * (1 / np.sqrt(np.pi))

                # store local points
                local_x.append(x)
                local_eps.append(eps)
                local_w.append(w)
            
            # update final array
            x_points = np.concatenate((x_points, np.concatenate(columnize(*np.meshgrid(*local_x)), axis=1)))
            eps_points = np.concatenate((eps_points, np.concatenate(columnize(*np.meshgrid(*local_eps)), axis=1)))
            weights = np.concatenate((weights, coeff * np.prod(np.concatenate(columnize(*np.meshgrid(*local_w)), axis=1),axis=1)))
        
        # get unique points
        unique_x_points, inverse_index = np.unique(x_points, axis=0, return_inverse=True)

        return x_points, eps_points, weights, unique_x_points, inverse_index


    def index_with_sum(self, dim, val):

        # check feasibility
        if val < dim:
            raise ValueError(f'dim={dim} --> minimum value for val=dim={dim} (here {val} is provided)')

        k = np.ones(dim,dtype=int)
        khat = k * (val - dim + 1)

        index = np.empty((0, dim), dtype=int)

        cnt = 0
        p = 0

        while k[dim - 1] < val:
            if k[p] > khat[p]:
                if (p + 1) != dim:
                    k[p] = 1
                    p += 1
            else:
                khat[:p] = khat[p] - k[p] + 1
                k[0] = khat[0]
                p = 0
                cnt += 1
                index = np.append(index, k.reshape(1,-1), axis=0)
            
            k[p] = k[p] + 1
        return index, cnt


    def growth_rule(self, level, kind='Legendre'):

        method_map = {'GL':1, 'GH':1, 'CC':2}

        if method_map[kind.upper()] == 1: 
            n = 2*level - 1
        elif method_map[kind.upper()] == 2:
            n = 1 if level == 1 else 2**(level - 1) + 1
        
        return n

In [None]:
# Polynomial Chaos Expansion class
class PolyChaos():

    def __str__(self):
        kmax = 5
        msg = ""
        
        msg += "Polynomial Chaos Expansion\n"
        msg += "--------------------------\n"
        msg += f"    dimensions: {self.dim}\n"
        msg += f"    order: {self.order}\n"
        
        msg += "    distrib: ["
        for k, ele in enumerate(self.distrib):
            if k > kmax - 1:
                msg += ' ... '
                break
            elif k == len(self.distrib)  - 1:
                msg += ele.upper()
            else:
                msg += ele.upper() + " , "
        msg += "]\n"

        msg += "    param: ["
        for k, ele in enumerate(self.param):
            if k > kmax - 1:
                msg += ' ... '
                break
            elif k == len(self.param)  - 1:
                msg += str(ele)
            else:
                msg += str(ele) + " , "
        msg += "]\n"

        msg += "    coeff: ["
        if self.coeff.size == 0:
            msg += " to be computed "
        else:
            for k, ele in enumerate(self.coeff):
                if k > kmax - 1:
                    msg += ' ... '
                    break
                elif k == len(self.param) - 1:
                    msg += str(ele)
                else:
                    msg += str(ele) + " , "
        msg += "]\n"
        
        return msg

    def __init__(self, order, distrib, param):

        if len(distrib) != len(param):
            raise ValueError('distrib and param not the same length')
        
        self.dim = len(distrib)
        self.order = order
        self.distrib = distrib
        self.param = param
        self.coeff = np.empty(0)
        self.grid = None
        self.mu = None
        self.sigma = None

        (self.nt,
         self.multi_index,
         self.basis) = self.create_instance(order, distrib)
    

    def create_instance(self, order, distrib):

        # generate multi index
        multi_index, nt = self.generate_multi_index(order)

        # create multivariate polynomials basis 
        def basis(index, eps):

            if index > (nt - 1):
                raise ValueError(f'max index possible is nt-1={self.nt-1}')
            if isinstance(eps, (list, tuple)):
                eps = np.array(eps)

            i = self.multi_index[index]
            
            y = np.ones(eps.shape[0])
           
            for k, dist in enumerate(distrib):
                if dist.upper() == 'U':
                    y = y * np.polyval(legendre(i[k]), eps[..., k])
                elif dist.upper() == 'N':
                    y = y * np.polyval(hermitenorm(i[k]), eps[..., k])
            
            return y
        
        return nt, multi_index, basis

        
    def generate_multi_index(self, order):
    
        index = np.empty((0, self.dim), dtype=int)
        cnt = 0

        for val in range(order + 1):
            k = np.zeros(self.dim,dtype=int)
            khat = np.ones_like(k) * val    
            p = 0

            while k[self.dim - 1] <= val:
                if k[p] > khat[p]:
                    if (p + 1) != self.dim:
                        k[p] = 0
                        p += 1
                else:
                    khat[:p] = khat[p] - k[p]
                    k[0] = khat[0]
                    p = 0
                    cnt += 1
                    index = np.append(index, k.reshape(1,-1), axis=0)
                
                k[p] = k[p] + 1
        
        return index, cnt


    def norm_factor(self, multi_index):

        factor = 1
        for k, index in enumerate(multi_index):
            if self.distrib[k].upper() == 'U':
                factor = factor * (2 * index + 1) / 1
            elif self.distrib[k].upper() == 'N':
                factor = factor / math.factorial(index)
        
        return factor

    
    def spectral_projection(self, fun, level,):
        
        # create sparse grid. Not called when employing least squares PCE
        # ------------------
        t1 = timer()
        print("* generation of smolyak sparse grid ... ", end=' ', flush=True)
        #
        self.grid = PceSmolyakGrid(self, level)

        # evaluate function at unique points
        # ----------------------------------
        
        unique_y = fun(self.grid.unique_x)
        #
                
        # evaluate function at all points
        # -------------------------------
        print(f"* build complete function output ({self.grid.x.shape[0]} points) ... ", end=' ', flush=True)
            #
        y = unique_y[self.grid.inverse_index, ...]
        
        # coefficient computation
        # -----------------------
        print("* coefficient computation ... ", end=' ', flush=True)
        
        # function to compute the k-th coefficient
        def compute_k_coeff(k):
            factor = self.norm_factor(self.multi_index[k])
            return factor * np.sum(y.flatten() * self.basis(k, self.grid.eps).flatten() * self.grid.weight.flatten())
        # parallel computation of all coefficients
        coeff = Parallel(n_jobs=-1, verbose=0)(map(delayed(compute_k_coeff), range(self.nt)))
        self.coeff = np.array(coeff)
    

    def create_latin_hypercube_samples(self,sp_num,rd_state=None):

        dim = self.dim
        sample_points = np.zeros((sp_num, dim))
        rng = np.random.RandomState(rd_state) if rd_state is not None else np.random

        # create the latin hyper-cube 
        randoms = rng.random(sp_num * dim).reshape((dim, sp_num))
        for dim_ in range(dim):
            perm = rng.permutation(sp_num)  # pylint: disable=no-member
            randoms[dim_] = (perm + randoms[dim_]) / sp_num

        for k, (dist,param) in enumerate(zip(self.distrib,self.param)):
        # rescale          
        # from [0, 1] to [a,b] for uniform distribution
        # from [0, 1] to [nu,sigma] for normal distribution 
            if dist.upper() == 'N':
                nu,sigma = param
                std_nm = stats.norm.ppf(randoms.T[:,k])
                sample_points[:,k] = nu + sigma*std_nm
            elif dist.upper() == 'U':
                min_v,max_v = param
                sample_points[:,k] = min_v + (max_v-min_v)*randoms.T[:,k]
        
        print(f'Latin Hypercube sampling: {sp_num} samples in {dim} dimensions created.\n')
        return sample_points
    

    def regression_fit(self,  points, y_samples):
        Ns = points.shape[0]
        if Ns < self.nt:
            raise ValueError(f'number of points ({Ns}) must be >= number of basis functions ({self.nt})\nIncrease Ns or decrease PCE order.')
        if y_samples.shape[0] != Ns:
            raise ValueError(f'number of points {points.shape} and samples {y_samples.shape} must be the same')

        # change of limited range
        std_points = np.zeros_like(points)
        for k, (dist,param) in enumerate(zip(self.distrib,self.param)):
            if dist.upper() == 'U':
                a, b = param
                std_points[:,k] = (2 * points[:,k]-a-b)/(b-a)
            elif dist.upper()=='N':
                nu,sigma = param
                std_points[:,k] = (points[:,k]-nu)/sigma

        # build the regression matrix
        A = np.zeros((Ns,self.nt))
        for k in range(self.nt):
            A[:,k] = self.basis(k,std_points)

        # compute the coefficients
        coeff, residuals, rank, singular_vals = np.linalg.lstsq(A, y_samples, rcond = None)

        self.coeff = coeff.flatten()

        if rank<min(A.shape):
            warnings.warn(f'Regression matrix is rank deficient.Rank={rank}, expected min={min(A.shape)}')

        print(f'Regression-Fit: PCE order={self.nt},residuals(L2)={np.sqrt(residuals/Ns)}\n')

    def compare_on_myData(self,input_latin,verbose = 'y',thres = 0.4):
        sr_rough = symbolic_model()
        real_samples = np.concatenate((Po,Ex,Sk,kavg,Sx), axis=1)
        sr_output_real = sr_rough.SrModel_cop50().squeeze()
        y_samples = sr_rough.SrModel_4PCE(input_latin).squeeze()
        output_pce = self.evaluate(input_latin)
        pce_output_real = self.evaluate(real_samples)

        if verbose =='y':
            figs,axes = plt.subplots(1,2,figsize = (6,3))
            axes[0].plot(y_samples, output_pce, 'o',color = '#c20078',markerfacecolor='none')
            axes[0].set_title('PCE on Latin-samples')
            id_less = np.where(np.abs((sr_output_real-pce_output_real)/sr_output_real)<thres)
            axes[1].plot(sr_output_real[id_less], pce_output_real[id_less], 'o',color = '#00c251',markerfacecolor = 'none')
            axes[1].set_title(f'PCE on Real-samples')
            axes[1].text(0.1,0.9,f'{np.array(id_less).shape[1]}/96 in {thres*100}%', fontsize=12, transform=axes[1].transAxes,
                        bbox=dict(facecolor='white', alpha=0.8, edgecolor='gray'))
            print(f"Rough dataset {np.array(id_less).shape[1]}/96 in {thres*100}%\n")
            plt.show()
        
        return [output_pce, y_samples, pce_output_real, sr_output_real]


    def norm_fit(self, plot='n'):
        
        # evaluation of the mean
        self.mu = self.coeff[0]
        
        # evaluation of the standard deviation
        c_quad = self.coeff[1:] ** 2
        psi_quad = np.array([1/self.norm_factor(k) for k in self.multi_index[1:]])
        self.sigma = np.sqrt(np.sum(c_quad * psi_quad))

        if plot.lower() == 'y':
            # check if mpl is interactive
            if not mpl.is_interactive():
                plt.ion()

            # create data
            x = np.linspace(-4*self.sigma, 4*self.sigma, 501) + self.mu
            y = 1 / np.sqrt(2 * np.pi * self.sigma**2) * np.exp(-(x - self.mu)**2 / (2 * self.sigma**2))

            # plot
            hp = plt.figure()
            plt.plot(x, y , linewidth=2)
            plt.xlabel('x', fontsize=14)
            plt.ylabel('y', fontsize=14)
            plt.xticks(fontsize=14)
            plt.yticks(fontsize=14)
            
            # text box
            ax = plt.gca()
            props = dict(boxstyle='round', facecolor='white', alpha=0.9, linewidth=2)
            ax.text(0.05, 0.95, r"$\mu={self.mu :.4f}$\n$\sigma={self.sigma  :.4f}$",
                    verticalalignment='top', bbox=props, fontsize=14, transform=ax.transAxes)
            
            # set grid and layout
            plt.grid()
            plt.tight_layout()
            
            # return handle to plot
            return hp
    

    def sobol(self, index):
        """
        SOBOL computes the Sobol' indices for a given PCE expansion.

        Paramaters
        ----------
        index (list) :
            list with index, e.g. [1,2] for having S12, [[1], [1,2,3]] for
            having S1 and S123
        
        """
        
        # check precedences
        if self.sigma is None:
            raise ValueError("norm_fit() must be called before sobol() can be executed")

        # handle input type
        if not isinstance(index[0], (list, tuple)):
            index = [index]
        
        sobol = []
        for idx in index:
            # create complementary index
            zero_based_index = [k - 1 for k in idx]
            all_index = np.array(range(len(self.distrib)))
            other_index = np.setdiff1d(all_index, zero_based_index)

            # find elements
            coeff_index = np.array([True] * (self.multi_index.shape[0] - 1))
            for ele in zero_based_index:
                coeff_index = coeff_index * (self.multi_index[1:, ele] != 0)
            for ele in other_index:
                coeff_index = coeff_index * (self.multi_index[1:, ele] == 0)
            
            # computation of the index
            c_quad = self.coeff[1:] ** 2
            psi_quad = np.array([1/self.norm_factor(k) for k in self.multi_index[1:]])
            sobol.append(np.sum(c_quad[coeff_index] * psi_quad[coeff_index]) / (self.sigma ** 2))
        
        # prepare output type
        if len(sobol) == 1:
            sobol = sobol[0]
        
        return sobol


    def evaluate(self, points):
        
        if self.coeff.shape == (0, ):
            import warnings
            warnings.warn('PCE coeffiecients are not yet computed.')
        else:            
            # initialization
            std_points = np.zeros_like(points)

            # change of coordinates
            for k, (dist, param) in enumerate(zip(self.distrib, self.param)):
                if dist.upper() == 'U':
                    std_points[:, k] = (param[0] + param[1] - 2 * points[:, k]) / (param[0] - param[1])
                elif dist.upper() == 'N':
                    std_points[:, k] = (points[:,k] -  param[0]) / param[1]
            
            for k in range(self.nt):
                if k == 0:
                    y = self.coeff[k] * self.basis(k, std_points)
                else:
                    y += self.coeff[k] * self.basis(k, std_points)
            
            return y


In [5]:
# an example of usage
def main(my_pceParams):

    pce_4SR = PolyChaos(order=my_pceParams['pce_order'], distrib=my_pceParams['distribution'], param=my_pceParams['parameters'])
    sr_rough = symbolic_model()
    input_samples_ = pce_4SR.create_latin_hypercube_samples(sp_num=my_pceParams['Laint_sample_num'], rd_state=my_pceParams['random_state'])
    y_samples = sr_rough.SrModel_4PCE(input_samples_)
    pce_4SR.regression_fit(input_samples_, y_samples)
    pce_4SR.norm_fit(plot='n')
    calc_ = pce_4SR.compare_on_myData(input_samples_,verbose = 'y',thres = 0.35)

    # Po Ex Sk kavg Sx --- 1 2 3 4 5
    sobol_idx = pce_4SR.sobol([[1],[2],[3],[4],[5],[2,3]])
    print(f'Sobol indices for 5 features: \nPo Ex Sk kavg Sx Ex*Sk\n{np.array(sobol_idx)}\nToal is {np.sum(sobol_idx)}\n')
    return sobol_idx,calc_
