Skip to content

Commit

Permalink
Merge pull request #63 from Bluefog-Lib/atc
Browse files Browse the repository at this point in the history
Atc
  • Loading branch information
bichengying committed Dec 20, 2020
2 parents ce5df1a + a0496fc commit 9e4967e
Show file tree
Hide file tree
Showing 11 changed files with 763 additions and 192 deletions.
111 changes: 109 additions & 2 deletions bluefog/common/basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ class BlueFogBasics(object):
def __init__(self, pkg_path, *args):
full_path = util.get_extension_full_path(pkg_path, *args)
self._topology = None
self._machine_topology = None
self._MPI_LIB_CTYPES = ctypes.CDLL(full_path, mode=ctypes.RTLD_GLOBAL)
self._is_topo_weighted = False
self._is_machine_topo_weighted = False
self.warn_timeline = False

def init(self, topology_fn: Optional[Callable[[int], networkx.DiGraph]] = None,
Expand All @@ -67,7 +69,8 @@ def init(self, topology_fn: Optional[Callable[[int], networkx.DiGraph]] = None,
def shutdown(self) -> None:
"""A function that shuts BlueFog down."""
self._MPI_LIB_CTYPES.bluefog_shutdown()
self.topology = None
self._topology = None
self._machine_topology = None

def size(self) -> int:
"""A function that returns the number of BlueFog processes.
Expand Down Expand Up @@ -116,6 +119,28 @@ def local_rank(self) -> int:
raise ValueError("BlueFog has not been initialized; use bf.init().")
return local_rank

def machine_rank(self) -> int:
"""A function that returns the BlueFog rank of the machine.
Returns:
An integer scalar with the BlueFog rank of the machine.
"""
# TODO(hhb) This only supports the homogenous environment now. Currently it assumes all
# machines share the same local_size()
assert self.is_homogeneous(), "Only supports homogeneous environment now"
return self.rank()//self.local_size()

def machine_size(self) -> int:
"""A function that returns the BlueFog size of the machine.
Returns:
An integer scalar with the BlueFog size of the machine.
"""
# TODO(hhb) This only supports the homogenous environment now. Currently it assumes all
# machines share the same local_size()
assert self.is_homogeneous(), "Only supports homogeneous environment now"
return self.size()//self.local_size()

def unified_mpi_window_model_supported(self) -> bool:
"""Returns a boolean value to indicate the MPI_Win model is unified or not.
Unfornuately, it is a collective call. We have to create a fake win to get
Expand Down Expand Up @@ -149,6 +174,22 @@ def is_topo_weighted(self) -> bool:
"""
return self._is_topo_weighted

def is_machine_topo_weighted(self) -> bool:
"""A function that returns if the virtual machine topology weights are used
Returns:
A boolean value indicating if the machine topology weights are used.
"""
return self._is_machine_topo_weighted

def load_machine_topology(self) -> networkx.DiGraph:
"""A function that returns the virtual topology for the machine.
Returns:
machine_topology: networkx.DiGraph.
"""
return self._machine_topology

def load_topology(self) -> networkx.DiGraph:
"""A funnction that returns the virtual topology MPI used.
Expand All @@ -157,6 +198,21 @@ def load_topology(self) -> networkx.DiGraph:
"""
return self._topology

def in_neighbor_machine_ranks(self) -> List[int]:
"""Return the machine ranks of all in-neighbors.
Notice: No matter self-loop is presented or not, self machine rank will not be included.
Returns:
in_neighbor_machine_ranks
"""
if self._machine_topology is None:
return []
_machine_rank = self.machine_rank()
in_neighbor_machine_ranks = [r for r in
self._machine_topology.predecessors(self.machine_rank())
if r != _machine_rank]
return in_neighbor_machine_ranks

def in_neighbor_ranks(self) -> List[int]:
"""Return the ranks of all in-neighbors.
Notice: No matter self-loop is presented or not, self rank will not be included.
Expand All @@ -171,6 +227,21 @@ def in_neighbor_ranks(self) -> List[int]:
if r != _rank]
return in_neighbor_ranks

def out_neighbor_machine_ranks(self) -> List[int]:
"""Return the machine ranks of all out-neighbors.
Notice: No matter self-loop is presented or not, self machine rank will not be included.
Returns:
out_neighbor_machine_ranks
"""
if self._machine_topology is None:
return []
_machine_rank = self.machine_rank()
out_neighbor_machine_ranks = [r for r in
self._machine_topology.successors(self.machine_rank())
if r != _machine_rank]
return out_neighbor_machine_ranks

def out_neighbor_ranks(self) -> List[int]:
"""Return the ranks of all out-neighbors.
Notice: No matter self-loop is presented or not, self rank will not be included.
Expand All @@ -185,9 +256,45 @@ def out_neighbor_ranks(self) -> List[int]:
if r != _rank]
return out_neighbor_ranks

def set_machine_topology(self, topology: Optional[networkx.DiGraph],
is_weighted: bool = False) -> bool:
"""A function that sets the virtual machine topology.
Args:
Topo: A networkx.DiGraph object to decide the machine topology. It shall not be None.
is_weighted: If set to true, hierarchical_neighbor_allreduce will execute the
weighted average instead, where the weights are the value used in machine topology
matrix (including self weight).
Returns:
A boolean value that whether machine topology is set correctly or not.
Example:
>>> import bluefog.torch as bf
>>> from bluefog.common import topology_util
>>> bf.init()
>>> bf.set_machine_topology(topology_util.RingGraph(bf.size()))
"""
if topology is None:
raise ValueError("Machine topology shall not be None.")

if not isinstance(topology, networkx.DiGraph):
raise TypeError("Machine topology must be a networkx.DiGraph obejct.")

assert self.is_homogeneous(), "Only supports homogeneous environment now"

if topology_util.IsTopologyEquivalent(topology, self._machine_topology):
if self.local_rank() == 0:
logger.debug("Machine topology to set is the same as old one. Skip the setting.")
return True

self._machine_topology = topology
self._is_machine_topo_weighted = is_weighted
return True

def set_topology(self, topology: Optional[networkx.DiGraph] = None,
is_weighted: bool = False) -> bool:
"""A funnction that sets the virtual topology MPI used.
"""A function that sets the virtual topology MPI used.
Args:
Topo: A networkx.DiGraph object to decide the topology. If not provided
Expand Down
8 changes: 4 additions & 4 deletions bluefog/common/topology_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def IsRegularGraph(topo: nx.DiGraph) -> bool:
return True


def GetDynamicSendRecvRanks(
def GetDynamicOnePeerSendRecvRanks(
topo: nx.DiGraph, self_rank: int) -> Iterator[Tuple[List[int], List[int]]]:
"""A utility function to generate 1-outoging send rank and corresponding recieving rank(s).
Expand All @@ -327,7 +327,7 @@ def GetDynamicSendRecvRanks(
>>> from bluefog.common import topology_util
>>> topo = topology_util.PowerTwoRingGraph(10)
>>> gen = topology_util.GetDynamicSendRecvRanks(topo, 0)
>>> gen = topology_util.GetDynamicOnePeerSendRecvRanks(topo, 0)
>>> for _ in range(10):
>>> print(next(gen))
"""
Expand Down Expand Up @@ -423,7 +423,7 @@ def GetInnerOuterRingDynamicSendRecvRanks(
nodes_per_machine = local_size
assert world_size % local_size == 0, "It should be used under homogeneous environment only."
assert local_size > 2, "Do no support the case where nodes_per_machine is equal or " \
"less than 2. Consider use hierarchical_neighbor_allreduce or GetDynamicSendRecvRanks."
"less than 2. Consider use hierarchical_neighbor_allreduce or GetDynamicOnePeerSendRecvRanks."

index = 0
while True:
Expand Down Expand Up @@ -490,7 +490,7 @@ def GetInnerOuterExpo2DynamicSendRecvRanks(
nodes_per_machine = local_size
assert world_size % local_size == 0, "It should be used under homogeneous environment only."
assert local_size > 2, "Do no support the case where nodes_per_machine is equal or " \
"less than 2. Consider use hierarchical_neighbor_allreduce or GetDynamicSendRecvRanks."
"less than 2. Consider use hierarchical_neighbor_allreduce or GetDynamicOnePeerSendRecvRanks."

exp_2_out_size = int(np.log2(num_machines-1))
if nodes_per_machine == 2:
Expand Down
11 changes: 9 additions & 2 deletions bluefog/torch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,25 @@
import torch
from bluefog.common.util import check_extension
from bluefog.torch.optimizers import (
CommunicationType,
DistributedAdaptThenCombineOptimizer,
DistributedAdaptWithCombineOptimizer,
DistributedGradientAllreduceOptimizer,
DistributedWinPutOptimizer,
DistributedAllreduceOptimizer,
DistributedNeighborAllreduceOptimizer,
DistributedHierarchicalNeighborAllreduceOptimizer,
DistributedWinPutOptimizer)
DistributedHierarchicalNeighborAllreduceOptimizer
)

check_extension('bluefog.torch', __file__, 'mpi_lib')

from bluefog.torch.mpi_ops import init, shutdown
from bluefog.torch.mpi_ops import size, local_size, rank, local_rank
from bluefog.torch.mpi_ops import machine_size, machine_rank
from bluefog.torch.mpi_ops import load_topology, set_topology
from bluefog.torch.mpi_ops import load_machine_topology, set_machine_topology
from bluefog.torch.mpi_ops import in_neighbor_ranks, out_neighbor_ranks
from bluefog.torch.mpi_ops import in_neighbor_machine_ranks, out_neighbor_machine_ranks
from bluefog.torch.mpi_ops import mpi_threads_supported
from bluefog.torch.mpi_ops import unified_mpi_window_model_supported
from bluefog.torch.mpi_ops import nccl_built, is_homogeneous
Expand Down

0 comments on commit 9e4967e

Please sign in to comment.