In [81]:
import glob
import os
import shutil
import subprocess
import math
import json
import re
import threading
import time

from langchain_openai import ChatOpenAI,OpenAI
from langchain_core.prompts import PromptTemplate,ChatPromptTemplate,MessagesPlaceholder
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate

In [76]:
def plain_parser(Message) -> str:
    return Message.content

summary_code_prompt = PromptTemplate.from_template(
    '''
    {code}
    The above code is calculix code, in which the long and repetitive parts are omitted.
    Task: Please describe the function of this code in plain language. 
    I hope to be able to reproduce this code in this language, so some parameters in the code need to be retained in this description.
    Please describe it in natural, human-like language, not in a way that resembles code. Try to write in one paragraph
    '''
)

generate_code_prompt = PromptTemplate.from_template(
    '''
    {description}
    The above description is about FEA simulation. 
    I hope to write a calculix inp file based on this description. 
    I will provide a part of the code, and you can help me complete the following code. 
    Please note that part of the code I give you is omitted and repeated. 
    This is not the part you need to complete. You only need to return the remaining part.
    {code}
    '''
)
step_plan_gen_prompt = PromptTemplate.from_template(
    '''
    Code:
    {code}
    The above the Code of Calculix FEA simulation. 
    I now want to divide this code into multiple steps. Then give a description of each step.
    Please note that lines at the beginning of the code marked with ** generally provide a simple description of the code, and we consider this as the first step. Then, code blocks starting with an asterisk followed by a keyword, such as *NODE, indicate a segment of code. Typically, a step might contain several code segments. 
    For example, 
    *MATERIAL, NAME=EL
    *ELASTIC
    210000.0, .3
    *DENSITY
    7.8E-9 is considered one step that includes the *MATERIAL, *ELASTIC, and *DENSITY segments. 
    Specifically, sections from *STEP to *END STEP are generally regarded as one step.
    Using JSON format to answer. Do not output anything else except json.
    Output Format:
    {{
        "Step 1":{{
            "Description":<1-3 sentances>,
            "Keywords":["<keyword>","<keyword>","<keyword>",...,],
            "Code blocks": [*MATERIAL,*ELASTIC,*DENSITY]
            }},
        "Step 2":{{
        ...
        }}, 
        ....
    }}
    Note: 1. In the 'Code blocks' field, include all lines starting with an asterisk followed by a keyword from each step. Only include the keyword part before any comma, such as '*MATERIAL' from '*MATERIAL, NAME=EL'.
    2. I hope the description includes many detailed pieces of information from the code in this step, so that I can reconstruct the code through the description.
    3. If multiple code blocks of the same type, like *ELEMENT, are present in one step, the 'Code blocks' field should list each *ELEMENT to reflect their respective occurrences. 
    4. For each step, several keywords need to be summarized to reflect the specific functions of this step. These are not the keywords at the beginning of the code blocks, but the keywords extracted from the description. 
    '''
)
step_plan_gen_prompt2 = PromptTemplate.from_template(
    '''
    Code:
    {code}
    The above the Code of Calculix FEA simulation. 
    I have omitted part of the code block if it was too long.
    I have divided the code into multiple steps. The current task is to write a description for each step.
    Using JSON format to answer. Do not output anything else except json.
    Output Format:
    {{
        "Step 1":{{
            "Description":<1-3 sentances>
            }},
        "Step 2":{{
        ...
        }}, 
        ....
    }}
    '''
)

class llm_agent:
    def __init__(self,prompt,parser):
        gpt_key = ''
        self.agent = prompt|ChatOpenAI(temperature=0,api_key=gpt_key, model="gpt-4o")|parser

    def get_result(self,*args, **kwargs):
        llm_response = self.agent.invoke(kwargs)
        return llm_response

In [3]:
def compare_float_files(file1, file2, rel_tol=1e-4, abs_tol=1e-6):
    with open(file1, 'r') as f1:
        lines = f1.readlines()
        f1_num = len(lines)
    with open(file2, 'r') as f2:
        lines = f2.readlines()
        f2_num = len(lines)
    if f1_num != f2_num:
        return False, -1
    with open(file1, 'r') as f1, open(file2, 'r') as f2:
        line_num = 0
        for line1, line2 in zip(f1, f2):
            if line1.strip() and line2.strip():
                values1 = line1.split()
                values2 = line2.split()
                for v1, v2 in zip(values1, values2):
                    try:
                        float_v1 = float(v1)
                        float_v2 = float(v2)
                        if not math.isclose(float_v1, float_v2, rel_tol=rel_tol, abs_tol=abs_tol):
                            print(f'Difference found at line {line_num}: {float_v1} != {float_v2}')
                            return False,line_num
                    except ValueError:
                        if v1 != v2:
                            print(f'Difference found at line {line_num}: {v1} != {v2}')
                            return False,line_num
            line_num += 1
        return True,line_num

In [111]:
starts = {}
material_key_words = ["*DAMPING","*DENSITY","*DEPVAR","*EXPANSION","*ACOUSTIC MEDIUM",
                      "*DEFORMATION PLASTICITY","*USER MATERIAL","*ELASTIC","*EOS","*HYPERELASTIC",
                      "*HYPERFOAM","*HYPOELASTIC","*POROUS ELASTIC","*EOS SHEAR","*FAIL STRAIN",
                      "*FAIL STRESS","*HYSTERESIS","*MULLINS EFFECT","*NO COMPRESSION","*NO TENSION",
                      "*VISCOELASTIC","*BRITTLE CRACKING","*CAP PLASTICITY","*CAST IRON PLASTICITY","*CLAY PLASTICITY",
                      "*CONCRETE","*CONCRETE DAMAGED PLASTICITY","*CRUSHABLE FOAM","*DRUCKER PRAGER","*EOS COMPACTION",
                      "*JOINTED MATERIAL","*MOHR COULOMB","*PLASTIC","*CAP CREEP","*CREEP",
                      "*DRUCKER PRAGER CREEP","*PLASTIC","*RATE DEPENDENT","*SWELLING","*VISCOUS",
                      "*ANNEAL TEMPERATURE","*BRITTLE FAILURE","*CYCLIC HARDENING","*INELASTIC HEAT FRACTION","*ORNL",
                      "*POROUS FAILURE CRITERIA","*POROUS METAL PLASTICITY","*POTENTIAL","*SHEAR FAILURE","*TENSILE FAILURE",
                      "*DAMAGE INITIATION","*DAMAGE EVOLUTION","*DAMAGE STABILIZATION","*CONDUCTIVITY","*HEAT GENERATION",
                      "*LATENT HEAT","*SPECIFIC HEAT","*USER MATERIAL","*GEL","*MOISTURE SWELLING",
                      "*PERMEABILITY","*POROUS BULK MODULI","*SORPTION","*DIELECTRIC","*ELECTRICAL CONDUCTIVITY",
                      "*JOULE HEAT FRACTION","*PIEZOELECTRIC","*DIFFUSIVITY","*SOLUBILITY","*FLUID BULK MODULUS","*FLUID DENSITY",
                      "*FLUID EXPANSION"]
not_keywords =['STEP','END STEP','TYPE']
class calculixInp:
    def __init__(self,name,inp_code) -> None:
        self.name = name
        lines = inp_code.splitlines()
        self.inp_code = inp_code
        self.sections = []
        flag = False
        section_code = ''
        section_codes = []
        omit_codes = []
        section_keywords = {}
        section_start = '*HEADING'
        self.structure_code = ''
        self.other_code =''
        self.omit_code = ''

        flag_in_step = False
        flag_in_material = False

        for line in lines:
            if line.startswith('*') and not line.startswith("**"):
                start = line.split(',')[0].upper().strip()

                sec_lines = section_code.splitlines()
                if len(sec_lines) > 7:
                    section_omit = '\n'.join(sec_lines[:5]+['...']+sec_lines[-2:]) + '\n'
                else:
                    section_omit = '\n'.join(sec_lines) + '\n'

                section_codes.append(section_code)
                omit_codes.append(section_omit)

                section_code = ''

                section_code += (line+'\n')
                
                

                flag_new_step = True
                
                if flag_in_step:
                    flag_new_step = False

                if start == '*STEP':
                    flag_new_step = True
                    flag_in_step = True

                if start == '*END STEP':
                    flag_new_step = False
                    flag_in_step = False
                
                

                if flag_in_material and start in material_key_words: 
                    flag_new_step = False
                
                if flag_in_material and start not in material_key_words: 
                    flag_new_step = True
                    flag_in_material = False

                if start == '*MATERIAL':
                    flag_in_material = True
                    flag_new_step = True

                if not flag_in_step and start == section_start:
                    flag_new_step = False

                if flag_new_step:
                    self.sections.append((section_start,section_codes,omit_codes,section_keywords))
                    section_start = start
                    section_codes = []
                    omit_codes = []
                    section_keywords = {}
                
                keywords = line[1:].split(',')

                for keyword in keywords:
                    key = keyword.upper().split('=')[0].strip()
                    if key not in section_keywords and key not in not_keywords:
                        section_keywords[key] = 0
            else:
                section_code += (line+'\n')

        if section_code not in section_codes:
            section_codes.append(section_code)
            omit_codes.append(section_omit)
        if section_codes!=[]:

            self.sections.append((section_start,section_codes,omit_codes,section_keywords))
    
    def get_len(self):
        result = 0
        for step in self.sections:
            result += len(step[2])
        return result
    def get_omit_code(self):
        result =''
        for step in self.sections:
            for code in step[2]:
                result+=code
        return result
    def get_steps_json(self):
        result = {}
        for i, step in enumerate(self.sections):
            step_num = 'Step '+str(i+1)
            result[step_num] = ''.join(step[2])
        return json.dumps(result,indent=4)
    def get_json(self):
        result = {}
        for i, step in enumerate(self.sections):
            step_num = 'Step '+str(i+1)
            result[step_num] = {}
            result[step_num]['Code'] = ''.join(step[1])
            result[step_num]['Keywords'] = []
            for key in step[3]:
                result[step_num]['Keywords'].append(key)
            result[step_num]['Start'] = step[0]
        return result



In [112]:
def extract_json_from_string(s):
    ss=s
    while (s!='' and s[0]!='{'):
        s = s[1:]
    while (s!='' and s[-1]!='}'):
        s=s[:-1]
    if s!='':
        try:
            json_data = json.loads(s)
            return json_data
        except json.JSONDecodeError:
            return None
    else:
        print("No JSON object found in the string.")
        return None
def thread_task(output_dir,name,sample,plan_agent,sum_agent):
    if sample.get_len()>30:
        return 
    description = sum_agent.get_result(code = sample.get_omit_code())
    plan = plan_agent.get_result(code  = sample.get_steps_json())
    js_plan =extract_json_from_string(plan)
    if js_plan ==None:
        return 
    js_step = sample.get_json()
    for step in js_step:
        if step in js_plan:
            js_step[step]['Description'] = js_plan[step]['Description']
    with open(os.path.join(output_dir,name[:-3]+'plan'),'w') as fout:
        fout.write(json.dumps(js_step,indent=4))
    with open(os.path.join(output_dir,name[:-3]+'des'),'w') as fout:
        fout.write(description)
    
        

In [113]:
output_dir = 'ccx_dataset'
folder_name = 'ccx'
sum_agent = llm_agent(prompt= summary_code_prompt,parser= plain_parser)
plan_agent = llm_agent(step_plan_gen_prompt2,plain_parser)
inps = glob.glob(os.path.join(folder_name,'*.inp'))
lens =[]
for i,inp in enumerate(inps): 
    # if i>1:
    #     break
    with open(inp,'r') as fin:
        inp_code = fin.read()
        sample = calculixInp(name = os.path.basename(inp), inp_code=inp_code)
        
        thread = threading.Thread(target=thread_task, args=(output_dir,os.path.basename(inp),sample, plan_agent,sum_agent))
        threads.append(thread)
        thread.start()
        if i!=0 and i%25 ==0:
            print(i)
            for thread in threads:
                thread.join(timeout=50)
            threads = []
for thread in threads:
    thread.join(timeout=50)
        

25
50
75
100
125
150
175
200
225
250
275
300
325
350
375
400
425
450
475
500
525
550
575


In [122]:
output_dir = 'ccx_dataset'
folder_name = 'ccx'
inps = glob.glob(os.path.join(output_dir,'*.plan'))
print(len(inps))

for i,inp in enumerate(inps): 
    print(inp)
    with open(os.path.join(folder_name,os.path.basename(inp)[:-4]+'inp'),'r') as fin:
        code = fin.read()
        with open(os.path.join(output_dir,os.path.basename(inp)[:-4]+'inp'),'w')  as fout:
            fout.write(code)
    with open(os.path.join(folder_name,os.path.basename(inp)[:-4]+'dat.ref'),'r') as fin:
        code = fin.read()
        with open(os.path.join(output_dir,os.path.basename(inp)[:-4]+'dat.ref'),'w')  as fout:
            fout.write(code)
        

528
ccx_dataset/artery2.plan
ccx_dataset/cou2d.plan
ccx_dataset/beamnlptp.plan
ccx_dataset/disconnect.plan
ccx_dataset/beamimpdy1nodirect.plan
ccx_dataset/beamd3.plan
ccx_dataset/contact11.plan
ccx_dataset/segdyn.plan
ccx_dataset/networkmpc2.plan
ccx_dataset/beamb.plan
ccx_dataset/beamog.plan
ccx_dataset/beamdy6.plan
ccx_dataset/beamfsuper.plan
ccx_dataset/channeljoint1.plan
ccx_dataset/gaspipe-cfd-pressure.plan
ccx_dataset/sens_freq_orien.plan
ccx_dataset/solidshell2.plan
ccx_dataset/substructure.plan
ccx_dataset/pipempc1.plan
ccx_dataset/beamptied1noadjust.plan
ccx_dataset/beamf.plan
ccx_dataset/sens3d.plan
ccx_dataset/membrane2.plan
ccx_dataset/beamdy4.plan
ccx_dataset/beamprand.plan
ccx_dataset/acou1.plan
ccx_dataset/lin_stat_th_gradient.plan
ccx_dataset/acou4.plan
ccx_dataset/beamdy5.plan
ccx_dataset/beamdy12.plan
ccx_dataset/oneel20cf.plan
ccx_dataset/ring4.plan
ccx_dataset/channel14a.plan
ccx_dataset/cubenewt.plan
ccx_dataset/potied.plan
ccx_dataset/contact9.plan
ccx_dataset/bea