In [None]:
MOMCS - DOF

In [None]:
import numpy as np
import os
import sys
import random
import time
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from scipy.special import gamma

# Setup DNN paths and imports
dnn_dir = os.path.abspath(os.path.join('..', 'DNN_CFD'))
sys.path.append(dnn_dir)

from airfoil_generator import generate_airfoil_coordinates
from area_calculator import calculate_area
from DNN_Only import ResidualBlock, ChannelSpecificDecoder, EncoderDecoderCNN, workflow


In [None]:
# Function to sample a new set of PARSEC parameters within the bounds
def sample_parsec_parameters(baseline, bounds):
    return [np.random.uniform(low, high) for (low, high) in bounds]


In [None]:
def dominates_pair(obj_a, obj_b):
    """Returns True if obj_a dominates obj_b.
        Maximizing obj_a[0] (Column 1) and minimizing obj_a[1] (Column 2)."""
    return (obj_a[0] > obj_b[0] and obj_a[1] <= obj_b[1]) or \
           (obj_a[0] >= obj_b[0] and obj_a[1] < obj_b[1])


In [None]:
def calculate_crowding_distance(front):
    """Calculate crowding distance for each individual in a Pareto front."""
    if len(front) == 0:
        return []
    
    num_objectives = len(front[0])
    distances = np.zeros(len(front))
    front_size = len(front)
    
    front_array = np.array(front)
    
    for i in range(num_objectives):
        # Sort indices based on the current objective
        sorted_indices = np.argsort(front_array[:, i])
        distances[sorted_indices[0]] = distances[sorted_indices[-1]] = np.inf
        min_obj = front_array[sorted_indices[0], i]
        max_obj = front_array[sorted_indices[-1], i]
        
        if max_obj == min_obj:
            # If all values are the same for this objective, skip distance calculation
            continue
        
        for j in range(1, front_size - 1):
            distances[sorted_indices[j]] += (front_array[sorted_indices[j + 1], i] - front_array[sorted_indices[j - 1], i]) / (max_obj - min_obj)
    
    return distances


In [None]:
def levy_flight(size, beta=1.5):
    """Generate Lévy flight step."""
    sigma_u = (gamma(1 + beta) * np.sin(np.pi * beta / 2) / (gamma((1 + beta) / 2) * beta * 2 ** ((beta - 1) / 2))) ** (1 / beta)
    u = np.random.normal(0, sigma_u, size=size)
    v = np.random.normal(0, 1, size=size)
    step = u / (np.abs(v) ** (1 / beta))
    return step


In [None]:
def efficient_non_dominated_sort(objective_values):
    num_individuals = len(objective_values)
    dominated_count = np.zeros(num_individuals, dtype=int)
    dominates = [set() for _ in range(num_individuals)]
    
    for i in range(num_individuals):
        for j in range(i + 1, num_individuals):
            if dominates_pair(objective_values[i], objective_values[j]):
                dominates[i].add(j)
                dominated_count[j] += 1
            elif dominates_pair(objective_values[j], objective_values[i]):
                dominates[j].add(i)
                dominated_count[i] += 1
    
    fronts = []
    current_front = np.where(dominated_count == 0)[0].tolist()
    
    while current_front:
        fronts.append(current_front)
        next_front = []
        for i in current_front:
            for j in dominates[i]:
                dominated_count[j] -= 1
                if dominated_count[j] == 0:
                    next_front.append(j)
        current_front = next_front
    
    return fronts


In [None]:
def plot_pareto_front(best_fitness, title="Pareto Front"):
    """Plot the Pareto front for a set of objective values."""
    best_fitness = np.array(best_fitness)
    
    plt.figure(figsize=(10, 6))
    plt.scatter(best_fitness[:, 0], best_fitness[:, 1], color='blue', s=50, alpha=0.7, label='MOMCS Solutions')
    plt.title(title)
    plt.xlabel('CL')
    plt.ylabel('Area')
    plt.legend()
    #plt.ylim(-1,1)
    #plt.xlim(0,1)
    plt.grid(True)
    plt.show()
    

In [None]:
# Function to evaluate the fitness of a nest (airfoil)
def fitness_function(airfoil_number, mach, aoa):
    # Paths for storing generated data
    sdf_path = './SDF'
    geo_path = './Airfoils'
    dnn_output_path = './Inference'
    
    C_L, C_D = workflow(airfoil_number, mach, aoa, sdf_path, geo_path, dnn_output_path)
    #function to calculate  area of the airfoil
    Area = calculate_area(airfoil_number, geo_path)
    return [C_L, Area]

In [None]:
def run_momcs(baseline_parsec, num_nests, max_evaluations, A, phi, bounds, mach, aoa):
    num_objectives = 2  
    num_dimensions = bounds.shape[0]
    
    # Initialize the population and evaluate the initial fitness
    population = []
    objective_values = []
    
    start_total_p = time.time()
    
    for i in range(num_nests):
        # Sample a new set of PARSEC parameters
        sampled_parsec = sample_parsec_parameters(baseline_parsec, bounds)
        generate_airfoil_coordinates(sampled_parsec, i + 1)  # Save airfoil as i+1.txt
        # Compute fitness
        obj_values = fitness_function(i + 1, mach, aoa)  # Using DNN for lift coefficient as fitness

        population.append(sampled_parsec)
        objective_values.append(obj_values)
    
    num_evaluations = num_nests
    generation = 1
    # Total execution time
    end_total_p = time.time()
    elapsed_total_p = end_total_p - start_total_p
    
    print(objective_values)
    print(f'Total time taken to initialise population: {elapsed_total_p:.2f} seconds')
#.....................................................................................................................................................
    pareto_optimal = []
    pareto_front = []
    optimisation_archive = []
    
    while num_evaluations < max_evaluations:
        # Non-dominated sorting and crowding distance calculation
        fronts = efficient_non_dominated_sort(objective_values)
        
        pareto_optimal.append((population[i], objective_values[i]) for i in fronts[0])
        pareto_front.extend(np.array(objective_values[i]) for i in fronts[0])
        
        sorted_nests = []
        sorted_objectives = []
        
        for front in fronts:
            front_nests = [population[i] for i in front]
            front_objectives = [objective_values[i] for i in front]
            distances = calculate_crowding_distance(front_objectives)
            
            # Sort the front by crowding distance in descending order
            sorted_indices = np.argsort(distances)[::-1]
            sorted_nests.extend(np.array(front_nests)[sorted_indices])
            sorted_objectives.extend(np.array(front_objectives)[sorted_indices])
        
        optimisation_archive.append((sorted_nests, sorted_objectives))
        
        # Abandon bottom 20% of nests
        n_abandon = int(0.2 * num_nests)
        for i in range(n_abandon):
            xi = sorted_nests[-(i + 1)]  # Select the nest to be abandoned
            alpha = A / np.cbrt(generation)  # Lévy flight step size
            step = levy_flight(num_dimensions)
            xk = xi + alpha * step  # New egg via Lévy flight
            xk = np.clip(xk, [low for (low, high) in bounds], [high for (low, high) in bounds])
            generate_airfoil_coordinates(xk, i + 1 + num_nests)  # Save as new airfoil
            obj_values = fitness_function(i + 1 + num_nests, mach, aoa)
            
            sorted_nests[-(i + 1)] = xk
            sorted_objectives[-(i + 1)] = obj_values
            num_evaluations += 1
        
        # Refinement of top nests
        n_top = num_nests - n_abandon
        for i in range(n_top):
            xi = sorted_nests[i]
            xj_index = np.random.choice(n_top)
            xj = sorted_nests[xj_index]
            
            if np.array_equal(xi, xj):
                alpha = A / (generation**2)  # Lévy flight step size for same nest
                step = levy_flight(num_dimensions)
                xk = xi + alpha * step
                xk = np.clip(xk, [low for (low, high) in bounds], [high for (low, high) in bounds])
            else:
                dx = np.abs(np.array(xi) - np.array(xj)) / phi
                objective_xi = sorted_objectives[i]
                objective_xj = sorted_objectives[xj_index]
                
                if dominates_pair(objective_xi, objective_xj):
                    xk = np.array(xj) + dx * np.sign(np.array(xi) - np.array(xj))
                elif dominates_pair(objective_xj, objective_xi):
                    xk = np.array(xi) + dx * np.sign(np.array(xj) - np.array(xi))
                else:
                    alpha = A / np.sqrt(generation) 
                    step = levy_flight(num_dimensions)
                    xk = xi + alpha * step
                
                xk = np.clip(xk, [low for (low, high) in bounds], [high for (low, high) in bounds])
            
            # Generate airfoil for new solution xk and evaluate its fitness
            generate_airfoil_coordinates(xk, num_nests + num_evaluations + 1)  # Save as new airfoil
            obj_values = fitness_function(num_nests + num_evaluations + 1, mach, aoa)
            
            l = np.random.choice(num_nests)
            objective_l = sorted_objectives[l]
            
            if dominates_pair(obj_values, objective_l):
                sorted_nests[l] = xk
                sorted_objectives[l] = obj_values
            
            num_evaluations += 1
        
        # Update the population and objective values
        population = sorted_nests
        objective_values = sorted_objectives
        generation += 1
        
    return sorted_nests, sorted_objectives, generation, pareto_optimal, pareto_front, optimisation_archive


In [None]:
start_total = time.time()

# Baseline PARSEC parameters for NACA0012
baseline_parsec = [0.015, 0.3, 0.06, -0.45, 0.3, -0.06, 0.45, 0, 0, 0.02, 19.5]
# ±20% bounds for each parameter
bounds = np.array([(value * 0.8, value * 1.2) for value in baseline_parsec])

num_nests = 30
max_evaluations = 600
A = 1
phi = (1 + np.sqrt(5)) / 2

#Flow Parameters
mach = 0.5
aoa = 4

best_nests, best_fitness, generation, pareto_optimal, pareto_front, optimisation_archive = run_momcs(baseline_parsec, num_nests, max_evaluations, A, phi, bounds, mach, aoa)

# Total execution time
end_total = time.time()
elapsed_total = end_total - start_total


In [None]:
print(generation)
print(f'Total time taken for optimisation: {elapsed_total:.2f} seconds')


In [None]:
def efficient_non_dominated_sort_first_front(objective_values):
    num_individuals = len(objective_values)
    dominated_count = np.zeros(num_individuals, dtype=int)
    dominates = [set() for _ in range(num_individuals)]

    # Loop to find all dominances
    for i in range(num_individuals):
        for j in range(i + 1, num_individuals):
            if dominates_pair(objective_values[i], objective_values[j]):
                dominates[i].add(j)
                dominated_count[j] += 1
            elif dominates_pair(objective_values[j], objective_values[i]):
                dominates[j].add(i)
                dominated_count[i] += 1

    # Identify the first front
    first_front = np.where(dominated_count == 0)[0].tolist()
    
    # Return only the first front
    return [first_front]


In [None]:
best_fitness_front = []
best_fitness_front_population = []
pareto_front_front = []
# Non-dominated sorting and crowding distance calculation
fronts = efficient_non_dominated_sort_first_front(best_fitness)
best_fitness_front.extend(np.array(best_fitness[i]) for i in fronts[0])
best_fitness_front_population.extend(np.array(best_nests[i]) for i in fronts[0])

#fronts = efficient_non_dominated_sort_first_front(pareto_front)
#pareto_front_front.extend(np.array(pareto_front[i]) for i in fronts[0])


In [None]:
# Plot the results
plot_pareto_front(pareto_front)


In [None]:
# Plot the results
plot_pareto_front(best_fitness)


In [None]:
# Plot the results
plot_pareto_front(best_fitness_front)


In [None]:
# Extract all objective values
all_objective_values = []
for _, objectives in optimisation_archive:
    all_objective_values.extend(objectives)

# Convert to numpy array for convenience
all_objective_values = np.array(all_objective_values)

# Separate objectives into x and y
objective1 = all_objective_values[:, 0]
objective2 = all_objective_values[:, 1]

# Plot
plt.figure(figsize=(10, 6))
plt.scatter(objective1, objective2, c='b', marker='o', alpha=0.4)
plt.xlabel('Objective 1')
plt.ylabel('Objective 2')
plt.title('Optimisation History')
plt.grid(True)
plt.show()


In [None]:
sv = np.array(best_fitness_front)
sv.shape
np.savetxt('pareto_front_dof.csv', sv, delimiter=',', header='CL,Area', comments='', fmt='%.5f,%.5f')

In [None]:
"""Validation"""

In [None]:
# Function to evaluate the fitness of a nest (airfoil)
def fitness_function_val(airfoil_number, mach, aoa):
    # Paths for storing generated data
    sdf_path = './SDF'
    geo_path = './Airfoils'
    dnn_output_path = './Inference'
    
    C_L, C_D =  FLITE2DPY(airfoil_number, mach, aoa, geo_path)
    #function to calculate  area of the airfoil
    Area = calculate_area(airfoil_number, geo_path)
    if np.isnan(C_L) or np.isnan(C_D):
        return [0, Area]  # If any component or ratio is NaN, return a fitness of 0
    return [C_L, Area]

In [None]:
flite_dir = os.path.abspath(os.path.join('..', 'FLITEonPy'))
sys.path.append(flite_dir)
from FLITE2DPY_OPT import FLITE2DPY

print(len(best_fitness_front_population))

obj_val = []
counter = 0
for i in best_fitness_front_population:
    generate_airfoil_coordinates(i, 100000 + counter)  # Save as new airfoil
    obj = fitness_function_val(100000 + counter, mach, aoa)
    obj_val.append(obj)
    counter += 1

In [None]:
svv = np.array(obj_val)
svv.shape
np.savetxt('pareto_front_dof_validation.csv', svv, delimiter=',', header='CL,Area', comments='', fmt='%.5f,%.5f')

In [None]:
# Plot the results
plot_pareto_front(obj_val)

In [None]:
import pickle

# Step 1: Check and convert generators to lists
def convert_to_list(obj):
    """Convert a generator to a list, or return the object if it's not a generator."""
    if isinstance(obj, (list, tuple, set)):
        return type(obj)(convert_to_list(item) for item in obj)
    elif isinstance(obj, dict):
        return {key: convert_to_list(val) for key, val in obj.items()}
    elif hasattr(obj, '__iter__') and not isinstance(obj, (str, bytes)):
        return list(obj)
    else:
        return obj

# Step 2: Apply the conversion and assign to new variables
best_nests_n = convert_to_list(best_nests)
best_fitness_n = convert_to_list(best_fitness)
pareto_optimal_n = convert_to_list(pareto_optimal)
pareto_front_n = convert_to_list(pareto_front)
optimisation_archive_n = convert_to_list(optimisation_archive)
obj_val_n = convert_to_list(obj_val)

# Step 3: Serialize the new variables to a file using pickle
with open('mdof.pkl', 'wb') as file:
    pickle.dump({
        'best_nests': best_nests_n,
        'best_fitness': best_fitness_n,
        'pareto_optimal': pareto_optimal_n,
        'pareto_front': pareto_front_n,
        'optimisation_archive': optimisation_archive_n,
        'obj_val': obj_val_n
    }, file)
