# Efficient Global Optimization employing Deep Gaussian Process Regression

See DGPR implementations in:

https://docs.gpytorch.ai/en/latest/examples/01_Exact_GPs/Simple_GP_Regression.html
https://docs.gpytorch.ai/en/stable/examples/06_PyTorch_NN_Integration_DKL/KISSGP_Deep_Kernel_Regression_CUDA.html

In [None]:
import numpy as np

from scipy.stats import qmc
from scipy.special import erf
import scipy.io as sio

import torch
from torch.nn import Sequential, Linear, ReLU, init
from torch.nn.utils import spectral_norm
from torch.optim import Adam

from gpytorch.distributions import MultivariateNormal
from gpytorch.models import ExactGP
from gpytorch.means import ConstantMean
from gpytorch.kernels import ScaleKernel, MaternKernel, RBFKernel
from gpytorch.utils.grid import ScaleToBounds
from gpytorch.likelihoods import GaussianLikelihood
from gpytorch.mlls import ExactMarginalLogLikelihood
from gpytorch.settings import use_toeplitz, fast_pred_var
from gpytorch.constraints.constraints import GreaterThan

from sklearn.model_selection import KFold
from sklearn.model_selection import train_test_split

import matplotlib.pyplot as plt
import random
import torch
from bsa import bsa
import time

In [None]:
from ansys.mapdl.core import launch_mapdl
def launch_mapdl_on_available_port(starting_port=50052, max_attempts=10):
    for i in range(max_attempts):
        port = starting_port + i
        try:
            mapdl = launch_mapdl(port=port)
            print(f"MAPDL launched successfully on port {port}")
            return mapdl
        except Exception as e:
            print(f"Failed to launch MAPDL on port {port}: {e}")
    raise RuntimeError("Could not launch MAPDL on any available port")

# Use a função para iniciar uma instância do MAPDL
mapdl = launch_mapdl_on_available_port()

## *GPR Model*

### Gaussian Process Regression

In [None]:
class GPRegressionModel(ExactGP):
    def __init__(self, train_x, train_y, likelihood):
        super(GPRegressionModel, self).__init__(train_x, train_y, likelihood)
        self.mean_module = ConstantMean()
        self.covar_module = ScaleKernel(RBFKernel())
        self.scale_to_bounds = ScaleToBounds(-1., 1.)
        
        # Store train_y for later use
        self.train_outputs = train_y

    def forward(self, x):
        projected_x = self.scale_to_bounds(x)
        mean_x = self.mean_module(projected_x)
        covar_x = self.covar_module(projected_x)
        return MultivariateNormal(mean_x, covar_x)


### Train model

In [None]:
def train_model(train_x, train_g, val_x, val_g, training_iterations):
    # Initialize the models and likelihood
    likelihood = GaussianLikelihood()
    model = GPRegressionModel(train_x=train_x, train_y=train_g, likelihood=likelihood)

    # if torch.cuda.is_available():
    #     model = model.cuda()
    #     likelihood = likelihood.cuda()

    # Find optimal model hyperparameters
    model.train()
    likelihood.train()

    # Use the adam optimizer
    optimizer = Adam([
        {'params': model.covar_module.parameters()},
        {'params': model.mean_module.parameters()},
        {'params': model.likelihood.parameters()},
    ], lr=0.005)

    # "Loss" for GPs - the marginal log likelihood
    mll = ExactMarginalLogLikelihood(likelihood, model)

    # To track loss values
    training_losses = []
    validation_losses = []

    # Training loop with validation
    def train():
        best_loss, best_val_loss, best_train_loss = 1e4, 1e4, 1e4
        patience = int(training_iterations * 0.01)
        wait = 0

        for epoch in range(training_iterations):
            model.train()
            likelihood.train()

            # Zero backprop gradients
            optimizer.zero_grad()

            # Forward pass and calculate loss on training set
            output = model(train_x)
            loss = -mll(output, train_g)
            loss.backward()
            optimizer.step()

            # Validation step
            model.eval()
            likelihood.eval()
            with torch.no_grad():
                val_output = model(val_x)
                val_loss = -mll(val_output, val_g).item()

            # Save the best model based on validation and training loss
            training_loss = loss.item()
            final_loss = val_loss # val_loss*0.5 + training_loss*0.5
 
            if final_loss < best_loss:
                best_loss = final_loss
                best_val_loss = val_loss
                best_train_loss = training_loss
                best_epoch = epoch
                torch.save(model.state_dict(), 'best_model_GPR.pth')
                wait = 0  # Reset patience counter when improvement is found
            else:
                wait += 1  # Increment patience counter if no improvement

            # Early stopping
            if wait > patience:
                print(f'Early stopping at epoch {epoch + 1}')
                break

            # Track losses for plotting
            training_losses.append(loss.item())
            validation_losses.append(val_loss)

            # print(f'Epoch {epoch + 1} - Training Loss: {loss.item()} - Validation Loss: {val_loss}')

        print(f'Best Loss: {best_loss} at epoch {best_epoch}. Training loss: {best_train_loss} and val. loss: {best_val_loss}')
        return best_val_loss, best_train_loss
    best_loss = train()

    # Load the best model state
    model.load_state_dict(torch.load('best_model_GPR.pth'))

    # Set model and likelihood to eval mode for further evaluation
    model.eval()
    likelihood.eval()

    # Plot training and validation loss
    plt.figure(figsize=(10, 5))
    plt.plot(training_losses, label='Training Loss')
    plt.plot(validation_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss Over Epochs')
    plt.legend()
    plt.grid(True)
    plt.show()

    return model, likelihood, best_loss


## Initial sampling plan (Latin Hypercube sampling)

In [None]:
def LHS_sample(num_points, DIM):
    # Number of variables and sampling points
    size = int(num_points)

    # Generate LHS samples
    sampler = qmc.LatinHypercube(d=DIM,
                                 optimization="random-cd")
    lhs_sample = sampler.random(n=size)

    return lhs_sample

## Expected Improvement

In [None]:
def expected_improvement(x, model):
    """
    Function to calculate the Expected Improvement using Monte Carlo Integration.
    
    Parameters:
        x (array): Individual under evaluation.
        y (array): Valor mínimo obtido até o momento.
    
    Returns:
        array: The value of the Expected Improvement.
    """
    # Get the minimum value obtained so far
    ymin = torch.min(model.train_outputs)
    # ymin = torch.Tensor(ymin)

    # Calculate the prediction value and the variance (Ssqr)
    with torch.no_grad(), use_toeplitz(False), fast_pred_var():
        preds = model(x)
    f = preds.mean
    s = preds.variance

    # Check for any errors that are less than zero (due to numerical error)
    s[s < 0] = 0  # Set negative variances to zero

    # Calculate the RMSE (Root Mean Square Error)
    s = torch.sqrt(s)

    # Calculation of Expected Improvement
    term1 = (ymin - f) * (0.5 + 0.5 * erf((ymin - f) / (s * torch.sqrt( torch.from_numpy(np.array([2])) ))))
    term2 = (1 / torch.sqrt(2 * torch.from_numpy(np.array([np.pi])))) * s * torch.exp(-((ymin - f) ** 2) / (2 * s ** 2))
    
    return -(term1 + term2)

## Define the problem

### Objective function

Your objective function must go inside obj_fun. The input is *xx*, your design variables vector, and the output is the objective function value (your own cost function).

You can either import your objective function from a *.py* file using `import obj_fun from my_file` (where *my_file.py* contains your objective function that is called *obj_fun*, already organized to receive a design variable vector (or matrix) as input and output an objective function value) or just replace the *obj_fun* code below by your entire code.

In [None]:
import math
import numpy as np 
import pandas as pd

# Defining variables only once
pile_radius = 0.5
minimum_distance = pile_radius * 2.5  # 5 times the pile radius
width = 9
height = 3.5
n_piles = 3
n_central_piles = round(2)
pile_length = 35

# AUXILIARY FUNCTIONS
def round_to_nearest(value, targets=[0, 0.333, 0.666, 1]): 
    # Rounds vertical angle to the nearest target value to work within equipment construction limits 
    # and reduce sample space
    return min(targets, key=lambda t: abs(value - t))

def distance_between_points(p1, p2): 
    # Measures the distance between two points to ensure geometric structure limits
    return math.sqrt((p2[0] - p1[0]) ** 2 + (p2[1] - p1[1]) ** 2)

def mirror_points(coordinates, n_central_piles): 
    # Mirrors points for symmetrical pile placement
    mirror = [(x, -y, z) for x, y, z in coordinates[:-int(n_central_piles)]]
    return mirror

def convert_x(normalized_x): 
    # Converts normalized values to actual X coordinates
    return pile_radius + normalized_x * (width - 2 * pile_radius)

def convert_y(normalized_y): 
    # Converts normalized values to actual Y coordinates
    return pile_radius + normalized_y * (height - 2 * pile_radius)

def convert_vertical_angles(vertical_angles_array): 
    # Converts normalized values to vertical angles in degrees
    return [np.round(75 + 15 * ang_v, 2) for ang_v in vertical_angles_array]

def convert_horizontal_angles(horizontal_angles_array, n_piles): 
    # Converts normalized values to horizontal angles in degrees
    result = [
        np.round(225 - 270 * ang_h, 2) if i < n_piles else np.round(180 * ang_h, 2)
        for i, ang_h in enumerate(horizontal_angles_array)
    ]
    return result

def round_to_binary(value):
    return 1 if value >= 0.5 else 0

def spherical_to_cartesian(x0, y0, z0, pile_length, vertical_angle_deg, horizontal_angle_deg): 
    # Converts spherical coordinates to cartesian coordinates
    theta_v = np.deg2rad(vertical_angle_deg)
    theta_h = np.deg2rad(horizontal_angle_deg)
    
    dx = pile_length * np.cos(theta_v) * np.cos(theta_h)
    dy = pile_length * np.cos(theta_v) * np.sin(theta_h)
    dz = pile_length * np.sin(theta_v)
    
    x_final = x0 + dx
    y_final = y0 + dy
    z_final = z0 + dz

    return x_final, y_final, z_final

def generate_end_points(start_points, vertical_angles, horizontal_angles): 
    # Generates end points based on spherical directions
    final_coordinates = []
    for i in range(len(start_points)):
        x, y, z = start_points[i]
        theta_v, theta_h = vertical_angles[i], horizontal_angles[i]
        x_f, y_f, z_f = spherical_to_cartesian(x, y, z, pile_length, theta_v, theta_h)
        final_coordinates.append((round(x_f, 2), round(y_f, 2), -1 * round(z_f, 2)))
    return final_coordinates

def mirror_end_points(points, k):
    # Mirrors the end points of piles along Y axis
    mirror = []
    for (x, y, z) in points[:-k]:
        mirror.append((x, -y, z))
    return mirror

def distance_between_points_r3(p1, p2): 
    # Ensures geometric/normative limits of the structure
    return round(np.linalg.norm(p1 - p2), 2)

def find_shortest_distance_between_vectors(start_points, end_points, k, l, length):
    # Computes the minimum distance between two pile vectors
    vector_k = end_points[k] - start_points[k]
    vector_l = end_points[l] - start_points[l]

    # Normalize to get direction vectors
    vector_k_normalized = np.round(vector_k / length, 2)
    vector_l_normalized = np.round(vector_l / length, 2)

    min_distance = float('inf')
    point_v1_min = None
    point_v2_min = None
    N1_min = 0
    N2_min = 0
    distance = 0

    for N1 in np.arange(1, length - 1, 1):
        for N2 in np.arange(1, length - 1, 1):
            point_v1 = start_points[k] + N1 * vector_k_normalized
            point_v2 = start_points[l] + N2 * vector_l_normalized

            distance = round(distance_between_points_r3(point_v1, point_v2), 2)

            if distance < min_distance:
                min_distance = distance
                point_v1_min = point_v1
                point_v2_min = point_v2
                N1_min = N1
                N2_min = N2

    return min_distance, point_v1_min, point_v2_min, N1_min, N2_min


# Ensures geometric/normative limits of the structure. Attempts to briefly optimize it;
# if not feasible, applies a penalty.
def check_and_adjust_points(points, pile_length, min_distance, final_points, vertical_angles_converted, horizontal_angles_converted, horizontal_angles, penalty):
    max_iterations = 1 * (n_piles + n_central_piles + n_piles)
    iteration = 0
    final_points = np.array(final_points)
    increment = 0.05
    trial = 5

    for k in np.arange(0, 5, 1):  # Replace with percentage of piles in the future
        while iteration <= max_iterations:
            point_adjusted = False

            for l in np.arange(0, 5, 1):  # Replace with percentage of piles in the future
                if k == l:
                    continue

                point1 = final_points[k]
                point2 = final_points[l]
                dist = distance_between_points_r3(point1, point2)

                min_dist, _, _, _, _ = find_shortest_distance_between_vectors(points, final_points, k, l, pile_length)

                if min_dist < 2.1 * pile_radius or dist < 5 * pile_radius:
                    point_adjusted = True
                    if k <= n_piles:
                        horizontal_angles_converted[k] = np.round(horizontal_angles_converted[k] + increment, 2)
                        horizontal_angles[k] = np.round(horizontal_angles[k] + increment, 2)
                    else:
                        # Alternate between 0/180 and 0/1 using (-1)**trial
                        if (-1) ** trial == 1:
                            horizontal_angles_converted[k] = 180
                            horizontal_angles[k] = 1
                        else:
                            horizontal_angles_converted[k] = 0
                            horizontal_angles[k] = 0

                    x_new, y_new, z_new = spherical_to_cartesian(
                        points[k][0], points[k][1], points[k][2],
                        pile_length, vertical_angles_converted[k], horizontal_angles_converted[k]
                    )
                    x = np.round(x_new, 2)
                    y = np.round(y_new, 2)
                    z = -1 * np.round(z_new, 2)

                    if k < n_piles:
                        final_points[k, :] = [x, y, z]
                        final_points[k + n_piles + n_central_piles, :] = [x, -y, z]
                    else:
                        final_points[k, :] = [x, y, z]
                    break

            if not point_adjusted:
                break
            trial += 1
            
            if trial >= max_iterations:
                penalty += ((((3 * pile_radius)/(0.7 + min_dist) + (5 * pile_radius)/(0.7 + dist))) / 2)
                k += 1
                iteration += 1
                break

    return final_points, horizontal_angles_converted, horizontal_angles, penalty
def analysis(initial_points, adjusted_final_points, horizontal_angles_array, vertical_angles_array):  # Structural analysis in ANSYS
    mapdl.clear('NOSTART')
    mapdl.prep7()

    # Title
    mapdl.title('Pile and Shell Analysis')

    # Define element type (BEAM188) and its properties
    mapdl.et(1, 'BEAM188')

    # Material properties
    elasticity_modulus = 0.9 * 5600 * (40 ** 0.5) * 1e6  # N/m²
    mapdl.mp('EX', 1, elasticity_modulus)
    mapdl.mp('PRXY', 1, 0.3)  # Poisson's ratio
    mapdl.mp('DENS', 1, 2500)  # Density

    # Beam section properties
    mapdl.sectype(1, 'BEAM', 'CSOLID')
    mapdl.secoffset('CENT')
    mapdl.secdata(0.5)

    # Number of intermediate nodes
    num_intermediate_nodes = 9

    # Add nodes
    node_id = 1
    for i in range(2 * n_piles + n_central_piles):
        x_start, y_start, z_start = initial_points[i]
        x_end, y_end, z_end = adjusted_final_points[i]

        mapdl.n(node_id, x_start, y_start, z_start)
        node_id_final = node_id + num_intermediate_nodes + 1
        mapdl.n(node_id_final, x_end, y_end, z_end)

        mapdl.d(node_id_final, 'ALL', 0)
        mapdl.fill(node_id, node_id_final, num_intermediate_nodes)
        node_id = node_id_final + 1

    # Generate elements
    element_id = 1
    for i in range(2 * n_piles + n_central_piles):
        for j in range(1, num_intermediate_nodes + 2):
            N1 = j + (num_intermediate_nodes + 2) * i
            N2 = N1 + 1
            mapdl.en(element_id, N1, N2)
            element_id += 1

    # Select BEAM188 elements
    mapdl.esel('S', 'TYPE', '', 1)

    # Count selected elements
    num_elem = mapdl.get('num_elem', 'ELEM', 0, 'COUNT')
    k = num_elem

    # HEADPIECE
    mapdl.n(1000, width / 2, 0, 0)
    mapdl.n(1001, width / 2, 0, 1.5)
    mapdl.en(k + 1, 1000, 1001)

    # FENDER
    mapdl.n(1002, width, 0, 0)
    mapdl.n(1003, width, 0, -1)
    mapdl.en(k + 2, 1002, 1003)

    # Define SHELL181 element type
    mapdl.et(2, 'SHELL181')
    mapdl.keyopt(2, 8, 2)  # Elastoplastic
    mapdl.keyopt(2, 3, 2)  # Stress accuracy

    # Material properties for SHELL181
    shell_elasticity_modulus = 0.9 * 5600 * (20 ** 0.5) * 1e6
    mapdl.mp('EX', 2, shell_elasticity_modulus)
    mapdl.mp('PRXY', 2, 0.2)
    mapdl.mp('DENS', 2, 2500)

    # Define shell section
    mapdl.sectype(2, 'SHELL')
    mapdl.secdata(1.5)

    # Create rectangle and mesh it
    mapdl.rectng(0, width, -3, 3)
    mapdl.esize(0.1)
    mapdl.amesh('ALL')

    # Select SHELL181 elements and assign section
    mapdl.esel('S', 'TYPE', '', 2)
    mapdl.emodif('ALL', 'SECNUM', 2)

    # Merge nodes
    mapdl.nsel('S', 'LOC', 'Z', 0, 1e5)
    mapdl.nummrg('NODE', 1e-5)
    mapdl.nsel('ALL')

    # Apply gravity
    mapdl.acel(0, 0, -9.81)
    mapdl.finish()

    # Solution phase
    mapdl.slashsolu()
    mapdl.antype(0)

    # Load application
    force = 1_000_000  # 1 MN
    load_cases = 12

    for i in range(1, load_cases + 2):
        rad = ((((i - 1) * (90 / load_cases)) - 90) * math.pi / 180)
        FX = math.cos(rad) * force
        FY = math.sin(rad) * force

        mapdl.allsel('ALL')
        mapdl.f(1001, 'FX', FX)
        mapdl.f(1001, 'FY', FY)
        mapdl.solve()
        mapdl.save(f'load_step_{i}')

    # Load step 14 (point 1003, FX = -1500 kN)
    mapdl.allsel('ALL')
    mapdl.f(1003, 'FX', -1_500_000)
    mapdl.solve()
    mapdl.save('load_step_14')

    # Load step 15 (point 1003, FY = 30% of 1500 kN)
    mapdl.allsel('ALL')
    mapdl.f(1003, 'FY', -1_500_000 * 0.3)
    mapdl.solve()
    mapdl.save('load_step_15')

    mapdl.finish()

    # Post-processing
    mapdl.post1()
    for i in range(1, load_cases + 4):
        mapdl.lcdef(i, i, 1)

    mapdl.lczero()
    mapdl.lcoper("ADD", 14)
    mapdl.lcoper("ADD", 15)
    mapdl.lcwrite(16)
    
    n_loadcases = 16
    mapdl.lcase(1)

    # MAX evaluation
    for R in range(2, load_cases + 5):
        mapdl.lcoper('MAX', R)
        n_loadcases += 1
        mapdl.lcwrite(n_loadcases)
    mapdl.lcase(n_loadcases)
    mapdl.etable('Fx_MAX', 'SMISC', 1, 'MAX')

    # MIN evaluation
    mapdl.lcase(1)
    for R in range(2, load_cases + 5):
        mapdl.lcoper('MIN', R)
        n_loadcases += 1
        mapdl.lcwrite(n_loadcases)
    mapdl.lcase(n_loadcases)
    mapdl.etable('Fx_MIN', 'SMISC', 1, 'MIN')

    # ABSMAX evaluation
    mapdl.lcase(1)
    for R in range(2, 17):
        n_loadcases += 1
        mapdl.lcoper('ABMX', R)
        mapdl.lcwrite(n_loadcases)
    mapdl.lcase(n_loadcases)
    mapdl.etable('My_ABMX', 'SMISC', 2, 'ABMX')
    mapdl.etable('Mz_ABMX', 'SMISC', 3, 'ABMX')
    mapdl.etable('UX', 'U', 'X')
    mapdl.etable('UY', 'U', 'Y')
    mapdl.etable('UZ', 'U', 'Z')

    # Element sequence
    element_sequence = []
    j = 1
    for i in range(0, (2 * n_piles) + n_central_piles):
        y = num_intermediate_nodes * i + j
        element_sequence.append(y)
        j += 1

    # Result storage
    data = {
        "Fx_MAX": [],
        "Fx_MIN": [],
        "Utot": [],
        "My": [],
        "Mz": [],
        "r_max": [],
        "load_case_max": []
    }

    for elem in element_sequence:
        fx_max = mapdl.get_value('ELEM', elem, 'ETABLE', 'Fx_MAX') / 1000
        fx_min = mapdl.get_value('ELEM', elem, 'ETABLE', 'Fx_MIN') / 1000
        ux = mapdl.get_value('ELEM', elem, 'ETABLE', 'UX') * 1000
        uy = mapdl.get_value('ELEM', elem, 'ETABLE', 'UY') * 1000
        uz = mapdl.get_value('ELEM', elem, 'ETABLE', 'UZ') * 1000
        Utot = math.sqrt(ux**2 + uy**2 + uz**2)

        data["Fx_MAX"].append(fx_max)
        data["Fx_MIN"].append(fx_min)
        data["Utot"].append(Utot)

    for elem in element_sequence:
        r_max_elem = 0
        my_max_elem = 0
        mz_max_elem = 0
        load_case_max = 0

        for load_case in range(1, load_cases + 2):
            mapdl.lcase(load_case)
            mapdl.etable('My', 'SMISC', 2)
            mapdl.etable('Mz', 'SMISC', 3)

            my_val = mapdl.get_value('ELEM', elem, 'ETABLE', 'My') / 1000
            mz_val = mapdl.get_value('ELEM', elem, 'ETABLE', 'Mz') / 1000
            r_val = math.sqrt(my_val**2 + mz_val**2)

            if r_val > r_max_elem:
                r_max_elem = r_val
                my_max_elem = my_val
                mz_max_elem = mz_val
                load_case_max = load_case

        data["My"].append(my_max_elem)
        data["Mz"].append(mz_max_elem)
        data["r_max"].append(r_max_elem)
        data["load_case_max"].append(load_case_max)

    df = pd.DataFrame(data).round(1)

    def wp(x):
        if x <= 0:
            return -x**2
        elif x > 0:
            return x**2
        return np.nan

    delta1, delta2, delta3, delta4 = 5500, 2000, 2500, 125

    def calculate_Pt(df):
        df['Pt_Fx_MAX'] = 0.0
        df['Pt_Fx_MIN'] = 0.0
        df['Pt_r_max'] = 0.0
        df['Pt_Utot'] = 0.0

        p_total = 0
        p_array = []

        for index, row in df.iterrows():
            df.at[index, 'Pt_Fx_MAX'] = wp((abs(row['Fx_MAX']) - delta1) / delta1 if row['Fx_MAX'] > 0 else (abs(row['Fx_MAX']) - delta2) / delta2)
            df.at[index, 'Pt_Fx_MIN'] = wp((abs(row['Fx_MIN']) - delta1) / delta1 if row['Fx_MIN'] > 0 else (abs(row['Fx_MIN']) - delta2) / delta2)
            df.at[index, 'Pt_r_max'] = wp((abs(row['r_max']) - delta3) / delta3)
            df.at[index, 'Pt_Utot'] = wp((abs(row['Utot']) - delta4) / delta4)

            p_current = max(df.at[index, 'Pt_Fx_MAX'], df.at[index, 'Pt_Fx_MIN'], df.at[index, 'Pt_r_max'], df.at[index, 'Pt_Utot'])
            p_array.append(p_current)
            p_total = max(p_array)

        return p_total

    p_total = calculate_Pt(df)

    return p_total, initial_points, adjusted_final_points, horizontal_angles_array, vertical_angles_array
# Updated version of obj_fun with float conversion to avoid tensor-related errors
# Now using `.item()` to safely extract values from PyTorch tensors
# Supports both single and batch evaluations

def obj_fun(xx):
    p = 0
    results = []  # Accumulate evaluation values

    # If input is 0-D tensor, convert to 1-D
    if xx.dim() == 0:
        xx = xx.unsqueeze(0)

    # Iterate through each row (for 2D tensors) or handle single input
    for i in range(xx.shape[0]):
        p = 0
        line = xx[i] if xx.dim() > 1 else xx

        # Extract normalized input variables
        x_aba_1, x_aba_2, x_aba_3, x_central_1, x_central_2 = (
            round(float(line[0].item()), 1), round(float(line[1].item()), 1),
            round(float(line[2].item()), 1), round(float(line[3].item()), 1),
            round(float(line[4].item()), 1)
        )

        y_aba_1, y_aba_2, y_aba_3 = (
            round(float(line[5].item()), 1), round(float(line[6].item()), 1),
            round(float(line[7].item()), 1)
        )

        # Define initial coordinates
        coordinates = [
            [convert_x(x_aba_1), convert_y(y_aba_1), 0], 
            [convert_x(x_aba_2), convert_y(y_aba_2), 0],
            [convert_x(x_aba_3), convert_y(y_aba_3), 0],
            [convert_x(x_central_1), 0.0, 0.0], 
            [convert_x(x_central_2), 0.0, 0.0]
        ]

        # Process vertical and horizontal angles
        ang_v1, ang_v2, ang_v3, ang_v4, ang_v5 = (
            round_to_nearest(float(line[9].item())),
            round_to_nearest(float(line[10].item())),
            round_to_nearest(float(line[11].item())),
            round_to_nearest(float(line[12].item())),
            round_to_nearest(float(line[13].item()))
        )

        ang_h1, ang_h2, ang_h3, ang_h4, ang_h5 = (
            float(line[14].item()),
            float(line[15].item()),
            float(line[16].item()),
            round_to_binary(float(line[17].item())),
            round_to_binary(float(line[18].item()))
        )

        # Store angles
        vertical_angles = [ang_v1, ang_v2, ang_v3, ang_v4, ang_v5]
        horizontal_angles = [ang_h1, ang_h2, ang_h3, ang_h4, ang_h5]

        # Convert angles to degrees
        vertical_angles_deg = convert_vertical_angles(vertical_angles)
        horizontal_angles_deg = convert_horizontal_angles(horizontal_angles, n_piles)

        # Mirror base coordinates
        mirror = mirror_points(coordinates, n_central_piles)

        # Penalize for points that are too close
        for i in range(len(coordinates)):
            for j in range(i + 1, len(coordinates)):
                dist = distance_between_points(coordinates[i], coordinates[j])
                if dist < minimum_distance:
                    p += (minimum_distance / (0.5 + dist))

        # Generate final points and mirrored final points
        final_coords = generate_end_points(coordinates, vertical_angles_deg, horizontal_angles_deg)
        mirrored_final_coords = mirror_end_points(final_coords, int(n_central_piles))

        # Prepare input/output points for analysis
        initial_points = coordinates + mirror
        final_points = np.array(final_coords + mirrored_final_coords)

        # Adjust points if geometric constraints are violated
        adjusted_final_points, horizontal_angles_deg, horizontal_angles, p = check_and_adjust_points(
            initial_points, pile_length, minimum_distance, final_points,
            vertical_angles_deg, horizontal_angles_deg, horizontal_angles, p
        )

        # Perform structural analysis only if no penalty
        if p <= 0:
            p_analysis, initial_points, adjusted_final_points, horizontal_angles, vertical_angles = analysis(
                initial_points, adjusted_final_points, horizontal_angles, vertical_angles
            )
            results.append(p_analysis)
        else:
            results.append(p)


    return torch.tensor(results)

# TESTER
values = [0, 0.333, 0.5, 0.666, 1] + [0.5] * 1 + [0.1] + [0.6] * 2 + [0.3] + [0.333, 0.666, 1, 0, 0.5] + [0.2] * 5
xx = torch.tensor(values).unsqueeze(0)
obj_fun(xx)



In [None]:
def calculate_BSA_parameters(N_OFEs):
    suggested_BSA_popsize = int(np.floor(np.log(N_OFEs**2)))
    suggested_BSA_epochs = int(np.ceil(N_OFEs/np.log(N_OFEs**2)))
    return suggested_BSA_popsize, suggested_BSA_epochs

### Parameters of the optimization for EGO

In [None]:
TRAINING_ITERATIONS = 20000 # epochs for training the GPR
N_INITIAL = 250#initial number of points    
N_INFILL = 600#number of infill points  
N_EI_POINTS = 100000  # 1 million takes 40 seconds
BSA_POPSIZE, BSA_EPOCH = calculate_BSA_parameters(N_EI_POINTS)

### Set variables

In [None]:
DIM = 19  # dimension of your problem (number of design variables)
BOUNDS_BSA = (
    tuple((0, 1) for _ in range(DIM))  # the number of (0, 1) tuples has to be equal to DIM
    )

## Optimization using BSA

In [None]:
POPSIZE, EPOCHS = calculate_BSA_parameters(N_INITIAL + N_INFILL)

In [None]:
t0 = time.time()

In [None]:
results_bsa = bsa(obj_fun, bounds=BOUNDS_BSA,
                    popsize=POPSIZE, epoch=EPOCHS, data=[])

In [None]:
t1 = time.time()
time_BSA = t1 - t0

In [None]:
print(f"f* = {results_bsa.y:.5f}; x* = {results_bsa.x}")
print(f"Total BSA time: {time_BSA:.1f} seconds")

In [None]:
# Plotting the cost history

print(results_bsa)
plt.figure(figsize=(8, 6))  # Optional: Set figure size
plt.plot(np.arange(EPOCHS), results_bsa.convergence, marker='o', linestyle='-', color='b', label='Cost History')

# Adding labels and title
plt.xlabel('Epoch')
plt.ylim([-0.5, 0])
plt.xlim([0,N_INITIAL + N_INFILL])
plt.ylabel('Cost')
plt.title('BSA Cost History per Epoch')
plt.legend()

# Display the plot
plt.grid(True)  # Optional: Add grid for better visualization
plt.show()


### Generate initial sampling plan using LHS

In [None]:
x = LHS_sample(N_INITIAL, DIM)
x = torch.from_numpy(x)
x = x.to(torch.float32)

In [None]:
time_EGO = {
    'LHS': 0,
    'training': [],
    'obj_fun': [],
    'EI': []
    }

In [None]:
import os.path

name_f, name_x = 'f.mat', 'x.mat'
path = './'
pathname_f = path + name_f
pathname_x = path + name_x
check_file = os.path.isfile(pathname_f)
if check_file:
    mat_contents = sio.loadmat(pathname_f)
    f = mat_contents['f']
    f = torch.Tensor(f)
    f = f.view(N_INITIAL)
    
    mat_contents = sio.loadmat(pathname_x)
    x = mat_contents['x']
    x = torch.Tensor(x)
    
else:
    t0 = time.time()
    f = obj_fun(x)
    f = f.view(N_INITIAL)
    sio.savemat(name_f, {'f': f.numpy()})
    sio.savemat(name_x, {'x': x.numpy()})
    t1 = time.time()
    time_EGO['LHS'] = t1 - t0

In [None]:
train_x, val_x, train_g, val_g = train_test_split(x, f, test_size=0.20)
# Converter os dados de treino e validação para o tipo float32
train_x = train_x.float()
train_g = train_g.float()
val_x = val_x.float()
val_g = val_g.float()

## Efficient Global Optimization

In [None]:
t0 = time.time()
model, likelihood, best_loss = train_model(train_x, train_g, val_x, val_g, TRAINING_ITERATIONS)
model.eval()
likelihood.eval()
it = 0
t1 = time.time()
time_EGO['training'].append(t1 - t0)
print(f'\nIteration {it}. Best of the OFEs: {torch.min(f)}. GPR model: Train loss = {best_loss[0]}; Val. loss = {best_loss[-1]}')

In [None]:
while it < N_INFILL:
    it += 1
    
    # Search for the maximum expected improvement
    t0 = time.time()    
    new_point = bsa(expected_improvement, bounds=BOUNDS_BSA,
                    popsize=BSA_POPSIZE, epoch=BSA_EPOCH, data=model)
    x_new = torch.from_numpy(new_point.x).float()  # Garantir que x_new esteja em float
    EI = new_point.y
    t1 = time.time()
    time_EGO['EI'].append(t1 - t0)

    # Objective function at the new point
    t0 = time.time()
    f_new = obj_fun(x_new.unsqueeze(0).float())  # Garante que x_new é float32 e 1D
    f_new = f_new.view(-1)                       # Ajusta f_new para a concatenação com f

    print(f'Iteration {it} of {N_INFILL}')
    if f_new.item() < torch.min(f).item():       # Converte f_new e torch.min(f) para escalares
        print(f'New best: f* = {float(f_new):.5f}; x* = {x_new.unsqueeze(0).float()}; at position {it}')
    
    # Add new values to the initial sampling
    x = torch.cat((x, x_new.unsqueeze(0)), 0).float()
    f = torch.cat((f, f_new), 0).float()
    t1 = time.time()
    time_EGO['obj_fun'].append(t1 - t0)
    
    # Update model
    t0 = time.time()
    train_x, val_x, train_g, val_g = train_test_split(x.float(), f.float(), test_size=0.20)
    model, likelihood, best_loss = train_model(train_x, train_g, val_x, val_g, TRAINING_ITERATIONS)
    model = model.to(torch.float32)
    likelihood = likelihood.to(torch.float32)
    model.eval()
    likelihood.eval()
    t1 = time.time()
    time_EGO['training'].append(t1 - t0)
    
    print(f'\nIteration {it}. Best of the OFEs: {torch.min(f)}. GPR model: Train loss = {best_loss[0]}; Val. loss = {best_loss[-1]}')
    
    # if abs(EI) < TOL_MIN_EI:
    #     print('Optimization finished. Minimum tolerance achieved.')
    #     break


In [None]:
# Plotting the cost history
plt.figure(figsize=(8, 6))  # Optional: Set figure size
plt.ylim([-0.5, 0])
plt.xlim([0,N_INITIAL+N_INFILL])
# plt.plot(np.arange(N_INITIAL), torch.min(f[:N_INITIAL]) * torch.ones(N_INITIAL), marker='o', linestyle='-', color='b', label='Cost History')

f_best = torch.Tensor([torch.min(f[:N_INITIAL]), torch.min(f[:N_INITIAL])])
idx_best = torch.Tensor([0, N_INITIAL])

for i in range(N_INITIAL, N_INITIAL+N_INFILL):
    if f[i] < f_best[-1]:
        f_best = torch.cat((f_best, torch.tensor([f[i]])), dim=0)
        idx_best = torch.cat((idx_best, torch.tensor([i])), dim=0)

if idx_best[-1] < N_INITIAL + N_INFILL - 1:
    idx_best = torch.cat((idx_best, torch.tensor([N_INITIAL + N_INFILL - 1])), dim=0)
    f_best = torch.cat((f_best, torch.tensor([torch.min(f)])), dim=0)

plt.plot(idx_best, f_best, marker='o', linestyle='-', color='k', label='Cost History')

# Adding labels and title
plt.xlabel('Epoch')
plt.ylabel('Cost')
plt.title('EGO Cost History per Epoch')
plt.legend(['Cost'])

# Display the plot
plt.grid(True)  # Optional: Add grid for better visualization
plt.show()

In [None]:
print(f'f*: {torch.min(f):.5f}; x*: {x[torch.argmin(f), :]}')

In [None]:
if time_EGO['LHS'] == 0:
    time_EGO['LHS'] = np.mean(time_EGO['obj_fun'])*N_INITIAL
time_EGO['total'] = np.sum(time_EGO['training']) + np.sum(time_EGO['obj_fun']) + np.sum(time_EGO['LHS']) + np.sum(time_EGO['EI'])

In [None]:
time_EGO['EI_total'] = np.sum(time_EGO['EI'])
print(f"Total EI time: {time_EGO['EI_total']:.1f} seconds ({100*time_EGO['EI_total']/time_EGO['total']:.1f}%)")

print(f"Total Initial Sampling Plan time: {np.sum(time_EGO['LHS']):.1f} seconds ({100*time_EGO['LHS']/time_EGO['total']:.1f}%), for {N_INITIAL} sampling points")

time_EGO['obj_fun_total'] = np.sum(time_EGO['obj_fun'])
print(f"Total EGO OFEs time: {time_EGO['obj_fun_total']:.1f} seconds ({100*time_EGO['obj_fun_total']/time_EGO['total']:.1f}%)")

time_EGO['training_total'] = np.sum(time_EGO['training'])
print(f"Total training time: {time_EGO['training_total']:.1f} seconds ({100*time_EGO['training_total']/time_EGO['total']:.1f}%)")

print(f"Total EGO time: {time_EGO['total']:.1f} seconds")

In [None]:
print(f"EGO OFEs vs BSA OFEs: {N_INITIAL + N_INFILL} vs {EPOCHS * POPSIZE}")
print(f"EGO time vs BSA time: {time_EGO['total']} vs {time_BSA} seconds")
suggested_BSA_OFEs = int(np.round((N_INITIAL + N_INFILL) * time_EGO['total'] / time_BSA))
suggested_BSA_popsize, suggested_BSA_epochs = calculate_BSA_parameters(suggested_BSA_OFEs)
print(f"Suggested BSA OFEs for equalling EGO time: {suggested_BSA_OFEs}")
print(f"Suggested BSA popsize: {suggested_BSA_popsize} and suggested BSA epochs: {suggested_BSA_epochs}")

In [None]:
print("EGO vs BSA")
print(f'EGO ---> f*: {torch.min(f):.5f}; x*: {x[torch.argmin(f), :]}')
print(f"BSA ---> f* = {results_bsa.y:.5f}; x* = {results_bsa.x}")

### Save results

In [None]:
results = dict()
results["x"] = x
results["f"] = f
results["n_initial"] = N_INITIAL
results["n_infill"] = N_INFILL
results["OFEs"] = N_INITIAL + it
results["time"] = time_EGO

In [None]:
name_f, name_x = 'f.mat', 'x.mat'
path = './'
pathname_f = path + name_f
pathname_x = path + name_x

sio.savemat(name_f, {'f': f.numpy()})
sio.savemat(name_x, {'x': x.numpy()})

### BSA optimization with suggested OFEs

In [None]:
t0 = time.time()

In [None]:
results_suggested_bsa = bsa(obj_fun, bounds=BOUNDS_BSA,
                    popsize=suggested_BSA_popsize, epoch=suggested_BSA_epochs, data=[], tempo=time_EGO["total"])

In [None]:
t1 = time.time()
time_suggested_BSA = t1 - t0

In [None]:
print(f"f* = {results_suggested_bsa.y:.2f}; x* = {results_suggested_bsa.x}")
print(f"Total suggested BSA time: {time_suggested_BSA:.1f} seconds")
print(f"EGO time vs suggested BSA time: {time_EGO['total']} vs {time_suggested_BSA} seconds")

In [None]:
print("EGO vs BSA")
print(f'EGO ---> f*: {torch.min(f):.5f}; x*: {x[torch.argmin(f), :]}')
print(f"BSA ---> f* = {results_suggested_bsa.y:.5f}; x* = {results_suggested_bsa.x}")

### COMPARISON BETWEEN BOTH METHODS CONSIDERING SAME OFE COST

In [None]:
EGO_best = float(torch.min(f))
BSA_best = float(results_bsa.y)
a = abs(EGO_best)
b = abs(BSA_best)
if EGO_best < BSA_best:
    print(f"EGO wins! {EGO_best} vs {BSA_best}")
else:
    print(f"BSA wins! {BSA_best} vs {EGO_best}")

if a > b:
    reduction = abs((a-b)/a)
else:
    reduction = abs((a-b)/b)

print(f"Reduction of {reduction*100:.1f}%")

### COMPARISON BETWEEN BOTH METHODS CONSIDERING SAME TIME COST

In [None]:
BSA_best = float(results_suggested_bsa.y)
a = abs(EGO_best)
b = abs(BSA_best)
if EGO_best < BSA_best:

    print(f"EGO wins! {EGO_best} vs {BSA_best}")
else:
    print(f"Suggested BSA wins! {BSA_best} vs {EGO_best}")

if a > b:
    reduction = abs((a-b)/a)
else:

    reduction = abs((a-b)/b)

print(f"Reduction of {reduction*100:.1f}%")


In [None]:
# Plotting the cost history
plt.figure(figsize=(8, 6))  # Optional: Set figure size
plt.plot(np.arange(results_suggested_bsa.convergence.shape[0]), results_suggested_bsa.convergence, marker='o', linestyle='-', color='b', label='Cost History')

# Adding labels and title
plt.xlabel('Epoch')
plt.ylabel('Cost')
plt.ylim([-0.5, 0])
plt.xlim([0,N_INITIAL+N_INFILL])
plt.title('BSA Cost History per Epoch')
plt.legend()

# Display the plot
plt.grid(True)  # Optional: Add grid for better visualization
plt.show()


In [None]:
#Sair da port do mapdl e fechar o processo
mapdl.exit()