In [10]:
import numpy as np
from pc_kriging import PC_Kriging
from datetime import datetime
# import matplotlib.pyplot as plt
from scipy.stats import norm
from doepy import build
from scipy import optimize
import pandas as pd
from numpy import genfromtxt
import pickle

class Active_Training():

    def __init__(self, model=None, settings=None):
        if model is None:
            model = {"metamodel" : 'PCK'}
        assert "metamodel" in model and \
            "Missing model type"
        
        self.date_record = datetime.now().strftime("%Y_%m_%d_%H%M%S")

        #----------------------------------------------------------------------
        self.doe_limits = {}              #dict with marginals limits (for initial sampling)
        self.config, poly_type = {}, []
        self.marginals = settings["marginals_R"][0]
        
        self.dim = len(settings["marginals_R"][0])      #Input dimensions
        self.n_act = settings["active_sampling"][0]     #Number of active samples
        self.passive_strat = settings["passive_sampling"][1]
        self.n_o = settings["passive_sampling"][0]

        self.MaternCoef = settings["model"][0]
        self.p_max = settings["model"][1]
        self.ModelType = model["metamodel"]
        self.targetF = settings["active_sampling"][1]
        self.learningF = settings["active_sampling"][2]
        #----------------------------------------------------------------------
        for margin in range (0, self.dim):
            self.doe_limits['x' + str (margin + 1)] = [-1, 1]  
            poly_type.append(settings["marginals_R"][0]['x' + str (margin + 1)][2])

        self.config["pol_type"] = poly_type

        #----------------------------------------------------------------------
        if self.ModelType == 'PCK':
            
            self.surrogate = PC_Kriging(self.config)
            
        # else we may add 'PCE' or 'Kriging' later
            

    def passive_sampling (self, n):
        
        xn_o = np.zeros((n, self.dim))      #normalized samples
        xr_o = np.zeros((n, self.dim))      #scaled samples
        yn_o = np.zeros((n))           #observations
        #----------------------------------------------------------------------
        if  self.passive_strat == 'LHS':   #Latin hypercube sampling

            Xdoe_N = build.space_filling_lhs( self.doe_limits , num_samples = n )
        
        # else WE SHOULD ADD 'RND' in case of random sampling on the marginals  
        
        #----------------------------------------------------------------------
        # evaluation and isotransformation
        
        for margin in range (0, self.dim):
            
            xn_o[:, margin] = Xdoe_N['x' + str (margin + 1)]
            
            if self.config["pol_type"][margin] == 'hermite':
                mean_ = self.marginals['x' + str (margin + 1)][0]
                var_ = self.marginals['x' + str (margin + 1)][1]
                xr_o[:, margin] = self.surrogate.scalehermite( xn_o[:, margin], mean_, var_ )
            else:
                min_ = self.marginals['x' + str (margin + 1)][0]
                max_ = self.marginals['x' + str (margin + 1)][1]
                xr_o[:, margin] = self.surrogate.scalelegendre( xn_o[:, margin], min_, max_ )
        
        yn_o = self.targetF(xr_o)
        
        return xn_o, xr_o, yn_o  #returns initial sampling (normalized, scaled and R_evaluations)
        

    def active_sampling(self, MCpool, GroundT):
        
        experiment_results = {}   #results file { Surrogate , Pf, CoV_Pf , eLoo , mse }
        
        xn, xr, yn = self.passive_sampling(self.n_o)
            
        for points in range(self.n_act):

            mse_results = np.zeros(self.p_max - 1)
            opt_length_it = np.zeros(self.p_max - 1)
            eloo_results = np.zeros(self.p_max - 1)

            mean_loo = np.zeros(len(xn))
            var_loo = np.zeros(len(xn))
            sumat = np.zeros((len(xn),(self.p_max - 1)))

            # OPTIMAL SURROGATE MODEL -----------------------------------
        
            for p in range(1, self.p_max):
                
                d = self.surrogate.distance(xn, xn)
                lmin = np.min(d[d!=0])
                
                ModelParam_temp = self.surrogate.train(xn, yn, p, lmin, self.MaternCoef)
                
                opt_length = self.surrogate.optimize('shgo')
            
                if self.ModelType == 'PCK':

                    self.surrogate_loo = PC_Kriging(self.config)    # for LOOCV with same 'config' as specified in the original model

                for out in range (0, len(xn)):

                    yn_loo= np.delete(yn,[out])                                     #y_n-i      leaving element i out the observations 
                    xr_loo= np.delete(xr,[out*2, out*2+1]).reshape(-1, self.dim)    #x1r_n-i   leaving element i out the inputs (xr)
                    xn_loo= np.delete(xn,[out*2, out*2+1]).reshape(-1, self.dim)    #x_n-i     leaving element i out the nomalized inputs (xn)

                    #training LOO
                    modelpar_loo = self.surrogate_loo.train(xn_loo , yn_loo , p , opt_length, self.MaternCoef)

                    #predicting LOO over each removed sample
                    mean_loo[out], var_loo[out] = self.surrogate_loo.predict_fast(xn[out].reshape(1, -1))

                e_loo = np.mean (yn - mean_loo)**2              #LOO CV squared errors
                
                sumat[:,p-1] = np.divide(e_loo, var_loo)
                
                eloo_results[p-1] = e_loo
                opt_length_it[p-1] = opt_length 
            #--------------------------------------- Gen. error over a set of test points
#                 mean0, var0 = self.surrogate.predict_fast(XN)    # test points predictions mean, variance
#                 mse = np.mean ((YN - mean0)**2)
#                 mse_results[p-1] = mse

            ## training optimal model ----------------------------

            opt = np.argmin(eloo_results)    #selected based 'eloo'

            ModelParam = self.surrogate.train (xn, yn, int(opt+1), opt_length_it[opt], self.MaternCoef)
            
            MCinputs_norm = np.zeros((int(MCpool), self.dim))
            MCinputs = np.zeros((int(MCpool), self.dim))
            LOOCV = np.zeros(int(MCpool))
            
            ## Generating pool of samples - MCS -----------------------------------

            for margin in range (0, self.dim):

                if self.config["pol_type"][margin] == 'hermite':
                    
                    MCinputs_norm[:, margin] = np.random.normal(0, 1, size=int(MCpool))
                    
                    mean_ = self.marginals['x' + str (margin + 1)][0]
                    var_ = self.marginals['x' + str (margin + 1)][1]
                    
                    MCinputs[:, margin] = self.surrogate.scalehermite( MCinputs_norm[:, margin], mean_, var_ )
                    
                else:
                    
                    MCinputs_norm[:, margin] = np.random.uniform(-1, 1, size=int(MCpool))
                    
                    min_ = self.marginals['x' + str (margin + 1)][0]
                    max_ = self.marginals['x' + str (margin + 1)][1]
                    
                    MCinputs[:, margin] = self.surrogate.scalelegendre( MCinputs_norm[:, margin], min_, max_ )
            
            # Pf estimation ----------------------------------------------
            
            meanMC, varMC = self.surrogate.predict_fast(MCinputs_norm)    # mean, variance
            fail_samples_SUMO = np.sum(np.asarray(meanMC) < 0 )
            Pf_SUMO = fail_samples_SUMO / MCpool
            
            if GroundT == 'True':
                yMC_ref = self.targetF(MCinputs)  
                fail_samples_ref = np.sum(yMC_ref < 0 )
                Pf_Ref = fail_samples_ref / MCpool
                
            else:
                Pf_ref = 'NoRef_Pf'
            
            cov_pf = np.sqrt((1 - fail_prob_SUMO ) / (fail_prob_SUMO * MCpool) )

            print('LS: ','Degree', int(opt+1), 'e_LOO', np.min(eloo_results), 'Pf_ref',
                  Pf_ref ,'Pf_SuMo', fail_prob_SUMO_1 , 'CoV_SuMo', "%.5f" % round(cov_pf, 4))
            
            # Saving results ----------------------------

#             experiment_results[str(len(xn))+'points'] = ModelName_1 , fail_prob_SUMO_1 , cov_pf , np.min(eloo_results), np.min(mse_results)

            # Learning Function ----------------------------
    
            if self.learningF == 'U' :
                U_f = sef.U_function(meanMC.reshape(-1), varMC.reshape(-1))
                xr = np.append(xr, MCinputs[np.argmin(U_f)]).reshape(-1,2)
                xn = np.append(xn, MCinputs_norm[np.argmin(U_f)]).reshape(-1,2)
                
            elif self.learningF == 'EFF':
                eff = self.EFF(meanMC.reshape(-1),varMC.reshape(-1), 0)
                xr = np.append(xr, MCinputs[np.argmax(eff)]).reshape(-1,2)
                xn = np.append(xn, MCinputs_norm[np.argmax(eff)]).reshape(-1,2)
            
            # LOO CV errors ###################################################
            #variance modification based LOO CV erros around voronoi cells
            
            elif self.learningF == 'ULOO' :
                for k in range (0, MCpool):               
                    voro = self.VoronoiCell(MCinputs[k], xr)
                    LOOCV[k]= varMC[k] * (1 + sumat[voro, opt])
                
                U_f = sef.U_function(meanMC.reshape(-1), LOOCV.reshape(-1))
                xr = np.append(xr, MCinputs[np.argmin(U_f)]).reshape(-1,2)
                xn = np.append(xn, MCinputs_norm[np.argmin(U_f)]).reshape(-1,2)
                
            elif self.learningF == 'EFFLOO' :
                for k in range (0, MCpool):               
                    voro = self.VoronoiCell(MCinputs[k], xr)
                    LOOCV[k]= varMC[k]*(1 + sumat[voro, opt])
                
                eff = self.EFF(meanMC.reshape(-1),varMC.reshape(-1), 0)
                xr = np.append(xr, MCinputs[np.argmax(eff)]).reshape(-1,2)
                xn = np.append(xn, MCinputs_norm[np.argmax(eff)]).reshape(-1,2)
            
            else:
                print ('No learning metric speficied')

    def EFF(self, u, v, z):
        zl=-2*v
        zh=2*v
        return ((u-z)*( 2*norm.cdf((z-u)/v) - norm.cdf((zl-u)/v) - norm.cdf((zh-u)/v)) 
                -(v)*( 2*norm.pdf((z-u)/v) - norm.pdf((zl-u)/v) - norm.pdf((zh-u)/v))  
                +(2*v)*(norm.cdf((zh-u)/v) - norm.cdf((zl-u)/v)))
    
    def U_function(self, u, v):
        return np.abs(u)/v
    
    def VoronoiCell(self, x,xn):   #given x [single value] return the index of the closest xn [array]
        dist = self.surrogate.distance(x.reshape(1,-1),xn)
        return np.argmin(dist)
    

In [12]:
def LS1(x):                   #Definition of target limit state
    return x*np.sin(x)

model = {"metamodel" : 'PCK'}
training_settings = {"passive_sampling" : [10,'LHS'] , # initial samplings [n, dim, method]
                     "marginals_R" : [{'x1':[0, 15, 'legendre'], 'x2':[3, 1.5, 'hermite']}],   # {'x1':[Min, Max], 'x2':[mean, var]}  ( if Legendre , if Hermite)
                     "active_sampling" : [2, LS1, 'U'], #target settings [n_act, targetF, learningF]
                     "model": [5/2, 5]}                  #modelparameters, matern coeff, max polynomial degree

PCK_batches = Active_Training(model, training_settings)

In [14]:
# PCK_batches.active_sampling(1000, True)

In [None]:
xr1 = np.array( [9.60173493460726,
14.0942931914357,
13.3422784718819,
6.92708483258534,
4.44528591511115,
7.66438265483689,
0.0286187144815328,
5.91008701677294,
10.6641810829072,
2.13464775089995]).reshape(-1,1)

xn1 = PCK1.surrogate.LinearNorm(xr1, 0.0, 15.0, -1.0, 1.0)

y1 = LS1(xr1) 