Skip to content
Permalink
Browse files

Implemented _aggregate_gradients in DistributedOptimizer to support T…

…ensorFlow Keras 2.2 (#1777)

Signed-off-by: Travis Addair <taddair@uber.com>
  • Loading branch information
tgaddair committed Mar 10, 2020
1 parent 167aa00 commit 6e4683acb4aea339c447c438ed277d0c6dec0c29
Showing with 25 additions and 12 deletions.
  1. +2 −2 docker-compose.test.yml
  2. +15 −6 horovod/_keras/__init__.py
  3. +7 −4 horovod/spark/keras/util.py
  4. +1 −0 test/test_spark_keras.py
@@ -162,7 +162,7 @@ services:
UBUNTU_VERSION: 18.04
MPI_KIND: OpenMPI
PYTHON_VERSION: 3.6
TENSORFLOW_PACKAGE: tensorflow==2.1.0 tensorflow-estimator==2.1.0
TENSORFLOW_PACKAGE: tf-nightly
KERAS_PACKAGE: git+https://github.com/keras-team/keras.git
PYTORCH_PACKAGE: torch-nightly
TORCHVISION_PACKAGE: torchvision
@@ -266,7 +266,7 @@ services:
NCCL_VERSION_OVERRIDE: 2.4.8-1+cuda10.1
MPI_KIND: OpenMPI
PYTHON_VERSION: 3.6
TENSORFLOW_PACKAGE: tensorflow-gpu==2.1.0 tensorflow-gpu-estimator==2.1.0
TENSORFLOW_PACKAGE: tf-nightly-gpu
KERAS_PACKAGE: git+https://github.com/keras-team/keras.git
PYTORCH_PACKAGE: torch-nightly
TORCHVISION_PACKAGE: torchvision
@@ -20,13 +20,15 @@
def create_distributed_optimizer(keras, optimizer, name, device_dense, device_sparse,
compression, sparse_as_dense):
class _DistributedOptimizer(keras.optimizers.Optimizer):
_HAS_ALL_REDUCE_SUM_GRAD = True

def __init__(self, **kwargs):
self._name = name or "Distributed%s" % self.__class__.__base__.__name__
self._device_dense = device_dense
self._device_sparse = device_sparse
self._compression = compression
self._sparse_as_dense = sparse_as_dense
self._get_gradients_used = False
self._aggregated_gradients = False
super(self.__class__, self).__init__(**kwargs)

def get_gradients(self, loss, params):
@@ -38,8 +40,15 @@ def get_gradients(self, loss, params):
In DistributedOptimizer, get_gradients() is overriden to also
allreduce the gradients before returning them.
"""
self._get_gradients_used = True
gradients = super(self.__class__, self).get_gradients(loss, params)
return self._allreduce(gradients)

def _aggregate_gradients(self, grads_and_vars):
gradients = [grad for grad, var in grads_and_vars]
return self._allreduce(gradients)

def _allreduce(self, gradients):
self._aggregated_gradients = True
if hvd.size() > 1:
averaged_gradients = []
with tf.name_scope(self._name + "_Allreduce"):
@@ -60,11 +69,11 @@ def get_gradients(self, loss, params):
return gradients

def apply_gradients(self, *args, **kwargs):
if not self._get_gradients_used:
if not self._aggregated_gradients:
raise Exception('`apply_gradients()` was called without a call to '
'`get_gradients()`. If you\'re using TensorFlow 2.0, '
'please specify `experimental_run_tf_function=False` in '
'`compile()`.')
'`get_gradients()` or `_aggregate_gradients`. If you\'re '
'using TensorFlow 2.0, please specify '
'`experimental_run_tf_function=False` in `compile()`.')
return super(self.__class__, self).apply_gradients(*args, **kwargs)

# We dynamically create a new class that inherits from the optimizer that was passed in.
@@ -166,6 +166,9 @@ def _get_from_named_tuple(row, col):
num_inputs = len(feature_columns)
num_labels = len(label_columns)

def as_tuple(v):
return tuple(v) if len(v) > 1 else v[0]

def prep(row):
if sample_weight_col:
sample_weight = get_col_from_row_fn(row, sample_weight_col)
@@ -174,10 +177,10 @@ def prep(row):
tf.reshape(get_col_from_row_fn(row, feature_columns[i]), input_shapes[i])
for i
in range(num_inputs)),
tuple(
as_tuple([
tf.reshape(get_col_from_row_fn(row, label_columns[j]), output_shapes[j]) for
j
in range(num_labels)),
in range(num_labels)]),
{name: tf.reshape(sample_weight, [-1]) for name in output_names}
)
else:
@@ -186,10 +189,10 @@ def prep(row):
tf.reshape(get_col_from_row_fn(row, feature_columns[i]), input_shapes[i])
for i
in range(num_inputs)),
tuple(
as_tuple([
tf.reshape(get_col_from_row_fn(row, label_columns[j]), output_shapes[j]) for
j
in range(num_labels))
in range(num_labels)])
)

return prep
@@ -114,6 +114,7 @@ def test_fit_model_multiclass(self):
model=model,
optimizer=optimizer,
loss=loss,
metrics=['accuracy'],
feature_cols=['features'],
label_cols=['label_vec'],
batch_size=2,

0 comments on commit 6e4683a

Please sign in to comment.
You can’t perform that action at this time.