From b825042a29d75fe58e30636c738113fabffe41bd Mon Sep 17 00:00:00 2001 From: jiangyimin Date: Wed, 10 Jul 2019 17:56:31 +0800 Subject: [PATCH] examples: some cleaning --- example/keras/keras_imagenet_resnet50.py | 6 ++--- example/pytorch/benchmark_byteps.py | 9 +++----- example/pytorch/microbenchmark-byteps.py | 2 -- .../pytorch/train_imagenet_resnet50_byteps.py | 22 +++++++++---------- example/pytorch/train_mnist_byteps.py | 6 ++--- example/tensorflow/synthetic_benchmark.py | 6 ++--- 6 files changed, 23 insertions(+), 28 deletions(-) diff --git a/example/keras/keras_imagenet_resnet50.py b/example/keras/keras_imagenet_resnet50.py index 4c28b38c9..81359d667 100644 --- a/example/keras/keras_imagenet_resnet50.py +++ b/example/keras/keras_imagenet_resnet50.py @@ -33,8 +33,8 @@ help='tensorboard log directory') parser.add_argument('--checkpoint-format', default='./checkpoint-{epoch}.h5', help='checkpoint file format') -parser.add_argument('--fp16-allreduce', action='store_true', default=False, - help='use fp16 compression during allreduce') +parser.add_argument('--fp16-pushpull', action='store_true', default=False, + help='use fp16 compression during pushpull') # Default settings from https://arxiv.org/abs/1706.02677. parser.add_argument('--batch-size', type=int, default=32, @@ -96,7 +96,7 @@ model = keras.applications.resnet50.ResNet50(weights=None) # BytePS: (optional) compression algorithm. -compression = bps.Compression.fp16 if args.fp16_allreduce else bps.Compression.none +compression = bps.Compression.fp16 if args.fp16_pushpull else bps.Compression.none # Restore from a previous checkpoint, if initial_epoch is specified. # BytePS: restore on the first worker which will broadcast both model and optimizer weights diff --git a/example/pytorch/benchmark_byteps.py b/example/pytorch/benchmark_byteps.py index fcd0df3bc..efd61ff65 100644 --- a/example/pytorch/benchmark_byteps.py +++ b/example/pytorch/benchmark_byteps.py @@ -14,8 +14,8 @@ # Benchmark settings parser = argparse.ArgumentParser(description='PyTorch Synthetic Benchmark', formatter_class=argparse.ArgumentDefaultsHelpFormatter) -parser.add_argument('--fp16-allreduce', action='store_true', default=False, - help='use fp16 compression during allreduce') +parser.add_argument('--fp16-pushpull', action='store_true', default=False, + help='use fp16 compression during byteps pushpull') parser.add_argument('--model', type=str, default='resnet50', help='model to benchmark') @@ -60,7 +60,7 @@ optimizer = optim.SGD(model.parameters(), lr=0.01) # BytePS: (optional) compression algorithm. -compression = bps.Compression.fp16 if args.fp16_allreduce else bps.Compression.none +compression = bps.Compression.fp16 if args.fp16_pushpull else bps.Compression.none # BytePS: wrap optimizer with DistributedOptimizer. optimizer = bps.DistributedOptimizer(optimizer, @@ -127,6 +127,3 @@ def log(s, nl=True): log('Img/sec per %s: %.1f +-%.1f' % (device, img_sec_mean, img_sec_conf)) log('Total img/sec on %d %s(s): %.1f +-%.1f' % (bps.size(), device, bps.size() * img_sec_mean, bps.size() * img_sec_conf)) - -# to let arnold shutdown all tasks -raise Exception diff --git a/example/pytorch/microbenchmark-byteps.py b/example/pytorch/microbenchmark-byteps.py index d88f15ddc..6892eb613 100644 --- a/example/pytorch/microbenchmark-byteps.py +++ b/example/pytorch/microbenchmark-byteps.py @@ -44,10 +44,8 @@ def log(s, nl=True): def benchmark(tensor, average, name): if not args.no_wait and bps.rank() == 0: - # let other workers submit allreduce request first time.sleep(0.01) start = time.time() - # do not use allreduce_() as it polls every 1ms handle = push_pull_async_inplace(tensor, average, name) while True: if poll(handle): diff --git a/example/pytorch/train_imagenet_resnet50_byteps.py b/example/pytorch/train_imagenet_resnet50_byteps.py index 73145d7fd..360a36fbc 100644 --- a/example/pytorch/train_imagenet_resnet50_byteps.py +++ b/example/pytorch/train_imagenet_resnet50_byteps.py @@ -24,11 +24,11 @@ help='tensorboard log directory') parser.add_argument('--checkpoint-format', default='./checkpoint-{epoch}.pth.tar', help='checkpoint file format') -parser.add_argument('--fp16-allreduce', action='store_true', default=False, - help='use fp16 compression during allreduce') -parser.add_argument('--batches-per-allreduce', type=int, default=1, +parser.add_argument('--fp16-pushpull', action='store_true', default=False, + help='use fp16 compression during pushpull') +parser.add_argument('--batches-per-pushpull', type=int, default=1, help='number of batches processed locally before ' - 'executing allreduce across workers; it multiplies ' + 'executing pushpull across workers; it multiplies ' 'total batch size.') # Default settings from https://arxiv.org/abs/1706.02677. @@ -55,7 +55,7 @@ args = parser.parse_args() args.cuda = not args.no_cuda and torch.cuda.is_available() -allreduce_batch_size = args.batch_size * args.batches_per_allreduce +pushpull_batch_size = args.batch_size * args.batches_per_pushpull bps.init() torch.manual_seed(args.seed) @@ -101,7 +101,7 @@ train_sampler = torch.utils.data.distributed.DistributedSampler( train_dataset, num_replicas=bps.size(), rank=bps.rank()) train_loader = torch.utils.data.DataLoader( - train_dataset, batch_size=allreduce_batch_size, + train_dataset, batch_size=pushpull_batch_size, sampler=train_sampler, **kwargs) val_dataset = \ @@ -127,20 +127,20 @@ model.cuda() # BytePS: scale learning rate by the number of GPUs. -# Gradient Accumulation: scale learning rate by batches_per_allreduce +# Gradient Accumulation: scale learning rate by batches_per_pushpull optimizer = optim.SGD(model.parameters(), lr=(args.base_lr * - args.batches_per_allreduce * bps.size()), + args.batches_per_pushpull * bps.size()), momentum=args.momentum, weight_decay=args.wd) # BytePS: (optional) compression algorithm. -compression = bps.Compression.fp16 if args.fp16_allreduce else bps.Compression.none +compression = bps.Compression.fp16 if args.fp16_pushpull else bps.Compression.none # BytePS: wrap optimizer with DistributedOptimizer. optimizer = bps.DistributedOptimizer( optimizer, named_parameters=model.named_parameters(), compression=compression, - backward_passes_per_step=args.batches_per_allreduce) + backward_passes_per_step=args.batches_per_pushpull) # Restore from a previous checkpoint, if initial_epoch is specified. # BytePS: restore on the first worker which will broadcast weights to other workers. @@ -233,7 +233,7 @@ def adjust_learning_rate(epoch, batch_idx): else: lr_adj = 1e-3 for param_group in optimizer.param_groups: - param_group['lr'] = args.base_lr * bps.size() * args.batches_per_allreduce * lr_adj + param_group['lr'] = args.base_lr * bps.size() * args.batches_per_pushpull * lr_adj def accuracy(output, target): diff --git a/example/pytorch/train_mnist_byteps.py b/example/pytorch/train_mnist_byteps.py index 82dc27b8d..a0faec0cf 100644 --- a/example/pytorch/train_mnist_byteps.py +++ b/example/pytorch/train_mnist_byteps.py @@ -26,8 +26,8 @@ help='random seed (default: 42)') parser.add_argument('--log-interval', type=int, default=10, metavar='N', help='how many batches to wait before logging training status') -parser.add_argument('--fp16-allreduce', action='store_true', default=False, - help='use fp16 compression during allreduce') +parser.add_argument('--fp16-pushpull', action='store_true', default=False, + help='use fp16 compression during pushpull') args = parser.parse_args() args.cuda = not args.no_cuda and torch.cuda.is_available() @@ -96,7 +96,7 @@ def forward(self, x): momentum=args.momentum) # BytePS: (optional) compression algorithm. -compression = bps.Compression.fp16 if args.fp16_allreduce else bps.Compression.none +compression = bps.Compression.fp16 if args.fp16_pushpull else bps.Compression.none # BytePS: wrap optimizer with DistributedOptimizer. optimizer = bps.DistributedOptimizer(optimizer, diff --git a/example/tensorflow/synthetic_benchmark.py b/example/tensorflow/synthetic_benchmark.py index 5de511ca1..9ad43eddb 100644 --- a/example/tensorflow/synthetic_benchmark.py +++ b/example/tensorflow/synthetic_benchmark.py @@ -13,8 +13,8 @@ # Benchmark settings parser = argparse.ArgumentParser(description='TensorFlow Synthetic Benchmark', formatter_class=argparse.ArgumentDefaultsHelpFormatter) -parser.add_argument('--fp16-allreduce', action='store_true', default=False, - help='use fp16 compression during allreduce') +parser.add_argument('--fp16-pushpull', action='store_true', default=False, + help='use fp16 compression during pushpull') parser.add_argument('--model', type=str, default='ResNet50', help='model to benchmark') @@ -58,7 +58,7 @@ opt = tf.train.GradientDescentOptimizer(0.01) # BytePS: (optional) compression algorithm. -compression = bps.Compression.fp16 if args.fp16_allreduce else bps.Compression.none +compression = bps.Compression.fp16 if args.fp16_pushpull else bps.Compression.none # BytePS: wrap optimizer with DistributedOptimizer. opt = bps.DistributedOptimizer(opt, compression=compression)