In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import os
import scipy.io
from typing import Tuple, List
from enum import Enum, auto
from dataclasses import dataclass

import sys
sys.path.insert(0, '../')
sys.path.insert(0, '../vae')
sys.path.insert(0, '../fluid_TO')
sys.path.insert(0, '../dataset')
import dataset.supershape as supershape
import yaml
import material_constants
import fluid_mesher
import fluid_bcs
import fluid_material
import fluid_fe
import projection
import neural_network
import loss
import utils
import plot
import opt_constraints
import torch.optim as optim

import vae.network as vae_network
import vae.data_preprocess as vae_data_prep
import vae.train_vae as train_vae

import time
start_time = time.time()


# Read Config File

In [None]:
with open("config_diffuser.yaml", "r") as file:
    config_data = yaml.safe_load(file)

# Fluid Mesh

In [None]:
bounding_box = fluid_mesher.BoundingBox(x_min= config_data["BOUNDING_BOX"]["x_min"],
                                    x_max= config_data["BOUNDING_BOX"]["x_max"],
                                    y_min= config_data["BOUNDING_BOX"]["y_min"],
                                    y_max= config_data["BOUNDING_BOX"]["y_max"])

fluid_mesh = fluid_mesher.fluid_mesher(nelx = config_data["MESH"]["nelx"],
                                        nely = config_data["MESH"]["nely"], 
                                        bounding_box=bounding_box)

# Boundary Conditions

In [None]:
char_velocity = config_data["BOUNDARY_CONDITIONS"]["char_velocity"]
fluid_bc = fluid_bcs.get_dirichlet_bc_and_fixed_dofs(
    fluid_mesh, char_velocity, fluid_bcs.FluidSampleProblems[
      config_data["BOUNDARY_CONDITIONS"]["example"]])

# Fluid Material

In [None]:
fluid_mat_constants = material_constants.MaterialConstants(
                                        kinematic_viscosity =  config_data[
                                          "MATERIAL_CONSTANTS"][
                                          "kinematic_viscosity"])
                                            
fluid_material = fluid_material.FluidMaterial(fluid_mesh.elem_dx,
            fluid_mesh.elem_dy,
            fluid_mat_constants)

# Fluid Solver

In [None]:
fluid_solver = fluid_fe.FluidSolver(fluid_mesh, fluid_bc,
                                    fixture_const = 
                                    config_data["BOUNDARY_CONDITIONS"]
                                    ["fixture_const"])

# Initialize Neural Network

In [None]:
fourier_map = projection.FourierMap(fluid_mesh,
  fourier_map_activation = ( projection.FourierActivation
                            [config_data["FOURIER_MAP_PARAMS"]
                              ["fourier_map_activation"]]),
  num_fourier_terms = config_data["FOURIER_MAP_PARAMS"][
                                      "num_fourier_terms"],
  max_radius = config_data["FOURIER_MAP_PARAMS"]["max_radius"],
  min_radius = config_data["FOURIER_MAP_PARAMS"]["min_radius"])

symmetry_activation_x_axis = projection.SymmetryActivation.SYM_X_AXIS_ON
symmetry_activation_y_axis = projection.SymmetryActivation.SYM_Y_AXIS_OFF

constraint_type = opt_constraints.ConstraintType[config_data["OPTIMIZATION"]
                                                   ["constraint_type"]]
nn_settings = neural_network.NeuralNetworkParameters(input_dim = 2*config_data["FOURIER_MAP_PARAMS"]
                                                      ["num_fourier_terms"],
                                                      output_dim = config_data["NEURAL_NETWORK_PARAMS"]
                                                      ["output_dim"],
                                                      num_layers = config_data["NEURAL_NETWORK_PARAMS"]
                                                      ["num_layers"],
                                                      num_neurons_per_layer=config_data["NEURAL_NETWORK_PARAMS"]
                                                      ["num_neurons_per_layer"])

neural_net = neural_network.TopOptNet(nn_params=nn_settings)



# Load VAE

In [None]:
folder_path = "../vae"
file_name = "vae_net.pt"
file_path = os.path.join(folder_path, file_name)
with open("vae_config.yaml", "r") as file:
  vae_config_data = yaml.safe_load(file)
vae_yaml = vae_config_data['NETWORK']
vae_yaml = vae_config_data['NETWORK']
vae_params = vae_network.VAE_Params(input_dim=12,
                                encoder_hidden_dim=vae_yaml['encoder_hidden_dim'],
                                latent_dim=vae_yaml['latent_dim'],
                                decoder_hidden_dim=vae_yaml['decoder_hidden_dim'])
vae_net = vae_network.VariationalAutoencoder(vae_params=vae_params)
vae_net.encoder.is_training = False
vae_net.load_state_dict(torch.load(file_path))
vae_net.eval()
nomalization = torch.load('../vae/nomalization.pt')
max_feature = nomalization['max_feature']
min_feature = nomalization['min_feature']
print("Loading VAE")
normalization_types = [vae_data_prep.NomalizationType.LINEAR] * 8 + [vae_data_prep.NomalizationType.LOG] * 2 + [vae_data_prep.NomalizationType.LINEAR] * 2


# Project Coordinates

In [None]:
def calculate_projected_coordinates( fluid_sym_map, res: int)->torch.Tensor:
  """
  Calculates the projected coordinates of fluid elements based on the
  specified resolution, reflection and fourier map.
  
  Args:
    res: Resolution parameter indicating the desired resolution in the sampled points.
  
  Returns:
    fluid_xy_f: Tensor of size (num_elems, 2) representing the projected coordinates
                of fluid elements.
  """
  if (res == 1):
    fluid_xy = torch.tensor(fluid_solver.mesh.elem_centers, 
                        requires_grad=True).double() 
    
  else:
    elem_centers = utils.generate_points_in_domain(fluid_mesher.nelx,
                                    fluid_mesher.nely,
                                    fluid_mesher.elem_dx,
                                    fluid_mesher.elem_dy, 
                                    fluid_mesher.num_dim,
                                    res)
    fluid_xy = torch.tensor(elem_centers, 
                          requires_grad=True).double() 
  
  fluid_xy_r, fluid_signs_reflection = projection.apply_reflection(fluid_xy,
                                                                  symmetry_activation_y_axis,
                                                                  symmetry_activation_x_axis,
                                                                  fluid_sym_map)
  
  
  if(fourier_map.fourier_map_activation == projection.FourierActivation.FOURIER_ON):
    fluid_xy_f = fourier_map.apply_fourier_map(fluid_xy_r)
  
  return fluid_xy_f, fluid_signs_reflection 

# Optimize Design

In [None]:
class Optimizer(Enum):
  ADAM = auto()
  LBFGS = auto()
  
class GradClipActivation(Enum):
  GRAD_CLIP_ON = auto()
  GRAD_CLIP_OFF = auto()

@dataclass
class OptimizationSettings:
  plot_interval: int
  lr: float
  num_epochs: int
  method : int
  grad_clip_activation: int
  grad_clip_norm: float

@dataclass 
class OptimizationInitials:
  J0 : float
  constraints : np.ndarray

def optimize_design(opt_params: OptimizationSettings, loss_type:loss.LossTypes,
                    loss_params:loss.AugmentedLagrangianParameters,
                    opt_initials:OptimizationInitials,
                    constraint_type: opt_constraints.ConstraintType, sym_params: projection.SymParams,
                     desired_vol_frac:float, desired_perimeter: float, perim_scale = 2.):
  """
  Optimize the design using the specified optimization parameters.

  Args:
      opt_params: Object representing the optimization settings.
      loss_type: Enum representing the type of loss function to be used.
      loss_params: Parameters for the augmented Lagrangian loss function.
      opt_initials: Initial values for optimization parameters.
      penalty_constants: Constants for the penalization.

  Returns:
      convergence_history: Dictionary containing lists of convergence values for each epoch.

  """
  epoch = 0
  convergence_history = {'epoch':[], 'perimeter':[], 'fluid_loss':[]}
  
  fluid_xy_f, fluid_signs_reflection = calculate_projected_coordinates(sym_params, res = 1.)
  
  if(opt_params.method == Optimizer.ADAM):
      print('Adam working')
      optimizer = optim.Adam(neural_net.parameters(), amsgrad=True, 
                              lr=opt_params.lr)  
      
  else:
      print('LBFGS working')
      optimizer = optim.LBFGS(neural_net.parameters(),
                                    line_search_fn = 'strong_wolfe')
        
  def loss_wrapper( eps = 1e-6)-> Tuple[torch.Tensor, torch.Tensor, List, torch.Tensor,
                              supershape.SuperShapes]:
    """
    Calculates the loss and related parameters for the optimization process.
    While data generation the fluid has density 1. and solid has 0, 
    but while getting the calculating the area it gives solid area.
    Therefore the decoder gives the supershape's solid area and we substract it from 1.
    to get the fluid volume fraction.

    Returns:
        net_loss: Tensor of size (1,) representing the combined loss value.
        fluid_loss: Tensor of size (1,) representing the fluid loss value.
        constraints: List of size (num_constraints,) constraint values. Currently it only contains fluid
                    volume constraint.
        fluid_vol_frac: Tensor of size (num_elems) and representing the fluid volume fraction.
        mstr_params: Object containing arrays of number num_geometric_params
                      where each array is of size (num_elems)
                      representing the renormalized geometric parameters
                      of the microstructures.
  """
    optimizer.zero_grad()
    latent_space, theta = neural_net(fluid_xy_f)
    theta = torch.einsum('i,i->i', theta, fluid_signs_reflection['X'])
    theta = torch.einsum('i,i->i', theta, fluid_signs_reflection['Y'])
    decoded_latent_pt = vae_net.decoder(latent_space)
    vae_output_renormalized = vae_data_prep.stack_vae_output(decoded_latent_pt, max_feature, min_feature, normalization_types)

    if(constraint_type == opt_constraints.ConstraintType.VOLUME):
      constraint_field = vae_output_renormalized[:, vae_data_prep.VAE_Fields.shape_area.value]
    elif(constraint_type == opt_constraints.ConstraintType.PERIMETER):
      constraint_field = fluid_solver.mesh.elem_dx*perim_scale*vae_output_renormalized[:, vae_data_prep.VAE_Fields.shape_perim.value]

    C_00 = vae_output_renormalized[:, vae_data_prep.VAE_Fields.homog_c00.value] + eps
    C_11 = vae_output_renormalized[:, vae_data_prep.VAE_Fields.homog_c11.value] + eps
    mstr_data = vae_output_renormalized[:, :vae_data_prep.VAE_Fields.homog_c00.value].clone().detach().numpy()
    mstr_params  = supershape.SuperShapes(mstr_data[:, 0], mstr_data[:, 1], mstr_data[:, 2], mstr_data[:, 3],
                   mstr_data[:, 4], mstr_data[:, 5], mstr_data[:, 6], mstr_data[:, 7])
    
    fluid_loss, velocity_pressure_field = fluid_solver.fluid_objective_function( fluid_material, C_00, C_11, theta)
    
    field_cons_value = opt_constraints.constraint_function(constraint_type, constraint_field, desired_vol_frac, desired_perimeter) 
    if(epoch == 0 or epoch == 20):
      opt_initials.J0 = (fluid_loss.item()) 
      
  
    opt_params.constraints = [field_cons_value]
    net_loss = loss.combined_loss(
      objective = fluid_loss/opt_initials.J0,              
      constraints = opt_params.constraints,
      loss_type = loss_type,
      loss_params = loss_params,
      epoch = epoch)
    return (net_loss, fluid_loss, opt_params.constraints, constraint_field,
            mstr_params, C_00, C_11, theta)
  
  
  def closure() -> torch.Tensor:
    """
      Closure function for the optimizer.
  
      This function performs a forward pass to calculate the total loss,
      backward pass to compute gradients, and applies gradient clipping.
      It returns the total loss.
  
      Returns:
          net_loss: Tensor of size (1,)representing the total loss.
    """
    
    optimizer.zero_grad()
    (net_loss, fluid_loss, constraint, constraint_field, mstr_params,
    C_00, C_11, theta)= loss_wrapper()
    net_loss.backward(retain_graph = True)
    
    torch.nn.utils.clip_grad_norm_(neural_net.parameters(),
                                    opt_params.grad_clip_norm)
    return net_loss 
  
  
  for epoch in range(opt_params.num_epochs):
    net_loss = closure()
    if(opt_params.method == 'adam'):
        
      optimizer.step()
        
    else:

      optimizer.step(closure)
      
    loss.update_loss_parameters(epoch, loss_type, loss_params, opt_params.constraints)  
    (net_loss, fluid_loss, constraint, constraint_field,
      mstr_params, C_00, C_11, theta) = loss_wrapper()
    net_loss.backward(retain_graph = True)
    convergence_history['epoch'].append(epoch)
    convergence_history['perimeter'].append(torch.sum(constraint_field).item())
    convergence_history['fluid_loss'].append(fluid_loss.item())
    
    status = "Iter {:3d} J: {:.2E}; perimeter : {:.2F} "\
      .format(epoch, fluid_loss.item(), torch.sum(constraint_field).item())
    print(status)
    
    if(epoch % opt_params.plot_interval == 0):

      theta = theta.clone().detach().numpy()
      plot.plot_microstructures_in_macro_mesh(mstr_params,
                                              theta,
                                              fluid_solver.mesh.nelx,
                                              fluid_solver.mesh.nely, 
                                              epoch)
  
  
  return convergence_history

# Optimization Settings

In [None]:
sym_params = projection.SymParams(sym_x_axis_mid_pt = 0.5*config_data["BOUNDING_BOX"]["y_max"],
                                  sym_y_axis_mid_pt = 0.5*config_data["BOUNDING_BOX"]["x_max"])

num_constraints = config_data["OPTIMIZATION"]["num_constraints"]
desired_vol_frac = config_data["OPTIMIZATION"]["desired_vol_frac"]
desired_perimeter = config_data["OPTIMIZATION"]["desired_perimeter"]

loss_type = loss.LossTypes[config_data["LOSS"]["method"]]


loss_params = loss.PenaltyLossParameters(
              alpha0 = config_data["LOSS"]["alpha0"],
              del_alpha = config_data["LOSS"]["del_alpha"])

opt_initials = OptimizationInitials( 
                J0 = config_data["OPTIMIZATION"]["init_objective"], 
  constraints = np.zeros((num_constraints)))

opt_params = OptimizationSettings(
              plot_interval =  config_data["OPTIMIZATION"]["plot_interval"],
              lr = config_data["OPTIMIZATION"]["learning_rate"],
              num_epochs= config_data["OPTIMIZATION"]["num_epochs"], 
              method = Optimizer[config_data["OPTIMIZATION"]["method"]],
              grad_clip_activation = GradClipActivation[
                                      config_data["OPTIMIZATION"]
                                      ["grad_clip_activation"]],
              grad_clip_norm = config_data["OPTIMIZATION"]
                                ["grad_clip_norm"]
  )


In [None]:
plt.close('all')
gpu_status = utils.Gpu.OVERRIDE
convg_history = optimize_design( opt_params, loss_type, loss_params, opt_initials, constraint_type, sym_params, desired_vol_frac, desired_perimeter)