Skip to content

Commit

Permalink
Disable the 2 local size for inner outer and remove its static graphs
Browse files Browse the repository at this point in the history
  • Loading branch information
bichengying committed Nov 6, 2020
1 parent 0f23999 commit 87fe958
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 145 deletions.
178 changes: 44 additions & 134 deletions bluefog/common/topology_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,89 +303,6 @@ def FullyConnectedGraph(size: int) -> nx.DiGraph:
return G


def InnerOuterRingGraph(world_size: int, local_size: int) -> nx.DiGraph:
"""Generate Inner Ring and Outer Ring (Unilateral) Static Graph for dynamic usage.
Within one machine all inner rank/processes is fully connected and all
nodes with same local size across all machines is connected through another ring.
.. note::
Currently, our implementation has requirement that dyanmic graph has to be the
subgraph of static graph. Hence, the inner connection here is fully connected.
.. plot::
>>> import networkx as nx
>>> from bluefog.common import topology_util
>>> G = topology_util.InnerOuterRingGraph(12, 3)
>>> nx.draw_circular(G)
"""
# TODO(hhb) remove this statis topology requirement.
total_nodes = world_size
num_machines = world_size // local_size
nodes_per_machine = local_size
assert world_size % local_size == 0, "It should be used under homogeneous environment only."

topo = np.zeros([total_nodes, total_nodes])

for i in range(total_nodes):
for j in range(total_nodes):
machine_i, local_rank_i = i // nodes_per_machine, i % nodes_per_machine
machine_j, local_rank_j = j // nodes_per_machine, j % nodes_per_machine
if machine_i == machine_j:
topo[i, j] = 1
elif ((machine_i + 1) % num_machines) == machine_j:
topo[i, j] = 1 if local_rank_i == local_rank_j else 0
else:
topo[i, j] = 0

topo = topo / topo.sum(axis=1)
G = nx.from_numpy_array(topo, create_using=nx.DiGraph)
return G


def InnerOuterExpo2Graph(world_size: int, local_size: int) -> nx.DiGraph:
"""Generate Inner Ring and Outer Exponential-2 Graph.
Within one machine all inner rank/processes is fully-connected and all
nodes with same local size across all machines forms exponential-2 graph.
.. note::
Currently, our implementation has requirement that dyanmic graph has to be the
subgraph of static graph. Hence, the inner connection here is fully connected.
.. plot::
>>> import networkx as nx
>>> from bluefog.common import topology_util
>>> G = topology_util.InnerOuterExpo2Graph(12, 3)
>>> nx.draw_circular(G)
"""
# TODO(hhb) remove this statis topology requirement.
total_nodes = world_size
num_machines = world_size // local_size
nodes_per_machine = local_size
assert world_size % local_size == 0, "It should be used under homogeneous environment only."

topo = np.zeros([total_nodes, total_nodes])

for i in range(total_nodes):
for j in range(total_nodes):
machine_i, local_rank_i = i // nodes_per_machine, i % nodes_per_machine
machine_j, local_rank_j = j // nodes_per_machine, j % nodes_per_machine
machine_dist = (machine_j - machine_i) % num_machines
if machine_i == machine_j:
topo[i, j] = 1
elif isPowerOf(machine_dist, 2):
topo[i, j] = 1 if local_rank_i == local_rank_j else 0
else:
topo[i, j] = 0

topo = topo / topo.sum(axis=1)
G = nx.from_numpy_array(topo, create_using=nx.DiGraph)
return G


def IsRegularGraph(topo: nx.DiGraph) -> bool:
"""Dtermine a graph is regular or not, i.e. all nodes have the same degree."""
degree = topo.degree(0)
Expand Down Expand Up @@ -505,7 +422,8 @@ def GetInnerOuterRingDynamicSendRecvRanks(
num_machines = world_size//local_size
nodes_per_machine = local_size
assert world_size % local_size == 0, "It should be used under homogeneous environment only."
assert local_size > 1, "Use GetDynamicSendRecvRanks for Expo2 in 1 node per machine case."
assert local_size > 2, "Do no support the case where nodes_per_machine is equal or less than 2 " \
"Consider use hierarchical_neighbor_allreduce or simple GetDynamicSendRecvRanks instead."

index = 0
while True:
Expand All @@ -525,26 +443,21 @@ def GetInnerOuterRingDynamicSendRecvRanks(
recv_rank = source_rank_id

else:
if nodes_per_machine == 2:
yield [], []
index += 1
continue
else:
# find send_rank
target_local_rank_id = (local_rank_id + 1) % nodes_per_machine
if target_local_rank_id == local_rank_to_go_outside_id:
target_local_rank_id = (
target_local_rank_id + 1) % nodes_per_machine
target_rank_id = target_local_rank_id + machine_id * nodes_per_machine
send_rank = target_rank_id

# find recv_rank
source_local_rank_id = (local_rank_id - 1) % nodes_per_machine
if source_local_rank_id == local_rank_to_go_outside_id:
source_local_rank_id = (
source_local_rank_id - 1) % nodes_per_machine
source_rank_id = source_local_rank_id + machine_id * nodes_per_machine
recv_rank = source_rank_id
# find send_rank
target_local_rank_id = (local_rank_id + 1) % nodes_per_machine
if target_local_rank_id == local_rank_to_go_outside_id:
target_local_rank_id = (
target_local_rank_id + 1) % nodes_per_machine
target_rank_id = target_local_rank_id + machine_id * nodes_per_machine
send_rank = target_rank_id

# find recv_rank
source_local_rank_id = (local_rank_id - 1) % nodes_per_machine
if source_local_rank_id == local_rank_to_go_outside_id:
source_local_rank_id = (
source_local_rank_id - 1) % nodes_per_machine
source_rank_id = source_local_rank_id + machine_id * nodes_per_machine
recv_rank = source_rank_id

yield [send_rank], [recv_rank]
index += 1
Expand Down Expand Up @@ -576,7 +489,9 @@ def GetInnerOuterExpo2DynamicSendRecvRanks(
num_machines = world_size//local_size
nodes_per_machine = local_size
assert world_size % local_size == 0, "It should be used under homogeneous environment only."
assert local_size > 1, "Use GetDynamicSendRecvRanks for Expo2 in 1 node per machine case."
assert local_size > 2, "Do no support the case where nodes_per_machine is equal or less than 2 " \
"Consider use hierarchical_neighbor_allreduce or simple GetDynamicSendRecvRanks instead."

exp_2_out_size = int(np.log2(num_machines-1))
if nodes_per_machine == 2:
exp_2_in_size = 0
Expand Down Expand Up @@ -610,35 +525,30 @@ def GetInnerOuterExpo2DynamicSendRecvRanks(
recv_rank = source_rank_id

else:
if nodes_per_machine == 2:
yield [], []
index += 1
continue
else:
# Distance from self to out-rank:
dist_to_out = (local_rank_to_go_outside_id -
local_rank_id) % nodes_per_machine
next_inner_dist = 2**(index % (exp_2_in_size + 1))
if next_inner_dist >= dist_to_out:
next_inner_dist += 1

# find send_rank
target_local_rank_id = (local_rank_id +
next_inner_dist) % nodes_per_machine
target_rank_id = target_local_rank_id + machine_id * nodes_per_machine
send_rank = target_rank_id

reverse_inner_dist = 2**(index % (exp_2_in_size + 1))
reverse_dist_to_out = (
local_rank_id - local_rank_to_go_outside_id) % nodes_per_machine
if reverse_inner_dist >= reverse_dist_to_out:
reverse_inner_dist += 1

# find recv_rank
source_local_rank_id = (local_rank_id -
reverse_inner_dist) % nodes_per_machine
source_rank_id = source_local_rank_id + machine_id * nodes_per_machine
recv_rank = source_rank_id
# Distance from self to out-rank:
dist_to_out = (local_rank_to_go_outside_id -
local_rank_id) % nodes_per_machine
next_inner_dist = 2**(index % (exp_2_in_size + 1))
if next_inner_dist >= dist_to_out:
next_inner_dist += 1

# find send_rank
target_local_rank_id = (local_rank_id +
next_inner_dist) % nodes_per_machine
target_rank_id = target_local_rank_id + machine_id * nodes_per_machine
send_rank = target_rank_id

reverse_inner_dist = 2**(index % (exp_2_in_size + 1))
reverse_dist_to_out = (
local_rank_id - local_rank_to_go_outside_id) % nodes_per_machine
if reverse_inner_dist >= reverse_dist_to_out:
reverse_inner_dist += 1

# find recv_rank
source_local_rank_id = (local_rank_id -
reverse_inner_dist) % nodes_per_machine
source_rank_id = source_local_rank_id + machine_id * nodes_per_machine
recv_rank = source_rank_id

yield [send_rank], [recv_rank]
index += 1
5 changes: 4 additions & 1 deletion bluefog/torch/mpi_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,11 @@ def _neighbor_allreduce_nonblocking(tensor, output, self_weight, neighbor_weight
elif len(set(send_neighbors)) != len(send_neighbors):
raise ValueError("Argument send_neighbors should only contain the unique ranks.")
elif self_weight is None or neighbor_weights is None:
raise ValueError("Arguments self_weight and neighbor_weights should be presented if"
raise ValueError("Arguments self_weight and neighbor_weights should be presented if "
"enabling dynamic topology.")
elif not send_neighbors:
raise ValueError("Argument send_neighbors cannot be empty list but we plan to support "
"it in future.")
else:
dynamic_neighbors_enabled = True
if self_weight is None and neighbor_weights is None:
Expand Down
8 changes: 2 additions & 6 deletions examples/pytorch_average_consensus.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
help='the size of data.')
parser.add_argument('--max-iters', type=int, default=200,
help='maximum iterations')
parser.add_argument('--local-size', type=int, default=4,
help='number of nodes per machine')
parser.add_argument('--virtual-topology', type=str, default="expo2",
help='The underlying virtual topology. Supporting options are ' +
'[expo2(Default), ring, mesh, star, InnerOuterExpo2].')
Expand Down Expand Up @@ -71,8 +69,6 @@
bf.size(), connect_style=0), is_weighted=True)
elif args.virtual_topology == "star":
bf.set_topology(topology_util.StarGraph(bf.size()), is_weighted=True)
elif args.virtual_topology == "InnerOuterExpo2":
bf.set_topology(topology_util.InnerOuterExpo2Graph(bf.size(), local_size=args.local_size))
elif args.virtual_topology == "full":
bf.set_topology(topology_util.FullyConnectedGraph(bf.size()))
else:
Expand All @@ -88,9 +84,9 @@
send_neighbors = None

if args.enable_dynamic_topology:
if args.virtual_topology == "InnerOuterRing":
if args.virtual_topology == "InnerOuterExpo2":
dynamic_neighbor_allreduce_gen = topology_util.GetInnerOuterExpo2DynamicSendRecvRanks(
bf.size(), local_size=args.local_size, self_rank=bf.rank())
bf.size(), local_size=bf.local_size(), self_rank=bf.rank())
else:
dynamic_neighbor_allreduce_gen = topology_util.GetDynamicSendRecvRanks(
bf.load_topology(), bf.rank())
Expand Down
2 changes: 0 additions & 2 deletions examples/pytorch_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
help='model to benchmark')
parser.add_argument('--batch-size', type=int, default=32,
help='input batch size')
parser.add_argument('--local-size', type=int, default=-1,
help='number of nodes per machine')
parser.add_argument('--num-warmup-batches', type=int, default=10,
help='number of warm-up batches that don\'t count towards benchmark')
parser.add_argument('--num-batches-per-iter', type=int, default=10,
Expand Down
2 changes: 0 additions & 2 deletions examples/pytorch_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
parser.add_argument(
"--batch-size", type=int, default=64,
metavar="N", help="input batch size for training (default: 64)")
parser.add_argument('--local-size', type=int, default=-1,
help='number of nodes per machine. Only used in test.')
parser.add_argument(
"--test-batch-size", type=int, default=1000,
metavar="N", help="input batch size for testing (default: 1000)")
Expand Down

0 comments on commit 87fe958

Please sign in to comment.