Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix writer initialization bug affecting horovod TF #68

Merged
merged 32 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d8bbfe9
Fix bug affecting horovod TF runs wrt initialization of writers. Also…
rahul003 Dec 1, 2019
5514ea5
util function refactor
rahul003 Dec 1, 2019
70c38ae
Fix lack of support for PS
rahul003 Dec 4, 2019
577dbcf
load json once
rahul003 Dec 4, 2019
05ddc62
rename var
rahul003 Dec 4, 2019
d50b90d
merge lack of ps support branch
rahul003 Dec 4, 2019
d9d4cb5
Remove unnecessary set num workers in constructor, it is always set i…
rahul003 Dec 4, 2019
5ef155d
Merge branch 'fix_lack_of_support' into horovod
rahul003 Dec 4, 2019
2328197
Restructure order of calls of distribution strategy setting
rahul003 Dec 4, 2019
7cf37d2
Fix mirrored strategy CPU
rahul003 Dec 4, 2019
72f16f9
Fix config tests
rahul003 Dec 5, 2019
95c5522
Fix return of all writers for mirrored
rahul003 Dec 5, 2019
d833f6c
cleanup
rahul003 Dec 5, 2019
90b2c92
Save all workers bug handled for mirrored strategy
rahul003 Dec 5, 2019
5eb6451
Fix mirrored strategy all workers bug
rahul003 Dec 5, 2019
e1287d0
Remove prints from tests
rahul003 Dec 5, 2019
2c6fb1b
Fix None being returned for writers
rahul003 Dec 5, 2019
d797a0a
Merge branch 'master' of https://github.com/awslabs/sagemaker-debugge…
rahul003 Dec 5, 2019
101a167
Add horovod examples
rahul003 Dec 5, 2019
1b35974
Add horovod ZCC tests
rahul003 Dec 5, 2019
2a6d25a
Fix model dir path for estimator and keras examples. Also fix estimat…
rahul003 Dec 5, 2019
e20a7f5
Change num tensors saved
rahul003 Dec 5, 2019
aefa9ba
change model dir for keras test
rahul003 Dec 5, 2019
c81e7dd
Address review comments
rahul003 Dec 6, 2019
9fb4197
Switch to horovodrun
rahul003 Dec 6, 2019
ac48e45
Place config file in outdir
rahul003 Dec 6, 2019
73e9724
Fix horovodrun command
rahul003 Dec 6, 2019
4cb6425
copy env dict so next job is not affected
rahul003 Dec 6, 2019
7ed5296
Add comments to scripts explaining modifications to enable SMD
rahul003 Dec 7, 2019
74d5575
Added asserts for prepared tensors when needed. Refactored distributi…
rahul003 Dec 10, 2019
ffa1d2d
revert to get_distribution_strategy
rahul003 Dec 11, 2019
01bb276
Merge master
rahul003 Dec 17, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions examples/tensorflow/local/horovod_keras_mnist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""
This script is a simple MNIST training script which uses Horovod and Tensorflow's Keras interface.
It has been orchestrated with SageMaker Debugger hook to allow saving tensors during training.
Here, the hook has been created using its constructor to allow running this locally for your experimentation.
When you want to run this script in SageMaker, it is recommended to create the hook from json file.
Please see scripts in either 'sagemaker_byoc' or 'sagemaker_official_container' folder based on your use case.

This script has been adapted from an example in Horovod repository https://github.com/uber/horovod
"""

# Standard Library
import argparse
import math
import os

# Third Party
import horovod.tensorflow.keras as hvd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import backend as K
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Conv2D, Dense, Dropout, Flatten, MaxPooling2D
from tensorflow.keras.models import Sequential

# First Party
import smdebug.tensorflow as smd


def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Boolean value expected.")
rahul003 marked this conversation as resolved.
Show resolved Hide resolved


def main(args):
# Horovod: initialize Horovod.
hvd.init()

if not args.use_only_cpu:
# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
else:
config = None

K.set_session(tf.Session(config=config))

batch_size = 128
num_classes = 10

# Horovod: adjust number of epochs based on number of GPUs.
epochs = int(math.ceil(args.num_epochs / hvd.size()))

# Input image dimensions
img_rows, img_cols = 28, 28

# The data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()

if K.image_data_format() == "channels_first":
x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
input_shape = (1, img_rows, img_cols)
else:
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)

x_train = x_train.astype("float32")
x_test = x_test.astype("float32")
x_train /= 255
x_test /= 255
print("x_train shape:", x_train.shape)
print(x_train.shape[0], "train samples")
print(x_test.shape[0], "test samples")

# Convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=input_shape))
model.add(Conv2D(64, (3, 3), activation="relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation="relu"))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation="softmax"))

# Horovod: adjust learning rate based on number of GPUs.
opt = keras.optimizers.Adadelta(1.0 * hvd.size())

# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt)

##### Enabling SageMaker Debugger ###########
# creating hook
smd_hook = smd.KerasHook(
out_dir=args.out_dir,
save_config=smd.SaveConfig(save_interval=args.save_interval),
include_collections=["weights", "gradients"],
include_workers=args.include_workers,
)

##### Enabling SageMaker Debugger ###########
# wrapping optimizer so hook can identify gradients
opt = smd_hook.wrap_optimizer(opt)

model.compile(loss=keras.losses.categorical_crossentropy, optimizer=opt, metrics=["accuracy"])

callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
##### Enabling SageMaker Debugger ###########
# adding smd hook as a callback
smd_hook,
]

# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
callbacks.append(
keras.callbacks.ModelCheckpoint(os.path.join(args.model_dir, "checkpoint-{epoch}.h5"))
)

model.fit(
x_train,
y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=1 if hvd.rank() == 0 else 0,
validation_data=(x_test, y_test),
)
score = model.evaluate(x_test, y_test, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])
Copy link
Contributor

@jarednielsen jarednielsen Dec 5, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some code at the end demonstrating creating/reading from a trial? Unless this is meant to be used in SageMaker with builtin rules and no custom analysis.



if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--use_only_cpu", type=str2bool, default=False)
parser.add_argument("--num_epochs", type=int, default=5, help="Number of epochs to train for")
parser.add_argument("--out_dir", type=str)
parser.add_argument("--save_interval", type=int, default=500)
parser.add_argument("--include_workers", type=str, default="one")
parser.add_argument("--model_dir", type=str, default="/tmp/mnist_model")
args = parser.parse_args()

main(args)
15 changes: 13 additions & 2 deletions examples/tensorflow/local/mnist.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def main():
parser.add_argument(
"--num_steps",
type=int,
help="Number of steps to train for. If this" "is passed, it overrides num_epochs",
help="Number of steps to train for. If this is passed, it overrides num_epochs",
)
parser.add_argument(
"--num_eval_steps",
Expand All @@ -47,6 +47,8 @@ def main():
np.random.seed(2)
random.seed(12)

##### Enabling SageMaker Debugger ###########
# creating hook
hook = smd.EstimatorHook(
out_dir=args.out_dir,
include_collections=["weights", "gradients"],
Expand Down Expand Up @@ -104,7 +106,8 @@ def cnn_model_fn(features, labels, mode):
if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.train.GradientDescentOptimizer(learning_rate=args.lr)

# SMD: Wrap your optimizer as follows to help SageMaker Debugger identify gradients
##### Enabling SageMaker Debugger ###########
# Wrap your optimizer as follows to help SageMaker Debugger identify gradients
# This does not change your optimization logic, it returns back the same optimizer
optimizer = hook.wrap_optimizer(optimizer)

Expand Down Expand Up @@ -140,12 +143,20 @@ def cnn_model_fn(features, labels, mode):
x={"x": eval_data}, y=eval_labels, num_epochs=1, shuffle=False
)

##### Enabling SageMaker Debugger ###########
# Set training mode so SMDebug can classify the steps into training mode
hook.set_mode(smd.modes.TRAIN)

##### Enabling SageMaker Debugger ###########
# pass hook to hooks parameter of train method
mnist_classifier.train(input_fn=train_input_fn, steps=args.num_steps, hooks=[hook])

##### Enabling SageMaker Debugger ###########
# Set eval mode so SMDebug can classify the steps into eval mode
hook.set_mode(smd.modes.EVAL)

##### Enabling SageMaker Debugger ###########
# pass hook to hooks parameter of evaluate method
mnist_classifier.evaluate(input_fn=eval_input_fn, steps=args.num_eval_steps, hooks=[hook])


Expand Down
6 changes: 6 additions & 0 deletions examples/tensorflow/local/tf_keras_resnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ def train(batch_size, epoch, model, hook):
epochs=epoch,
validation_data=(X_valid, Y_valid),
shuffle=True,
##### Enabling SageMaker Debugger ###########
# adding hook as a callback
callbacks=[hook],
)

Expand All @@ -57,13 +59,17 @@ def main():

model = ResNet50(weights=None, input_shape=(32, 32, 3), classes=10)

##### Enabling SageMaker Debugger ###########
# creating hook
hook = smd.KerasHook(
out_dir=opt.out_dir,
include_collections=["weights", "gradients", "losses"],
save_config=smd.SaveConfig(save_interval=opt.save_interval),
)

optimizer = tf.keras.optimizers.Adam()

##### Enabling SageMaker Debugger ###########
# wrap the optimizer so the hook can identify the gradients
optimizer = hook.wrap_optimizer(optimizer)
model.compile(loss="categorical_crossentropy", optimizer=optimizer, metrics=["accuracy"])
Expand Down
152 changes: 152 additions & 0 deletions examples/tensorflow/sagemaker_byoc/horovod_keras_mnist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
"""
This script is a simple MNIST training script which uses Horovod and Tensorflow's Keras interface.
It has been orchestrated with SageMaker Debugger hooks to allow saving tensors during training.
These hooks have been instrumented to read from json configuration that SageMaker will put in the training container.
Configuration provided to the SageMaker python SDK when creating a job will be passed on to the hook.
This allows you to use the same script with differing configurations across different runs.
If you use an official SageMaker Framework container (i.e. AWS Deep Learning Container), then
you do not have to orchestrate your script as below. Hooks will automatically be added in those environments.
For more information, please refer to https://github.com/awslabs/sagemaker-debugger/blob/master/docs/sagemaker.md

This script has been adapted from an example in Horovod repository https://github.com/uber/horovod
"""
# Standard Library
import argparse
import math
import os

# Third Party
import horovod.tensorflow.keras as hvd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import backend as K
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Conv2D, Dense, Dropout, Flatten, MaxPooling2D
from tensorflow.keras.models import Sequential

# First Party
import smdebug.tensorflow as smd


def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ("yes", "true", "t", "y", "1"):
return True
elif v.lower() in ("no", "false", "f", "n", "0"):
return False
else:
raise argparse.ArgumentTypeError("Boolean value expected.")


def main(args):
# Horovod: initialize Horovod.
hvd.init()

if not args.use_only_cpu:
# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
else:
config = None

K.set_session(tf.Session(config=config))

batch_size = 128
num_classes = 10

# Horovod: adjust number of epochs based on number of GPUs.
epochs = int(math.ceil(args.num_epochs / hvd.size()))

# Input image dimensions
img_rows, img_cols = 28, 28

# The data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()

if K.image_data_format() == "channels_first":
x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
input_shape = (1, img_rows, img_cols)
else:
x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
input_shape = (img_rows, img_cols, 1)

x_train = x_train.astype("float32")
x_test = x_test.astype("float32")
x_train /= 255
x_test /= 255
print("x_train shape:", x_train.shape)
print(x_train.shape[0], "train samples")
print(x_test.shape[0], "test samples")

# Convert class vectors to binary class matrices
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=input_shape))
model.add(Conv2D(64, (3, 3), activation="relu"))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation="relu"))
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation="softmax"))

# Horovod: adjust learning rate based on number of GPUs.
opt = keras.optimizers.Adadelta(1.0 * hvd.size())

# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt)

##### Enabling SageMaker Debugger ###########
# Create hook from the configuration provided through sagemaker python sdk
smd_hook = smd.KerasHook.create_from_json_file()

##### Enabling SageMaker Debugger ###########
# wrap the optimizer so the hook can identify the gradients
opt = smd_hook.wrap_optimizer(opt)

model.compile(loss=keras.losses.categorical_crossentropy, optimizer=opt, metrics=["accuracy"])

callbacks = [
# Horovod: broadcast initial variable states from rank 0 to all other processes.
# This is necessary to ensure consistent initialization of all workers when
# training is started with random weights or restored from a checkpoint.
hvd.callbacks.BroadcastGlobalVariablesCallback(0),
##### Enabling SageMaker Debugger ###########
# pass smd_hook as a callback
smd_hook,
rahul003 marked this conversation as resolved.
Show resolved Hide resolved
]

# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
if hvd.rank() == 0:
callbacks.append(
keras.callbacks.ModelCheckpoint(os.path.join(args.model_dir, "checkpoint-{epoch}.h5"))
)

model.fit(
x_train,
y_train,
batch_size=batch_size,
callbacks=callbacks,
epochs=epochs,
verbose=1 if hvd.rank() == 0 else 0,
validation_data=(x_test, y_test),
)
score = model.evaluate(x_test, y_test, verbose=0)
print("Test loss:", score[0])
print("Test accuracy:", score[1])


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--use_only_cpu", type=str2bool, default=False)
parser.add_argument("--num_epochs", type=int, default=5, help="Number of epochs to train for")
parser.add_argument("--model_dir", type=str, default="/tmp/mnist_model")
args = parser.parse_args()

main(args)
Loading