Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TF 2.x BroadcastGlobalVariablesHook Fix #1265

Merged
merged 21 commits into from Aug 13, 2019
Merged
Changes from 20 commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -1,13 +1,22 @@
*.pyc
*.so
.idea
.eggs

.vscode/
.idea/

horovod.egg-info
dist
build
docs/_build
.vscode/

env
venv/

examples/checkpoint*

examples/checkpoints/
horovod/tensorflow/mpi_lib.so
horovod/torch/test_cuda/

@@ -19,7 +19,6 @@
from __future__ import division
from __future__ import print_function


from horovod.common.util import check_extension

check_extension('horovod.tensorflow', 'HOROVOD_WITH_TENSORFLOW', __file__, 'mpi_lib')
@@ -111,7 +110,15 @@ def broadcast_variables(variables, root_rank):
return broadcast_group(variables, root_rank)


if hasattr(tf, 'global_variables'):
try:
_global_variables = tf.global_variables
except AttributeError:
try:
_global_variables = tf.compat.v1.global_variables
except AttributeError:
_global_variables = None

if _global_variables is not None:
def broadcast_global_variables(root_rank):
"""Broadcasts all global variables from root rank to all other processes.
@@ -121,14 +128,31 @@ def broadcast_global_variables(root_rank):
root_rank: rank of the process from which global variables will be broadcasted
to all other processes.
"""
return broadcast_variables(tf.global_variables(), root_rank)


if hasattr(tf, 'train') and hasattr(tf.train, 'SessionRunHook'):
if hasattr(tf, 'estimator') and hasattr(tf.estimator, 'SessionRunHook'):
_SessionRunHook = tf.estimator.SessionRunHook
else:
if _executing_eagerly():

This comment has been minimized.

Copy link
@alsrgv

alsrgv Aug 7, 2019

Member

Great! Can you add a test for this functionality (verify that it fails in eager mode)?

raise RuntimeError(
"Eager Execution is not supported by `hvd.BroadcastGlobalVariablesHook`\n"
"We recommend using `hvd.DistributedGradientTape` instead"
)

return broadcast_variables(_global_variables(), root_rank)
This conversation was marked as resolved by DEKHTIARJonathan

This comment has been minimized.

Copy link
@alsrgv

alsrgv Aug 6, 2019

Member

We should fail this call in the eager mode.


try:
_get_default_graph = tf.get_default_graph
except AttributeError:
try:
_get_default_graph = tf.compat.v1.get_default_graph
except AttributeError:
_get_default_graph = None

try:
_SessionRunHook = tf.estimator.SessionRunHook
except AttributeError:
try:
_SessionRunHook = tf.train.SessionRunHook
except AttributeError:
_SessionRunHook = None

if _SessionRunHook is not None and _get_default_graph is not None:

class BroadcastGlobalVariablesHook(_SessionRunHook):
"""
@@ -158,7 +182,9 @@ def __init__(self, root_rank, device=''):
self.device = device

def begin(self):
if not self.bcast_op or self.bcast_op.graph != tf.get_default_graph():

if not self.bcast_op or self.bcast_op.graph != _get_default_graph():

This comment has been minimized.

Copy link
@alsrgv

alsrgv Aug 6, 2019

Member

Nit: remove space

with tf.device(self.device):
self.bcast_op = broadcast_global_variables(self.root_rank)

@@ -189,19 +215,18 @@ def allreduce_grads(grads):
return allreduce_grads


if hasattr(tf, 'compat') and hasattr(tf.compat, 'v1') and \
hasattr(tf.compat.v1, 'train') and hasattr(tf.compat.v1.train, 'Optimizer'):
try:
# TensorFlow 2.x
_LegacyOptimizer = tf.compat.v1.train.Optimizer
elif hasattr(tf, 'train') and hasattr(tf.train, 'Optimizer'):
# TensorFlow 1.x
_LegacyOptimizer = tf.train.Optimizer
else:
# Future TensorFlow versions
_LegacyOptimizer = None


if _LegacyOptimizer:
except AttributeError:
try:
# TensorFlow 1.x
_LegacyOptimizer = tf.train.Optimizer
except AttributeError:
# Future TensorFlow versions
_LegacyOptimizer = None

if _LegacyOptimizer is not None:
class _DistributedOptimizer(_LegacyOptimizer):
"""An optimizer that wraps another tf.Optimizer, using an allreduce to
average gradient values before applying gradients to model weights."""
@@ -17,7 +17,7 @@
import tensorflow as tf


if LooseVersion(tf.__version__) >= LooseVersion("1.9.0"):
if LooseVersion(tf.__version__) >= LooseVersion('1.7.0'): # Eager Mode has been introduced in TF 1.7.0
from tensorflow.python.eager import context
_has_eager = True
else:
@@ -26,7 +26,7 @@

def _executing_eagerly():
"""Returns true if eager execution is supported and enabled."""
return _has_eager and context.in_eager_mode()
return _has_eager and context.executing_eagerly()


def _make_subgraph(f):
@@ -975,6 +975,25 @@ def test_horovod_broadcast_grad_gpu(self):
"gradient %s differs from expected %s, "
"error: %s" % (grad_out, expected, str(err)))

def test_horovod_broadcast_eager_mode_error(self):
"""Test that tries to broadcast tensorflow global variables
in eager execution mode. This call should raise a RuntimeError."""

if not hvd.util._executing_eagerly():
return

with self.assertRaises(RuntimeError):
hvd.broadcast_global_variables(root_rank=0)

def test_horovod_broadcast_graph_mode(self):
"""Test that tries to broadcast tensorflow global variables
in graph execution mode. This call should not raise any exception."""

if hvd.util._executing_eagerly():
return

hvd.broadcast_global_variables(root_rank=0)

def test_compression_fp16(self):
valid_dtypes = [tf.float16, tf.float32, tf.float64]
invalid_dtypes = [tf.uint8, tf.int8, tf.uint16, tf.int16,
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.