Skip to content

Commit

Permalink
Update the arg for dist optimizer choosing
Browse files Browse the repository at this point in the history
  • Loading branch information
bichengying committed May 31, 2020
1 parent da31272 commit 6358ba9
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 98 deletions.
47 changes: 23 additions & 24 deletions examples/pytorch_benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,18 @@
help='disables profiler')
parser.add_argument('--partition', type=int, default=None,
help='partition size')
parser.add_argument("--no-bluefog", action="store_true",
default=False, help="disables bluefog library")
parser.add_argument("--no-rma", action="store_true",
default=False, help="Do no use remote memory access(no window ops).")
parser.add_argument("--enable-dynamic-topology", action="store_true",
default=False, help=("Enable each iteration to transmit one neighbor " +
"per iteration dynamically."))
parser.add_argument('--dist-optimizer', type=str, default='win_put',
help='The type of distributed optimizer. Supporting options are '+
'[win_put, neighbor_allreduce, allreduce, push_sum, horovod]')
parser.add_argument('--enable-dynamic-topology', action='store_true',
default=False, help=('Enable each iteration to transmit one neighbor ' +
'per iteration dynamically.'))


args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
args.bluefog = not args.no_bluefog

if not args.bluefog:
if args.dist_optimizer == 'horovod':
print("importing horovod")
import horovod.torch as bf

Expand Down Expand Up @@ -106,23 +104,24 @@ def forward(self, x):
optimizer = optim.SGD(model.parameters(), lr=0.01)

# Bluefog: wrap optimizer with DistributedOptimizer.
if args.bluefog:
if args.no_rma:
print("Use neighbor collective")
# This distributed optimizer uses neighbor communication.
optimizer = bf.DistributedNeighborAllreduceOptimizer(
optimizer, model=model
)
else:
# This distributed optimizer uses one-sided communication
print("Use win_put ops.")
optimizer = bf.DistributedBluefogOptimizer(
optimizer, model=model
)
else:
optimizer = bf.DistributedOptimizer(
if args.dist_optimizer == 'win_put':
optimizer = bf.DistributedBluefogOptimizer(optimizer, model=model)
elif args.dist_optimizer == 'neighbor_allreduce':
optimizer = optimizer = bf.DistributedNeighborAllreduceOptimizer(
optimizer, model=model)
elif args.dist_optimizer == 'allreduce':
optimizer = optimizer = bf.DistributedAllreduceOptimizer(
optimizer, model=model)
elif args.dist_optimizer == 'push_sum':
optimizer = bf.DistributedPushSumOptimizer(optimizer, model=model)
elif args.dist_optimizer == 'horovod':
optimizer = optimizer = bf.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters()
)
else:
raise ValueError('Unknown args.dist-optimizer type -- ' + args.dist_optimizer + '\n' +
'Please set the argument to be one of ' +
'[win_put, neighbor_allreduce, allreduce, push_sum, horovod]')

bf.broadcast_parameters(model.state_dict(), root_rank=0)
bf.broadcast_optimizer_state(optimizer, root_rank=0)
Expand Down
61 changes: 24 additions & 37 deletions examples/pytorch_cifar10_resnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from __future__ import print_function

from bluefog.common import topology_util
import bluefog.torch as bf
import argparse
import os
import sys
Expand Down Expand Up @@ -86,10 +88,9 @@
"--no-cuda", action="store_true", default=False, help="disables CUDA training"
)
parser.add_argument("--seed", type=int, default=42, help="random seed")
parser.add_argument("--no-bluefog", action="store_true",
default=False, help="disables bluefog library")
parser.add_argument("--no-rma", action="store_true",
default=False, help="Do no use remote memory access(no window ops).")
parser.add_argument('--dist-optimizer', type=str, default='win_put',
help='The type of distributed optimizer. Supporting options are ' +
'[win_put, neighbor_allreduce, allreduce, push_sum, horovod]')
parser.add_argument("--average-test-result", action="store_true",
default=False,
help=("Allreduce called to average test result. Warning this will " +
Expand All @@ -100,24 +101,15 @@

args = parser.parse_args()
args.cuda = (not args.no_cuda) and (torch.cuda.is_available())
args.bluefog = not args.no_bluefog

allreduce_batch_size = args.batch_size * args.batches_per_allreduce

# Bluefog: initialize library.
if args.bluefog:
print("importing bluefog")
import bluefog.torch as bf
from bluefog.common import topology_util
else:

if args.dist_optimizer == 'horovod':
print("importing horovod")
import horovod.torch as bf

# Bluefog: initialize library.
bf.init()

if args.bluefog:
bf.set_topology(topology=topology_util.PowerTwoRingGraph(bf.size()))

torch.manual_seed(args.seed)

if args.cuda:
Expand Down Expand Up @@ -214,29 +206,24 @@
)

# Bluefog: wrap optimizer with DistributedOptimizer.
if args.bluefog:
if args.no_rma:
print("Use neighbor collective")
# This distributed optimizer uses neighbor communication.
optimizer = bf.DistributedNeighborAllreduceOptimizer(
optimizer, model=model
)
if os.environ.get("BLUEFOG_TIMELINE"):
print("Timeline for optimizer is enabled")
optimizer.turn_on_timeline()
else:
# This distributed optimizer uses one-sided communication
print("Use win_put ops.")
optimizer = bf.DistributedBluefogOptimizer(
optimizer, model=model
)
if os.environ.get("BLUEFOG_TIMELINE"):
print("Timeline for optimizer is enabled")
optimizer.turn_on_timeline()
else:
optimizer = bf.DistributedOptimizer(
if args.dist_optimizer == 'win_put':
optimizer = bf.DistributedBluefogOptimizer(optimizer, model=model)
elif args.dist_optimizer == 'neighbor_allreduce':
optimizer = optimizer = bf.DistributedNeighborAllreduceOptimizer(
optimizer, model=model)
elif args.dist_optimizer == 'allreduce':
optimizer = optimizer = bf.DistributedAllreduceOptimizer(
optimizer, model=model)
elif args.dist_optimizer == 'push_sum':
optimizer = bf.DistributedPushSumOptimizer(optimizer, model=model)
elif args.dist_optimizer == 'horovod':
optimizer = optimizer = bf.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters()
)
else:
raise ValueError('Unknown args.dist-optimizer type -- ' + args.dist_optimizer + '\n' +
'Please set the argument to be one of ' +
'[win_put, neighbor_allreduce, allreduce, push_sum, horovod]')

print("resume_from_epoch: ", resume_from_epoch)
# Restore from a previous checkpoint, if initial_epoch is specified.
Expand Down
60 changes: 23 additions & 37 deletions examples/pytorch_mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

from __future__ import print_function

from bluefog.common import topology_util
import bluefog.torch as bf
import argparse
import os
import sys
Expand Down Expand Up @@ -67,10 +69,9 @@
parser.add_argument(
"--no-cuda", action="store_true", default=False, help="disables CUDA training"
)
parser.add_argument("--no-bluefog", action="store_true",
default=False, help="disables bluefog library. Use horovod instead.")
parser.add_argument("--no-rma", action="store_true",
default=False, help="Do no use remote memory access(no window ops).")
parser.add_argument('--dist-optimizer', type=str, default='win_put',
help='The type of distributed optimizer. Supporting options are '+
'[win_put, neighbor_allreduce, allreduce, push_sum, horovod]')
parser.add_argument("--average-test-result", action="store_true",
default=False,
help=("Allreduce called to average test result. Warning this will " +
Expand All @@ -92,23 +93,14 @@

args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
args.bluefog = not args.no_bluefog
torch.manual_seed(args.seed)

# Bluefog: initialize library.
if args.bluefog:
print("importing bluefog")
import bluefog.torch as bf
from bluefog.common import topology_util
else:
if args.dist_optimizer == 'horovod':
print("importing horovod")
import horovod.torch as bf

bf.init()

if args.bluefog:
bf.set_topology(topology=topology_util.RingGraph(bf.size()))

torch.manual_seed(args.seed)

if args.cuda:
# Bluefog: pin GPU to local rank.
Expand Down Expand Up @@ -189,30 +181,24 @@ def forward(self, x):
bf.broadcast_optimizer_state(optimizer, root_rank=0)

# Bluefog: wrap optimizer with DistributedOptimizer.
if args.bluefog:
# optimizer = bf.DistributedAllreduceOptimizer(
# optimizer, named_parameters=model.named_parameters()
# )
if args.no_rma:
print("Use neighbor collective")
optimizer = bf.DistributedNeighborAllreduceOptimizer(
optimizer, model=model
)
if os.environ.get("BLUEFOG_TIMELINE"):
print("Timeline for optimizer is enabled")
optimizer.turn_on_timeline()
else:
print("Use win_put ops.")
optimizer = bf.DistributedBluefogOptimizer(
optimizer, model=model
)
if os.environ.get("BLUEFOG_TIMELINE"):
print("Timeline for optimizer is enabled")
optimizer.turn_on_timeline()
else:
optimizer = bf.DistributedOptimizer(
if args.dist_optimizer == 'win_put':
optimizer = bf.DistributedBluefogOptimizer(optimizer, model=model)
elif args.dist_optimizer == 'neighbor_allreduce':
optimizer = optimizer = bf.DistributedNeighborAllreduceOptimizer(
optimizer, model=model)
elif args.dist_optimizer == 'allreduce':
optimizer = optimizer = bf.DistributedAllreduceOptimizer(
optimizer, model=model)
elif args.dist_optimizer == 'push_sum':
optimizer = bf.DistributedPushSumOptimizer(optimizer, model=model)
elif args.dist_optimizer == 'horovod':
optimizer = optimizer = bf.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters()
)
else:
raise ValueError('Unknown args.dist-optimizer type -- ' + args.dist_optimizer + '\n' +
'Please set the argument to be one of ' +
'[win_put, neighbor_allreduce, allreduce, push_sum, horovod]')


def dynamic_topology_update(epoch, batch_idx):
Expand Down

0 comments on commit 6358ba9

Please sign in to comment.