## **Pomocne moduly**

In [322]:
import itertools
import math
import numpy as np
from scipy.special import comb
from UQpy.distributions import Uniform, JointIndependent
from UQpy.surrogates.polynomial_chaos import PolynomialChaosExpansion, TotalDegreeBasis, LeastSquareRegression
from UQpy.sensitivity import PceSensitivity
from sklearn.metrics import mean_squared_error, r2_score

Generování multiindexových množin

In [325]:
def setsize(N, w):
    return int(comb(N + w - 1, N - 1))

def td_set_recursive(N, w, rows):
    if N == 1:
        subset = w * np.ones([rows, 1])
    else:
        if w == 0:
            subset = np.zeros([rows, N])
        elif w == 1:
            subset = np.eye(N)
        else:
            subset = np.empty([rows, N])
            row_start = 0
            for k in range(0, w + 1):
                sub_rows = setsize(N - 1, w - k)
                row_end = row_start + sub_rows - 1
                subset[row_start:row_end + 1, 0] = k * np.ones(sub_rows)
                subset[row_start:row_end + 1, 1:] = td_set_recursive(N - 1, w - k, sub_rows)
                row_start = row_end + 1
    return subset

def td_multiindex_set(N, w):
    td_size = int(comb(N + w, N))
    midx_set = np.empty([td_size, N])
    row_start = 0
    for i in range(0, w + 1):
        rows = setsize(N, i)
        row_end = rows + row_start - 1
        midx_set[row_start:row_end + 1, :] = td_set_recursive(N, i, rows)
        row_start = row_end + 1
    return midx_set.astype(int)

def tp_multiindex_set(N, w):
    orders = np.arange(0, w + 1, 1).tolist()
    if N == 1:
        midx_set = np.array(list(map(lambda el: [el], orders)))
    else:
        midx = list(itertools.product(orders, repeat=N))
        midx = [list(elem) for elem in midx]
        midx_sums = [int(math.fsum(midx[i])) for i in range(len(midx))]
        midx_sorted = sorted(range(len(midx_sums)), key=lambda k: midx_sums[k])
        midx_set = np.array([midx[midx_sorted[i]] for i in range(len(midx))])
    return midx_set.astype(int)

In [327]:
import numpy as np

def admissible_neighbors(index, index_set):
    for_neighbors = forward_neighbors(index)
    for_truefalse = [is_admissible(fn, index_set) for fn in for_neighbors]
    adm_neighbors = np.array(for_neighbors)[for_truefalse].tolist()
    return adm_neighbors

def is_admissible(index, index_set):
    back_neighbors = backward_neighbors(index)
    for ind_b in back_neighbors:
        if ind_b not in index_set:
            return False
    return True

def forward_neighbors(index):
    N = len(index)
    for_neighbors = []
    for i in range(N):
        index_tmp = index[:]
        index_tmp[i] = index_tmp[i] + 1
        for_neighbors.append(index_tmp)
    return for_neighbors

def backward_neighbors(index):
    N = len(index)
    back_neighbors = []
    for i in range(N):
        index_tmp = index[:]
        if index_tmp[i] > 0:
            index_tmp[i] = index_tmp[i] - 1
            back_neighbors.append(index_tmp)
    return back_neighbors


In [329]:
!pip install UQpy
!pip install --upgrade UQpy
!pip install torch



## **PCE**

In [344]:
class PolynomialChaosExpansionUQPY:
    def __init__(self, pdf, exp_design_in, exp_design_out, degree=4):
        self.pdf = pdf if pdf is not None else JointIndependent([Uniform(0, 1) for _ in range(exp_design_in.shape[1])])
        self.num_inputs = exp_design_in.shape[1]
        self.num_outputs = exp_design_out.shape[1]
        self.num_samples = exp_design_out.shape[0]
        self.exp_design_inputs = exp_design_in
        self.exp_design_outputs = exp_design_out

        self.polynomial_basis = TotalDegreeBasis(self.pdf, degree)
        self.regression_method = LeastSquareRegression()

        # Vytvoření PCE pro každý výstup
        self.pce_list = []
        for i in range(self.num_outputs):
            pce = PolynomialChaosExpansion(
                polynomial_basis=self.polynomial_basis,
                regression_method=self.regression_method
            )
            pce.fit(exp_design_in, exp_design_out[:, i].reshape(-1, 1))
            self.pce_list.append(pce)

    def fit(self, inputs, outputs):
       
       # Trénuje PCE model na zadaných datech.
       
        for i, pce in enumerate(self.pce_list):
            pce.fit(inputs, outputs[:, i].reshape(-1, 1))

    def predict(self, design_in):
        predictions = np.zeros((design_in.shape[0], self.num_outputs))
        for i, pce in enumerate(self.pce_list):
            predictions[:, i] = pce.predict(design_in).flatten()
        return predictions

    def get_moments(self):
        mean_values = np.zeros(self.num_outputs)
        variance_values = np.zeros(self.num_outputs)
        for i, pce in enumerate(self.pce_list):
            mean_values[i], variance_values[i] = pce.get_moments()
        return mean_values, variance_values

    def compute_mean(self):
      
        mean_values, _ = self.get_moments()
        return mean_values

    def compute_variance(self):
      
        _, variance_values = self.get_moments()
        return variance_values

    def compute_sobol_first(self):
        sobol_first = []
        for i, pce in enumerate(self.pce_list):
            sensitivity = PceSensitivity(pce)
            sensitivity.run()
            sobol_first.append(sensitivity.first_order_indices.flatten())  # Zajištění 1D pole
        return np.array(sobol_first)

    def compute_sobol_total(self):
        sobol_total = []
        for i, pce in enumerate(self.pce_list):
            sensitivity = PceSensitivity(pce)
            sensitivity.run()
            sobol_total.append(sensitivity.total_order_indices.flatten())  # Zajištění 1D pole
        return np.array(sobol_total)

    def get_condition_number(self):
        condition_numbers = []
        for pce in self.pce_list:
            design_matrix = self._compute_design_matrix(self.exp_design_inputs, pce)
            condition_numbers.append(np.linalg.cond(design_matrix))
        return condition_numbers

    def _compute_design_matrix(self, inputs, pce):
        num_samples = inputs.shape[0]
        num_basis = len(pce.polynomial_basis.polynomials)
        design_matrix = np.zeros((num_samples, num_basis))

        for i in range(num_samples):
            for j in range(num_basis):
                design_matrix[i, j] = pce.polynomial_basis.polynomials[j].evaluate(inputs[i])

        return design_matrix

    
    @property # Vrátí koeficienty PCE modelu pro každý výstup jako 1D pole. Každý prvek pole odpovídá koeficientům pro jeden výstup
    def coefficients(self):
        return np.array([pce.coefficients.flatten() for pce in self.pce_list])  # Zajištění 1D pole

    @property #  Vrátí počet vstupních proměnných (dimenzí) modelu
    def inputs_number(self):
        return self.num_inputs

    @property # multi-indexy použitých polynomů v PCE modelu.
    def multi_index_set(self):
        return np.array([pce.polynomial_basis.polynomials for pce in self.pce_list])

In [332]:
# Testuje samotné PCE
"""
num_samples = 500
marginals = [Uniform(0, 5), Uniform(0, 2)] 
input_distribution = JointIndependent(marginals)
input_sample = input_distribution.rvs(num_samples)
print("Generovaná vstupní data:")
print(input_sample)

def test_function(X):
    x1, x2 = X[:, 0], X[:, 1]
    return np.expand_dims(2*x1 + 3*x2**2, axis=1)

output_sample = test_function(input_sample)

pce_uqpy = PolynomialChaosExpansionUQPY(input_distribution, input_sample, output_sample)

test_inputs = input_distribution.rvs(10)
predictions = pce_uqpy.predict(test_inputs)

mean_value = pce_uqpy.compute_mean()
variance_value = pce_uqpy.compute_variance()

sobol_first = pce_uqpy.compute_sobol_first()
sobol_total = pce_uqpy.compute_sobol_total()

print("První řád Sobolových indexů:")
print(sobol_first)
print("Celkové Sobolovy indexy:")
print(sobol_total)
print("Střední hodnota modelu:")
print(mean_value)
print("Rozptyl modelu:")
print(variance_value)

true_outputs = test_function(test_inputs)
print("Skutečné hodnoty vs Predikované hodnoty:")
for i in range(len(test_inputs)):
    print(f"Vstupy: {test_inputs[i]}, Skutečná hodnota: {true_outputs[i][0]}, Predikovaná hodnota: {predictions[i][0]}")
    
"""

'\nnum_samples = 500\nmarginals = [Uniform(0, 5), Uniform(0, 2)] \ninput_distribution = JointIndependent(marginals)\ninput_sample = input_distribution.rvs(num_samples)\nprint("Generovaná vstupní data:")\nprint(input_sample)\n\ndef test_function(X):\n    x1, x2 = X[:, 0], X[:, 1]\n    return np.expand_dims(2*x1 + 3*x2**2, axis=1)\n\noutput_sample = test_function(input_sample)\n\npce_uqpy = PolynomialChaosExpansionUQPY(input_distribution, input_sample, output_sample)\n\ntest_inputs = input_distribution.rvs(10)\npredictions = pce_uqpy.predict(test_inputs)\n\nmean_value = pce_uqpy.compute_mean()\nvariance_value = pce_uqpy.compute_variance()\n\nsobol_first = pce_uqpy.compute_sobol_first()\nsobol_total = pce_uqpy.compute_sobol_total()\n\nprint("První řád Sobolových indexů:")\nprint(sobol_first)\nprint("Celkové Sobolovy indexy:")\nprint(sobol_total)\nprint("Střední hodnota modelu:")\nprint(mean_value)\nprint("Rozptyl modelu:")\nprint(variance_value)\n\ntrue_outputs = test_function(test_inputs

## **SAPCE**

In [334]:
class SensitivityAdaptivePCE:
    def __init__(self, pdf, exp_design_in, exp_design_out, max_partial_degree=10, tolerance=1e-3):
        self.pdf = pdf if pdf is not None else JointIndependent([Uniform(0, 1) for _ in range(exp_design_in.shape[1])])
        self.exp_design_in = exp_design_in
        self.exp_design_out = exp_design_out
        self.max_partial_degree = max_partial_degree
        self.tolerance = tolerance

        # Inicializace PCE pro každý výstup
        self.pce_list = []
        self.active_multi_indices = []  # Aktivní multi-indexy pro každý výstup
        self.admissible_multi_indices = []  # Přípustné multi-indexy pro každý výstup

        for i in range(exp_design_out.shape[1]):  # Pro každý výstup
            # Inicializace s malým počtem multi-indexů
            initial_multi_indices = td_multiindex_set(len(self.pdf.marginals), 1).tolist()
            self.active_multi_indices.append(initial_multi_indices)
            self.admissible_multi_indices.append(self._generate_admissible_indices(initial_multi_indices))

            # Vytvoření PCE
            pce = PolynomialChaosExpansionUQPY(
                pdf=self.pdf,
                exp_design_in=exp_design_in,
                exp_design_out=exp_design_out[:, i].reshape(-1, 1),
                degree=max_partial_degree
            )
            self.pce_list.append(pce)

    def _generate_admissible_indices(self, active_indices):
        # Generuje přípustné multi-indexy na základě aktivních indexů
        admissible_indices = []
        for index in active_indices:
            for i in range(len(index)):
                new_index = list(index)
                new_index[i] += 1
                if new_index not in active_indices and new_index not in admissible_indices:
                    admissible_indices.append(new_index)
        return admissible_indices

    def adapt(self):
        # Adaptivní výběr multi-indexů na základě citlivosti
        for i, pce in enumerate(self.pce_list):
            sensitivity = PceSensitivity(pce)
            sensitivity.run()
            sobol_indices = sensitivity.total_order_indices.flatten()  # Zajištění 1D pole

            # Vybere nejdůležitější multi-indexy
            important_indices = [idx for idx, sobol in zip(self.admissible_multi_indices[i], sobol_indices) if sobol > self.tolerance]
            self.active_multi_indices[i].extend(important_indices)
            self.admissible_multi_indices[i] = self._generate_admissible_indices(self.active_multi_indices[i])

            # Aktualizace PCE s novými multi-indexy
            pce.polynomial_basis = TotalDegreeBasis(self.pdf, self.max_partial_degree)  # Bez multi_index_set
            pce.fit(self.exp_design_in, self.exp_design_out[:, i].reshape(-1, 1))  # Volání metody fit

    def get_active_multi_indices(self):
        return self.active_multi_indices

    def get_admissible_multi_indices(self):
        return self.admissible_multi_indices

    def get_coefficients(self):
        return [pce.coefficients.flatten() for pce in self.pce_list]  # Zajištění 1D pole

    def get_multi_index_set(self):
        return self.active_multi_indices

    def predict(self, x):
        predictions = np.zeros((x.shape[0], len(self.pce_list)))
        for i, pce in enumerate(self.pce_list):
            predictions[:, i] = pce.predict(x).flatten()
        return predictions

In [347]:
def test_SensitivityAdaptivePCE():
    dim = 3 # Počet vstupních proměnných
    size = 100 # Počet vzorků

    distribution = JointIndependent([Uniform(0, 1) for _ in range(dim)]) # Definice rozdělení 
    exp_design_in = distribution.rvs(size)  

    def model(x):
        return np.column_stack((
            x[:, 0] + 2 * x[:, 1] + 3 * x[:, 2],  #1 Výstup
            x[:, 0] * x[:, 1] + x[:, 2],  #2 Výstup
            np.sin(x[:, 0]) + np.cos(x[:, 1]), #3 Výstup
            np.exp(x[:, 2])  #4 Výstup
        ))

    exp_design_out = model(exp_design_in)

    sapce = SensitivityAdaptivePCE(
        pdf=distribution, 
        exp_design_in=exp_design_in, 
        exp_design_out=exp_design_out, 
        max_partial_degree=10,
        tolerance=1e-3
    )

    # Adaptivní výběr multi-indexů
    sapce.adapt()

    # Predikce
    predicted_output = sapce.predict(exp_design_in)

    print("\nPorovnání predikovaných a skutečných hodnot:")
    for i in range(5):
        print(f"Vzorek {i + 1}:")
        for j in range(exp_design_out.shape[1]):
            print(f"  Výstup {j + 1}: Skutečná hodnota = {exp_design_out[i, j]:.4f}, Predikovaná hodnota = {predicted_output[i, j]:.4f}")

    mse = mean_squared_error(exp_design_out, predicted_output, multioutput='raw_values')
    r2 = r2_score(exp_design_out, predicted_output, multioutput='raw_values')
    print("\nMetriky výkonu modelu:")
    for j in range(exp_design_out.shape[1]):
        print(f"Výstup {j + 1}:")
        print(f"  R² skore: {r2[j]:.4f}")

# Spuštění testu
test_SensitivityAdaptivePCE()


Porovnání predikovaných a skutečných hodnot:
Vzorek 1:
  Výstup 1: Skutečná hodnota = 3.6715, Predikovaná hodnota = 3.6715
  Výstup 2: Skutečná hodnota = 1.0405, Predikovaná hodnota = 1.0405
  Výstup 3: Skutečná hodnota = 1.2041, Predikovaná hodnota = 1.2041
  Výstup 4: Skutečná hodnota = 2.6729, Predikovaná hodnota = 2.6729
Vzorek 2:
  Výstup 1: Skutečná hodnota = 3.5733, Predikovaná hodnota = 3.5733
  Výstup 2: Skutečná hodnota = 1.0707, Predikovaná hodnota = 1.0707
  Výstup 3: Skutečná hodnota = 1.5974, Predikovaná hodnota = 1.5974
  Výstup 4: Skutečná hodnota = 1.5185, Predikovaná hodnota = 1.5185
Vzorek 3:
  Výstup 1: Skutečná hodnota = 2.9946, Predikovaná hodnota = 2.9946
  Výstup 2: Skutečná hodnota = 0.7508, Predikovaná hodnota = 0.7508
  Výstup 3: Skutečná hodnota = 1.3470, Predikovaná hodnota = 1.3470
  Výstup 4: Skutečná hodnota = 1.4382, Predikovaná hodnota = 1.4382
Vzorek 4:
  Výstup 1: Skutečná hodnota = 1.4416, Predikovaná hodnota = 1.4416
  Výstup 2: Skutečná hodnota =