diff --git a/.azure-pipelines/ipu-tests.yml b/.azure-pipelines/ipu-tests.yml index 065c6983a2abe..42cee6b040ba3 100644 --- a/.azure-pipelines/ipu-tests.yml +++ b/.azure-pipelines/ipu-tests.yml @@ -81,7 +81,7 @@ jobs: - bash: | source ${{ variables.poplar_sdk }}/poplar-ubuntu*/enable.sh source ${{ variables.poplar_sdk }}/popart-ubuntu*/enable.sh - + export POPTORCH_WAIT_FOR_IPU=1 python -m coverage run --source pytorch_lightning -m pytest pytorch_lightning tests -v --junitxml=$(Build.StagingDirectory)/test-results.xml --durations=50 env: MKL_THREADING_LAYER: "GNU" @@ -90,6 +90,7 @@ jobs: - bash: | source ${{ variables.poplar_sdk }}/poplar-ubuntu*/enable.sh source ${{ variables.poplar_sdk }}/popart-ubuntu*/enable.sh + export POPTORCH_WAIT_FOR_IPU=1 bash tests/special_tests.sh env: MKL_THREADING_LAYER: "GNU" diff --git a/CHANGELOG.md b/CHANGELOG.md index f2c9bcb726a79..febab75d5ec5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -362,6 +362,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Fixed a bug where `truncated_bptt_steps` would throw an AttributeError when the target RNN has multiple hidden states ([#8145](https://github.com/PyTorchLightning/pytorch-lightning/pull/8145)) +- Fixed passing a custom `DDPPlugin` when choosing `accelerator="ddp_cpu"` for the accelerator ([#6208](https://github.com/PyTorchLightning/pytorch-lightning/pull/6208)) + + ## [1.3.7] - 2021-06-22 - Fixed a bug where skipping an optimizer while using amp causes amp to trigger an assertion error ([#7975](https://github.com/PyTorchLightning/pytorch-lightning/pull/7975)) diff --git a/pytorch_lightning/plugins/training_type/ddp.py b/pytorch_lightning/plugins/training_type/ddp.py index aba2bf242b790..a882390b78b0d 100644 --- a/pytorch_lightning/plugins/training_type/ddp.py +++ b/pytorch_lightning/plugins/training_type/ddp.py @@ -209,11 +209,9 @@ def _call_children_scripts(self): if self.parallel_devices is None: raise MisconfigurationException("you selected (distribute_backend = ddp) but did not set Trainer(gpus=?)") - os.environ["PL_TRAINER_GPUS"] = ",".join([str(device.index) for device in self.parallel_devices]) os.environ["PL_IN_DDP_SUBPROCESS"] = "1" - num_gpus = len(self.parallel_devices) - os.environ["WORLD_SIZE"] = f"{num_gpus * self.num_nodes}" + os.environ["WORLD_SIZE"] = f"{self.num_processes * self.num_nodes}" self.interactive_ddp_procs = [] diff --git a/tests/accelerators/test_accelerator_connector.py b/tests/accelerators/test_accelerator_connector.py index 9c3bccb4d3283..4a9b01281f784 100644 --- a/tests/accelerators/test_accelerator_connector.py +++ b/tests/accelerators/test_accelerator_connector.py @@ -18,6 +18,7 @@ import pytest import torch +import torch.distributed from pytorch_lightning import Trainer from pytorch_lightning.accelerators.accelerator import Accelerator @@ -385,6 +386,35 @@ def on_fit_start(self, trainer, pl_module): trainer.fit(model) +@RunIf(special=True) +def test_accelerator_choice_ddp_cpu_and_plugin(tmpdir): + """ Test that accelerator="ddp_cpu" can work together with an instance of DDPPlugin. """ + _test_accelerator_choice_ddp_cpu_and_plugin(tmpdir, ddp_plugin_class=DDPPlugin) + + +@RunIf(special=True) +def test_accelerator_choice_ddp_cpu_and_plugin_spawn(tmpdir): + """ Test that accelerator="ddp_cpu" can work together with an instance of DDPPSpawnPlugin. """ + _test_accelerator_choice_ddp_cpu_and_plugin(tmpdir, ddp_plugin_class=DDPSpawnPlugin) + + +def _test_accelerator_choice_ddp_cpu_and_plugin(tmpdir, ddp_plugin_class): + + model = BoringModel() + trainer = Trainer( + default_root_dir=tmpdir, + plugins=[ddp_plugin_class(find_unused_parameters=True)], + fast_dev_run=True, + accelerator='ddp_cpu', + num_processes=2, + ) + assert isinstance(trainer.training_type_plugin, ddp_plugin_class) + assert isinstance(trainer.accelerator, CPUAccelerator) + assert trainer.training_type_plugin.num_processes == 2 + assert trainer.training_type_plugin.parallel_devices == [torch.device("cpu")] * 2 + trainer.fit(model) + + @mock.patch.dict( os.environ, { "SLURM_NTASKS": "2", @@ -396,11 +426,8 @@ def on_fit_start(self, trainer, pl_module): } ) @mock.patch('torch.cuda.device_count', return_value=0) -@mock.patch('pytorch_lightning.plugins.DDPPlugin.setup_distributed', autospec=True) -def test_accelerator_choice_ddp_cpu_custom_cluster(device_count_mock, setup_distributed_mock): - """ - Test that we choose the custom cluster even when SLURM or TE flags are around - """ +def test_accelerator_choice_ddp_cpu_custom_cluster(_, tmpdir): + """ Test that we choose the custom cluster even when SLURM or TE flags are around """ class CustomCluster(LightningEnvironment): @@ -410,25 +437,16 @@ def master_address(self): def creates_children(self) -> bool: return True - class CB(Callback): - - def on_fit_start(self, trainer, pl_module): - assert isinstance(trainer.accelerator, CPUAccelerator) - assert isinstance(trainer.training_type_plugin, DDPPlugin) - assert isinstance(trainer.training_type_plugin.cluster_environment, CustomCluster) - raise SystemExit() - - model = BoringModel() trainer = Trainer( + default_root_dir=tmpdir, plugins=[CustomCluster()], fast_dev_run=True, accelerator='ddp_cpu', num_processes=2, - callbacks=[CB()], ) - - with pytest.raises(SystemExit): - trainer.fit(model) + assert isinstance(trainer.accelerator, CPUAccelerator) + assert isinstance(trainer.training_type_plugin, DDPPlugin) + assert isinstance(trainer.training_type_plugin.cluster_environment, CustomCluster) @mock.patch.dict( diff --git a/tests/accelerators/test_ddp.py b/tests/accelerators/test_ddp.py index 31f6cd1b0687f..f38d08df3daf9 100644 --- a/tests/accelerators/test_ddp.py +++ b/tests/accelerators/test_ddp.py @@ -126,8 +126,16 @@ def setup(self, stage: Optional[str] = None) -> None: @RunIf(min_gpus=2, min_torch="1.8.1", special=True) -@pytest.mark.parametrize("precision", [16, 32]) -def test_ddp_wrapper(tmpdir, precision): +def test_ddp_wrapper_16(tmpdir): + _test_ddp_wrapper(tmpdir, precision=16) + + +@RunIf(min_gpus=2, min_torch="1.8.1", special=True) +def test_ddp_wrapper_32(tmpdir): + _test_ddp_wrapper(tmpdir, precision=32) + + +def _test_ddp_wrapper(tmpdir, precision): """ Test parameters to ignore are carried over for DDP. """ diff --git a/tests/callbacks/test_progress_bar.py b/tests/callbacks/test_progress_bar.py index 2eb1bdf690c30..aafb29d51b161 100644 --- a/tests/callbacks/test_progress_bar.py +++ b/tests/callbacks/test_progress_bar.py @@ -543,22 +543,33 @@ def test_progress_bar_can_be_pickled(): @RunIf(min_gpus=2, special=True) -@pytest.mark.parametrize([ - "total_train_samples", - "train_batch_size", - "total_val_samples", - "val_batch_size", - "val_check_interval", -], [ - (8, 4, 2, 1, 0.2), - (8, 4, 2, 1, 0.5), -]) -def test_progress_bar_max_val_check_interval( - total_train_samples, train_batch_size, total_val_samples, val_batch_size, val_check_interval, tmpdir -): +def test_progress_bar_max_val_check_interval_0(tmpdir): + _test_progress_bar_max_val_check_interval( + tmpdir, + total_train_samples=8, + train_batch_size=4, + total_val_samples=2, + val_batch_size=1, + val_check_interval=0.2 + ) + + +@RunIf(min_gpus=2, special=True) +def test_progress_bar_max_val_check_interval_1(tmpdir): + _test_progress_bar_max_val_check_interval( + tmpdir, + total_train_samples=8, + train_batch_size=4, + total_val_samples=2, + val_batch_size=1, + val_check_interval=0.5 + ) - world_size = 2 +def _test_progress_bar_max_val_check_interval( + tmpdir, total_train_samples, train_batch_size, total_val_samples, val_batch_size, val_check_interval +): + world_size = 2 train_data = DataLoader(RandomDataset(32, total_train_samples), batch_size=train_batch_size) val_data = DataLoader(RandomDataset(32, total_val_samples), batch_size=val_batch_size) diff --git a/tests/callbacks/test_pruning.py b/tests/callbacks/test_pruning.py index f198b29d24e84..1a5ddad64106e 100644 --- a/tests/callbacks/test_pruning.py +++ b/tests/callbacks/test_pruning.py @@ -162,13 +162,44 @@ def test_pruning_callback( @RunIf(special=True, min_gpus=2) -@pytest.mark.parametrize("parameters_to_prune", [False, True]) -@pytest.mark.parametrize("use_global_unstructured", [False, True]) -def test_pruning_callback_ddp(tmpdir, use_global_unstructured: bool, parameters_to_prune: bool): +def test_pruning_callback_ddp_0(tmpdir): train_with_pruning_callback( tmpdir, - parameters_to_prune=parameters_to_prune, - use_global_unstructured=use_global_unstructured, + parameters_to_prune=False, + use_global_unstructured=False, + accelerator="ddp", + gpus=2, + ) + + +@RunIf(special=True, min_gpus=2) +def test_pruning_callback_ddp_1(tmpdir): + train_with_pruning_callback( + tmpdir, + parameters_to_prune=False, + use_global_unstructured=True, + accelerator="ddp", + gpus=2, + ) + + +@RunIf(special=True, min_gpus=2) +def test_pruning_callback_ddp_2(tmpdir): + train_with_pruning_callback( + tmpdir, + parameters_to_prune=True, + use_global_unstructured=False, + accelerator="ddp", + gpus=2, + ) + + +@RunIf(special=True, min_gpus=2) +def test_pruning_callback_ddp_3(tmpdir): + train_with_pruning_callback( + tmpdir, + parameters_to_prune=True, + use_global_unstructured=True, accelerator="ddp", gpus=2, ) diff --git a/tests/checkpointing/test_checkpoint_callback_frequency.py b/tests/checkpointing/test_checkpoint_callback_frequency.py index 0073676a77eec..8617a9f8f7050 100644 --- a/tests/checkpointing/test_checkpoint_callback_frequency.py +++ b/tests/checkpointing/test_checkpoint_callback_frequency.py @@ -107,8 +107,17 @@ def training_step(self, batch, batch_idx): @mock.patch('torch.save') @RunIf(special=True, min_gpus=2) -@pytest.mark.parametrize(['k', 'epochs', 'val_check_interval', 'expected'], [(1, 1, 1.0, 1), (2, 2, 0.3, 5)]) -def test_top_k_ddp(save_mock, tmpdir, k, epochs, val_check_interval, expected): +def test_top_k_ddp_0(save_mock, tmpdir): + _top_k_ddp(save_mock, tmpdir, k=1, epochs=1, val_check_interval=1.0, expected=1) + + +@mock.patch('torch.save') +@RunIf(special=True, min_gpus=2) +def test_top_k_ddp_1(save_mock, tmpdir): + _top_k_ddp(save_mock, tmpdir, k=2, epochs=2, val_check_interval=0.3, expected=5) + + +def _top_k_ddp(save_mock, tmpdir, k, epochs, val_check_interval, expected): class TestModel(BoringModel): diff --git a/tests/conftest.py b/tests/conftest.py index 7f6407ecfd82b..3f767d8b6fad2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -18,6 +18,7 @@ from http.server import SimpleHTTPRequestHandler import pytest +import torch.distributed import torch.multiprocessing as mp @@ -41,6 +42,14 @@ def restore_env_variables(): os.environ.update(env_backup) +@pytest.fixture(scope="function", autouse=True) +def teardown_process_group(): + """ Ensures that the distributed process group gets closed before the next test runs. """ + yield + if torch.distributed.is_available() and torch.distributed.is_initialized(): + torch.distributed.destroy_process_group() + + def pytest_configure(config): config.addinivalue_line("markers", "spawn: spawn test in a separate process using torch.multiprocessing.spawn") diff --git a/tests/plugins/test_deepspeed_plugin.py b/tests/plugins/test_deepspeed_plugin.py index c5eaadd1e5985..efe8da981c9eb 100644 --- a/tests/plugins/test_deepspeed_plugin.py +++ b/tests/plugins/test_deepspeed_plugin.py @@ -640,8 +640,16 @@ def test_deepspeed_multigpu_stage_3_checkpointing_full_weights_manual(tmpdir): @RunIf(min_gpus=2, deepspeed=True, special=True) -@pytest.mark.parametrize('offload_optimizer', [True, False]) -def test_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer): +def test_deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir): + _deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer=False) + + +@RunIf(min_gpus=2, deepspeed=True, special=True) +def test_deepspeed_multigpu_stage_2_accumulated_grad_batches_offload_optimizer(tmpdir): + _deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer=True) + + +def _deepspeed_multigpu_stage_2_accumulated_grad_batches(tmpdir, offload_optimizer): """ Test to ensure with Stage 2 and multiple GPUs, accumulated grad batches works. """ diff --git a/tests/trainer/test_data_loading.py b/tests/trainer/test_data_loading.py index 831fc474336b6..5d4da1be7ddbe 100644 --- a/tests/trainer/test_data_loading.py +++ b/tests/trainer/test_data_loading.py @@ -98,9 +98,13 @@ def check_replace_distributed_sampler(tmpdir, save_preds_on_dl_idx, accelerator, @RunIf(min_gpus=2, special=True) -@pytest.mark.parametrize("mode", [1, 2]) -def test_replace_distributed_sampler_custom_dataloader_custom_batch_sampler(tmpdir, mode): - check_replace_distributed_sampler(tmpdir, True, "ddp", 2, 2, mode) +def test_replace_distributed_sampler_custom_dataloader_custom_batch_sampler_0(tmpdir): + check_replace_distributed_sampler(tmpdir, True, "ddp", 2, 2, mode=1) + + +@RunIf(min_gpus=2, special=True) +def test_replace_distributed_sampler_custom_dataloader_custom_batch_sampler_1(tmpdir): + check_replace_distributed_sampler(tmpdir, True, "ddp", 2, 2, mode=2) @pytest.mark.parametrize("num_workers", [0, 1])