Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying to get deephyper to work with a script that distributes training using horovod #146

Closed
athish-thiru opened this issue Jul 1, 2022 · 7 comments
Labels
question Further information is requested

Comments

@athish-thiru
Copy link

I've have been working on ThetaGPU using the 2021-11-30 version of Anaconda.

My task is to trying to find hyperparameters for a deep learning model built using tensorflow keras which distributes training across multiple GPUs using horovod. I have attached the training file which contains the run function that I have been using.

I have been running across the following error message while trying to execute this script.
INVALID_ARGUMENT: Requested to allreduce, allgather, or broadcast a tensor with the same name as another tensor that is currently being processed. If you want to request another tensor, use a different tensor name.

I know that this isn't an issue with horovod or the model itself because I was able to run the script before trying to integrate hyperparameter optimization using deephyper. I was hoping you would have some insight as to why the problem is arising and how to go about fixing it.

train.txt

@athish-thiru athish-thiru added the question Further information is requested label Jul 1, 2022
@Deathn0t
Copy link
Member

Deathn0t commented Jul 2, 2022

Hello @athish-thiru ,

Here is what has to happen: deephyper runs in 1 process with the search, this search process emits sub-processes to launch mpirun commands for horovod. Otherwise, tensorflow distributed can be used if you want to do everything in the same process (also possible).

A tutorial was recently published for this type of use case (i.e., when deephyper needs to execute MPI functions for the black-box). Click here is the check this tutorial

Basically for your use case you will edit the run_mpi function by editing your runner variable.

The nodes variables is also important to define the correct nodes on which will be executed the functions:

nodes_per_task = 2
n_ranks_per_node = 8
nodes = [f'{n}:{n_ranks_per_node}' for n in get_thetagpu_nodelist()]

Let me know if this helps and you manage to figure it out. If not I will try to provide more explicit examples for your usecase.

@athish-thiru
Copy link
Author

I've gone through the tutorial, now what I've done is essentially reconfigured the run function with the mpirun command that I had been using to run the model. I'm passing in the config dictionary as an argument which is read and used by the training script. I then parse through the output file to find the value accuracy which is then returned. I've attached the new script that I'm using to run deephyper.
deephyper_search.txt

I'm now running into an json decoder error with the following traceback:

File "/home/athish/gw_forecasting/transformer/src/deephyper_search.py", line 63, in
results = search.search(timeout=25*60)
File "/home/athish/.local/lib/python3.8/site-packages/deephyper/search/_search.py", line 131, in search
self._search(max_evals, timeout)
File "/home/athish/.local/lib/python3.8/site-packages/deephyper/search/hps/_cbo.py", line 221, in _search
new_results = self._evaluator.gather(self._gather_type, size=1)
File "/home/athish/.local/lib/python3.8/site-packages/deephyper/evaluator/_evaluator.py", line 264, in gather
job = task.result()
File "/home/athish/.local/lib/python3.8/site-packages/deephyper/evaluator/_evaluator.py", line 205, in _execute
job = await self.execute(job)
File "/home/athish/.local/lib/python3.8/site-packages/deephyper/evaluator/_queue.py", line 25, in execute
job = await evaluator_class.execute(self, job)
File "/home/athish/.local/lib/python3.8/site-packages/deephyper/evaluator/_subprocess.py", line 80, in execute
sol = json.loads(retval)
File "/lus/theta-fs0/software/thetagpu/conda/2021-11-30/mconda3/lib/python3.8/json/init.py", line 357, in loads
return _default_decoder.decode(s)
File "/lus/theta-fs0/software/thetagpu/conda/2021-11-30/mconda3/lib/python3.8/json/decoder.py", line 337, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/lus/theta-fs0/software/thetagpu/conda/2021-11-30/mconda3/lib/python3.8/json/decoder.py", line 355, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

I printed out the search json to see what could be causing the issue but I can't see any problematic initializations:
Search json: {'search': {'type': 'CBO', 'num_workers': 2, 'acq_func': 'UCB', 'acq_optimizer': 'auto', 'evaluator': {'type': 'QueuedSubprocessEvaluator'}, 'filter_duplicated': True, 'filter_failures': 'mean', 'initial_points': [], 'kappa': 1.96, 'log_dir': '/lus/swift/home/athish/gw_forecasting/transformer', 'multi_point_strategy': 'cl_min', 'n_initial_points': 10, 'n_jobs': 1, 'n_points': 10000, 'problem': 'HpProblem', 'random_state': 'RandomState', 'surrogate_model': 'RF', 'sync_communication': False, 'update_prior': False, 'verbose': 0, 'xi': 0.001}, 'calls': []}

I have very little experience dealing with jsons. So any help that could provide as to why this error may be arising and how to go about fixing would be very grateful!

@Deathn0t
Copy link
Member

Deathn0t commented Jul 9, 2022

Hello @athish-thiru ,

The results returned by parse.search is of type Result which is not a JSON acceptable type. You can use this line instead to retrieve the float value of val_accuracy with the [0] at the end:

res = parse.search("val_accuracy = {:f}", out.read())[0]

The .to_json() have nothing to do with the above problem. Also I advise doing to have easier to read file names:

job_id = config.get("job_id", "none")
with open(f"hyperparameter_train_{job_id}.out", "w") as out:
        result = subprocess.run(runner, stdout=out)

Here is the full simplified script (from yours) which runs for me locally on a single node

import os
import subprocess
import joblib
import parse
import argparse

from deephyper.evaluator.callback import TqdmCallback, SearchEarlyStopping
from deephyper.problem import HpProblem
from deephyper.search.hps import CBO
from deephyper.evaluator import SubprocessEvaluator, queued

#Global definitions for functions
nodes_per_task = 1
n_ranks_per_node = 8

def run_mpi(config, dequed=["localhost:1"]):
    job_id = config.get("job_id", "none")
    nodes = dequed
    runner = "mpirun -n 1 echo".split() + ["val_accuracy = 0.0"]
    print(f"{runner=}")

    with open(f"hyperparameter_train_{job_id}.out", "w") as out:
        result = subprocess.run(runner, stdout=out)

    with open(f"hyperparameter_train_{job_id}.out", "r") as out:
        res = parse.search("val_accuracy = {:f}", out.read())[0]

    return res

def get_thetagpu_nodelist():
    return [f"{i}" for i in range(10)]

if __name__ == "__main__":
    #Add HyperParameters
    problem = HpProblem()
    #problem.add_hyperparameter((8, 256, "log-uniform"), "batch_size", default_value=32)
    problem.add_hyperparameter((1e-4, 0.01, "log-uniform"), "learning_rate", default_value=0.001)
    #problem.add_hyperparameter((10, 100), "num_epochs", default_value=50)

    nodes = [f"{n}:{n_ranks_per_node}" for n in get_thetagpu_nodelist()]
    print("Nodes: ", nodes)

    evaluator = queued(SubprocessEvaluator)(run_mpi, num_workers=1, queue=nodes, queue_pop_per_task=nodes_per_task, callbacks=[TqdmCallback(), SearchEarlyStopping()])
    print("Evaluator queue: ", evaluator.queue)
    print("Evaluator queue_pop: ", evaluator.queue_pop_per_task)
    # print("Evaluator json: ", evaluator.to_json())

    search = CBO(problem, evaluator)
    # print("Search json:", search.to_json())

    results = search.search(max_evals=1) #timeout=25*60)

this creates:

  • hyperparameter_train_1.out containing val_accuracy = 0.0
  • context.yaml
  • results.csv

I hope it helps let me know if you are still having issues.

@athish-thiru
Copy link
Author

I modified the run function to run a simple python script. In this python script, I initialize horovod, print out the rank and size and then print out the value accuracy. I've attached the files I used here.

learning.txt
learning_2.txt

When I run this on a single gpu it works well enough but when I try and run this on multiple gpus I get the following error message.

[thetagpu01:1789064:0] mm_posix.c:206 UCX ERROR open(file_name=/proc/1789065/fd/64 flags=0x0) failed: No such file or directory
[thetagpu01:1789064:0] mm_ep.c:158 UCX ERROR mm ep failed to connect to remote FIFO id 0xc0000010001b4c89: Shared memory error

@Deathn0t
Copy link
Member

Deathn0t commented Jul 20, 2022

Hello, here is a running example on ThetaGPU

Here is what I did, connect to a single-gpu:

qsub-gpu -n 1 -A $PROJECT_NAME -q single-gpu -t 60 -I

Activate the environment:

source activate-dhenv.sh

Run the search:

python search.py

I hope this example will help you.

example-horovod.zip

@Deathn0t
Copy link
Member

@athish-thiru any news about this?

@Deathn0t
Copy link
Member

As there is no answer from this thread. I consider the issue as stall and close it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants