From bace0ba3061950fefe2d2565e2d17870d23cdb2b Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Mon, 25 Apr 2022 16:34:37 -0400 Subject: [PATCH 1/6] Fix ddp_comm_hook tests --- .../test_ddp_strategy_with_comm_hook.py | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/tests/strategies/test_ddp_strategy_with_comm_hook.py b/tests/strategies/test_ddp_strategy_with_comm_hook.py index 9445565ec9c3b..a4d0de70b45de 100644 --- a/tests/strategies/test_ddp_strategy_with_comm_hook.py +++ b/tests/strategies/test_ddp_strategy_with_comm_hook.py @@ -33,8 +33,16 @@ @RunIf(min_gpus=2, min_torch="1.9.0", skip_windows=True, standalone=True) def test_ddp_fp16_compress_comm_hook(tmpdir): """Test for DDP FP16 compress hook.""" + class TestDDPStrategy(DDPStrategy): + def teardown(self): + # check here before unwrapping DistributedDataParallel in self.teardown + trainer_comm_hook = trainer.strategy.model._get_ddp_logging_data()["comm_hook"] + expected_comm_hook = default.fp16_compress_hook.__qualname__ + assert trainer_comm_hook == expected_comm_hook + return super().teardown() + model = BoringModel() - strategy = DDPStrategy(ddp_comm_hook=default.fp16_compress_hook) + strategy = TestDDPStrategy(ddp_comm_hook=default.fp16_compress_hook) trainer = Trainer( max_epochs=1, accelerator="gpu", @@ -47,17 +55,22 @@ def test_ddp_fp16_compress_comm_hook(tmpdir): enable_model_summary=False, ) trainer.fit(model) - trainer_comm_hook = trainer.strategy.model.get_ddp_logging_data().comm_hook - expected_comm_hook = default.fp16_compress_hook.__qualname__ - assert trainer_comm_hook == expected_comm_hook assert trainer.state.finished, f"Training failed with {trainer.state}" @RunIf(min_gpus=2, min_torch="1.9.0", skip_windows=True, standalone=True) def test_ddp_sgd_comm_hook(tmpdir): """Test for DDP FP16 compress hook.""" + class TestDDPStrategy(DDPStrategy): + def teardown(self): + # check here before unwrapping DistributedDataParallel in self.teardown + trainer_comm_hook = trainer.strategy.model._get_ddp_logging_data()["comm_hook"] + expected_comm_hook = powerSGD.powerSGD_hook.__qualname__ + assert trainer_comm_hook == expected_comm_hook + return super().teardown() + model = BoringModel() - strategy = DDPStrategy( + strategy = TestDDPStrategy( ddp_comm_state=powerSGD.PowerSGDState(process_group=None), ddp_comm_hook=powerSGD.powerSGD_hook, ) @@ -73,17 +86,22 @@ def test_ddp_sgd_comm_hook(tmpdir): enable_model_summary=False, ) trainer.fit(model) - trainer_comm_hook = trainer.strategy.model.get_ddp_logging_data().comm_hook - expected_comm_hook = powerSGD.powerSGD_hook.__qualname__ - assert trainer_comm_hook == expected_comm_hook assert trainer.state.finished, f"Training failed with {trainer.state}" @RunIf(min_gpus=2, min_torch="1.9.0", skip_windows=True, standalone=True) def test_ddp_fp16_compress_wrap_sgd_comm_hook(tmpdir): """Test for DDP FP16 compress wrapper for SGD hook.""" + class TestDDPStrategy(DDPStrategy): + def teardown(self): + # check here before unwrapping DistributedDataParallel in self.teardown + trainer_comm_hook = trainer.strategy.model._get_ddp_logging_data()["comm_hook"] + expected_comm_hook = default.fp16_compress_wrapper(powerSGD.powerSGD_hook).__qualname__ + assert trainer_comm_hook == expected_comm_hook + return super().teardown() + model = BoringModel() - strategy = DDPStrategy( + strategy = TestDDPStrategy( ddp_comm_state=powerSGD.PowerSGDState(process_group=None), ddp_comm_hook=powerSGD.powerSGD_hook, ddp_comm_wrapper=default.fp16_compress_wrapper, @@ -100,12 +118,10 @@ def test_ddp_fp16_compress_wrap_sgd_comm_hook(tmpdir): enable_model_summary=False, ) trainer.fit(model) - trainer_comm_hook = trainer.strategy.model.get_ddp_logging_data().comm_hook - expected_comm_hook = default.fp16_compress_wrapper(powerSGD.powerSGD_hook).__qualname__ - assert trainer_comm_hook == expected_comm_hook assert trainer.state.finished, f"Training failed with {trainer.state}" + @RunIf(min_gpus=2, min_torch="1.9.0", skip_windows=True, standalone=True) def test_ddp_spawn_fp16_compress_comm_hook(tmpdir): """Test for DDP Spawn FP16 compress hook.""" @@ -151,7 +167,7 @@ def test_ddp_post_local_sgd_comm_hook(tmpdir): enable_model_summary=False, ) trainer.fit(model) - trainer_comm_hook = trainer.strategy.model.get_ddp_logging_data().comm_hook + trainer_comm_hook = trainer.strategy.model._get_ddp_logging_data()["comm_hook"] expected_comm_hook = post_localSGD.post_localSGD_hook.__qualname__ assert trainer_comm_hook == expected_comm_hook assert trainer.state.finished, f"Training failed with {trainer.state}" From f5a333fcb6417dcaa2269ae578d305ebd6beeaa0 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Mon, 25 Apr 2022 16:59:53 -0400 Subject: [PATCH 2/6] Fix ddp_comm_hook tests --- .../test_ddp_strategy_with_comm_hook.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/tests/strategies/test_ddp_strategy_with_comm_hook.py b/tests/strategies/test_ddp_strategy_with_comm_hook.py index a4d0de70b45de..9b0ce725ff96d 100644 --- a/tests/strategies/test_ddp_strategy_with_comm_hook.py +++ b/tests/strategies/test_ddp_strategy_with_comm_hook.py @@ -36,7 +36,7 @@ def test_ddp_fp16_compress_comm_hook(tmpdir): class TestDDPStrategy(DDPStrategy): def teardown(self): # check here before unwrapping DistributedDataParallel in self.teardown - trainer_comm_hook = trainer.strategy.model._get_ddp_logging_data()["comm_hook"] + trainer_comm_hook = self.model._get_ddp_logging_data()["comm_hook"] expected_comm_hook = default.fp16_compress_hook.__qualname__ assert trainer_comm_hook == expected_comm_hook return super().teardown() @@ -64,7 +64,7 @@ def test_ddp_sgd_comm_hook(tmpdir): class TestDDPStrategy(DDPStrategy): def teardown(self): # check here before unwrapping DistributedDataParallel in self.teardown - trainer_comm_hook = trainer.strategy.model._get_ddp_logging_data()["comm_hook"] + trainer_comm_hook = self.model._get_ddp_logging_data()["comm_hook"] expected_comm_hook = powerSGD.powerSGD_hook.__qualname__ assert trainer_comm_hook == expected_comm_hook return super().teardown() @@ -95,7 +95,7 @@ def test_ddp_fp16_compress_wrap_sgd_comm_hook(tmpdir): class TestDDPStrategy(DDPStrategy): def teardown(self): # check here before unwrapping DistributedDataParallel in self.teardown - trainer_comm_hook = trainer.strategy.model._get_ddp_logging_data()["comm_hook"] + trainer_comm_hook = self.model._get_ddp_logging_data()["comm_hook"] expected_comm_hook = default.fp16_compress_wrapper(powerSGD.powerSGD_hook).__qualname__ assert trainer_comm_hook == expected_comm_hook return super().teardown() @@ -145,6 +145,14 @@ def test_ddp_spawn_fp16_compress_comm_hook(tmpdir): @RunIf(min_gpus=2, min_torch="1.10.0", skip_windows=True, standalone=True) def test_ddp_post_local_sgd_comm_hook(tmpdir): """Test for DDP post-localSGD hook.""" + class TestDDPStrategy(DDPStrategy): + def teardown(self): + # check here before unwrapping DistributedDataParallel in self.teardown + trainer_comm_hook = self.strategy.model._get_ddp_logging_data()["comm_hook"] + expected_comm_hook = post_localSGD.post_localSGD_hook.__qualname__ + assert trainer_comm_hook == expected_comm_hook + return super().teardown() + model = BoringModel() strategy = DDPStrategy( @@ -167,9 +175,6 @@ def test_ddp_post_local_sgd_comm_hook(tmpdir): enable_model_summary=False, ) trainer.fit(model) - trainer_comm_hook = trainer.strategy.model._get_ddp_logging_data()["comm_hook"] - expected_comm_hook = post_localSGD.post_localSGD_hook.__qualname__ - assert trainer_comm_hook == expected_comm_hook assert trainer.state.finished, f"Training failed with {trainer.state}" From f5fbebbd8d3422a53bf2b2762e704a4fdc58fea6 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Tue, 26 Apr 2022 06:46:38 +0900 Subject: [PATCH 3/6] Fix ddp_comm_hook tests --- tests/strategies/test_ddp_strategy_with_comm_hook.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/strategies/test_ddp_strategy_with_comm_hook.py b/tests/strategies/test_ddp_strategy_with_comm_hook.py index 9b0ce725ff96d..c511caafc2d2e 100644 --- a/tests/strategies/test_ddp_strategy_with_comm_hook.py +++ b/tests/strategies/test_ddp_strategy_with_comm_hook.py @@ -155,7 +155,7 @@ def teardown(self): model = BoringModel() - strategy = DDPStrategy( + strategy = TestDDPStrategy( ddp_comm_state=post_localSGD.PostLocalSGDState( process_group=None, subgroup=None, From af6290b61eee0317d74d7fab4e5700eba3cd16fc Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Tue, 26 Apr 2022 06:46:38 +0900 Subject: [PATCH 4/6] Fix ddp_comm_hook tests --- tests/strategies/test_ddp_strategy_with_comm_hook.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/strategies/test_ddp_strategy_with_comm_hook.py b/tests/strategies/test_ddp_strategy_with_comm_hook.py index c511caafc2d2e..a0da9db9297e7 100644 --- a/tests/strategies/test_ddp_strategy_with_comm_hook.py +++ b/tests/strategies/test_ddp_strategy_with_comm_hook.py @@ -148,7 +148,7 @@ def test_ddp_post_local_sgd_comm_hook(tmpdir): class TestDDPStrategy(DDPStrategy): def teardown(self): # check here before unwrapping DistributedDataParallel in self.teardown - trainer_comm_hook = self.strategy.model._get_ddp_logging_data()["comm_hook"] + trainer_comm_hook = self.model._get_ddp_logging_data()["comm_hook"] expected_comm_hook = post_localSGD.post_localSGD_hook.__qualname__ assert trainer_comm_hook == expected_comm_hook return super().teardown() From 664ede277703f72283bb70414cb2892429bf6e4d Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 26 Apr 2022 01:47:16 +0000 Subject: [PATCH 5/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/strategies/test_ddp_strategy_with_comm_hook.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/strategies/test_ddp_strategy_with_comm_hook.py b/tests/strategies/test_ddp_strategy_with_comm_hook.py index a0da9db9297e7..d10e3c5e43b9a 100644 --- a/tests/strategies/test_ddp_strategy_with_comm_hook.py +++ b/tests/strategies/test_ddp_strategy_with_comm_hook.py @@ -33,6 +33,7 @@ @RunIf(min_gpus=2, min_torch="1.9.0", skip_windows=True, standalone=True) def test_ddp_fp16_compress_comm_hook(tmpdir): """Test for DDP FP16 compress hook.""" + class TestDDPStrategy(DDPStrategy): def teardown(self): # check here before unwrapping DistributedDataParallel in self.teardown @@ -61,6 +62,7 @@ def teardown(self): @RunIf(min_gpus=2, min_torch="1.9.0", skip_windows=True, standalone=True) def test_ddp_sgd_comm_hook(tmpdir): """Test for DDP FP16 compress hook.""" + class TestDDPStrategy(DDPStrategy): def teardown(self): # check here before unwrapping DistributedDataParallel in self.teardown @@ -92,6 +94,7 @@ def teardown(self): @RunIf(min_gpus=2, min_torch="1.9.0", skip_windows=True, standalone=True) def test_ddp_fp16_compress_wrap_sgd_comm_hook(tmpdir): """Test for DDP FP16 compress wrapper for SGD hook.""" + class TestDDPStrategy(DDPStrategy): def teardown(self): # check here before unwrapping DistributedDataParallel in self.teardown @@ -121,7 +124,6 @@ def teardown(self): assert trainer.state.finished, f"Training failed with {trainer.state}" - @RunIf(min_gpus=2, min_torch="1.9.0", skip_windows=True, standalone=True) def test_ddp_spawn_fp16_compress_comm_hook(tmpdir): """Test for DDP Spawn FP16 compress hook.""" @@ -145,6 +147,7 @@ def test_ddp_spawn_fp16_compress_comm_hook(tmpdir): @RunIf(min_gpus=2, min_torch="1.10.0", skip_windows=True, standalone=True) def test_ddp_post_local_sgd_comm_hook(tmpdir): """Test for DDP post-localSGD hook.""" + class TestDDPStrategy(DDPStrategy): def teardown(self): # check here before unwrapping DistributedDataParallel in self.teardown From 37609d2158c3b25915bf99c389889470a60a51e2 Mon Sep 17 00:00:00 2001 From: Akihiro Nitta Date: Tue, 26 Apr 2022 09:52:02 +0900 Subject: [PATCH 6/6] Refactor ddp_comm_hook tests --- .../test_ddp_strategy_with_comm_hook.py | 57 +++++++------------ 1 file changed, 19 insertions(+), 38 deletions(-) diff --git a/tests/strategies/test_ddp_strategy_with_comm_hook.py b/tests/strategies/test_ddp_strategy_with_comm_hook.py index d10e3c5e43b9a..dada03e83a5a4 100644 --- a/tests/strategies/test_ddp_strategy_with_comm_hook.py +++ b/tests/strategies/test_ddp_strategy_with_comm_hook.py @@ -30,20 +30,26 @@ import torch.distributed.algorithms.ddp_comm_hooks.post_localSGD_hook as post_localSGD +class TestDDPStrategy(DDPStrategy): + def __init__(self, expected_ddp_comm_hook_name, *args, **kwargs): + self.expected_ddp_comm_hook_name = expected_ddp_comm_hook_name + super().__init__(*args, **kwargs) + + def teardown(self): + # check here before unwrapping DistributedDataParallel in self.teardown + attached_ddp_comm_hook_name = self.model._get_ddp_logging_data()["comm_hook"] + assert attached_ddp_comm_hook_name == self.expected_ddp_comm_hook_name + return super().teardown() + + @RunIf(min_gpus=2, min_torch="1.9.0", skip_windows=True, standalone=True) def test_ddp_fp16_compress_comm_hook(tmpdir): """Test for DDP FP16 compress hook.""" - - class TestDDPStrategy(DDPStrategy): - def teardown(self): - # check here before unwrapping DistributedDataParallel in self.teardown - trainer_comm_hook = self.model._get_ddp_logging_data()["comm_hook"] - expected_comm_hook = default.fp16_compress_hook.__qualname__ - assert trainer_comm_hook == expected_comm_hook - return super().teardown() - model = BoringModel() - strategy = TestDDPStrategy(ddp_comm_hook=default.fp16_compress_hook) + strategy = TestDDPStrategy( + expected_ddp_comm_hook_name=default.fp16_compress_hook.__qualname__, + ddp_comm_hook=default.fp16_compress_hook, + ) trainer = Trainer( max_epochs=1, accelerator="gpu", @@ -62,17 +68,9 @@ def teardown(self): @RunIf(min_gpus=2, min_torch="1.9.0", skip_windows=True, standalone=True) def test_ddp_sgd_comm_hook(tmpdir): """Test for DDP FP16 compress hook.""" - - class TestDDPStrategy(DDPStrategy): - def teardown(self): - # check here before unwrapping DistributedDataParallel in self.teardown - trainer_comm_hook = self.model._get_ddp_logging_data()["comm_hook"] - expected_comm_hook = powerSGD.powerSGD_hook.__qualname__ - assert trainer_comm_hook == expected_comm_hook - return super().teardown() - model = BoringModel() strategy = TestDDPStrategy( + expected_ddp_comm_hook_name=powerSGD.powerSGD_hook.__qualname__, ddp_comm_state=powerSGD.PowerSGDState(process_group=None), ddp_comm_hook=powerSGD.powerSGD_hook, ) @@ -94,17 +92,9 @@ def teardown(self): @RunIf(min_gpus=2, min_torch="1.9.0", skip_windows=True, standalone=True) def test_ddp_fp16_compress_wrap_sgd_comm_hook(tmpdir): """Test for DDP FP16 compress wrapper for SGD hook.""" - - class TestDDPStrategy(DDPStrategy): - def teardown(self): - # check here before unwrapping DistributedDataParallel in self.teardown - trainer_comm_hook = self.model._get_ddp_logging_data()["comm_hook"] - expected_comm_hook = default.fp16_compress_wrapper(powerSGD.powerSGD_hook).__qualname__ - assert trainer_comm_hook == expected_comm_hook - return super().teardown() - model = BoringModel() strategy = TestDDPStrategy( + expected_ddp_comm_hook_name=default.fp16_compress_wrapper(powerSGD.powerSGD_hook).__qualname__, ddp_comm_state=powerSGD.PowerSGDState(process_group=None), ddp_comm_hook=powerSGD.powerSGD_hook, ddp_comm_wrapper=default.fp16_compress_wrapper, @@ -147,18 +137,9 @@ def test_ddp_spawn_fp16_compress_comm_hook(tmpdir): @RunIf(min_gpus=2, min_torch="1.10.0", skip_windows=True, standalone=True) def test_ddp_post_local_sgd_comm_hook(tmpdir): """Test for DDP post-localSGD hook.""" - - class TestDDPStrategy(DDPStrategy): - def teardown(self): - # check here before unwrapping DistributedDataParallel in self.teardown - trainer_comm_hook = self.model._get_ddp_logging_data()["comm_hook"] - expected_comm_hook = post_localSGD.post_localSGD_hook.__qualname__ - assert trainer_comm_hook == expected_comm_hook - return super().teardown() - model = BoringModel() - strategy = TestDDPStrategy( + expected_ddp_comm_hook_name=post_localSGD.post_localSGD_hook.__qualname__, ddp_comm_state=post_localSGD.PostLocalSGDState( process_group=None, subgroup=None,