In [None]:
import ax
import os
import time 
from typing import Any, Mapping
import subprocess


import numpy as np 
from ax.api.client import Client
from ax.api.configs import RangeParameterConfig
from ax.api.protocols.metric import IMetric
from ax.api.protocols.runner import IRunner, TrialStatus
from ax.api.types import TParameterization 
import json
from ax import Client, RangeParameterConfig, ChoiceParameterConfig


<font size= 6> Main Function

In [None]:
def blackbox_function(x1,x2, x3, x4, x5, x6): 
    #This is where the experiment task happens
    #delegate the task of printing to the external system (computer and the AJP)
    #return the set of values (for now, line width) to the system

<FONT SIZE = 6> Interaction (Deploying and Fetching) with an external system 

In [None]:
INPUT_DIR = "C://AJP/input_parameters"
OUTPUT_DIR = "C://AJP/output_parameters"
POWER_AUTOMATE_FLOW_ID = "AUTOMATE_FLOW_ID"


class Runner(IRunner):

    #run_trial deploys a trial to the external system with the given parameters. 
    # Maybe my deploying would be to give a save a JSON file in a particular location or pass it on to an agent  in the workflow

    def run_trial(
        self, trial_index: int, parameterization: TParameterization ) -> dict[str, Any]:
        
        #create directories for input nd output values 
        os.makedirs(INPUT_DIR, exist_ok= True)
        os.makedirs(OUTPUT_DIR, exist_ok = True)

        input_file_path = os.path.join(INPUT_DIR, f"params_{trial_index}.json")
        with open(input_file_path, "w") as f: 
            json.dump(parameterization, f, indent=4)

        output_file_path = os.path.join(OUTPUT_DIR, f"results_{trial_index}.json")
       
        command = [
            "C:\\Program Files (x86)\\Power Automate Desktop\\PAD.Console.Host.exe", # Default path, verify on your system
            "run",
            "--flowId", POWER_AUTOMATE_FLOW_ID,
            "--input", f"InputFilePath={input_file_path}"
        ]
        print(f"Launching Power Automate flow for trial{trial_index}...")


        process = subprocess.Popen(command, #commands to execute
                                   stdout=subprocess.PIPE, #standard output of the external command will be captured by a pipe
                                   stderr= subprocess.PIPE, #standard error output of the external command will be captured by the pipe
                                   text= True
            
        )

        print(f"Power Automate flow for trial {trial_index} launched with PID:{process.pid}")

        return_code = process.wait()

        if return_code !=0: 
            print(f"Power Automte flow for trial{trial_index} exited with non-zero code :{return_code}")
        else: 
            print(f"Power Automate flow for trial {trial_index} completed normally.")


        return {
            "trial_index": trial_index, 
            "input_file_path": input_file_path, 
            "output_file_path": output_file_path
        }
         
    #poll_trial queries the external system to see if the trial has completed, failed, or if it's still runing. There is room to explore how this can be
    # done or if this is needed at all. 
    
    def poll_trial(self, trial_index :int, trial_metadata: Mapping[str, Any]) -> TrialStatus: 

        output_file_path = trial_metadata["output_file_path"]
        if not os.path.exists(output_file_path): 
            print(f"Error: Output file not found for trial{trial_index} after P flow completed")
            return TrialStatus.FAILED
        
        try: 
            with open(output_file_path, "r") as f: 
                results_data = json.load(f)
        
            status_from_file = results_data.get("trial_status")

            if status_from_file =="COMPLETED": 
                return TrialStatus.COMPLETED
            elif status_from_file == "FAILED": 
                return TrialStatus.FAILED
            else: 
                #does not return a status in this case 
                print(f"Warning: Unexpectec status '{status_from_file}' in output file for trial {trial_index}.")

        except json.JSONDecodeError: 
            print (f"Error: Invlid JSON in {output_file_path} for trial {trial_index}.")
            return TrialStatus.FAILED
        
        # general error handler, designed to handle any type of error that mihgt occus within the try block, that wasn't specifically caught
        # e is a variable that holds the exception object tht was raiseed       
        except Exception as e: 
            print(f"Error accessing output file for {trial_index}: {e}")
            return TrialStatus.FAILED

In [None]:
class Metric(IMetric): 
    def fetch(
        self, 
        trial_index: int, 
        trial_metadata: Mapping[str, Any], ) -> tuple[int, float | tuple[float, float] ]:

        output_file_path = trial_metadata["output_file_path"]

        try: 
            with open (output_file_path,  'r') as file: 
                results_data  = json.load(file)
            value =results_data.get("line_width")
            return (trial_index, value)
        except Exception as e: 
            print("")

In [None]:
client = Client()

carrier_gas_flow = RangeParameterConfig(name="layer_height", parameter_type="float", bounds=(1, 10))
sheath_gas_flow = RangeParameterConfig(name="sheath_gas_flow", parameter_type="str", bounds = (50, 200))
number_of_loops = RangeParameterConfig(name = "number_of_loops", parameter_type ="int", bounds=(1,5) )
speed = RangeParameterConfig(name ="speed", parameter_type = "float", bound = (50, 300))

parameters = [sheath_gas_flow, carrier_gas_flow, speed, number_of_loops]

client.configure_experiment(parameters=parameters, 
                            name = "AJP_Optimization")
client.configure_optimization(objective= "-line_width", outcome_constraints=["line_width >=200"])

<font size =6> Optimization Loop

In [None]:
for _ in range(10): # Run 10 rounds of trials
    
    
    # We will request three trials at a time in this example
    trials = client.get_next_trials(max_trials= 3) 

    
    for trial_index, parameters in trials.items():
        x1 = parameters["x1"]
        x2 = parameters["x2"]
        x3 = parameters["x3"]
        x4 = parameters["x4"]
        x5 = parameters["x5"]
        x6 = parameters["x6"]


        #This is where the main experimentation part happens: Where I delegate the task to the outer system
        result = hartmann6(x1, x2, x3, x4, x5, x6)

        # Set raw_data as a dictionary with metric names as keys and results as values
        raw_data = {metric_name: result}


        
        # Complete the trial with the result 
        
        #[client.complete_trial(parameters, result]
        #This is kept Ax in the database to generate data for the nextstep        
        client.complete_trial(trial_index=trial_index, raw_data=raw_data)

        print(f"Completed trial {trial_index} with {raw_data=}")

In [None]:
runner = Runner()
line_width_metric = Metric(name = "line_width")
client.configure_runner(runner=runner) 
client.configure_metrics(metrics=[line_width_metric]) 


