In [1]:
import matplotlib.pyplot as plt
import fenics as fe
import mshr
import numpy as np
from scipy.linalg import eigh

In [2]:
# Constants
CENTER = fe.Point(0, 0)
RADIUS = 1
DOMAIN = mshr.Circle(CENTER, RADIUS)
DIRICHLET_BC = fe.Constant(0)

In [3]:
def cov1_1(x, y):
    return 5.0/100.0 * np.exp(-4.0 * ((x[0] - y[0])**2 + (x[1] - y[1])**2))
def cov1_2(x, y):
    return 1.0/100.0 * np.exp(-0.1 * ((2*x[0] - y[0])**2 + (2*x[1] - y[1])**2))
def cov2_1(x, y):
    return 1.0/100.0 * np.exp(-0.1 * ((x[0] - 2*y[0])**2 + (x[1] - 2*y[1])**2))
def cov2_2(x, y):
    return 5.0/100.0 * np.exp(-1.0 * ((x[0] - y[0])**2 + (x[1] - y[1])**2))

In [4]:
# Helpers eigenpair calculation
class BasisFunction():
    def __init__(self, basis_function: fe.Function, coordinates: np.array):
        self.function = basis_function
        self.coordinates = coordinates

def get_C_entry(mc_samples_for_c_entries, f, basis_function_i: BasisFunction, basis_function_j: BasisFunction, max_edge_length: float):
    def integrand(x, y):
        return f(x, y) * basis_function_i.function(x) * basis_function_j.function(y)
    # generate MC samples
    angles_x = np.random.uniform(0, 2 * np.pi, mc_samples_for_c_entries)
    radii_x = np.sqrt(np.random.uniform(0, max_edge_length, mc_samples_for_c_entries))
    x1_samples = basis_function_i.coordinates[0] + radii_x * np.cos(angles_x)
    x2_samples = basis_function_i.coordinates[1] + radii_x * np.sin(angles_x)
    angles_y = np.random.uniform(0, 2 * np.pi, mc_samples_for_c_entries)
    radii_y = np.sqrt(np.random.uniform(0, max_edge_length, mc_samples_for_c_entries))
    y1_samples = basis_function_j.coordinates[0] + radii_y * np.cos(angles_y)
    y2_samples = basis_function_j.coordinates[1] + radii_y * np.sin(angles_y)
    C_entry = 0
    for (x1, x2) in zip(x1_samples, x2_samples):
        for (y1, y2) in zip(y1_samples, y2_samples):
            C_entry += integrand([x1, x2], [y1, y2])
    return C_entry / (mc_samples_for_c_entries**2) * np.pi * max_edge_length**2

def calculate_vector_field_eigenpairs(mesh_resolution_c_entries, mc_samples_for_c_entries):  
    mesh = mshr.generate_mesh(DOMAIN, mesh_resolution_c_entries)
    max_edge_length = mesh.hmax()
    V = fe.FunctionSpace(mesh, "CG", 1)
    V_Vector = fe.VectorFunctionSpace(mesh, "CG", 1)
    N = V.dim()
    dof_coordinates = V.tabulate_dof_coordinates().reshape((-1, 2))

    basis_functions = []
    basis_functions_grads = []
    for i in range(N):
        basis_function = fe.Function(V)
        basis_function.vector()[i] = 1.0
        basis_function.set_allow_extrapolation(True)
        basis_functions.append(BasisFunction(basis_function, dof_coordinates[i]))
        grad = fe.project(fe.grad(basis_function), V_Vector)
        grad.set_allow_extrapolation(True)
        basis_functions_grads.append(BasisFunction(grad, dof_coordinates[i]))

    C = np.zeros((2 * N, 2 * N))
    for i, basis_function_i in enumerate(basis_functions):
        for j, basis_function_j in enumerate(basis_functions):
            if j <= i:
                # Here we use that each block is symmetric because of the symmetry of the covariance functions
                C[i, j] = C[j, i] = get_C_entry(mc_samples_for_c_entries, cov1_1, basis_function_i, basis_function_j, max_edge_length)
                C[i, N + j] = C[j, N + i] = get_C_entry(mc_samples_for_c_entries, cov1_2, basis_function_i, basis_function_j, max_edge_length)
                C[N + i, j] = C[N + j, i] = get_C_entry(mc_samples_for_c_entries, cov2_1, basis_function_i, basis_function_j, max_edge_length)
                C[N + i, N + j] = C[N + j, N + i] = get_C_entry(mc_samples_for_c_entries, cov2_2, basis_function_i, basis_function_j, max_edge_length)
    # print(f"C: {C}")

    M = np.zeros((2 * N, 2 * N))
    for i, basis_function_i in enumerate(basis_functions):
        for j, basis_function_j in enumerate(basis_functions):
            if i >= j:
                integrand = basis_function_j.function * basis_function_i.function * fe.dx
                M[i, j] = M[j, i] = M[N + i, N + j] = M[N + j, N + i] = fe.assemble(integrand)
    # print(f"M: {M}")


    J = N # Number of eigenvectors -> J = N is maximum
    eigenvalues, eigenvectors = eigh(C, M, subset_by_index=[0, J-1])
    sorted_indices = np.argsort(eigenvalues)[::-1]
    sorted_eigenvalues = eigenvalues[sorted_indices]
    sorted_eigenvectors = eigenvectors[:, sorted_indices]

    # Print the eigenvalues and eigenvectors -> important to test if MC-sample size is large enough
    # print(f"Eigenvalues: {sorted_eigenvalues}")
    # print(f"Eigenvectors: {sorted_eigenvectors}")

    # Eliminate negative eigenvalues
    for index, sorted_eigenvalue in enumerate(sorted_eigenvalues):
        if sorted_eigenvalue < 0:
            sorted_eigenvalues[index] = 0
    return sorted_eigenvalues, sorted_eigenvectors, basis_functions_grads, N, J

In [5]:
###### Section 1 ######
###### Calculation of the eigenpairs for the random field ###### 

# Inputs
# 3, 100 takes approximately 5 minutes
mesh_resolution_c_entries = 3
mc_samples_for_c_entries = 100

# Calculate the eigenpairs
eigenvalues, eigenvectors, basis_functions_grads, N, J = calculate_vector_field_eigenpairs(mesh_resolution_c_entries, mc_samples_for_c_entries)

Building point search tree to accelerate distance queries.
Computed bounding box tree with 63 nodes for 32 points.


In [6]:
# Helpers sampling
def random_field(x, eigenvalues, eigenvectors, xi, basis_functions, N, J):
    return x[0] + sum([np.sqrt(eigenvalues[j]) * sum([eigenvectors[k, j] * basis_functions[k].function(x) for k in range(N)]) * xi[j] for j in range(J)]), \
           x[1] + sum([np.sqrt(eigenvalues[j]) * sum([eigenvectors[N + k, j] * basis_functions[k].function(x) for k in range(N)]) * xi[j] for j in range(J)])

def jacobian(x, eigenvalues, eigenvectors, xi, basis_functions_grads, N, J):
    jacobian_output = np.zeros((2, 2))
    jacobian_output[0, 0] = 1 + sum([np.sqrt(eigenvalues[j]) * sum([eigenvectors[k, j] * basis_functions_grads[k].function(x)[0] for k in range(N)]) * xi[j] for j in range(J)])
    jacobian_output[0, 1] = sum([np.sqrt(eigenvalues[j]) * sum([eigenvectors[N + k, j] * basis_functions_grads[k].function(x)[0] for k in range(N)]) * xi[j] for j in range(J)])
    jacobian_output[1, 0] = sum([np.sqrt(eigenvalues[j]) * sum([eigenvectors[k, j] * basis_functions_grads[k].function(x)[1] for k in range(N)]) * xi[j] for j in range(J)])
    jacobian_output[1, 1] = 1 + sum([np.sqrt(eigenvalues[j]) * sum([eigenvectors[N + k, j] * basis_functions_grads[k].function(x)[0] for k in range(N)]) * xi[j] for j in range(J)])
    return jacobian_output

class AExpression(fe.UserExpression):
    def __init__(self, eigenvalues, eigenvectors, xi, basis_functions_grads, N, J, **kwargs):
        super().__init__(**kwargs)
        self.eigenvalues = eigenvalues
        self.eigenvectors = eigenvectors
        self.xi = xi
        self.basis_functions_grads = basis_functions_grads
        self.N = N
        self.J = J

    def eval(self, values, x):
        J_x = jacobian(x, self.eigenvalues, self.eigenvectors, self.xi, self.basis_functions_grads, self.N, self.J)
        inv_JTJ = np.linalg.inv(J_x.T @ J_x)
        det_J = np.linalg.det(J_x)
        A_x = inv_JTJ * det_J
        values[0] = A_x[0, 0]
        values[1] = A_x[0, 1]
        values[2] = A_x[1, 0]
        values[3] = A_x[1, 1]

    def value_shape(self):
        return (2, 2)

class detJExpression(fe.UserExpression):
    def __init__(self, eigenvalues, eigenvectors, xi, basis_functions_grads, N, J, **kwargs):
        super().__init__(**kwargs)
        self.eigenvalues = eigenvalues
        self.eigenvectors = eigenvectors
        self.xi = xi
        self.basis_functions_grads = basis_functions_grads
        self.N = N
        self.J = J
        
    def eval(self, values, x):
        J_x = jacobian(x, self.eigenvalues, self.eigenvectors, self.xi, self.basis_functions_grads, self.N, self.J)
        det_J = np.linalg.det(J_x)
        values[0] = det_J

    def value_shape(self):
        return ()
    
def solve_poisson_for_given_sample(mesh_resolution_fem_single, eigenvalues, eigenvectors, xi, basis_functions_grads, N, J, f):
    mesh = mshr.generate_mesh(DOMAIN, mesh_resolution_fem_single)
    V = fe.FunctionSpace(mesh, "CG", 3) #! degree 3
    u = fe.TrialFunction(V)
    v = fe.TestFunction(V)
    A_expr = AExpression(eigenvalues, eigenvectors, xi, basis_functions_grads, N, J, degree=2)
    a = fe.inner(fe.dot(A_expr, fe.grad(u)), fe.grad(v)) * fe.dx
    det_J_expr = detJExpression(eigenvalues, eigenvectors, xi, basis_functions_grads, N, J, degree=2)
    L = f * det_J_expr * v * fe.dx
    bc = fe.DirichletBC(V, DIRICHLET_BC, 'on_boundary')
    u_sol = fe.Function(V)
    fe.solve(a == L, u_sol, bc)
    return u_sol


def double_loop_mc(mc_sample_size, mesh_resolution_fem, size_of_xi, P, indices):
    # Translate math indices to CS indices
    indices = [index - 1 for index in indices]

    # Implementation of the double loop MC estimation of specific Sobol Index
    for i in range(mc_sample_size):
        print(f"First loop: Outer MC loop iteration {i+1} / {mc_sample_size}")
        mean_sols_point_P = np.zeros(mc_sample_size)
        sols_point_P = []
        sample = np.zeros(size_of_xi + 1)
        for index in indices:
            if index < size_of_xi:
                sample[index] = np.random.uniform(-np.sqrt(3), np.sqrt(3))
            else:
                sample[index] = np.random.normal(0, 1)
        missing_indices = [i for i in range(size_of_xi + 1) if i not in indices]
        for k in range(mc_sample_size):
            for index in missing_indices:
                if index < size_of_xi:
                    sample[index] = np.random.uniform(-np.sqrt(3), np.sqrt(3))
                else:
                    sample[index] = np.random.normal(0, 1)
            sols_point_P.append(solve_poisson_for_given_sample(mesh_resolution_fem, eigenvalues, eigenvectors, sample[0:-1], basis_functions_grads, N, size_of_xi, sample[-1])(P))
        mean_sols_point_P[i] = np.mean(sols_point_P)
    var_A = np.var(mean_sols_point_P)
    sols_point_P = []
    for i in range(mc_sample_size):
        print(f"Second loop: {i+1} / {mc_sample_size}")
        xi = np.random.uniform(-np.sqrt(3), np.sqrt(3), size_of_xi)
        f = np.random.normal(0, 1)
        sols_point_P.append(solve_poisson_for_given_sample(mesh_resolution_fem, eigenvalues, eigenvectors, xi, basis_functions_grads, N, size_of_xi, f)(P))
    var = np.var(sols_point_P)
    return var_A / var


def pick_freeze(mc_sample_size, mesh_resolution_fem, size_of_xi, P, indices):
    # Translate math indices to CS indices
    indices = [index - 1 for index in indices]

    y = []
    y_tilde = []
    for i in range(mc_sample_size):
        print(f"First loop: {i+1} / {mc_sample_size}")
        x = np.zeros(size_of_xi + 1)
        x_prime = np.zeros(size_of_xi + 1)
        for index in indices:
            if index < size_of_xi:
                sample = np.random.uniform(-np.sqrt(3), np.sqrt(3))
                x[index] = sample
                x_prime[index] = sample
            else:
                sample = np.random.normal(0, 1)
                x[index] = sample
                x_prime[index] = sample
        for index in [i for i in range(size_of_xi + 1) if i not in indices]:
            if index < size_of_xi:
                x[index] = np.random.uniform(-np.sqrt(3), np.sqrt(3))
                x_prime[index] = np.random.uniform(-np.sqrt(3), np.sqrt(3))
            else:
                x[index] = np.random.normal(0, 1)
                x_prime[index] = np.random.normal(0, 1)
        y.append(solve_poisson_for_given_sample(mesh_resolution_fem, eigenvalues, eigenvectors, x[0:-1], basis_functions_grads, N, size_of_xi, x[-1])(P))
        y_tilde.append(solve_poisson_for_given_sample(mesh_resolution_fem, eigenvalues, eigenvectors, x_prime[0:-1], basis_functions_grads, N, size_of_xi, x_prime[-1])(P))
    y_bar = np.mean(y)
    y_tilde_bar = np.mean(y_tilde)
    if mc_sample_size > 1:
        var_A = 1/(mc_sample_size - 1) * sum([(y[i] - y_bar) * (y_tilde[i] - y_tilde_bar) for i in range(mc_sample_size)])
    else:
        var_A = 0

    sols_point_P = []
    for i in range(mc_sample_size):
        print(f"Second loop: {i+1} / {mc_sample_size}")
        xi = np.random.uniform(-np.sqrt(3), np.sqrt(3), size_of_xi)
        f = np.random.normal(0, 1)
        sols_point_P.append(solve_poisson_for_given_sample(mesh_resolution_fem, eigenvalues, eigenvectors, xi, basis_functions_grads, N, size_of_xi, f)(P))
    var = np.var(sols_point_P)
    return var_A / var

def rank(i, sorted_samples, samples):
    return sorted_samples.index(samples[i])
def rank_inv(i, sorted_samples, samples):
    return samples.index(sorted_samples[i])
def P_j(i, sorted_samples, samples):
    if rank(i, sorted_samples, samples) + 2 <= len(samples):
        return rank_inv(rank(i, sorted_samples, samples) + 1, sorted_samples, samples)
    else:
        return rank_inv(0, sorted_samples, samples)

def rank_statistics_permute(samples, y):
    sorted_samples = sorted(samples)
    helper = np.zeros((len(y), 3))
    for i in range(len(y)):
        helper[i, 0] = y[i]
        helper[i, 1] = P_j(i, sorted_samples, samples)
        helper[i, 2] = samples[P_j(i, sorted_samples, samples)]
    sorted_helper = helper[helper[:, 2].argsort()]
    return sorted_helper[:, 0]
    
def rank_statistics(mc_sample_size, mesh_resolution_fem, size_of_xi, P, index):
    # Translate math index to CS index
    index -= 1

    y = []
    samples = np.zeros((mc_sample_size, size_of_xi + 1))
    for i in range(mc_sample_size):
        print(f"Loop: {i+1} / {mc_sample_size}")
        samples[i, 0:-1] = np.random.uniform(-np.sqrt(3), np.sqrt(3), size_of_xi)
        samples[i, -1] = np.random.normal(0, 1)
        y.append(solve_poisson_for_given_sample(mesh_resolution_fem, eigenvalues, eigenvectors, samples[i, 0:-1], basis_functions_grads, N, size_of_xi, samples[i, -1])(P))
    y = rank_statistics_permute(samples[:, index].tolist(), y)
    y_bar = np.mean(y)
    V_hat_j = 1/mc_sample_size * sum([y[i] * y[P_j(i, sorted(samples[:, index].tolist()), samples[:, index].tolist())] for i in range(mc_sample_size)]) - y_bar**2
    V_hat = 1/mc_sample_size * sum([y[i]**2 for i in range(mc_sample_size)]) - y_bar**2
    return V_hat_j / V_hat


In [7]:
###### Section 2 ######
###### Sobol Index Estimates ######

# Inputs

# Times for only double loop MC:
#   3, 4, 3 takes approximately 2,5 minutes
#   8, 4, 3 takes approximately 14 minutes -> result index123: 0.3, index4: 0.07

# Times for double loop MC and pick freeze:
#   6, 4, 3 takes approximately 32 minutes
mc_sample_size = 6
mesh_resolution_fem = 4
size_of_xi = 3
P = fe.Point(0, 0)


indices = [1, 2, 3] # [1, 2, 3] are all xi
double_loop_mc_closed_sobol_index_123 = double_loop_mc(mc_sample_size, mesh_resolution_fem, size_of_xi, P, indices)

indices = [4] # [4] is f
double_loop_mc_closed_sobol_index_4 = double_loop_mc(mc_sample_size, mesh_resolution_fem, size_of_xi, P, indices)

indices = [1, 2, 3] # [1, 2, 3] are all xi
pick_freeze_closed_sobol_index_123 = pick_freeze(mc_sample_size, mesh_resolution_fem, size_of_xi, P, indices)

indices = [4] # [4] is f
pick_freeze_closed_sobol_index_4 = pick_freeze(mc_sample_size, mesh_resolution_fem, size_of_xi, P, indices)

print(f"Double loop MC closed sobol index 123: {double_loop_mc_closed_sobol_index_123}")
print(f"Double loop MC closed sobol index 4: {double_loop_mc_closed_sobol_index_4}")
print(f"Pick freeze closed sobol index 123: {pick_freeze_closed_sobol_index_123}")
print(f"Pick freeze closed sobol index 4: {pick_freeze_closed_sobol_index_4}")

First loop: Outer MC loop iteration 1 / 6
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
First loop: Outer MC loop iteration 2 / 6
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem

In [8]:
# Compare sobol indices for the samples of xi

# Inputs
# 10, 4, 3 takes approximately 35 minutes
mc_sample_size = 10
mesh_resolution_fem = 4
size_of_xi = 3
P = fe.Point(0, 0)

s_1 = double_loop_mc(mc_sample_size, mesh_resolution_fem, size_of_xi, P, [1])
s_2 = double_loop_mc(mc_sample_size, mesh_resolution_fem, size_of_xi, P, [2])
s_3 = double_loop_mc(mc_sample_size, mesh_resolution_fem, size_of_xi, P, [3])

print(f"S_1: {s_1}, S_2: {s_2}, S_3: {s_3}")

First loop: Outer MC loop iteration 1 / 10
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Calling FFC just-in-time (JIT) compiler

In [9]:
# Sobol Index estimation by rank statistics

# Inputs
# 5, 4, 3 takes approximately 2 minutes
mc_sample_size = 5
mesh_resolution_fem = 4
size_of_xi = 3
P = fe.Point(0, 0)

index = 1
s_1 = rank_statistics(mc_sample_size, mesh_resolution_fem, size_of_xi, P, index)

index = 4
s_4 = rank_statistics(mc_sample_size, mesh_resolution_fem, size_of_xi, P, index)

print(f"Rank statistics estimates: S_1: {s_1}, S_4: {s_4}")

Loop: 1 / 5
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Loop: 2 / 5
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Loop: 3 / 5
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Loop: 4 / 5
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Loop: 5 / 5
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Loop: 1 / 5
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Loop: 2 / 5
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Loop: 3 / 5
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving linear variational problem.
Loop: 4 / 5
Calling FFC just-in-time (JIT) compiler, this may take some time.
Solving li