Skip to content

Commit

Permalink
Support on-demand debug for multigpu training
Browse files Browse the repository at this point in the history
For multigpu training, all the training workers are spawned subprocesses.
But pdb session cannot start in a subprocess because the lack of stdin.
So we change to lauch of ddp processes so that the main process will
be the rank-0 worker so that we can start a pdb session by using `kill -10`
  • Loading branch information
emailweixu committed May 17, 2024
1 parent f3978fa commit 27cb722
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 20 deletions.
29 changes: 22 additions & 7 deletions alf/bin/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,13 @@ def _train(root_dir, rank=0, world_size=1):
trainer.train()


def _training_worker_helper(rank: int, *args, **kwargs):
# Helper to start the training worker with the correct rank
# so that rank 0 is from the main process and the rest are
# from the spawned processes.
training_worker(rank + 1, *args, **kwargs)


def training_worker(rank: int,
world_size: int,
conf_file: str,
Expand All @@ -169,9 +176,10 @@ def training_worker(rank: int,
# Specialization for distributed mode
dist.init_process_group('nccl', rank=rank, world_size=world_size)
# Recover the flags when spawned as a sub process
_define_flags()
FLAGS(sys.argv, known_only=True)
FLAGS.mark_as_parsed()
if rank > 0:
_define_flags()
FLAGS(sys.argv, known_only=True)
FLAGS.mark_as_parsed()
# Set the rank and total number of processes for distributed training.
PerProcessContext().set_distributed(
rank=rank, num_processes=world_size)
Expand Down Expand Up @@ -235,12 +243,19 @@ def main(_):
# The other process will communicate with the authoritative
# process via network protocol on localhost:port.
os.environ['MASTER_PORT'] = str(port)
processes = mp.spawn(
training_worker,
# We spawn the processes for rank-1 and above and use the main
# process for rank-0 so that we can request debug session
# for the main process. We need to do this because the debug
# session cannot be started in a subprocess.
context = mp.spawn(
_training_worker_helper,
args=(world_size, conf_file, root_dir, paras_queue),
join=True,
nprocs=world_size,
join=False,
nprocs=world_size - 1,
start_method='spawn')
training_worker(0, world_size, conf_file, root_dir,
paras_queue)
context.join()
except KeyboardInterrupt:
pass
except Exception as e:
Expand Down
18 changes: 5 additions & 13 deletions alf/trainers/policy_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,10 @@ def train(self):
"Use `kill -%s %s` to request checkpoint during training." %
(int(signal.SIGUSR2), self._pid))

self._debug_requested = False
if threading.current_thread() == threading.main_thread():
if (threading.current_thread() == threading.main_thread()
and PerProcessContext().ddp_rank <= 0):
# Debugging in subprocesses is not supported because they don't have
# stdin.
# kill -10 PID
signal.signal(signal.SIGUSR1, self._request_debug)
logging.info("Use `kill -%s %s` to request debugging." % (int(
Expand Down Expand Up @@ -470,7 +472,7 @@ def _request_checkpoint(self, signum, frame):
self._checkpoint_requested = True

def _request_debug(self, signum, frame):
self._debug_requested = True
breakpoint()

def _save_checkpoint(self):
# Saving checkpoint is only enabled when running single process training
Expand Down Expand Up @@ -700,11 +702,6 @@ def _train(self):
self._save_checkpoint()
self._checkpoint_requested = False

if self._debug_requested:
self._debug_requested = False
import pdb
pdb.set_trace()

def _need_to_evaluate(self, iter_num):
if not self._evaluate:
return False
Expand Down Expand Up @@ -882,11 +879,6 @@ def _train(self):
self._save_checkpoint()
self._checkpoint_requested = False

if self._debug_requested:
self._debug_requested = False
import pdb
pdb.set_trace()

def _restore_checkpoint(self):
checkpointer = Checkpointer(
ckpt_dir=os.path.join(self._train_dir, 'algorithm'),
Expand Down

0 comments on commit 27cb722

Please sign in to comment.