Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TF 2.x BroadcastGlobalVariablesHook Fix #1265

Merged
merged 21 commits into from Aug 13, 2019
Merged
Changes from 10 commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -19,6 +19,7 @@
from __future__ import division
from __future__ import print_function

from distutils.version import LooseVersion

This comment has been minimized.

Copy link
@alsrgv

alsrgv Aug 13, 2019

Member

Remove?

This comment has been minimized.

Copy link
@DEKHTIARJonathan

DEKHTIARJonathan Aug 13, 2019

Author Contributor

Indeed, unused


from horovod.common.util import check_extension

@@ -111,7 +112,15 @@ def broadcast_variables(variables, root_rank):
return broadcast_group(variables, root_rank)


if hasattr(tf, 'global_variables'):
try:
_global_variables = tf.global_variables
except AttributeError:
try:
_global_variables = tf.compat.v1.global_variables
except AttributeError:
_global_variables = None

if _global_variables is not None:
def broadcast_global_variables(root_rank):
"""Broadcasts all global variables from root rank to all other processes.
@@ -121,14 +130,31 @@ def broadcast_global_variables(root_rank):
root_rank: rank of the process from which global variables will be broadcasted
to all other processes.
"""
return broadcast_variables(tf.global_variables(), root_rank)


if hasattr(tf, 'train') and hasattr(tf.train, 'SessionRunHook'):
if hasattr(tf, 'estimator') and hasattr(tf.estimator, 'SessionRunHook'):
_SessionRunHook = tf.estimator.SessionRunHook
else:
if _executing_eagerly():

This comment has been minimized.

Copy link
@alsrgv

alsrgv Aug 7, 2019

Member

Great! Can you add a test for this functionality (verify that it fails in eager mode)?

raise RuntimeError(
"Eager Execution is not supported by `hvd.BroadcastGlobalVariablesHook`\n"
"We recommend using `hvd.DistributedGradientTape` instead"
)

return broadcast_variables(_global_variables(), root_rank)
This conversation was marked as resolved by DEKHTIARJonathan

This comment has been minimized.

Copy link
@alsrgv

alsrgv Aug 6, 2019

Member

We should fail this call in the eager mode.


try:
_get_default_graph = tf.get_default_graph
except AttributeError:
try:
_get_default_graph = tf.compat.v1.get_default_graph
except AttributeError:
_get_default_graph = None

try:
_SessionRunHook = tf.estimator.SessionRunHook
except AttributeError:
try:
_SessionRunHook = tf.train.SessionRunHook
except AttributeError:
_SessionRunHook = None

if _SessionRunHook is not None and _get_default_graph is not None:

class BroadcastGlobalVariablesHook(_SessionRunHook):
"""
@@ -158,7 +184,9 @@ def __init__(self, root_rank, device=''):
self.device = device

def begin(self):
if not self.bcast_op or self.bcast_op.graph != tf.get_default_graph():

if not self.bcast_op or self.bcast_op.graph != _get_default_graph():

This comment has been minimized.

Copy link
@alsrgv

alsrgv Aug 6, 2019

Member

Nit: remove space

with tf.device(self.device):
self.bcast_op = broadcast_global_variables(self.root_rank)

@@ -189,19 +217,18 @@ def allreduce_grads(grads):
return allreduce_grads


if hasattr(tf, 'compat') and hasattr(tf.compat, 'v1') and \
hasattr(tf.compat.v1, 'train') and hasattr(tf.compat.v1.train, 'Optimizer'):
try:
# TensorFlow 2.x
_LegacyOptimizer = tf.compat.v1.train.Optimizer
elif hasattr(tf, 'train') and hasattr(tf.train, 'Optimizer'):
# TensorFlow 1.x
_LegacyOptimizer = tf.train.Optimizer
else:
# Future TensorFlow versions
_LegacyOptimizer = None


if _LegacyOptimizer:
except AttributeError:
try:
# TensorFlow 1.x
_LegacyOptimizer = tf.train.Optimizer
except AttributeError:
# Future TensorFlow versions
_LegacyOptimizer = None

if _LegacyOptimizer is not None:
class _DistributedOptimizer(_LegacyOptimizer):
"""An optimizer that wraps another tf.Optimizer, using an allreduce to
average gradient values before applying gradients to model weights."""
@@ -17,7 +17,7 @@
import tensorflow as tf


if LooseVersion(tf.__version__) >= LooseVersion("1.9.0"):
if LooseVersion(tf.__version__) >= LooseVersion('1.7.0'): # Eager Mode has been introduced in TF 1.7.0
from tensorflow.python.eager import context
_has_eager = True
else:
@@ -26,7 +26,7 @@

def _executing_eagerly():
"""Returns true if eager execution is supported and enabled."""
return _has_eager and context.in_eager_mode()
return _has_eager and context.executing_eagerly()()

This comment has been minimized.

Copy link
@alsrgv

alsrgv Aug 7, 2019

Member

Too many parenthesis?

This comment has been minimized.

Copy link
@DEKHTIARJonathan

DEKHTIARJonathan Aug 7, 2019

Author Contributor

Oupsi. Sent a patch



def _make_subgraph(f):
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.