Skip to content

Commit

Permalink
Merge pull request #68 from Bluefog-Lib/interactive_bluefog
Browse files Browse the repository at this point in the history
Interactive bluefog
  • Loading branch information
bichengying committed Jan 3, 2021
2 parents 1779321 + b29b8a4 commit 1c2b675
Show file tree
Hide file tree
Showing 16 changed files with 1,273 additions and 105 deletions.
20 changes: 20 additions & 0 deletions bluefog/common/basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,3 +493,23 @@ def timeline_context(self, tensor_name: str, activity_name: str):
yield
finally:
self.timeline_end_activity(tensor_name)

def suspend(self):
"""Suspend the background thread of BlueFog.
It should be used under interactive python environment only.
"""
if not util.is_running_from_ipython:
raise EnvironmentError(
"This function should be used only when you are under ipython environment.")
self._MPI_LIB_CTYPES.bluefog_suspend()

def resume(self):
"""Resume the background thread of BlueFog.
It should be used under interactive python environment only.
"""
if not util.is_running_from_ipython:
raise EnvironmentError(
"This function should be used only when you are under ipython environment.")
self._MPI_LIB_CTYPES.bluefog_resume()
4 changes: 4 additions & 0 deletions bluefog/common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ const Status SHUT_DOWN_ERROR = Status::UnknownError(
"an exception, you should see the exception in the log before the first "
"shutdown message.");

const Status SUSPEND_ERROR = Status::PreconditionError(
"Bluefog background thread is suspended. use bf.resume() first."
);

const Status DUPLICATE_NAME_ERROR = Status::InvalidArgument(
"Requested to collective operation like (allreduce, neighbor_allreduce) or "
"window ops like (win_put, win_create) a tensor with the same "
Expand Down
85 changes: 85 additions & 0 deletions bluefog/common/operations.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ NCCLContext nccl_context;
// If set, win_ops will execute the same ops on associated p as well.
static bool global_with_associated_p_state = false;
static bool global_skip_negotiate_stage = false;
static bool global_background_thread_suspend = false;

const auto SUSPEND_BACKGROUND_WAITTING_DURATION = std::chrono::microseconds(10);

Expand Down Expand Up @@ -1105,6 +1106,10 @@ bool RunLoopOnce(BluefogGlobalState& state) {
std::this_thread::sleep_for(sleep_duration);
}
state.last_cycle_start = std::chrono::steady_clock::now();
if (global_background_thread_suspend) {
BFLOG(ERROR) << "Suspending";
std::this_thread::sleep_for(std::chrono::seconds(3));
}

std::deque<Request> message_queue_buffer;
state.tensor_queue.PopMessagesFromQueue(message_queue_buffer);
Expand Down Expand Up @@ -1145,6 +1150,7 @@ bool RunLoopOnce(BluefogGlobalState& state) {
should_shut_down);
}
// Seperate the setting topology and negotiate communnication.
// TODO(ybc) Use conditional variable and mutex to re-implement this.
if (should_change_topo) {
bluefog_global.ready_to_setting_topology = true;
while (!bluefog_global.setting_topology_done) {
Expand Down Expand Up @@ -1379,6 +1385,16 @@ int bluefog_get_skip_negotiate_stage() {
return GetSkipNegotiateStageState();
}

int bluefog_suspend() {
global_background_thread_suspend = true;
return 1;
}

int bluefog_resume() {
global_background_thread_suspend = false;
return 1;
}

} // extern "C"

Status EnqueueTensorAllreduce(std::shared_ptr<Tensor> tensor,
Expand Down Expand Up @@ -1413,6 +1429,9 @@ Status EnqueueTensorAllreduce(std::shared_ptr<Tensor> tensor,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
return status;
}
Expand Down Expand Up @@ -1445,6 +1464,9 @@ Status EnqueueTensorBroadcast(std::shared_ptr<Tensor> tensor,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
return status;
}
Expand Down Expand Up @@ -1476,6 +1498,9 @@ Status EnqueueTensorAllgather(std::shared_ptr<Tensor> tensor,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
return status;
}
Expand Down Expand Up @@ -1507,6 +1532,9 @@ Status EnqueueTensorNeighborAllgather(std::shared_ptr<Tensor> tensor,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
return status;
}
Expand Down Expand Up @@ -1551,6 +1579,9 @@ Status EnqueueTensorNeighborAllreduce(std::shared_ptr<Tensor> tensor,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
return status;
}
Expand Down Expand Up @@ -1588,6 +1619,9 @@ Status EnqueueTensorPairGossip(std::shared_ptr<Tensor> tensor,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
return status;
}
Expand Down Expand Up @@ -1617,6 +1651,9 @@ Status EnqueueTensorWindowCreate(
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
return status;
}
Expand All @@ -1638,6 +1675,9 @@ Status EnqueueTensorWindowFree(const std::string& name, int device,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
return status;
}
Expand Down Expand Up @@ -1668,6 +1708,9 @@ Status EnqueueTensorWindowPut(std::shared_ptr<Tensor> tensor,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
return status;
}
Expand Down Expand Up @@ -1696,6 +1739,9 @@ Status EnqueueTensorWindowAccumulate(
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
return status;
}
Expand All @@ -1720,6 +1766,9 @@ Status EnqueueTensorWindowGet(const std::string& name,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message);
return status;
}
Expand All @@ -1733,6 +1782,9 @@ Status ExecuteBarrier(StatusCallback callback) {
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
bluefog_global.controller->Barrier(e);
return Status::OK();
}
Expand All @@ -1741,6 +1793,9 @@ Status WindowSync(const std::string& name, int device) {
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Vendor vendor = DetermineController(MPIOpsType::WIN_SYNC, device);
Status status;
#if HAVE_NCCL
Expand All @@ -1767,6 +1822,9 @@ Status WindowMutexAcquire(const std::string& name,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
// Because mutex is always associated with each win_create ops, it is safe to
// use the vendor of win_create for window mutex.
Vendor vendor = DetermineController(MPIOpsType::WIN_CREATE, device);
Expand Down Expand Up @@ -1795,6 +1853,9 @@ Status WindowMutexRelease(const std::string& name,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
// Because mutex is always associated with each win_create ops, it is safe to
// use the vendor of win_create for window mutex.
Vendor vendor = DetermineController(MPIOpsType::WIN_CREATE, device);
Expand Down Expand Up @@ -1822,6 +1883,9 @@ Status GetBluefogTimeline(Timeline*& timeline) {
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
if (!bluefog_global.timeline_enabled) {
return Status::Aborted("timeline is not enabled.");
}
Expand All @@ -1833,6 +1897,9 @@ Status GetBluefogFusionBuffer(FusionBufferManager*& fusion_buffer) {
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
return Status::OK();
}

Expand All @@ -1841,6 +1908,9 @@ Status WindowFence(const std::string& name) {
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.controller->WinFence(name);

if (!status.ok()) {
Expand All @@ -1854,6 +1924,9 @@ Status WindowLock(const std::string& name) {
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.controller->WinLock(name);

if (!status.ok()) {
Expand All @@ -1867,6 +1940,9 @@ Status WindowUnlock(const std::string& name) {
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
Status status = bluefog_global.controller->WinUnlock(name);

if (!status.ok()) {
Expand All @@ -1880,6 +1956,9 @@ Status GetWindowVersion(const std::string& name, std::vector<int>& versions) {
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}

Status status =
bluefog_global.controller->GetWindowVersionValue(name, versions);
Expand All @@ -1897,6 +1976,9 @@ Status GetWinAssociatedPByNameAndRank(const std::string& name,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
return bluefog_global.controller->GetWinAssociatedPByNameAndRank(
name, rank, weight);
}
Expand All @@ -1906,6 +1988,9 @@ Status SetWinAssociatedPByNameAndRank(const std::string& name,
if (bluefog_global.shut_down) {
return SHUT_DOWN_ERROR;
}
if (global_background_thread_suspend) {
return SUSPEND_ERROR;
}
return bluefog_global.controller->SetWinAssociatedPByNameAndRank(
name, rank, weight);
}
Expand Down
4 changes: 4 additions & 0 deletions bluefog/common/operations.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ void bluefog_set_skip_negotiate_stage(bool value);

int bluefog_get_skip_negotiate_stage();

int bluefog_suspend();

int bluefog_resume();

}

Status EnqueueTensorAllreduce(std::shared_ptr<Tensor> tensor,
Expand Down
3 changes: 3 additions & 0 deletions bluefog/common/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@

EXTENSIONS = ['tensorflow', 'torch']

def is_running_from_ipython():
from IPython import get_ipython
return get_ipython() is not None

def get_ext_suffix():
"""Determine library extension for various versions of Python."""
Expand Down
52 changes: 51 additions & 1 deletion bluefog/run/env_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,61 @@
# ==============================================================================

import re

import shlex
import subprocess
import sys
import traceback

# List of regular expressions to ignore environment variables by.
IGNORE_REGEXES = {'BASH_FUNC_.*\(\)', 'OLDPWD'}

BLUEFOG_TIMELINE = 'BLUEFOG_TIMELINE'
BLUEFOG_LOG_LEVEL = 'BLUEFOG_LOG_LEVEL'


def is_exportable(v):
return not any(re.match(r, v) for r in IGNORE_REGEXES)


def is_open_mpi_installed():
command = 'mpirun --version'
try:
output_msg = str(subprocess.check_output(
shlex.split(command), universal_newlines=True))
except Exception: # pylint disable=broad-except
print("Was not able to run %s:\n%s" % (command, output_msg),
file=sys.stderr)
print(traceback.format_exc(), file=sys.stderr)
return False

if 'Open MPI' not in output_msg:
print('Open MPI not found in output of mpirun --version.',
file=sys.stderr)
return False
return True


def is_ipyparallel_installed():
try:
import ipyparallel # pylint: disable=unused-import
return True
except ImportError:
return False


def _add_arg_to_env(env, env_key, arg_value, transform_fn=None):
if arg_value is not None:
value = arg_value
if transform_fn:
value = transform_fn(value)
env[env_key] = str(value)

def set_env_from_args(env, args):
# Timeline
if args.timeline_filename:
_add_arg_to_env(env, BLUEFOG_TIMELINE, args.timeline_filename)

if args.verbose:
_add_arg_to_env(env, BLUEFOG_LOG_LEVEL, "debug")

return env

0 comments on commit 1c2b675

Please sign in to comment.