diff --git a/examples/cfx_static_mixer/StaticMixer_001.cfx b/examples/cfx_static_mixer/StaticMixer_001.cfx new file mode 100644 index 000000000..c2f1c3335 Binary files /dev/null and b/examples/cfx_static_mixer/StaticMixer_001.cfx differ diff --git a/examples/cfx_static_mixer/StaticMixer_001.def b/examples/cfx_static_mixer/StaticMixer_001.def new file mode 100644 index 000000000..3456e766d Binary files /dev/null and b/examples/cfx_static_mixer/StaticMixer_001.def differ diff --git a/examples/cfx_static_mixer/project_setup.py b/examples/cfx_static_mixer/project_setup.py new file mode 100644 index 000000000..8fa5d0f45 --- /dev/null +++ b/examples/cfx_static_mixer/project_setup.py @@ -0,0 +1,179 @@ +""" +Example script to setup a simple CFX project in pyrep. +""" +import argparse +import logging +import os + +from ansys.rep.client import Client, REPError +from ansys.rep.client import __external_version__ as ansys_version +from ansys.rep.client.jms import ( + File, + JmsApi, + Job, + JobDefinition, + Licensing, + Project, + ProjectApi, + ResourceRequirements, + Software, + SuccessCriteria, + TaskDefinition, +) + +log = logging.getLogger(__name__) + + +def create_project(client, name, num_jobs=20, use_exec_script=False): + """ + Create a REP project consisting of an ANSYS CFX model. + """ + jms_api = JmsApi(client) + log.debug("=== Project") + proj = Project(name=name, priority=1, active=True) + proj = jms_api.create_project(proj, replace=True) + + project_api = ProjectApi(client, proj.id) + + log.debug("=== Files") + cwd = os.path.dirname(__file__) + files = [] + files.append( + File( + name="ccl", + evaluation_path="runInput.ccl", + type="text/plain", + src=os.path.join(cwd, "runInput.ccl"), + ) + ) + files.append( + File( + name="inp", + evaluation_path="StaticMixer_001.cfx", + type="application/octet-stream", + src=os.path.join(cwd, "StaticMixer_001.cfx"), + ) + ) + files.append( + File( + name="def", + evaluation_path="StaticMixer_001.def", + type="application/octet-stream", + src=os.path.join(cwd, "StaticMixer_001.def"), + ) + ) + + if use_exec_script: + files.append( + File( + name="exec_cfx", + evaluation_path="exec_cfx.py", + type="application/x-python-code", + src=os.path.join(cwd, "..", "exec_scripts", "exec_cfx.py"), + ) + ) + + files.append( + File( + name="out", + evaluation_path="StaticMixer_*.out", + type="text/plain", + collect=True, + monitor=True, + ) + ) + files.append( + File( + name="res", + evaluation_path="StaticMixer_*.res", + type="text/plain", + collect=True, + monitor=False, + ) + ) + + files = project_api.create_files(files) + file_ids = {f.name: f.id for f in files} + + log.debug("=== JobDefinition with simulation workflow and parameters") + job_def = JobDefinition(name="JobDefinition.1", active=True) + + # Task definition + num_input_files = 4 if use_exec_script else 3 + task_def = TaskDefinition( + name="CFX_run", + software_requirements=[ + Software(name="Ansys CFX", version=ansys_version), + ], + execution_command=None, # only execution script supported initially + resource_requirements=ResourceRequirements( + cpu_core_usage=1.0, + memory=250, + disk_space=5, + ), + execution_level=0, + execution_context={"cfx_cclFile": "runInput.ccl", "cfx_runName": "StaticMixer"}, + max_execution_time=50.0, + num_trials=1, + input_file_ids=[f.id for f in files[:num_input_files]], + output_file_ids=[f.id for f in files[num_input_files:]], + success_criteria=SuccessCriteria( + return_code=0, + required_output_file_ids=[file_ids["out"]], + require_all_output_files=False, + ), + licensing=Licensing(enable_shared_licensing=False), # Shared licensing disabled by default + ) + + if use_exec_script: + task_def.use_execution_script = True + task_def.execution_command = None + task_def.execution_script_id = file_ids["exec_cfx"] + + task_defs = [task_def] + task_defs = project_api.create_task_definitions(task_defs) + + job_def.task_definition_ids = [td.id for td in task_defs] + + # Create job_definition in project + job_def = project_api.create_job_definitions([job_def])[0] + + job_def = project_api.get_job_definitions()[0] + + log.debug(f"=== Create {num_jobs} jobs") + jobs = [] + for i in range(num_jobs): + jobs.append(Job(name=f"Job.{i}", eval_status="pending", job_definition_id=job_def.id)) + jobs = project_api.create_jobs(jobs) + + log.info(f"Created project '{proj.name}', ID='{proj.id}'") + + return proj + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-n", "--name", type=str, default="cfx_static_mixer") + parser.add_argument("-j", "--num-jobs", type=int, default=1) + parser.add_argument("-es", "--use-exec-script", default=True, action="store_true") + parser.add_argument("-U", "--url", default="https://127.0.0.1:8443/rep") + parser.add_argument("-u", "--username", default="repadmin") + parser.add_argument("-p", "--password", default="repadmin") + args = parser.parse_args() + + logger = logging.getLogger() + logging.basicConfig(format="%(message)s", level=logging.DEBUG) + + try: + log.info("Connect to REP JMS") + client = Client(rep_url=args.url, username=args.username, password=args.password) + log.info(f"REP URL: {client.rep_url}") + proj = create_project( + client=client, + name=args.name, + num_jobs=args.num_jobs, + use_exec_script=args.use_exec_script, + ) + + except REPError as e: + log.error(str(e)) diff --git a/examples/cfx_static_mixer/runInput.ccl b/examples/cfx_static_mixer/runInput.ccl new file mode 100644 index 000000000..e6256b677 --- /dev/null +++ b/examples/cfx_static_mixer/runInput.ccl @@ -0,0 +1,7 @@ +SIMULATION CONTROL: + EXECUTION CONTROL: + RUN DEFINITION: + Solver Input File = StaticMixer_001.def + END + END +END diff --git a/examples/exec_scripts/exec_cfx.py b/examples/exec_scripts/exec_cfx.py new file mode 100644 index 000000000..891bd1b35 --- /dev/null +++ b/examples/exec_scripts/exec_cfx.py @@ -0,0 +1,267 @@ +""" +Copyright (C) 2021 ANSYS, Inc. and its subsidiaries. All Rights Reserved. +""" +import _thread +import json +import logging +import os +from os import path +import platform +import re +import shlex +import subprocess +import time +import traceback + +from ansys.rep.common.logging import log +from ansys.rep.evaluator.task_manager import ApplicationExecution +from ansys.rep.evaluator.task_manager.context import SubmitContext + + +class CfxExecution(ApplicationExecution): + isLinux = platform.platform().startswith("Linux") + + def __init__(self, context): + self.active_run_name = None + self.putative_run_name = None + self.withSoftInterrupt = True + ApplicationExecution.__init__(self, context) + + def publish_to_default_log(self, msg): + log.info(msg) + + def publish_to_debug_log(self, msg): + log.debug(msg) + + def execute(self): + log.info("Start CFX execution script") + + try: + log.info("Evaluator Platform: " + platform.platform()) + + num_cores = self.context.resource_requirements["num_cores"] + log.info(f"Requested cores: {num_cores}") + + # create defaults for inputs not provided + inputs = { + "cfx_additionalArgs": self.context.execution_context.get("cfx_additionalArgs", "") + } + inputs["cfx_solverFile"] = self.context.execution_context.get("cfx_solverFile", None) + inputs["cfx_definitionFile"] = self.context.execution_context.get( + "cfx_definitionFile", None + ) + inputs["cfx_iniFile"] = self.context.execution_context.get("cfx_iniFile", None) + inputs["cfx_cclFile"] = self.context.execution_context.get("cfx_cclFile", None) + inputs["cfx_contFile"] = self.context.execution_context.get("cfx_contFile", None) + inputs["cfx_mcontFile"] = self.context.execution_context.get("cfx_mcontFile", None) + inputs["cfx_mdefFile"] = self.context.execution_context.get("cfx_mdefFile", None) + inputs["cfx_parFile"] = self.context.execution_context.get("cfx_parFile", None) + inputs["cfx_indirectPath"] = self.context.execution_context.get( + "cfx_indirectPath", None + ) + inputs["cfx_version"] = self.context.execution_context.get("cfx_version", None) + inputs["cfx_useAAS"] = self.context.execution_context.get("cfx_useAAS", False) + inputs["cfx_startMethod"] = self.context.execution_context.get("cfx_startMethod", None) + inputs["cfx_runName"] = self.context.execution_context.get("cfx_runName", None) + + self.publish_to_default_log( + "Task inputs after applying default values to missing inputs:" + ) + for name in inputs.keys(): + if inputs[name] == None: + continue + self.publish_to_default_log("\t-" + name + ":<" + str(inputs[name]) + ">") + + # Check existence of files which must exist if specified + inputs_existchk = [ + "cclFile", + "contFile", + "definitionFile", + "iniFile", + "mcontFile", + "mdefFile", + "parFile", + "solverFile", + ] + + self.publish_to_default_log("Checking if provided files exist in the storage...") + for i in inputs_existchk: + k = "cfx_" + i + if not inputs[k] == None: + if not os.path.isfile(inputs[k]): + raise Exception("Required file does not exist!\n" + inputs[k]) + + if not inputs["cfx_indirectPath"] == None: + # Special check for indirect startup and set active name for later use + rundir = inputs["cfx_indirectPath"] + ".dir" + if not os.path.isdir(rundir): + raise Exception("Required directory does not exist!\n" + rundir) + startup_ccl = rundir + "/startup.ccl" + if not os.path.isfile(startup_ccl): + raise Exception(startup_ccl) + self.active_run_name = inputs["cfx_indirectPath"] + else: + # Set putative run name from input file + for i in ["definitionFile", "mdefFile", "contFile", "iniFile", "mcontFile"]: + k = "cfx_" + i + if not inputs[k] == None: + probname = re.sub("(_\d{3})?\.[^\.]+$", "", inputs[k]) + self.set_putative_run_name(probname) + break + + if self.putative_run_name == None and inputs["cfx_runName"] != None: + self.set_putative_run_name(inputs["cfx_runName"]) + + # Set putative run name from -eg or -name value (-name always wins) + if ( + not inputs["cfx_additionalArgs"] == "" + and not inputs["cfx_additionalArgs"] == None + ): + for opt in ["-eg", "-example", "-name"]: + m = re.search(opt + "\s+([^\s-]+)", inputs["cfx_additionalArgs"]) + if m: + self.set_putative_run_name(m.group(1)) + + # Identify application + app_name = "Ansys CFX" + app = next((a for a in self.context.software if a["name"] == app_name), None) + assert app, f"{app_name} is required for execution" + + log.info("Using " + app["name"] + " " + app["version"]) + log.info("Current directory: " + os.getcwd()) + + files = [f for f in os.listdir(".") if os.path.isfile(f)] + for f in files: + log.info(" " + f) + + # Determine CFX root directory, solver command and hosts + self.publish_to_default_log("CFX Root directory = " + app["install_path"]) + + exe = app["executable"] # should already be platform specific + self.publish_to_default_log("CFX Solver command: " + exe) + + # Create command line + # Add parallel options + cmd = [exe] + cmd.extend(["-fullname", self.active_run_name]) + cmd.append("-batch") + cmd.append("-serial") + + # Add options requiring an argument + options_arg = { + "-ccl": "cclFile", + "-continue-from-file": "contFile", + "-def": "definitionFile", + "-indirect-startup-path": "indirectPath", + "-initial-file": "iniFile", + "-mcont": "mcontFile", + "-mdef": "mdefFile", + "-parfile-read": "parFile", + "-solver": "solverFile", + } + for opt, i in sorted(options_arg.items()): + k = "cfx_" + i + if not inputs[k] == None: + cmd.extend([opt, inputs[k]]) + + # Add additional options + if not inputs["cfx_additionalArgs"] == "": + cmd.extend(shlex.split(inputs["cfx_additionalArgs"])) + + # Start the solver + self.publish_to_default_log("CFX solver command line = " + str(cmd)) + + rc = None + self.CFXOutputFile = None + self.CFXMonFile = None + with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) as self.proc: + self.publish_to_default_log("CFX solver started\npid:" + format(self.proc.pid)) + t1 = _thread.start_new_thread(self.process_output, (self.proc,)) + t2 = _thread.start_new_thread(self.process_error, (self.proc,)) + + while rc is None: + rc = self.proc.poll() + time.sleep(1) + + # Post solution actions + for msg in ["Finished CFX solve"]: + self.publish_to_default_log(msg) + + if rc != 0: + self.publish_to_default_log(f"Error: Solver exited with errors ({rc}).") + raise Exception("Solver exited with errors.") + + return + + except Exception as e: + self.publish_to_debug_log(traceback.print_exc()) + self.publish_to_default_log(str(e)) + raise e + + # Set putative run name from problem name (to be called BEFORE the run is started) + def set_putative_run_name(self, probname): + if self.active_run_name != None: + return + imax = 0 + for dI in os.listdir(os.getcwd()): + m = re.match("^" + probname + "_(\d+)(\.(ansys|dir|out|res|mres|trn|cfx))?$", dI) + if m: + i = int(m.group(1)) + if i > imax: + imax = i + prob_ext = str(imax + 1) + self.putative_run_name = probname + "_" + prob_ext.zfill(3) + self.active_run_name = self.putative_run_name + self.publish_to_default_log("Set putative run name = " + self.putative_run_name) + + # Find active run name from putative run name (to be called AFTER the run is started) + def find_active_run_name(self): + # Putative run name set: Wait for output or run directory or output file to exist + if self.active_run_name == None: + if self.putative_run_name == None: + raise Exception("Unable to find active run name. Putative run name not set.") + outdir = path.join(os.getcwd(), self.putative_run_name) + rundir = outdir + ".dir" + outfile = outdir + ".out" + while self.active_run_name == None: + if path.isdir(outdir) or path.isdir(rundir) or path.isfile(outfile): + self.active_run_name = self.putative_run_name + else: + time.sleep(1) + return self.active_run_name + + # Monitor the stdout of the main process. If present, create log and log data. + def process_output(self, proc): + for line in iter(proc.stdout.readline, b""): + msg = line.decode("utf-8").rstrip() + self.publish_to_default_log(msg) + proc.stdout.close() + + # Monitor the stderr of the main process. If present, create log and log data. + def process_error(self, proc): + for line in iter(proc.stderr.readline, b""): + msg = line.decode("utf-8").rstrip() + self.publish_to_default_log(msg) + proc.stderr.close() + + +# EXAMPLE: this function will only be called if this script is run at the command line. +if __name__ == "__main__": + log = logging.getLogger() + logging.basicConfig(format="%(message)s", level=logging.DEBUG) + + try: + log.info("Loading sample CFX context...") + + with open("cfx_context.json", "r") as f: + context = json.load(f) + print(context) + + submit_context = SubmitContext(**context) + + log.info("Executing...") + ex = CfxExecution(submit_context).execute() + log.info("Execution ended.") + + except Exception as e: + log.error(str(e)) diff --git a/examples/exec_scripts/exec_fluent.py b/examples/exec_scripts/exec_fluent.py index 1a6546b80..074cd4782 100644 --- a/examples/exec_scripts/exec_fluent.py +++ b/examples/exec_scripts/exec_fluent.py @@ -1,156 +1,186 @@ """ Copyright (C) 2021 ANSYS, Inc. and its subsidiaries. All Rights Reserved. """ -import subprocess -import time import _thread -import os -import traceback import json -import psutil -import platform import logging +import os +import platform +import subprocess +import time +import traceback from ansys.rep.common.logging import log from ansys.rep.evaluator.task_manager import ApplicationExecution from ansys.rep.evaluator.task_manager.context import SubmitContext +import psutil + class FluentExecution(ApplicationExecution): isLinux = platform.platform().startswith("Linux") - + def __init__(self, context): self.CleanupScript = None self.FluentTranscript = None self.error_detected = False - self.fluent_children=[] + self.fluent_children = [] ApplicationExecution.__init__(self, context) def execute(self): try: log.info("Start FLUENT execution script") - pythoncode_version="0.1" - log.info("python code version "+pythoncode_version) - + pythoncode_version = "0.1" + log.info("python code version " + pythoncode_version) + log.info("Evaluator Platform: " + platform.platform()) - + num_cores = self.context.resource_requirements["num_cores"] - log.info(f'Requested cores: {num_cores}') + log.info(f"Requested cores: {num_cores}") - #self.environmentInfo.defaultMpi + # self.environmentInfo.defaultMpi defaultMpi = "intel" - #create defaults for inputs not provided - inputs = { "fluent_dimension" : self.context.execution_context.get("fluent_dimension","2d") } - inputs["fluent_precision"] = self.context.execution_context.get("fluent_precision", "dp") + # create defaults for inputs not provided + inputs = { + "fluent_dimension": self.context.execution_context.get("fluent_dimension", "2d") + } + inputs["fluent_precision"] = self.context.execution_context.get( + "fluent_precision", "dp" + ) inputs["fluent_meshing"] = self.context.execution_context.get("fluent_meshing", False) - inputs["fluent_numGPGPUsPerMachine"] = self.context.execution_context.get("fluent_numGPGPUsPerMachine", 0) - inputs["fluent_defaultFluentVersion"] = self.context.execution_context.get("fluent_defaultFluentVersion", None) - inputs["fluent_MPIType"] = self.context.execution_context.get("fluent_MPIType", defaultMpi) - inputs["fluent_otherEnvironment"] = self.context.execution_context.get("fluent_otherEnvironment", "{}") + inputs["fluent_numGPGPUsPerMachine"] = self.context.execution_context.get( + "fluent_numGPGPUsPerMachine", 0 + ) + inputs["fluent_defaultFluentVersion"] = self.context.execution_context.get( + "fluent_defaultFluentVersion", None + ) + inputs["fluent_MPIType"] = self.context.execution_context.get( + "fluent_MPIType", defaultMpi + ) + inputs["fluent_otherEnvironment"] = self.context.execution_context.get( + "fluent_otherEnvironment", "{}" + ) inputs["fluent_UDFBat"] = self.context.execution_context.get("fluent_UDFBat", None) inputs["fluent_jouFile"] = self.context.execution_context.get("fluent_jouFile", None) inputs["fluent_useGUI"] = self.context.execution_context.get("fluent_useGUI", False) - inputs["fluent_additionalArgs"] = self.context.execution_context.get("fluent_additionalArgs", "") + inputs["fluent_additionalArgs"] = self.context.execution_context.get( + "fluent_additionalArgs", "" + ) log.info("Task inputs ") for name in inputs.keys(): - if inputs[name]==None:continue - log.info("\t-"+name+":<"+str(inputs[name])+">") - + if inputs[name] == None: + continue + log.info("\t-" + name + ":<" + str(inputs[name]) + ">") + log.info("Checking if required inputs are provided...") - - valid_launcher_dimensions=['2d','3d'] + + valid_launcher_dimensions = ["2d", "3d"] if not inputs["fluent_dimension"] in valid_launcher_dimensions: - raise Exception("Required Input is invalid! fluent_dimension("+inputs["fluent_dimension"]+")\nValid values are "+format(valid_launcher_dimensions)) - - valid_launcher_precisions=['sp','dp'] + raise Exception( + "Required Input is invalid! fluent_dimension(" + + inputs["fluent_dimension"] + + ")\nValid values are " + + format(valid_launcher_dimensions) + ) + + valid_launcher_precisions = ["sp", "dp"] if not inputs["fluent_precision"] in valid_launcher_precisions: - raise Exception("Required Input is invalid! fluent_precision("+inputs["fluent_precision"]+")\nValid values are "+format(valid_launcher_precisions)) + raise Exception( + "Required Input is invalid! fluent_precision(" + + inputs["fluent_precision"] + + ")\nValid values are " + + format(valid_launcher_precisions) + ) # Identify application - app_name = "ANSYS Fluent" + app_name = "Ansys Fluent" app = next((a for a in self.context.software if a["name"] == app_name), None) assert app, f"{app_name} is required for execution" - - log.info("Using "+app["name"]+" "+app["version"]) + + log.info("Using " + app["name"] + " " + app["version"]) log.info("Current directory: " + os.getcwd()) - files = [f for f in os.listdir('.') if os.path.isfile(f)] + files = [f for f in os.listdir(".") if os.path.isfile(f)] for f in files: log.info(" " + f) if not os.path.isfile(inputs["fluent_jouFile"]): - raise Exception("File "+inputs["fluent_jouFile"]+" does not exist!") + raise Exception("File " + inputs["fluent_jouFile"] + " does not exist!") # Add " around exe if needed for Windows exe = app["executable"] # should already be platform specific log.info("Fluent executable: " + exe) - - if inputs["fluent_UDFBat"]==None: + + if inputs["fluent_UDFBat"] == None: if self.isLinux: - pass #no need in Linux, None is OK + pass # no need in Linux, None is OK else: - inputs["fluent_UDFBat"]=os.path.join(os.path.dirname(exe), "udf.bat") - log.info("Setting fluent_UDFBat to "+inputs["fluent_UDFBat"]) - - otherEnvironment=json.loads(inputs['fluent_otherEnvironment']) - noGuiOptions=None + inputs["fluent_UDFBat"] = os.path.join(os.path.dirname(exe), "udf.bat") + log.info("Setting fluent_UDFBat to " + inputs["fluent_UDFBat"]) + + otherEnvironment = json.loads(inputs["fluent_otherEnvironment"]) + noGuiOptions = None if not inputs["fluent_useGUI"]: if self.isLinux: noGuiOptions = " -gu -driver null" else: noGuiOptions = " -hidden -driver null" - - log.debug(f'exe: {exe}') - args=inputs['fluent_dimension'] - args+= (inputs["fluent_precision"] if inputs["fluent_precision"]=="dp" else "") - args+= (" -meshing" if inputs["fluent_meshing"] else "") - args+= (" -t"+format(num_cores)) - if inputs["fluent_MPIType"]!=None and inputs["fluent_MPIType"]!= "": - args+= (" -mpi="+format(inputs["fluent_MPIType"])) - if inputs['fluent_numGPGPUsPerMachine'] > 0: - args+= (" -gpgp="+format(inputs['fluent_numGPGPUsPerMachine'])) - args+= (" -i "+ inputs['fluent_jouFile']) - #args+= cnf - if not noGuiOptions==None:args+=noGuiOptions - args+=(" "+inputs["fluent_additionalArgs"]+" ") - + + log.debug(f"exe: {exe}") + args = inputs["fluent_dimension"] + args += inputs["fluent_precision"] if inputs["fluent_precision"] == "dp" else "" + args += " -meshing" if inputs["fluent_meshing"] else "" + args += " -t" + format(num_cores) + if inputs["fluent_MPIType"] != None and inputs["fluent_MPIType"] != "": + args += " -mpi=" + format(inputs["fluent_MPIType"]) + if inputs["fluent_numGPGPUsPerMachine"] > 0: + args += " -gpgp=" + format(inputs["fluent_numGPGPUsPerMachine"]) + args += " -i " + inputs["fluent_jouFile"] + # args+= cnf + if not noGuiOptions == None: + args += noGuiOptions + args += " " + inputs["fluent_additionalArgs"] + " " + cmd = [os.path.basename(exe)] - cmd.extend(args.split(' ')) - + cmd.extend(args.split(" ")) + rc = None - firstchild=None + firstchild = None fluent_env = os.environ.copy() - + for oenv in otherEnvironment: - if "FLUENT_GUI"==oenv['Name']:continue - #if "FLUENT_AAS"==oenv['Name']:continue - fluent_env[oenv['Name']]=oenv['Value'] + if "FLUENT_GUI" == oenv["Name"]: + continue + # if "FLUENT_AAS"==oenv['Name']:continue + fluent_env[oenv["Name"]] = oenv["Value"] log.info("Fluent environment:") for k in fluent_env: try: - log.info("\t- "+k+"\n\t\t "+fluent_env[k]) - except: - log.info("\t- error while printing "+k) - - log.info(' '.join(cmd)) - + log.info("\t- " + k + "\n\t\t " + fluent_env[k]) + except: + log.info("\t- error while printing " + k) + + log.info(" ".join(cmd)) + max_wait_time = 120 tried_time = 0 self.error_detected = False - with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=fluent_env, executable=exe) as self.proc: - log.info("Fluent started\npid:"+format(self.proc.pid)) + with subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=fluent_env, executable=exe + ) as self.proc: + log.info("Fluent started\npid:" + format(self.proc.pid)) log.info("TODO: start new thread to monitor process children") - t3=_thread.start_new_thread(self.monitor_children, (self.proc,)) + t3 = _thread.start_new_thread(self.monitor_children, (self.proc,)) log.info("Fluent started a new thread to monitor its children") - t4=_thread.start_new_thread(self.monitor_transcript, (self.proc,)) + t4 = _thread.start_new_thread(self.monitor_transcript, (self.proc,)) log.info("Fluent started a new thread to monitor its transcript") - t1=_thread.start_new_thread(self.process_output, (self.proc,)) + t1 = _thread.start_new_thread(self.process_output, (self.proc,)) log.info("Fluent started a new thread for stdout log") - t2=_thread.start_new_thread(self.process_error, (self.proc,)) + t2 = _thread.start_new_thread(self.process_error, (self.proc,)) log.info("Fluent started a new thread for stderr log") while True: if self.error_detected: @@ -165,64 +195,76 @@ def execute(self): elif firstchild is None: time.sleep(3) tried_time = tried_time + 3 - if len(self.fluent_children)==0: - if (tried_time < max_wait_time): - log.info("\t- no fluent children process found, continue") + if len(self.fluent_children) == 0: + if tried_time < max_wait_time: + log.info("\t- no fluent children process found, continue") continue else: - log.info("\t- can not start fluent in " + format(max_wait_time) + "seconds, quit the process") + log.info( + "\t- can not start fluent in " + + format(max_wait_time) + + "seconds, quit the process" + ) break - firstchild=self.fluent_children[0] - log.info("rc:"+format(rc)+" ,firstchild:"+format(firstchild)) + firstchild = self.fluent_children[0] + log.info("rc:" + format(rc) + " ,firstchild:" + format(firstchild)) elif not psutil.pid_exists(firstchild): log.info("\t- fluent exits normally") break - + log.info("Finished Fluent solve") - if rc!=0: - log.info(f'Error: Solver exited with errors ({rc}).') + if rc != 0: + log.info(f"Error: Solver exited with errors ({rc}).") raise Exception("Solver exited with errors.") - + except Exception as e: log.info("====== error in execute =========") log.debug(traceback.print_exc()) log.info(str(e)) log.info("====== error in execute =========") raise e - - #monitor the children of the main process - def monitor_children(self,proc): - starting_process=psutil.Process(proc.pid) + + # monitor the children of the main process + def monitor_children(self, proc): + starting_process = psutil.Process(proc.pid) try: while True: for child in starting_process.children(): - if not child.pid in self.fluent_children:self.fluent_children.append(child.pid) + if not child.pid in self.fluent_children: + self.fluent_children.append(child.pid) time.sleep(0.001) except Exception as e: - if not 'psutil.NoSuchProcess' in format(e): - errormessage=traceback.format_exc() + if not "psutil.NoSuchProcess" in format(e): + errormessage = traceback.format_exc() log.info(errormessage) - log.info("<"+format(e)+">") - - #monitor creation and content of transcript files and record content to corresponding logs - def monitor_transcript(self,proc): + log.info("<" + format(e) + ">") + + # monitor creation and content of transcript files and record content to corresponding logs + def monitor_transcript(self, proc): try: while True: log.info("Looking for fluent automatically generated transcript file...") - if not self.FluentTranscript==None:break + if not self.FluentTranscript == None: + break time.sleep(1) - for fn in os.listdir('.'): - if not fn.endswith(".trn"):continue - if fn.endswith(format(self.proc.pid)+".trn"): - self.FluentTranscript=fn + for fn in os.listdir("."): + if not fn.endswith(".trn"): + continue + if fn.endswith(format(self.proc.pid) + ".trn"): + self.FluentTranscript = fn for childpid in self.fluent_children: - if fn.endswith(format(childpid)+".trn"): - log.info("Warning: a fluent child process generated transcript <" + format(fn) + "> is found!") - self.FluentTranscript=fn - if not self.FluentTranscript==None:break - log.info("Fluent transcript detected: <"+format(self.FluentTranscript)+">") - - current_line=0 + if fn.endswith(format(childpid) + ".trn"): + log.info( + "Warning: a fluent child process generated transcript <" + + format(fn) + + "> is found!" + ) + self.FluentTranscript = fn + if not self.FluentTranscript == None: + break + log.info("Fluent transcript detected: <" + format(self.FluentTranscript) + ">") + + current_line = 0 while True: time.sleep(1) with open(self.FluentTranscript) as f: @@ -230,60 +272,71 @@ def monitor_transcript(self,proc): next(f) for line in f: log.info(line.rstrip()) - current_line=current_line+1 - msg=line.rstrip() - if msg.startswith('ANSYS LICENSE STDOUT ERROR'): - self.error_detected=True + current_line = current_line + 1 + msg = line.rstrip() + if msg.startswith("ANSYS LICENSE STDOUT ERROR"): + self.error_detected = True log.info("License error detected in fluent") - if msg.startswith('Unexpected license problem'): - self.error_detected=True + if msg.startswith("Unexpected license problem"): + self.error_detected = True log.info("Unexpected license error detected in fluent") - if msg.startswith('Warning: An error or interrupt occurred while reading the journal file'): - self.error_detected=True + if msg.startswith( + "Warning: An error or interrupt occurred while reading the journal file" + ): + self.error_detected = True log.info("An error detected in fluent, killing fluent...") - if msg.startswith('Error:'): - self.error_detected=True - log.info("An error detected in fluent, killing fluent...") - if msg.startswith('Cleanup script file is'): - self.CleanupScript=msg.replace('Cleanup script file is ','') - log.debug("Execute kills script is : "+ self.CleanupScript) + if msg.startswith("Error:"): + self.error_detected = True + log.info("An error detected in fluent, killing fluent...") + if msg.startswith("Cleanup script file is"): + self.CleanupScript = msg.replace("Cleanup script file is ", "") + log.debug("Execute kills script is : " + self.CleanupScript) if msg.startswith('Opening input/output transcript to file "'): - self.FluentTranscript=msg.replace('Opening input/output transcript to file "','').replace('".','') - log.debug("Fluent transcript is : "+ self.FluentTranscript) + self.FluentTranscript = msg.replace( + 'Opening input/output transcript to file "', "" + ).replace('".', "") + log.debug("Fluent transcript is : " + self.FluentTranscript) except Exception as e: - errormessage=traceback.format_exc() + errormessage = traceback.format_exc() log.info(errormessage) - log.info("<"+format(e)+">") + log.info("<" + format(e) + ">") - #monitor the stdout of the main process and log information to corresponding logs + # monitor the stdout of the main process and log information to corresponding logs def process_output(self, proc): - for line in iter(proc.stdout.readline, b''): - msg=line.decode("utf-8").rstrip() + for line in iter(proc.stdout.readline, b""): + msg = line.decode("utf-8").rstrip() log.info(msg) - if msg.startswith('ANSYS LICENSE MANAGER ERROR'):self.error_detected=True - if msg.startswith('Cleanup script file is'): - self.CleanupScript=msg.replace('Cleanup script file is ','') - log.debug("Execute kills script is : "+ self.CleanupScript) + if msg.startswith("ANSYS LICENSE MANAGER ERROR"): + self.error_detected = True + if msg.startswith("Cleanup script file is"): + self.CleanupScript = msg.replace("Cleanup script file is ", "") + log.debug("Execute kills script is : " + self.CleanupScript) if msg.startswith('Opening input/output transcript to file "'): - self.FluentTranscript=msg.replace('Opening input/output transcript to file "','').replace('".','') - log.debug("Fluent transcript is : "+ self.FluentTranscript) - #log.info(msg) - if self.error_detected:log.debug(msg) + self.FluentTranscript = msg.replace( + 'Opening input/output transcript to file "', "" + ).replace('".', "") + log.debug("Fluent transcript is : " + self.FluentTranscript) + # log.info(msg) + if self.error_detected: + log.debug(msg) proc.stdout.close() - - #monitor the stderr of the main process and log information to corresponding logs + + # monitor the stderr of the main process and log information to corresponding logs def process_error(self, proc): - for line in iter(proc.stderr.readline, b''): - msg=line.decode("utf-8").rstrip() + for line in iter(proc.stderr.readline, b""): + msg = line.decode("utf-8").rstrip() log.error(msg) - if msg.startswith('Fatal error in MPI_Init: Internal MPI error!'): - if self.CleanupScript==None: + if msg.startswith("Fatal error in MPI_Init: Internal MPI error!"): + if self.CleanupScript == None: self.proc.kill() else: - p = subprocess.Popen(self.CleanupScript,stdout=subprocess.PIPE, stderr=subprocess.PIPE) + p = subprocess.Popen( + self.CleanupScript, stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) stdout, stderr = p.communicate() proc.stderr.close() + # EXAMPLE: this function will only be called if this script is run at the command line. if __name__ == "__main__": log = logging.getLogger() @@ -295,7 +348,7 @@ def process_error(self, proc): with open("fluent_context.json", "r") as f: context = json.load(f) print(context) - + submit_context = SubmitContext(**context) log.info("Executing...") @@ -304,4 +357,3 @@ def process_error(self, proc): except Exception as e: log.error(str(e)) - diff --git a/examples/exec_scripts/exec_mapdl.py b/examples/exec_scripts/exec_mapdl.py index 8cf83b6be..a1bf9483e 100644 --- a/examples/exec_scripts/exec_mapdl.py +++ b/examples/exec_scripts/exec_mapdl.py @@ -10,10 +10,11 @@ from ansys.rep.common.logging import log from ansys.rep.evaluator.task_manager import ApplicationExecution + class MAPDLExecution(ApplicationExecution): def execute(self): - log.info("Start MAPDL execution script") + log.info("Starting MAPDL execution script") # Identify files inp_file = next((f for f in self.context.input_files if f["name"] == "inp"), None) diff --git a/examples/fluent_nozzle/project_setup.py b/examples/fluent_nozzle/project_setup.py index 7b20ce58c..f198d469c 100644 --- a/examples/fluent_nozzle/project_setup.py +++ b/examples/fluent_nozzle/project_setup.py @@ -18,14 +18,15 @@ ResourceRequirements, Software, SuccessCriteria, - TaskDefinition + TaskDefinition, ) log = logging.getLogger(__name__) + def create_project(client, name, num_jobs=20, use_exec_script=False): """ - Create a REP project consisting of an ANSYS Fluent model. + Create a REP project consisting of an Ansys Fluent model. """ jms_api = JmsApi(client) log.debug("=== Project") @@ -38,10 +39,20 @@ def create_project(client, name, num_jobs=20, use_exec_script=False): cwd = os.path.dirname(__file__) files = [] files.append( - File(name="inp", evaluation_path="nozzle.cas", type="text/plain", src=os.path.join(cwd, "nozzle.cas") ) + File( + name="inp", + evaluation_path="nozzle.cas", + type="text/plain", + src=os.path.join(cwd, "nozzle.cas"), + ) ) files.append( - File(name="jou", evaluation_path="solve.jou", type="text/plain", src=os.path.join(cwd, "solve.jou") ) + File( + name="jou", + evaluation_path="solve.jou", + type="text/plain", + src=os.path.join(cwd, "solve.jou"), + ) ) if use_exec_script: @@ -55,22 +66,50 @@ def create_project(client, name, num_jobs=20, use_exec_script=False): ) files.append( - File(name="trn", evaluation_path="fluent*.trn", type="text/plain", collect=True, monitor=True) + File( + name="trn", evaluation_path="fluent*.trn", type="text/plain", collect=True, monitor=True + ) ) files.append( - File(name="surf_out", evaluation_path="surf*.out", type="text/plain", collect=True, monitor=True) + File( + name="surf_out", + evaluation_path="surf*.out", + type="text/plain", + collect=True, + monitor=True, + ) ) files.append( - File(name="vol_out", evaluation_path="vol*.out", type="text/plain", collect=True, monitor=True) + File( + name="vol_out", + evaluation_path="vol*.out", + type="text/plain", + collect=True, + monitor=True, + ) ) files.append( - File(name="err", evaluation_path="*error.log", type="text/plain", collect=True, monitor=True) + File( + name="err", evaluation_path="*error.log", type="text/plain", collect=True, monitor=True + ) ) files.append( - File(name="output_cas", evaluation_path="nozzle.cas.h5", type="application/octet-stream", collect=True, monitor=False) + File( + name="output_cas", + evaluation_path="nozzle.cas.h5", + type="application/octet-stream", + collect=True, + monitor=False, + ) ) files.append( - File(name="output_data", evaluation_path="nozzle.dat.h5", type="application/octet-stream", collect=True, monitor=False) + File( + name="output_data", + evaluation_path="nozzle.dat.h5", + type="application/octet-stream", + collect=True, + monitor=False, + ) ) files = project_api.create_files(files) @@ -84,24 +123,24 @@ def create_project(client, name, num_jobs=20, use_exec_script=False): task_def = TaskDefinition( name="Fluent_run", software_requirements=[ - Software(name="ANSYS Fluent", version=ansys_version), + Software(name="Ansys Fluent", version=ansys_version), ], - execution_command=None, # Only execution currently supported + execution_command=None, # Only execution currently supported resource_requirements=ResourceRequirements( cpu_core_usage=1.0, memory=250, disk_space=5, ), execution_level=0, - execution_context = { - "fluent_dimension" : "3d", - "fluent_precision" : "dp", - "fluent_meshing" : False, - "fluent_numGPGPUsPerMachine" : 0, - "fluent_MPIType" : "intel", - "fluent_otherEnvironment" : "{}", - "fluent_jouFile" : "solve.jou", - "fluent_useGUI" : False + execution_context={ + "fluent_dimension": "3d", + "fluent_precision": "dp", + "fluent_meshing": False, + "fluent_numGPGPUsPerMachine": 0, + "fluent_MPIType": "intel", + "fluent_otherEnvironment": "{}", + "fluent_jouFile": "solve.jou", + "fluent_useGUI": False, }, max_execution_time=50.0, num_trials=1, @@ -109,15 +148,19 @@ def create_project(client, name, num_jobs=20, use_exec_script=False): output_file_ids=[f.id for f in files[num_input_files:]], success_criteria=SuccessCriteria( return_code=0, - required_output_file_ids=[file_ids["output_cas"], file_ids["surf_out"], file_ids["vol_out"] ], - require_all_output_files=False + required_output_file_ids=[ + file_ids["output_cas"], + file_ids["surf_out"], + file_ids["vol_out"], + ], + require_all_output_files=False, ), licensing=Licensing(enable_shared_licensing=False), # Shared licensing disabled by default ) if use_exec_script: task_def.use_execution_script = True - task_def.execution_command=None + task_def.execution_command = None task_def.execution_script_id = file_ids["exec_fluent"] task_defs = [task_def] @@ -133,14 +176,13 @@ def create_project(client, name, num_jobs=20, use_exec_script=False): log.debug(f"=== Create {num_jobs} jobs") jobs = [] for i in range(num_jobs): - jobs.append( - Job(name=f"Job.{i}", eval_status="pending", job_definition_id=job_def.id) - ) + jobs.append(Job(name=f"Job.{i}", eval_status="pending", job_definition_id=job_def.id)) jobs = project_api.create_jobs(jobs) log.info(f"Created project '{proj.name}', ID='{proj.id}'") return proj + if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-n", "--name", type=str, default="fluent_nozzle") diff --git a/tests/test_examples.py b/tests/test_examples.py index 384f3707e..f07ae8869 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -122,6 +122,40 @@ def test_fluent_2d_heat_exchanger(self): jms_api.delete_project(project) + def test_fluent_nozzle(self): + + from examples.fluent_nozzle.project_setup import create_project + + project = create_project( + self.client(), name="Fluent Nozzle Test", num_jobs=1, use_exec_script=True + ) + self.assertIsNotNone(project) + + jms_api = JmsApi(self.client()) + project_api = ProjectApi(self.client(), project.id) + + self.assertEqual(len(project_api.get_jobs()), 1) + self.assertEqual(jms_api.get_project(id=project.id).name, "Fluent Nozzle Test") + + jms_api.delete_project(project) + + def test_cfx_static_mixer(self): + + from examples.cfx_static_mixer.project_setup import create_project + + project = create_project( + self.client(), name="CFX Static Mixer Test", num_jobs=1, use_exec_script=True + ) + self.assertIsNotNone(project) + + jms_api = JmsApi(self.client()) + project_api = ProjectApi(self.client(), project.id) + + self.assertEqual(len(project_api.get_jobs()), 1) + self.assertEqual(jms_api.get_project(id=project.id).name, "CFX Static Mixer Test") + + jms_api.delete_project(project) + if __name__ == "__main__": unittest.main()