Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/ybc1991/bluefog
Browse files Browse the repository at this point in the history
  • Loading branch information
Hanbin Hu committed Feb 17, 2021
2 parents a738b59 + 6fcb669 commit fb30a9c
Showing 1 changed file with 27 additions and 4 deletions.
31 changes: 27 additions & 4 deletions bluefog/run/interactive_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ def parse_args():
parser_start.add_argument('--ipython-profile', action="store", dest="profile",
type=str, default="bluefog",
help="The profile name for ipython environment.")
parser_stop.add_argument('--ipython-profile', action="store", dest="profile",
type=str, default="bluefog",
help="The profile name for ipython environment.")


parser_start.add_argument('--disable-heartbeat', action="store_true", dest="disable_heartbeat",
help='Disable the heartbeat checking service between ipcontroller and ipengines.')
group_hosts_parent = parser_start.add_argument_group('host arguments')
group_hosts = group_hosts_parent.add_mutually_exclusive_group()
group_hosts.add_argument('-H', '--hosts', action='store', dest='hosts',
Expand Down Expand Up @@ -94,6 +94,22 @@ def _get_ip_file_dir(profile):
profile=profile)
return os.path.expanduser(ip_file_dir)

def _disable_heart_beatcheck(profile):
config_file = os.path.join(_get_ip_file_dir(
profile), "..", "ipengine_config.py")
try:
with open(config_file, 'w') as f:
f.write("c.EngineFactory.max_heartbeat_misses = 0")
return True
except:
return False

def _delete_ipengine_config(profile):
config_file = os.path.join(_get_ip_file_dir(
profile), "..", "ipengine_config.py")
if os.path.exists(config_file):
os.remove(config_file)
print("removed ipengine_config file")

def _wait_engine_file_ready(profile, trial=10):
engine_file = os.path.join(_get_ip_file_dir(
Expand Down Expand Up @@ -195,6 +211,7 @@ def _maybe_kill_ipengine_processes(profile):
os.kill(pid, signal.SIGINT)
except:
pass
_delete_ipengine_config(profile)
_delete_ipengine_pid(profile)


Expand Down Expand Up @@ -256,6 +273,10 @@ def local_machine_launch(args, env: Dict[str, str]):
stderr=subprocess.STDOUT)
_wait_engine_file_ready(args.profile)
print("Starting the engines.")
if args.disable_heartbeat:
disabled = _disable_heart_beatcheck(args.profile)
print(f"Heartbeat Service Disabled: {disabled}")

p_engine = subprocess.Popen(ipengine_command, shell=True, env=env)
engine_pid_done = False
while not engine_pid_done:
Expand Down Expand Up @@ -315,6 +336,8 @@ def multiple_machines_launch(args, env: Dict[str, str],
subprocess.run('ipcluster nbextension enable --user',
shell=True, env=env)
print("Starting the controller.")
if args.disable_heartbeat:
_disable_heart_beatcheck(args.profile)
stdout = None if args.verbose else subprocess.PIPE
p_controller = subprocess.Popen(ipcontroller_command, shell=True, env=env, stdout=stdout,
stderr=subprocess.STDOUT)
Expand Down

0 comments on commit fb30a9c

Please sign in to comment.