forked from horovod/horovod
/
tensorflow2_mnist_data_service.py
76 lines (60 loc) · 3.56 KB
/
tensorflow2_mnist_data_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Copyright 2022 G-Research. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
import argparse
import os
import sys
from horovod.runner.common.util import env
from horovod.tensorflow.data.compute_service import TfDataServiceConfig
from tensorflow2_mnist_data_service_train_fn_compute_side_dispatcher import train_fn as train_fn_compute_side
from tensorflow2_mnist_data_service_train_fn_training_side_dispatcher import train_fn as train_fn_training_side
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
# This exemplifies how to use the Tensorflow Compute Service with Horovod.
# The Tensorflow Dispatcher can reside with the training script, or the compute service.
# If you use only one of these options, you can ignore the respective code of the other option in this example.
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("configfile", type=str,
help=f"The path to the compute service config file.")
parser.add_argument("--training-tasks", required=False, type=int,
help=f"The number of training tasks when there is only one dispatcher. "
f"Otherwise there are as many training tasks as there are dispatchers.",
dest="training_tasks")
parser.add_argument("--reuse-dataset", required=False, action="store_true", default=False,
help=f"Reusing the dataset allows the training tasks to reads from a single dispatcher "
f"in a first-come-first-serve manner.",
dest="reuse_dataset")
parser.add_argument("--round-robin", required=False, action="store_true", default=False,
help=f"Reusing the dataset can be done round-robin instead first-come-first-serve.",
dest="round_robin")
parsed_args = parser.parse_args()
compute_config = TfDataServiceConfig.read(parsed_args.configfile, wait_for_file_creation=True)
rank, size = env.get_env_rank_and_size()
if compute_config.dispatchers > 1 and compute_config.dispatchers != size:
print(f'Unless there is only one dispatcher, the number of training tasks ({size}) must match '
f'the number of dispatchers ({compute_config.dispatchers}) configured in the '
f'data service config file ({parsed_args.compute_service_config_file}).', file=sys.stderr)
sys.exit(1)
# pick the right train_fn depending on the dispatcher side
if compute_config.dispatcher_side == 'training':
train_fn = train_fn_training_side
elif compute_config.dispatcher_side == 'compute':
train_fn = train_fn_compute_side
else:
raise ValueError(f'Unsupported dispatcher side: {compute_config.dispatcher_side}')
# run the distributed training
train_fn(compute_config, reuse_dataset=parsed_args.reuse_dataset, round_robin=parsed_args.round_robin)
if rank == 0:
compute = compute_config.compute_client(verbose=2)
compute.shutdown()