diff --git a/CHANGELOG.md b/CHANGELOG.md index e5d266c77dfd8..f7b251939ce14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -157,6 +157,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - Use only a single instance of `rich.console.Console` throughout codebase ([#12886](https://github.com/PyTorchLightning/pytorch-lightning/pull/12886)) +- Fixed an issue to ensure all the checkpoint states are saved in a common filepath with `DeepspeedStrategy` ([#12887](https://github.com/PyTorchLightning/pytorch-lightning/pull/12887)) + + ## [1.6.1] - 2022-04-13 ### Changed diff --git a/pytorch_lightning/strategies/deepspeed.py b/pytorch_lightning/strategies/deepspeed.py index 7874322a4d74a..b6a353ae15e7b 100644 --- a/pytorch_lightning/strategies/deepspeed.py +++ b/pytorch_lightning/strategies/deepspeed.py @@ -762,6 +762,9 @@ def save_checkpoint(self, checkpoint: Dict, filepath: _PATH, storage_options: Op TypeError: If ``storage_options`` arg is passed in """ + # broadcast the filepath from rank 0 to ensure all the states are saved in a common filepath + filepath = self.broadcast(filepath) + if storage_options is not None: raise TypeError( "`Trainer.save_checkpoint(..., storage_options=...)` with `storage_options` arg" diff --git a/tests/strategies/test_deepspeed_strategy.py b/tests/strategies/test_deepspeed_strategy.py index 319289d200f4f..dd9bfcea236d2 100644 --- a/tests/strategies/test_deepspeed_strategy.py +++ b/tests/strategies/test_deepspeed_strategy.py @@ -1269,13 +1269,19 @@ def test_deepspeed_with_meta_device(tmpdir): def test_deepspeed_multi_save_same_filepath(tmpdir): """Test that verifies that deepspeed saves only latest checkpoint in the specified path and deletes the old sharded checkpoints.""" - model = BoringModel() + + class CustomModel(BoringModel): + def training_step(self, *args, **kwargs): + self.log("grank", self.global_rank) + return super().training_step(*args, **kwargs) + + model = CustomModel() trainer = Trainer( default_root_dir=tmpdir, strategy="deepspeed", accelerator="gpu", devices=2, - callbacks=[ModelCheckpoint(save_top_k=1, save_last=True)], + callbacks=[ModelCheckpoint(filename="{epoch}_{step}_{grank}", save_top_k=1)], limit_train_batches=1, limit_val_batches=0, num_sanity_val_steps=0, @@ -1284,9 +1290,14 @@ def test_deepspeed_multi_save_same_filepath(tmpdir): enable_model_summary=False, ) trainer.fit(model) - ckpt_path = os.path.join(trainer.checkpoint_callback.dirpath, "last.ckpt") - expected = ["latest", "zero_to_fp32.py", "checkpoint"] - assert set(expected) == set(os.listdir(ckpt_path)) + + filepath = "epoch=1_step=2_grank=0.0.ckpt" + expected = {filepath} + assert expected == set(os.listdir(trainer.checkpoint_callback.dirpath)) + + ckpt_path = os.path.join(trainer.checkpoint_callback.dirpath, filepath) + expected = {"latest", "zero_to_fp32.py", "checkpoint"} + assert expected == set(os.listdir(ckpt_path)) @RunIf(min_gpus=2, standalone=True, deepspeed=True)