In [None]:
# default_exp FE

# FE

> This module contains functions to generate parametric LS-Dyna simulations.

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#export
from pyDOE import lhs
import numpy as np
from scipy.stats.distributions import norm
from scipy.stats import uniform
import yaml
from qd.cae.dyna import KeyFile
import os
import sys
import pandas as pd
import subprocess 
import shlex
from diversipy.hycusampling import maximin_reconstruction as maxmin
import csv

class FE():
    """
    This Class contains set of methods which performs reading of the .yaml file and replaces values of the input parameters 
    with newly generated sample data sets. And then, new key files are generated for simulation. 
    
    -----------
       INPUTS  
    -----------
            settigs : Input file for FE simulations to get the user input                

    """

    def __init__(self, settings):
 
        self.settings = settings
        self.folders_count=0
        self._get_user_input()
     
    def _get_user_input(self): 
        """ gets the user input details from the settings.yaml file.
        
        Returns
        -------
        fin_dir         :   Final path of the created directory
        self.Run        :   Number of runs
        self.para_list  :   A .yaml file containing the parameters/ features/ variables for sampling with appropriate
                            values as subkeys in the same file.
        self.key        :   .key file containg the initial simulation details. 
        """

        """ gets the user input details from the settings.yaml file.
        
        Returns
        -------
        fin_dir         :   Final path of the created directory
        self.Run        :   Number of runs
        self.para_list  :   A .yaml file containing the parameters/ features/ variables for sampling with appropriate
                            values as subkeys in the same file.
        self.key        :   .key file containg the initial simulation details. 
        """

        with open(self.settings,'r') as file:
            inp = yaml.load(file, Loader=yaml.FullLoader) 
        inp_vals=[*inp.values()]
        inp_keys=[*inp.keys()]
        
        req=['project_path','simulations','key','FE_parameters'] 
        
        for names in req:
            if names not in inp_keys:
                raise Exception(names +" not in settings.yaml file")
            if inp[names] == None:
                raise Exception(names +" value not in settings.yaml file")
                
        if isinstance(inp['simulations'], int) == True:
            self.Run=inp['simulations']
            self.int='yes'
            self.Flag=1
        elif isinstance(inp['simulations'], str) == True:
            self.DOE=pd.read_csv(inp['simulations'])
            self.int='no'
            self.Run=len(self.DOE)
            self.Flag=1
        else:
            print('Enter either a Integer or a .csv Input')
               
        self.key=inp['key']
        self.fin_dir=inp['project_path']
        self.basename=inp['basefile']
        self.para_list=inp['FE_parameters']
        self.ncpu = inp['NCPU']
        self.ls_run_exe = inp['LS_Dyna_executable']
        self.outputs=inp['program_name']
        self.cwd=os.getcwd()
        
        self.dyna_dir = os.path.join(self.fin_dir,'.dynakit')
        self.basepath=os.path.join(self.fin_dir,self.basename)
        self.fol_name=self.basename.split('_')[0]

        try:
            os.mkdir(self.dyna_dir)
        except OSError as err:
            print('Adding new samples to the existing directory')
            self.Flag=0
                
        return self.fin_dir , self.Run , self.key , self.para_list
   

    def Read_config(self):
        """ converts the .yaml file to a dictionary
        
        Parameters
        ----------
        self.para_list : the config.yaml file  with the user inputs
        
        Returns
        -------
        z : the .yaml file in dictionary format
        
        """  
        os.chdir(self.cwd)
        with open(self.para_list,'r') as file:
            parameter_list  = yaml.load(file, Loader=yaml.FullLoader) 
        dynParams = {k: v for k, v in parameter_list['parameters'].items() if parameter_list['parameters'][k]['type'] == 'dynaParameter'}
        self.dynaParameters = pd.DataFrame.from_dict(dynParams)
        
        onparams = {k: v for k, v in dynParams.items() if dynParams[k]['status'] == True }
        self.new_par=pd.DataFrame.from_dict(onparams)             
        on=self.new_par.loc['parameter']
        self.on_params=on.to_list()
        
        return self.dynaParameters
    

    def get_samples(self): 
        """ samples the data based on the .yaml file using normal / uniform  distribution and lhs library
        
        Parameters
        ----------
        vars      : values assigned to the sub keys in the .yaml file
        self.Run  : Number of samples required 
        
        Returns
        -------
        Data   : samples matrix in a list
        
        """ 
        os.chdir(self.dyna_dir)
        if self.int=='yes':
            self.col_names=self.dynaParameters.loc['parameter']
        elif self.int=='no':
            self.col_names=self.DOE.columns

        if self.int =='yes':
            
            DOE_s = lhs(len(self.new_par.loc['parameter']),samples = self.Run)
            j=0
            self.DOE=np.zeros((self.Run,len(self.dynaParameters.loc['parameter'])))
            for i in range(0,len(self.dynaParameters.loc['parameter'])):
                if self.dynaParameters.loc['parameter'][i] in self.on_params:
                    self.DOE[:,i]=DOE_s[:,j]
                    j+=1
                else:
                    self.DOE[:,i]=1
            
            save_file=pd.DataFrame(self.DOE)
            os.chdir(self.dyna_dir)
            save_file.to_csv('DOE.csv', index=False)
            minimum_val = self.dynaParameters.loc['min']
            maximum_val = self.dynaParameters.loc['max']
            
            for j in range(0,len(self.dynaParameters.loc['parameter'])):
                if self.dynaParameters.loc['parameter'][j] in self.on_params:
                    if self.dynaParameters.loc['distribution'][j]=='Uniform':
                        self.DOE[:,j]=uniform(self.dynaParameters.loc['min'][j], self.dynaParameters.loc['max'][j] - self.dynaParameters.loc['min'][j]).ppf(self.DOE[:, j])
                    elif self.dynaParameters.loc['distribution'][j]=='Normal':
                        self.DOE[:, j] = norm(loc=self.dynaParameters.loc['mean'][j], scale=self.dynaParameters.loc['SD'][j]).ppf(self.DOE[:, j])
                else:
                    self.DOE[:,j]=self.dynaParameters.loc['default'][j]

        elif self.int=='no':
            os.chdir(self.dyna_dir)
            df=self.DOE
            df.to_csv('DOE.csv', index=False)
            self.DOE=np.array(self.DOE)
            
        return self.DOE

    def add_samples(self):
        """ adds samples of the data based on the .yaml file using normal / uniform distribution and lhs library
        
        Parameters
        ----------
        vars      : values assigned to the sub keys in the .yaml file
        self.Run  : Number of samples required 
        self.fin_dir     : final path of the created directory
        
        Returns
        -------
        Data   : samples matrix in a list
        
        """ 
        os.chdir(self.fin_dir)
        self.folders_count =len([name for name in os.listdir(os.getcwd()) if name.startswith(self.fol_name)])-1
        os.chdir(self.dyna_dir)
        
        if os.path.isfile('DOE.csv'):
            old_DOE_s=pd.read_csv('DOE.csv')
        else:
            print('No preexisting DOE found!')
        
        if self.int=='yes':
            self.col_names=self.dynaParameters.loc['parameter']
        elif self.int=='no':
            self.col_names=self.DOE.columns
        if self.int=='yes':
            old_DOE=np.zeros((self.folders_count,len(self.new_par.loc['parameter'])))
            old=old_DOE_s.values
            j=0
            for i in range(0,len(self.dynaParameters.loc['parameter'])):
                if self.dynaParameters.loc['parameter'][i] in self.on_params:
                    old_DOE[:,j]=old[:,i]
                    j+=1
            data_add=lhs(len(self.new_par.loc['parameter']),samples = self.Run)
            DOE_new_add= maxmin(self.Run,len(self.new_par.loc['parameter']), num_steps=None, initial_points=data_add, existing_points=old_DOE, use_reflection_edge_correction=None, dist_matrix_function=None, callback=None)

            new_DOE=np.zeros((self.Run,len(self.dynaParameters.loc['parameter'])))
            j=0
            for i in range(0,len(self.dynaParameters.loc['parameter'])):
                if self.dynaParameters.loc['parameter'][i] in self.on_params:
                    new_DOE[:,i]=DOE_new_add[:,j]
                    j+=1
                else:
                    new_DOE[:,i]=1
            df=pd.DataFrame(new_DOE)    
           
            os.chdir(self.dyna_dir)
            df.to_csv('DOE.csv', mode='a', header=False, index=False)
            
            self.DOE= pd.read_csv('DOE.csv')
        
            for j in range(0,len(self.dynaParameters.loc['parameter'])):
                if self.dynaParameters.loc['parameter'][j] in self.on_params:
                    if self.dynaParameters.loc['distribution'][j]=='Uniform':
                        self.DOE.values[:,j]=uniform(self.dynaParameters.loc['min'][j], self.dynaParameters.loc['max'][j] - self.dynaParameters.loc['min'][j]).ppf(self.DOE.values[:, j])
                    elif self.dynaParameters.loc['distribution'][j]=='Normal':
                        self.DOE.values[:, j] = norm(loc=self.dynaParameters.loc['mean'][j], scale=self.dynaParameters.loc['SD'][j]).ppf(self.DOE.values[:, j])
                else:
                    self.DOE.values[:,j]=self.dynaParameters.loc['default'][j]
                    
            self.DOE=self.DOE.values
            
        elif self.int=='no':
            os.chdir(self.dyna_dir)
            df=self.DOE
            df.to_csv('DOE.csv', mode='a', header=False, index=False)
            self.DOE=np.array(self.DOE)

        return self.DOE
    
    def generate_key_file(self): 
        """ Generate the new updated .key file and a FE_Parameters.yaml file containing respective sampled values 
        for each parameters in new folders.
        
        Parameters
        ----------
        self.newkey      : a new key file with an updated file path.
        self.fin_dir     : final path of the created directory
        self.Run         : Number of samples required 
        self.para_num    : number of parameters/variables/features
        self.para_names  : Names of parameters/variables/features
        self.DOE         : samples matrix in a list
        
        Returns
        -------
        fldolder in the directory
        
        """
        os.chdir(self.basepath)
        kf=KeyFile(self.key)
        os.chdir(self.fin_dir)  
        key_parameters=kf["*PARAMETER"][0]
        key_parameters_array=np.array(kf["*PARAMETER"][0])
        
        # Creating a dictionary with key and it's values:
        key_dict={}
        R_index=[]
        for i in range(0,len(key_parameters_array)):
            if key_parameters_array[i].startswith('R'):
                R_index.append(i)
                f=key_parameters_array[i].split(' ')
                key_dict[f[1]]=f[-1]
        par_lis=[*key_dict.keys()]
        os.chdir(self.fin_dir)
        self.folders_count =len([name for name in os.listdir(os.getcwd()) if name.startswith(self.fol_name)])

        
        for run in range(0,self.Run):
            
            os.mkdir('{}_{:03}'.format(self.fol_name,(run+self.folders_count)))
            os.chdir('{}_{:03}'.format(self.fol_name,(run+self.folders_count)))
            FE_Parameters = {}
            
            for para in range(0,len(self.col_names)):
                
                for i in range(0,len(R_index)):
                    
                    if par_lis[i] == self.col_names[para]:
                        
                        key_parameters[i+1,1] = self.DOE[run+self.folders_count-1,para]                 
                        kf.save("run_main_{:03}.key".format((run+self.folders_count)))
                        FE_Parameters[par_lis[i]] =  key_parameters[i+1,1]
                    with open('simulation_Parameters.yaml','w') as FE_file:
                        yaml.dump(FE_Parameters,FE_file,default_flow_style = False)
            os.chdir(self.fin_dir)        
        
    def get_simulation_files(self):
        """ 
        Runs all the methods of pre-process class
        
        """
        self.Read_config()
        if self.Flag==1:
            self.get_samples()
        elif self.Flag==0:
            self.add_samples()
        self.generate_key_file()
    

### Ls_Run method
add the functionality to execute lsDyna using subprocess

### Post process
generalised script to combine the results extracted by the

# Test

In [None]:
import os
cur_path=os.getcwd()
path=os.path.join(cur_path,'example')
pro_path=os.path.join(path,'test_project')
base_path=os.path.join(path,'main')
os.chdir(path)
with open('dynakit_FE.yaml') as file:
    a  = yaml.load(file, Loader=yaml.FullLoader) 
a['project_path']=pro_path
name=a['basefile']
with open('dynakit_FE.yaml','w') as file:
    yaml.dump(a,file) 

file_name=name.split('_')[0]
k=FE('dynakit_FE.yaml')

### 1. Test the object 

In [None]:
# path_cur=os.getcwd()
def test_object(k,pro_path):
    os.chdir(pro_path)
    assert os.path.exists('.dynakit') == True
#     assert os.path.exists('Test_file') == True
    
test_object(k,pro_path)

### 2. Test `Read_config`

In [None]:
def test_Read_config(k,path):
    df=k.Read_config()
    os.chdir(path)
    with open('FE_parameters.yaml','r') as file:
        para_list  = yaml.load(file, Loader=yaml.FullLoader) 
    a={k: v for k, v in para_list['parameters'].items() if para_list['parameters'][k]['type'] == 'dynaParameter'}
    assert len([*a.keys()])==len(df.columns)
    assert len([*a.values()][0])==len(df)
    assert [*a.values()][0]['max'] == df.iloc[:,0]['max']
    
test_Read_config(k,path)

### 3. Test `get_samples`

In [None]:
def test_get_samples(k):
    df=k.Read_config()
    test_array=k.get_samples()
    
    assert df.iloc[:,0]['min']<min(test_array[:,0])
    assert df.iloc[:,0]['max']>max(test_array[:,0])
    
test_get_samples(k)

### 4. Test `generate_key_file`

In [None]:
os.chdir(pro_path)
def test_generate_key_file(k):
#     path=os.path.join(os.getcwd(),file_name)
    k.Read_config()
    test_array=k.get_samples()
    k.generate_key_file()
    indx=len([name for name in os.listdir(path) if name.startswith(file_name)])
    
    assert len(test_array)==indx
    
test_generate_key_file(k)

### 5. Test `add samples`

In [None]:
os.chdir(pro_path)
doe_o=len(pd.read_csv(".dynakit/DOE.csv"))
def test_add_samples(k):
    with open('settings.yaml','r') as file:
        set_list  = yaml.load(file, Loader=yaml.FullLoader) 
    k_1=FE('settings.yaml')
    k_1.Read_config()
    k_1.add_samples()
    doe_n=pd.read_csv("DOE.csv")
    assert doe_o+set_list['Runs'] == len(doe_n)
test_add_samples(k)