Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed Elastic tf.keras and added additional integration tests #2289

Merged
merged 9 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/gen-pipeline.sh
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ run_gloo_integration() {
"horovodrun -np 2 -H localhost:2 --gloo python /horovod/examples/mxnet_mnist.py"

# Elastic
local elastic_tensorflow="test_elastic_tensorflow.py"
local elastic_tensorflow="test_elastic_tensorflow.py test_elastic_tensorflow_keras.py"
local elastic_spark_tensorflow="test_elastic_spark_tensorflow.py"
if [[ ${test} == *"tf2_"* ]] || [[ ${test} == *"tfhead"* ]]; then
elastic_tensorflow="test_elastic_tensorflow2.py"
Expand Down
5 changes: 3 additions & 2 deletions examples/elastic/tensorflow_keras_mnist_elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
dataset = dataset.repeat().shuffle(10000).batch(batch_size)

# NOTE: `input_shape` is required to ensure the model is built before training
model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
activation='relu',
Expand Down Expand Up @@ -62,9 +63,9 @@ def on_state_reset():

callbacks = [
# Horovod: elastic training callbacks to update and commit state.
hvd.elastic.CommitStateCallback(state),
hvd.elastic.UpdateBatchStateCallback(state),
hvd.elastic.UpdateEpochStateCallback(state),
hvd.elastic.UpdateBatchStateCallback(state),
hvd.elastic.CommitStateCallback(state),
]

# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
Expand Down
48 changes: 10 additions & 38 deletions horovod/_keras/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@ class _DistributedOptimizer(keras.optimizers.Optimizer):

def __init__(self, **kwargs):
self._name = name or "Distributed%s" % self.__class__.__base__.__name__
self._device_dense = device_dense
self._device_sparse = device_sparse
self._compression = compression
self._sparse_as_dense = sparse_as_dense
self._aggregated_gradients = False
self._gradient_predivide_factor = gradient_predivide_factor
self._allreduce_grads = hvd._make_allreduce_grads_fn(
self._name,
device_dense,
device_sparse,
compression,
sparse_as_dense,
hvd.Average,
gradient_predivide_factor)
super(self.__class__, self).__init__(**kwargs)

def get_gradients(self, loss, params):
Expand All @@ -58,40 +61,9 @@ def _aggregate_gradients(self, grads_and_vars):
return aggregated_grads
return list(zip(aggregated_grads, vars))

def _allreduce(self, gradients):
def _allreduce(self, grads):
self._aggregated_gradients = True
if hvd.size() > 1:
if self._gradient_predivide_factor != 1.0:
# Perform averaging via pre/postscaling factors.
# Split average operation across pre/postscale factors
prescale_factor = 1.0 / gradient_predivide_factor
postscale_factor = gradient_predivide_factor / hvd.size()
do_average = False
else:
prescale_factor = 1.0
postscale_factor = 1.0
do_average = True

averaged_gradients = []
with tf.name_scope(self._name + "_Allreduce"):
for grad in gradients:
if grad is not None:
if self._sparse_as_dense and \
isinstance(grad, tf.IndexedSlices):
grad = tf.convert_to_tensor(grad)
avg_grad = hvd.allreduce(grad,
average=do_average,
device_dense=self._device_dense,
device_sparse=self._device_sparse,
compression=self._compression,
prescale_factor=prescale_factor,
postscale_factor=postscale_factor)
averaged_gradients.append(avg_grad)
else:
averaged_gradients.append(None)
return averaged_gradients
else:
return gradients
return self._allreduce_grads(grads)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@romerojosh this part may be relevant to you since you recently did some work on this part of the code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent! Glad to see this duplicate code being removed.


def apply_gradients(self, *args, **kwargs):
results = super(self.__class__, self).apply_gradients(*args, **kwargs)
Expand Down
14 changes: 12 additions & 2 deletions horovod/_keras/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@ def __init__(self, backend, state, batches_per_commit, *args):
def on_batch_end(self, batch, logs=None):
self.batches_remaining -= 1
if self.batches_remaining == 0:
self.state.commit()
self.commit()
self.batches_remaining = self.batches_per_commit

def on_epoch_end(self, epoch, logs=None):
self.commit()

def commit(self):
self.state.commit()


class UpdateBatchStateCallbackImpl(object):
def __init__(self, backend, state, *args):
Expand All @@ -54,6 +60,10 @@ def __init__(self, backend, state, *args):
super(UpdateEpochStateCallbackImpl, self).__init__(*args)
self.backend = backend
self.state = state
self.initial_epoch = self.state.epoch

def on_train_begin(self, logs=None):
self.initial_epoch = self.state.epoch

def on_epoch_end(self, epoch, logs=None):
self.state.epoch = epoch
self.state.epoch = self.initial_epoch + epoch + 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is epoch the epoch that ended? why self.state.epoch = self.initial_epoch + epoch + 1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, epoch is the epoch that just ended. The idea is that at the end of the epoch, we want to commit to save our progress. We could avoid having to do the + 1 here if we called this on_epoch_begin, but if we commit before incrementing the epoch number (but after resetting the batch number), then we would end up repeating the last epoch. So this way we ensure that when we commit, the state is fully up to date in case an error occurs.

The initial_epoch is because the epoch provided by the callback is a relative epoch (we cannot offset it at an initial epoch). So if we complete some number of epochs and then reset, we do not want to lose our progress.

I will add comments to explain this in the code.

2 changes: 1 addition & 1 deletion horovod/common/stall_inspector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ bool StallInspector::CheckForStalledTensors(int global_size) {
"submit different tensors or that only subset of ranks is "
"submitting tensors, which will cause deadlock. "
<< std::endl
<< "Stalled ranks:";
<< "Missing ranks:";
for (auto& kv : missing_ranks) {
message << std::endl << kv.first;
if (shutdown_ranks.find(kv.first) != shutdown_ranks.end()) {
Expand Down
12 changes: 6 additions & 6 deletions horovod/tensorflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,12 @@ def allreduce_grads(grads):
else grad for grad in grads]

return [_allreduce_cond(grad,
device_dense=device_dense,
device_sparse=device_sparse,
compression=compression,
op=op,
prescale_factor=prescale_factor,
postscale_factor=postscale_factor)
device_dense=device_dense,
device_sparse=device_sparse,
compression=compression,
op=op,
prescale_factor=prescale_factor,
postscale_factor=postscale_factor)
if grad is not None else grad
for grad in grads]

Expand Down
16 changes: 10 additions & 6 deletions horovod/tensorflow/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,16 @@ def __init__(self, model, optimizer=None, backend=None, **kwargs):
self.backend = backend
self._save_model()

def broadcast_object_with_session(obj):
return broadcast_object(obj, session=backend.get_session())

broadcast_object_fn = broadcast_object if not backend or _executing_eagerly() else broadcast_object_with_session
if not backend or _executing_eagerly():
self._bcast_model = lambda: _broadcast_model(self.model, self.optimizer, backend=self.backend)
bcast_object = broadcast_object
else:
# For TensorFlow v1, we need to reuse the broadcast op to prevent incrementing the uids
bcast_op = broadcast_variables(_global_variables(), root_rank=0)
self._bcast_model = lambda: self.backend.get_session().run(bcast_op)
bcast_object = broadcast_object_fn(session=self.backend.get_session())

super(TensorFlowKerasState, self).__init__(bcast_object=broadcast_object_fn,
super(TensorFlowKerasState, self).__init__(bcast_object=bcast_object,
get_rank=rank,
**kwargs)

Expand All @@ -126,7 +130,7 @@ def restore(self):
super(TensorFlowKerasState, self).restore()

def sync(self):
_broadcast_model(self.model, self.optimizer, backend=self.backend)
self._bcast_model()
self._save_model()
super(TensorFlowKerasState, self).sync()

Expand Down
2 changes: 1 addition & 1 deletion test/data/expected_buildkite_pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ steps:
agents:
queue: cpu
- label: ':factory: Elastic Tests (test-cpu-gloo-py3_6-tf1_15_0-keras2_2_4-torch1_2_0-mxnet1_4_1-pyspark2_3_2)'
command: bash -c "cd /horovod/test/integration && HOROVOD_LOG_LEVEL=DEBUG pytest --forked -v --log-cli-level 10 --log-cli-format '[%(asctime)-15s %(levelname)s %(filename)s:%(lineno)d %(funcName)s()] %(message)s' --capture=no --continue-on-collection-errors --junit-xml=/artifacts/junit.gloo.elastic.xml test_elastic_torch.py test_elastic_tensorflow.py"
command: bash -c "cd /horovod/test/integration && HOROVOD_LOG_LEVEL=DEBUG pytest --forked -v --log-cli-level 10 --log-cli-format '[%(asctime)-15s %(levelname)s %(filename)s:%(lineno)d %(funcName)s()] %(message)s' --capture=no --continue-on-collection-errors --junit-xml=/artifacts/junit.gloo.elastic.xml test_elastic_torch.py test_elastic_tensorflow.py test_elastic_tensorflow_keras.py"
artifact_paths: "artifacts/**"
plugins:
- docker-compose#v3.5.0:
Expand Down
171 changes: 171 additions & 0 deletions test/integration/data/elastic_tensorflow_keras_main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Copyright 2020 Uber Technologies, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

import argparse
import json
import os
import psutil
import time

import tensorflow as tf

import horovod.tensorflow.keras as hvd

parser = argparse.ArgumentParser(description='TensorFlow Keras Elastic Test',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)

parser.add_argument('--batches-per-epoch', type=int, default=10,
help='number of batches per epoch')
parser.add_argument('--batches-per-commit', type=int, default=1,
help='number of batches per commit of the elastic state object')
parser.add_argument('--epochs', type=int, default=3,
help='number of epochs')
parser.add_argument('--epoch-wait', type=int, default=0,
help='number of seconds each epoch takes')
parser.add_argument('--logfile', default='/tmp/logfile.txt',
help='log file to record results (one line per epoch)')
parser.add_argument('--discovery-schedule', default='[]',
help='JSON string specifying schedule of host updates each epoch')
parser.add_argument('--discovery-wait', type=int, default=3,
help='number of seconds the worker waits for an expected host discovery')
parser.add_argument('--exit-schedule', default='{}',
help='JSON string mapping from (epoch, batch) to list of ranks to exit at that time')
parser.add_argument('--exit-mode', default='exception',
help='means used to cause a worker to exit [exception | kill]')

args = parser.parse_args()

config = tf.ConfigProto()
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
config.gpu_options.allow_growth = False
config.gpu_options.visible_device_list = ''
tf.keras.backend.set_session(tf.Session(config=config))

hvd.init()

lr = 0.01
model = tf.keras.Sequential([tf.keras.layers.Dense(2, activation='softmax', input_shape=(2,))])
optimizer = tf.keras.optimizers.SGD(lr * hvd.size())
optimizer = hvd.DistributedOptimizer(optimizer)

model.compile(loss=tf.keras.losses.sparse_categorical_crossentropy,
optimizer=optimizer,
metrics=['accuracy'])

batch_size = 32
data = tf.random.uniform([batch_size, 2])
target = tf.random.uniform([batch_size, 1], minval=0, maxval=2, dtype=tf.int64)
dataset = tf.data.Dataset.from_tensor_slices((data, target)).cache().repeat().batch(batch_size)

hostname = os.environ.get('HOROVOD_HOSTNAME')
start_rank = int(os.environ.get('HOROVOD_RANK', 0))

discovery_schedule = json.loads(args.discovery_schedule)
epoch_to_hosts = {epoch: hosts for epoch, hosts in discovery_schedule if epoch is not None}
default_hosts = discovery_schedule[-1][1] if discovery_schedule else []

exit_schedule = json.loads(args.exit_schedule) if args.exit_schedule else {}


def check_exit(epoch, batch):
key = str((epoch, batch))
if key in exit_schedule:
ranks_to_exit = exit_schedule[key]
if start_rank in ranks_to_exit:
if args.exit_mode == 'exception':
raise RuntimeError('check_rank and exit epoch={} batch={} start_rank={} rank={}'
.format(epoch, batch, start_rank, hvd.rank()))
else:
psutil.Process(os.getpid()).kill()


def log_state(state):
state_dict = {
'epoch': state.epoch,
'batch': state.batch,
'commits': state.commits,
'hostname': hostname,
'start_rank': start_rank,
'rank': hvd.rank(),
'size': hvd.size(),
'rendezvous': state.rendezvous}
with open(args.logfile, 'a') as f:
f.write(json.dumps(state_dict) + os.linesep)


def on_state_reset():
tf.keras.backend.set_value(model.optimizer.lr, lr * hvd.size())


state = hvd.elastic.KerasState(model, batch=0, epoch=0, commits=0, rendezvous=0)
state.register_reset_callbacks([on_state_reset])


# Handles all the test logic the surrounds the training loop
class TestCallback(tf.keras.callbacks.Callback):
def __init__(self, state):
self.state = state

def on_batch_begin(self, batch, logs=None):
check_exit(self.state.epoch, self.state.batch)

def on_epoch_begin(self, epoch, logs=None):
print('epoch {} batch {}'.format(self.state.epoch, self.state.batch))

def on_epoch_end(self, epoch, logs=None):
if hvd.rank() == 0:
log_state(self.state)

current_hosts = epoch_to_hosts.get(self.state.epoch, default_hosts)
next_hosts = epoch_to_hosts.get(self.state.epoch + 1, default_hosts)
if args.discovery_wait > 0 and current_hosts != next_hosts:
print('host changes: {} -> {}'.format(current_hosts, next_hosts))
start = int(time.time())
while state._host_messages.empty():
if int(time.time()) - start > args.discovery_wait:
raise TimeoutError('Timed out waiting for notifications from driver.')
time.sleep(0.1)

if args.epoch_wait > 0:
time.sleep(args.epoch_wait)


# Special callback for testing that allows us to record how many times we have committed
class TrackingCommitCallback(hvd.elastic.CommitStateCallback):
def commit(self):
self.state.commits += 1
super().commit()


callbacks = [
TestCallback(state),
hvd.elastic.UpdateEpochStateCallback(state),
hvd.elastic.UpdateBatchStateCallback(state),
TrackingCommitCallback(state, args.batches_per_commit),
]


@hvd.elastic.run
def train(state):
state.rendezvous += 1
# Horovod: adjust number of steps based on number of GPUs.
state.model.fit(dataset,
steps_per_epoch=args.batches_per_epoch,
callbacks=callbacks,
epochs=args.epochs - state.epoch,
verbose=1 if hvd.rank() == 0 else 0)


train(state)