import os import subprocess import joblib import parse import argparse from deephyper.evaluator.callback import TqdmCallback, SearchEarlyStopping from deephyper.problem import HpProblem from deephyper.search.hps import CBO from deephyper.evaluator import SubprocessEvaluator, queued #Global definitions for functions nodes_per_task = 1 n_ranks_per_node = 8 def run_mpi(config, dequed): job_id = config.get("job_id", "none") submit_nodes = ",".join(dequed) n_gpus = len(dequed)*n_ranks_per_node path = os.environ["PATH"] runner = f"mpirun -x LD_LIBRARY_PATH -x PATH -x PYTHONPATH -np {n_gpus} -npernode {n_ranks_per_node}".split() + [" --host ", submit_nodes, " --hostfile COBALT_NODEFILE"] + [" python learning_2.py"] print(f"{runner=}") with open(f"hyperparameter_train_{job_id}.out", "w") as out: result = subprocess.run(runner, stdout=out) with open(f"hyperparameter_train_{job_id}.out", "r") as out: res = parse.search("val_accuracy = {:f}", out.read())[0] return res def get_thetagpu_nodelist(): nodefile = os.environ["COBALT_NODEFILE"] with open(nodefile) as nodefile: lines = nodefile.readlines() nodelist = [line.rstrip() for line in lines] return nodelist if __name__ == "__main__": #Add HyperParameters problem = HpProblem() #problem.add_hyperparameter((8, 256, "log-uniform"), "batch_size", default_value=32) problem.add_hyperparameter((1e-4, 0.01, "log-uniform"), "learning_rate", default_value=0.001) #problem.add_hyperparameter((10, 100), "num_epochs", default_value=50) nodes = [f"{n}:{n_ranks_per_node}" for n in get_thetagpu_nodelist()] print("Nodes: ", nodes) evaluator = queued(SubprocessEvaluator)(run_mpi, num_workers=len(nodes)//nodes_per_task, queue=nodes, queue_pop_per_task=nodes_per_task, callbacks=[TqdmCallback(), SearchEarlyStopping()]) print("Evaluator queue: ", evaluator.queue) print("Evaluator queue_pop: ", evaluator.queue_pop_per_task) # print("Evaluator json: ", evaluator.to_json()) search = CBO(problem, evaluator) # print("Search json:", search.to_json()) results = search.search()