In [3]:
import time
import os
import random
import numpy as np                                                
import matplotlib.pyplot as plt                                   
import autograd, autograd.core, autograd.extend, autograd.tracer  
import autograd.numpy as anp      
import scipy, scipy.ndimage, scipy.sparse, scipy.sparse.linalg    
from collections import namedtuple                                           


import gymnasium as gym
from gymnasium import spaces

import torch
from torch import nn as nn
import torch.nn.functional as F


from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.results_plotter import load_results, ts2xy
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common import results_plotter
from stable_baselines3.common.policies import ActorCriticPolicy

# added Imports For Multiprocessing
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.vec_env import SubprocVecEnv
from stable_baselines3.common.vec_env import VecMonitor

# added Imports for Tensorboard Analysis
%load_ext tensorboard
import tensorflow as tf
import datetime

from FEA_SOLVER_GENERAL import *
from Nodes import *
from opts import *
from TopOpt_Env_Functions import *

The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [6]:
class TopOpt_Gen(gym.Env):
    def __init__(self,Elements_X,Elements_Y,Vol_Frac,SC,opts):
        #Actons we can take... remove any of the blocks
        
        
        self.EX=Elements_X
        self.p=opts.P_Norm
        self.RS=Reward_Surface(opts)[0]
        self.RV=Reward_Surface(opts)[1]
        self.SC=SC
        self.Lx=opts.Lx
        self.EY=Elements_Y
        self.Ly=opts.Ly
        self.state=spaces.Box(low=0,high=1,shape=(self.EX,self.EY,3))
        self.action_space=Discrete(self.EX*self.EY)
        self.eta=opts.Eta
        self.Vol_Frac=Vol_Frac

        self.reward = 0

        self.reset_conditions()

        self.VoidCheck=np.ones((1,self.EX*self.EY))
        self.VoidCheck=list(self.VoidCheck[0])
        self.VoidCheck=np.array(self.VoidCheck)

        self.FEA_Skip = 2
        self.PR = True
        self.load_checkpoint = True
        
    def step(self,action):
        #Apply Action
        self.Counter+=1    
        # evaluate it on grid
        Last_Reward = self.reward
        rs_place=self.VoidCheck[int(action)]
        self.VoidCheck[int(action)]=0
        ElementMat=np.reshape(self.VoidCheck,(self.EX,self.EY))
        SingleCheck=isolate_largest_group_original(ElementMat)
        It=list(self.VoidCheck).count(0)
        if rs_place==1 and action not in self.BC and SingleCheck[1]==True:
            done=False
            if It>=math.ceil((self.EX*self.EY)*(1-self.Vol_Frac)) and self.load_checkpoint or It>=math.ceil((self.EX*self.EY)*(1-self.Vol_Frac)) and self.PR:
                done=True
            if self.Counter==1 or (self.Counter/self.FEA_Skip)==int(self.Counter/self.FEA_Skip):
                Run_Results=FEASolve(list(self.VoidCheck),self.Lx,self.Ly,self.EX,self.EY,self.LC_Nodes,self.Load_Directions,self.BC_Nodes,Stress=True)
                self.Max_SE_Ep=np.max(Run_Results[1])
                if (self.P_Norm/(sum(sum([number**self.p for number in np.reshape(Run_Results[2],(1,self.EX*self.EY))]))**(1/self.p)))<(1-float(self.SC)):

                    done=True
                    print('STRESS CONSTRAINT HIT!')
            else:
                self.Stress_state=np.reshape(self.Stress_state,(1,self.EX*self.EY))
                self.Stress_state[0][action]=0
                self.Stress_state=np.reshape(self.Stress_state,(self.EX,self.EY))
            
            if abs(self.Max_SE_Tot/self.Max_SE_Ep)>=1 or abs(It/(self.EX*self.EY))>=1 or self.Max_SE_Tot==0 or self.Max_SE_Ep==0:
                reward=-1
                done=True
            else:
                index1 = int((self.Max_SE_Tot/self.Max_SE_Ep)*1000) - 1
                index2 = int((It/(self.EX*self.EY))*1000) - 1
                reward = self.RS[index1][index2]
                if not self.load_checkpoint:
                    reward2=self.RV[int(1-(np.reshape(self.Stress_state,(self.EX*self.EY,1))[action])*1000)-1]
                    reward=np.mean([reward,reward2])
            if self.Counter==1 or (self.Counter/self.FEA_Skip)==int(self.Counter/self.FEA_Skip):
             
                self.Stress_state=Run_Results[3]
                self.Stress_state=np.reshape(self.Stress_state,(self.EX,self.EY))
            self.state=np.zeros((self.EX,self.EY,3))
            self.state[:,:,0]=self.Stress_state
            self.state[:,:,1]=self.BC_state
            self.state[:,:,2]=self.LC_state
        else:
            """If the removed block has already been removed, leads to a non-singular
            body or one of the Boundary condition blocks, the agent should be severely punished (-1)"""
            Run_Results=FEASolve(list(self.VoidCheck),self.Lx,self.Ly,self.EX,self.EY,self.LC_Nodes,self.Load_Directions,self.BC_Nodes,Stress=True)
            self.Max_SE_Ep=np.max(Run_Results[1])
            self.Stress_state=Run_Results[3]
            self.Stress_state=np.reshape(self.Stress_state,(self.EX,self.EY))
            self.state=np.zeros((self.EX,self.EY,3))
            self.state[:,:,0]=self.Stress_state
            self.state[:,:,1]=self.BC_state
            self.state[:,:,2]=self.LC_state
            reward=-1
            done=True
            if rs_place==1:
                self.VoidCheck[int(action)]=1
            
        reward+=1e-4
        Last_Reward+=1e-4
        rho=((reward)-(Last_Reward))/min([reward,Last_Reward])
        if reward>Last_Reward:
            llambda=1
        else:
            llambda=-1
        x=rho+llambda
        f_x=math.atan(x*(math.pi/2)*(1/self.eta))
        self.reward=reward+(f_x-llambda)*abs(reward)

        return self.state.astype(np.float32), reward, done, False, self._get_info()
    
    def render(self,mode='human'):
        RenderMat=copy.deepcopy(self.VoidCheck)
        for RM in range(0,len(self.BC_Elements)):
            RenderMat[int(self.BC_Elements[RM])]=2
            RenderMat[int(self.BC_Elements[RM])]=2
        for RM in range(0,len(self.LC_Elements)):
            RenderMat[int(self.LC_Elements[RM])]=4
        RenderMat=np.reshape(RenderMat,(self.EX,self.EY))
        print(np.flip(RenderMat,0))
        print('')
        return 
        
    def reset(self,seed=None):

        self.Results=FEASolve(self.VoidCheck,self.Lx,self.Ly,self.EX,self.EY,self.LC_Nodes,self.Load_Directions,self.BC_Nodes,Stress=True)
        self.Stress_state=self.Results[3]
        self.P_Norm=sum(sum([number**self.p for number in np.reshape(self.Results[2],(1,self.EX*self.EY))]))**(1/self.p)        #self.Stress_state=list(np.array(self.Stress_state)
        self.Stress_state=np.reshape(self.Stress_state,(self.EX,self.EY))
        self.state=np.zeros((self.EX,self.EY,3))
        self.state[:,:,0]=self.Stress_state
        self.state[:,:,1]=self.BC_state
        self.state[:,:,2]=self.LC_state
        self.Counter=0

        return self.state.astype(np.float32), self._get_info()
    def reset_conditions(self):
        self.Max_SE_Tot=0
        self.VoidCheck=np.ones((1,self.EX*self.EY))
        self.VoidCheck=list(self.VoidCheck[0])
        self.VoidCheck=np.array(self.VoidCheck)

        while self.Max_SE_Tot<=0 or self.Max_SE_Tot>5000:        
            self.BC_Elements=[]
            self.BC_Nodes=[]
            self.LC_Elements=[]
            self.LC_Nodes=[]
            self.BC=[]
            self.Load_Types=[]
            self.Load_Directions=[]
            self.BC_Elements=np.append(self.BC_Elements,int(random.choice([i for i in Element_Lists(self.EX,self.EY)[1]])))
            self.BC_Elements=np.append(self.BC_Elements,int(random.choice([i for i in Element_Lists(self.EX,self.EY)[1]])))
            while self.BC_Elements[0]==self.BC_Elements[1]:
                self.BC_Elements[1]=int(random.choice([i for i in Element_Lists(self.EX,self.EY)[1]]))
            self.BC_Nodes=np.append(self.BC_Nodes,BC_Nodes(int(self.BC_Elements[0]),self.Lx,self.Ly,self.EX,self.EY)[0])
            self.BC_Nodes=np.append(self.BC_Nodes,BC_Nodes(int(self.BC_Elements[0]),self.Lx,self.Ly,self.EX,self.EY)[1])
            self.BC_Nodes=np.append(self.BC_Nodes,BC_Nodes(int(self.BC_Elements[1]),self.Lx,self.Ly,self.EX,self.EY)[0])
            self.BC_Nodes=np.append(self.BC_Nodes,BC_Nodes(int(self.BC_Elements[1]),self.Lx,self.Ly,self.EX,self.EY)[1])

      
            self.LC_Elements=np.append(self.LC_Elements,int(random.choice([i for i in Element_Lists(self.EX,self.EY)[1]])))
            while self.LC_Elements[0] in self.BC_Elements:
                self.LC_Elements[0]=int(random.choice([i for i in Element_Lists(self.EX,self.EY)[1]]))
            
            self.BC_set=np.append(self.BC_Elements,self.LC_Elements)
            self.LC_state=list(np.zeros((1,self.EX*self.EY))[0])
            for LCS in range(0,len(self.LC_Elements)):
                self.LC_state[int(self.LC_Elements[LCS])]=1
            self.LC_state=np.reshape(self.LC_state,(self.EX,self.EY))
            self.Load_Types=np.append(self.Load_Types,random.choice([0,1]))
            self.LC_Nodes=np.append(self.LC_Nodes,LC_Nodes(int(self.LC_Elements[0]),self.Load_Types,self.Load_Directions,self.Lx,self.Ly,self.EX,self.EY,LCS,Node_Location=False)[0])
            self.LC_Nodes=np.append(self.LC_Nodes,LC_Nodes(int(self.LC_Elements[0]),self.Load_Types,self.Load_Directions,self.Lx,self.Ly,self.EX,self.EY,LCS,Node_Location=False)[1])
            if self.Load_Types[0]==0: #Load will be applied vertically
                self.LC_Nodes[0]+=((self.EX+1)*(self.EY+1))
                self.LC_Nodes[1]+=((self.EX+1)*(self.EY+1))
            self.Load_Directions=np.append(self.Load_Directions,random.choice([-1,1])) #1 for Compressive Load, -1 for tensile load
            self.BC=np.append(self.BC,self.BC_Elements)
            self.BC=np.append(self.BC,self.LC_Elements)
            self.BC_state=list(np.zeros((1,self.EX*self.EY))[0])
            for BCS in range(0,len(self.BC_Elements)):
                self.BC_state[int(self.BC_Elements[BCS])]=1
            self.BC_state=np.reshape(self.BC_state,(self.EX,self.EY))
            self.Results=FEASolve(self.VoidCheck,self.Lx,self.Ly,self.EX,self.EY,self.LC_Nodes,self.Load_Directions,self.BC_Nodes,Stress=True)
            self.Max_SE_Tot=self.Results[1]
    def primer_cond(self,EX,EY):
        self.BC=[]
        self.BC=np.append(self.BC,self.BC_Elements)
        self.BC=np.append(self.BC,self.LC_Elements)
        self.BC_state=list(np.zeros((1,EX*EY))[0])
        for BCS in range(0,len(self.BC_Elements)):
            self.BC_state[int(self.BC_Elements[BCS])]=1
        self.BC_state=np.reshape(self.BC_state,(EX,EY))
        self.LC_state=list(np.zeros((1,EX*EY))[0])
        for LCS in range(0,len(self.LC_Elements)):
            self.LC_state[int(self.LC_Elements[LCS])]=1
        self.LC_state=np.reshape(self.LC_state,(EX,EY))
        self.Results=FEASolve(self.VoidCheck,self.Lx,self.Ly,self.EX,self.EY,self.LC_Nodes,self.Load_Directions,self.BC_Nodes,Stress=True)
        self.Max_SE_Tot=np.max(self.Results[1])
    
    def _get_info(self):
        return {"num_steps :" : self.Counter, "Max Stress" : self.Max_SE_Tot}    

NameError: name 'gym' is not defined

In [2]:
def get_opts():
    Option = namedtuple('Option', [
        'Main_EX', 'Main_EY', 'PR2_EX', 'PR2_EY', 'PR_EX', 'PR_EY', 'Lx', 'Ly', 'Eta', 'a', 'b', 'replace', 
        'epsilon_dec', 'eps_end', 'mem_size', 'n_games', 'batch_size', 'lr', 'gamma', 'Vol_Frac_1', 'Vol_Frac_2', 
        'Vol_Frac_3', 'SC', 'P_Norm', 'filename_save', 'filename_load', 'Progressive_Refinement', 'LC', 
        'Load_Checkpoints', 'VF_S', 'Min_Dist', 'Time_Trial', 'configfile', 'From_App', 'base_folder'
    ])

    default_opts = Option(
        Main_EX=24, Main_EY=24, PR2_EX=12, PR2_EY=12, PR_EX=6, PR_EY=6, Lx=1, Ly=1, Eta=2, a=5, b=5, replace=100, 
        epsilon_dec=3.5e-4, eps_end=0.01, mem_size=30000, n_games=50000, batch_size=128, lr=5e-3, gamma=0.1, 
        Vol_Frac_1=0.7, Vol_Frac_2=0.5, Vol_Frac_3=0.25, SC=10, P_Norm=10, filename_save='DDQN_TopOpt_Generalized_CNN_4L_', 
        filename_load='DDQN_TopOpt_Generalized_CNN_4L_6by6', Progressive_Refinement=True, LC=False, 
        Load_Checkpoints=True, VF_S=0, Min_Dist=0, Time_Trial=True, configfile='config.json', From_App=True, base_folder="."
    )

    return default_opts

In [4]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cuda device


In [3]:
class DuelingDeepQNetwork(nn.Module):
    def __init__(self, n_actions, Increase):
        super(DuelingDeepQNetwork, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(16, 8, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(8, 4, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(4, 1, kernel_size=3, padding=1)
        self.flatten = nn.Flatten()

    def forward(self, state):
        x = F.relu(self.conv1(state))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.flatten(x)

        #V = self.model_V(x)
        #A = self.model_A(x)
        
        Q = x #V + (A - A.mean(dim=1, keepdim=True))
        return Q
    


class CustomPolicy(ActorCriticPolicy):
    def __init__(self, *args, **kwargs):
        super(CustomPolicy, self).__init__(*args, **kwargs,
                                           net_arch=[dict(pi=[128, 128, 128], vf=[128, 128, 128])])

    def _build_mlp_extractor(self) -> None:
        self.mlp_extractor = DuelingDeepQNetwork(self.features_dim, self.latent_dim)

NameError: name 'nn' is not defined

In [7]:
opts = get_opts()
env = TopOpt_Gen(6,6,0.5,0.5,opts)


check_env(env)
model = PPO(CustomPolicy, env, verbose=1)
print(model)

NameError: name 'namedtuple' is not defined