Skip to content

Commit

Permalink
Merge pull request #69 from Bluefog-Lib/interactive_bluefog
Browse files Browse the repository at this point in the history
Interactive bluefog
  • Loading branch information
Bluefog-Lib committed Jan 18, 2021
2 parents c171454 + 6ca9963 commit 27dee39
Show file tree
Hide file tree
Showing 3 changed files with 1,785 additions and 46 deletions.
98 changes: 97 additions & 1 deletion bluefog/run/interactive_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
# ==============================================================================

import argparse
import ipyparallel as ipp
import json
import os
import multiprocessing
import signal
import subprocess
import time
Expand Down Expand Up @@ -136,6 +139,36 @@ def _get_ipcontroller_pid(profile):
return pid


# This function must be called after the ipengine is up.
# Note if the ipengine is on multiple machines, the pid
# is the id at the remote machines.
def _get_ipengine_pid(profile):
rc = ipp.Client(profile=profile)
engine_pids = rc[:].apply(os.getpid).get_dict()
return engine_pids


def _write_ipengine_pid(profile):
engine_pids = _get_ipengine_pid(profile)
path = _get_ip_file_dir(profile)
with open(os.path.join(path, "engine_pids.json"), 'w+') as f:
json.dump(engine_pids, f)


def _get_ipengine_pid_from_file(profile):
path = _get_ip_file_dir(profile)
with open(os.path.join(path, "engine_pids.json"), 'r') as f:
engine_pids = json.load(f)
return engine_pids


def _delete_ipengine_pid(profile):
path = _get_ip_file_dir(profile)
engine_pid_file = os.path.join(path, "engine_pids.json")
if os.path.exists(engine_pid_file):
os.remove(engine_pid_file)


def _maybe_kill_ipcontroller_process(profile):
"Try to kill the ipcontroller process through read the pid file."
"Return True if it found process and killed it successfully."
Expand All @@ -151,6 +184,58 @@ def _maybe_kill_ipcontroller_process(profile):
return False


def _maybe_kill_ipengine_processes(profile):
"Try to kill the ipengine processes through read the pid file."
"It only works for local machine case."
engine_pids = _get_ipengine_pid_from_file(profile)
if engine_pids is None:
return
for _, pid in engine_pids.items():
try:
os.kill(pid, signal.SIGINT)
except:
pass
_delete_ipengine_pid(profile)


def interrupt_hanged_processes(profile="bluefog"):
""" Send the interrupt signal to all hanged workers.
Args:
profile (str): The profile name for ipython environment, i.e.
the --ipython-profile you specified in `ibfrun`. By default,
this value should be 'bluefog'.
Note: This function is supported under localhost mode.
"""
engine_pids = _get_ipengine_pid_from_file(profile)
if engine_pids is None:
raise FileNotFoundError("Cannot find pids to interrupt the engines. Note this"
"function is supported under localhost mode only")
timeout = 0.2

def send_request_to_rc(i):
rc = ipp.Client(profile=profile)
rc[i].apply_sync(lambda: 0)

# Send an empty function to the workers. If it cannot be finished within the
# {timeout} second, we assume the worker is hanged then send the interrupt
# signal to it. If finished, do nothing.
p_list = []
for i in range(len(engine_pids)):
p = multiprocessing.Process(target=send_request_to_rc, args=(i,))
p.start()
p_list.append(p)
for i, p in enumerate(p_list):
p.join(timeout)
if p.exitcode is None:
try:
os.kill(engine_pids[str(i)], signal.SIGINT)
print(f"send signal to {engine_pids[i]}")
except:
pass


def local_machine_launch(args, env: Dict[str, str]):
ipcontroller_command = "ipcontroller --profile {profile}".format(
profile=args.profile)
Expand All @@ -172,6 +257,14 @@ def local_machine_launch(args, env: Dict[str, str]):
_wait_engine_file_ready(args.profile)
print("Starting the engines.")
p_engine = subprocess.Popen(ipengine_command, shell=True, env=env)
engine_pid_done = False
while not engine_pid_done:
try:
time.sleep(2)
_write_ipengine_pid(args.profile)
engine_pid_done = True
except ipp.NoEnginesRegistered as e:
pass
while not p_controller.poll() and not p_engine.poll():
time.sleep(600)

Expand Down Expand Up @@ -314,10 +407,13 @@ def handler(signum, frame):
multiple_machines_launch(args, env, all_host_names=all_host_names,
hosts_arg=hosts_arg,
remote_host_names=remote_host_names)
_maybe_kill_ipcontroller_process(args.profile)
except Exception as e:
print("Fail to launch ibfrun. Error: ", e)
finally:
_maybe_kill_ipcontroller_process(args.profile)
if not remote_host_names:
time.sleep(1.0)
_maybe_kill_ipengine_processes(args.profile)


if __name__ == "__main__":
Expand Down

0 comments on commit 27dee39

Please sign in to comment.