-
Notifications
You must be signed in to change notification settings - Fork 0
/
command_launchers.py
69 lines (60 loc) · 2.19 KB
/
command_launchers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
"""
A command launcher launches a list of commands on a cluster; implement your own
launcher to add support for your cluster. We've provided an example launcher
which runs all commands serially on the local machine.
"""
import subprocess
import time
import torch
import os
def local_launcher(commands):
"""Launch commands serially on the local machine."""
for cmd in commands:
subprocess.call(cmd, shell=True)
def dummy_launcher(commands):
"""
Doesn't run anything; instead, prints each command.
Useful for testing.
"""
for cmd in commands:
print(f'Dummy launcher: {cmd}')
def multi_gpu_launcher(commands):
"""
Launch commands on the local machine, using all GPUs in parallel.
"""
print('WARNING: using experimental multi_gpu_launcher.')
try:
# Get list of GPUs from env, split by ',' and remove empty string ''
# To handle the case when there is one extra comma: `CUDA_VISIBLE_DEVICES=0,1,2,3, python3 ...`
available_gpus = [x for x in os.environ['CUDA_VISIBLE_DEVICES'].split(',') if x != '']
except Exception:
# If the env variable is not set, we use all GPUs
available_gpus = [str(x) for x in range(torch.cuda.device_count())]
n_gpus = len(available_gpus)
procs_by_gpu = [None]*n_gpus
while len(commands) > 0:
for idx, gpu_idx in enumerate(available_gpus):
proc = procs_by_gpu[idx]
if (proc is None) or (proc.poll() is not None):
# Nothing is running on this GPU; launch a command.
cmd = commands.pop(0)
new_proc = subprocess.Popen(
f'CUDA_VISIBLE_DEVICES={gpu_idx} {cmd}', shell=True)
procs_by_gpu[idx] = new_proc
break
time.sleep(1)
# Wait for the last few tasks to finish before returning
for p in procs_by_gpu:
if p is not None:
p.wait()
REGISTRY = {
'local': local_launcher,
'dummy': dummy_launcher,
'multi_gpu': multi_gpu_launcher
}
try:
from domainbed import facebook
facebook.register_command_launchers(REGISTRY)
except ImportError:
pass