diff --git a/horovod/mxnet/mpi_ops.py b/horovod/mxnet/mpi_ops.py index c4b6d9cd2b..6994a26556 100644 --- a/horovod/mxnet/mpi_ops.py +++ b/horovod/mxnet/mpi_ops.py @@ -39,7 +39,6 @@ check_installed_version('mxnet', mx.__version__) # import basic methods -init = _basics.init shutdown = _basics.shutdown is_initialized = _basics.is_initialized start_timeline = _basics.start_timeline @@ -61,6 +60,11 @@ cuda_built = _basics.cuda_built rocm_built = _basics.rocm_built +def init(*args, **kwargs): + _basics.init(*args, **kwargs) + # Call set up again to make sure the basics is in sync + _setup_process_sets(_basics) + dll_path = os.path.join(os.path.dirname(__file__), 'mpi_lib' + get_ext_suffix()) MPI_MXNET_LIB_CTYPES = ctypes.CDLL(dll_path, ctypes.RTLD_GLOBAL) diff --git a/horovod/spark/keras/remote.py b/horovod/spark/keras/remote.py index 5d9c74c412..879e80e12b 100644 --- a/horovod/spark/keras/remote.py +++ b/horovod/spark/keras/remote.py @@ -109,6 +109,7 @@ def train(serialized_model, train_rows, val_rows, avg_row_size): hvd = get_horovod() hvd.init() + pin_gpu(hvd, tf, k) if not user_shuffle_buffer_size: @@ -129,6 +130,9 @@ def train(serialized_model, train_rows, val_rows, avg_row_size): # Verbose mode 1 will print a progress bar verbose = user_verbose if hvd.rank() == 0 else 0 + if verbose: + print(f"Shared lib path is pointing to: {_horovod.common.process_sets._basics.MPI_LIB_CTYPES}") + transform_spec = None if transformation: transform_spec = TransformSpec(transformation) @@ -227,12 +231,6 @@ def train(serialized_model, train_rows, val_rows, avg_row_size): reader_factory = make_batch_reader is_batch_reader = True - # Call _setup again in process set module to point shared lib to tensorflow's module - # since the lib path might be overwritten in remote trainer. - _horovod.common.process_sets._setup(_horovod.tensorflow.mpi_ops._basics) - if verbose: - print(f"Set shared lib path to: {_horovod.common.process_sets._basics.MPI_LIB_CTYPES}") - with reader_factory(remote_store.train_data_path, num_epochs=1, cur_shard=hvd.rank(), diff --git a/horovod/spark/lightning/remote.py b/horovod/spark/lightning/remote.py index a1099ad571..b048f2b2b9 100644 --- a/horovod/spark/lightning/remote.py +++ b/horovod/spark/lightning/remote.py @@ -97,10 +97,14 @@ def RemoteTrainer(estimator, metadata, ckpt_bytes, run_id, dataset_idx, train_ro def train(serialized_model): import horovod.torch as hvd - import horovod as _horovod # Horovod: initialize library. hvd.init() + + if verbose: + import horovod as _horovod + print(f"Shared lib path is pointing to: {_horovod.common.process_sets._basics.MPI_LIB_CTYPES}") + _checkpoint_callback = None require_checkpoint = False @@ -218,12 +222,6 @@ def on_epoch_end(self, trainer: "pl.Trainer", pl_module: "pl.LightningModule") - print(f"pytorch_lightning version={pl.__version__}") - # Call _setup again in process set module to point shared lib to torch's module - # since the lib path might be overwritten in remote trainer. - _horovod.common.process_sets._setup(_horovod.torch.mpi_ops._basics) - if verbose: - print(f"Set shared lib path to: {_horovod.common.process_sets._basics.MPI_LIB_CTYPES}") - dataset = data_module(train_dir=remote_store.train_data_path, val_dir=remote_store.val_data_path, num_train_epochs=epochs, diff --git a/horovod/spark/torch/remote.py b/horovod/spark/torch/remote.py index 9697131e40..6b2f325caa 100644 --- a/horovod/spark/torch/remote.py +++ b/horovod/spark/torch/remote.py @@ -103,7 +103,6 @@ def train(serialized_model, optimizer_cls, model_opt_state_serialized, from petastorm.pytorch import BatchedDataLoader, InMemBatchedDataLoader import torch import horovod.torch as hvd - import horovod as _horovod # Deserializing objects model_opt_state = torch.load(model_opt_state_serialized) @@ -118,6 +117,10 @@ def train(serialized_model, optimizer_cls, model_opt_state_serialized, # Horovod: initialize library. hvd.init() + if user_verbose: + import horovod as _horovod + print(f"Shared lib path is pointing to: {_horovod.common.process_sets._basics.MPI_LIB_CTYPES}") + if not user_shuffle_buffer_size: shuffle_buffer_size = \ calculate_shuffle_buffer_size(hvd, avg_row_size, train_rows / hvd.size()) @@ -228,12 +231,6 @@ def save_checkpoint(): else: reader_factory = make_batch_reader - # Call _setup again in process set module to point shared lib to torch's module - # since the lib path might be overwritten in remote trainer. - _horovod.common.process_sets._setup(_horovod.torch.mpi_ops._basics) - if user_verbose: - print(f"Set shared lib path to: {_horovod.common.process_sets._basics.MPI_LIB_CTYPES}") - # Petastorm: read data from the store with the correct shard for this rank # setting num_epochs=None will cause an infinite iterator # and enables ranks to perform training and validation with diff --git a/horovod/tensorflow/mpi_ops.py b/horovod/tensorflow/mpi_ops.py index ccb9f34705..8865e5843d 100644 --- a/horovod/tensorflow/mpi_ops.py +++ b/horovod/tensorflow/mpi_ops.py @@ -57,7 +57,6 @@ def _load_library(name): _basics = _HorovodBasics(__file__, 'mpi_lib') # import basic methods -init = _basics.init shutdown = _basics.shutdown is_initialized = _basics.is_initialized start_timeline = _basics.start_timeline @@ -84,6 +83,11 @@ def _load_library(name): Sum = _basics.Sum Adasum = _basics.Adasum +def init(*args, **kwargs): + _basics.init(*args, **kwargs) + # Call set up again to make sure the basics is in sync + _setup_process_sets(_basics) + is_homogeneous = _basics.is_homogeneous handle_average_backwards_compatibility = get_average_backwards_compatibility_fun(_basics) diff --git a/horovod/torch/mpi_ops.py b/horovod/torch/mpi_ops.py index ba798a2d59..1fbcc3bc3d 100644 --- a/horovod/torch/mpi_ops.py +++ b/horovod/torch/mpi_ops.py @@ -69,7 +69,9 @@ def shutdown(*args, **kwargs): def init(*args, **kwargs): global _handle_map _handle_map = {} - return _basics.init(*args, **kwargs) + _basics.init(*args, **kwargs) + # Call set up again to make sure the basics is in sync + _setup_process_sets(_basics) # import reduction op values Average = _basics.Average