diff --git a/examples/components/CCFRAUD/traininsilo/conda.yaml b/examples/components/CCFRAUD/traininsilo/conda.yaml index 341acc18..3c80bc96 100644 --- a/examples/components/CCFRAUD/traininsilo/conda.yaml +++ b/examples/components/CCFRAUD/traininsilo/conda.yaml @@ -2,12 +2,12 @@ name: ccfraud_train_conda_env channels: - defaults - pytorch + - nvidia dependencies: - python=3.8 - pip=22.3.1 - pytorch=1.13.1 - - torchvision=0.14.1 - - cudatoolkit=11.3 + - pytorch-cuda=11.6 - pip: - azureml-mlflow==1.48.0 - pandas==1.5.2 diff --git a/examples/components/CCFRAUD/traininsilo/run.py b/examples/components/CCFRAUD/traininsilo/run.py index 4927ed55..87169053 100644 --- a/examples/components/CCFRAUD/traininsilo/run.py +++ b/examples/components/CCFRAUD/traininsilo/run.py @@ -569,13 +569,13 @@ def run(args): logger.info(f"Distributed process rank: {os.environ['RANK']}") logger.info(f"Distributed world size: {os.environ['WORLD_SIZE']}") - if int(os.environ["WORLD_SIZE"]) > 1 and torch.cuda.is_available(): + if int(os.environ.get("WORLD_SIZE", "1")) > 1 and torch.cuda.is_available(): dist.init_process_group( "nccl", rank=int(os.environ["RANK"]), - world_size=int(os.environ["WORLD_SIZE"]), + world_size=int(os.environ.get("WORLD_SIZE", "1")), ) - elif int(os.environ["WORLD_SIZE"]) > 1: + elif int(os.environ.get("WORLD_SIZE", "1")) > 1: dist.init_process_group("gloo") trainer = CCFraudTrainer( @@ -594,11 +594,12 @@ def run(args): experiment_name=args.metrics_prefix, iteration_name=args.iteration_name, device_id=int(os.environ["RANK"]), - distributed=int(os.environ["WORLD_SIZE"]) > 1 and torch.cuda.is_available(), + distributed=int(os.environ.get("WORLD_SIZE", "1")) > 1 + and torch.cuda.is_available(), ) trainer.execute(args.checkpoint) - if torch.cuda.is_available() or int(os.environ["WORLD_SIZE"]) > 1: + if int(os.environ.get("WORLD_SIZE", "1")) > 1: dist.destroy_process_group() diff --git a/examples/components/MNIST/traininsilo/conda.yaml b/examples/components/MNIST/traininsilo/conda.yaml index 3f94ec4f..823fe491 100644 --- a/examples/components/MNIST/traininsilo/conda.yaml +++ b/examples/components/MNIST/traininsilo/conda.yaml @@ -2,12 +2,13 @@ name: mnist_conda_env channels: - defaults - pytorch + - nvidia dependencies: - python=3.8 - pip=22.3.1 - pytorch=1.13.1 - torchvision=0.14.1 - - cudatoolkit=11.3 + - pytorch-cuda=11.6 - pip: - azureml-mlflow==1.48.0 - pandas==1.5.2 diff --git a/examples/components/NER/traininsilo/conda.yaml b/examples/components/NER/traininsilo/conda.yaml index d17df465..4a44e2be 100644 --- a/examples/components/NER/traininsilo/conda.yaml +++ b/examples/components/NER/traininsilo/conda.yaml @@ -2,11 +2,12 @@ name: ner_train_conda_env channels: - defaults - pytorch + - nvidia dependencies: - python=3.8 - pip=22.3.1 - pytorch=1.13.1 - - cudatoolkit=11.3 + - pytorch-cuda=11.6 - pip: - azureml-mlflow==1.48.0 - pandas==1.5.2 diff --git a/examples/components/NER/traininsilo/run.py b/examples/components/NER/traininsilo/run.py index 835d86d0..b898d801 100644 --- a/examples/components/NER/traininsilo/run.py +++ b/examples/components/NER/traininsilo/run.py @@ -140,15 +140,15 @@ def __init__( if self._distributed: logger.info("Setting up distributed samplers.") - self.train_sampler_ = DistributedSampler(self.train_dataset_) - self.test_sampler_ = DistributedSampler(self.test_dataset_) + self.train_sampler_ = DistributedSampler(train_dataset) + self.test_sampler_ = DistributedSampler(test_dataset) else: self.train_sampler_ = None self.test_sampler_ = None self.train_loader_ = DataLoader( train_dataset, - shuffle=True, + shuffle=(not self._distributed), collate_fn=data_collator, batch_size=self._batch_size, sampler=self.train_sampler_, @@ -157,7 +157,7 @@ def __init__( test_dataset, collate_fn=data_collator, batch_size=self._batch_size, - sampler=self.train_sampler_, + sampler=self.test_sampler_, ) logger.info(f"Train loader steps: {len(self.train_loader_)}") @@ -179,19 +179,19 @@ def __init__( trainable_params += p.numel() logger.info(f"Trainable parameters: {trainable_params}") - self.model_.train() + self.model_.to(self.device_) if self._distributed: self.model_ = DDP( self.model_, device_ids=[self._rank] if self._rank is not None else None, output_device=self._rank, ) - self.model_.to(self.device_) self.metric_ = evaluate.load("seqeval") # DP logger.info(f"DP: {dp}") if dp: + self.model_.train() if not ModuleValidator.is_valid(self.model_): self.model_ = ModuleValidator.fix(self.model_) @@ -625,13 +625,13 @@ def run(args): logger.info(f"Distributed process rank: {os.environ['RANK']}") logger.info(f"Distributed world size: {os.environ['WORLD_SIZE']}") - if int(os.environ["WORLD_SIZE"]) > 1 and torch.cuda.is_available(): + if int(os.environ.get("WORLD_SIZE", "1")) > 1 and torch.cuda.is_available(): dist.init_process_group( "nccl", rank=int(os.environ["RANK"]), - world_size=int(os.environ["WORLD_SIZE"]), + world_size=int(os.environ.get("WORLD_SIZE", "1")), ) - elif int(os.environ["WORLD_SIZE"]) > 1: + elif int(os.environ.get("WORLD_SIZE", "1")) > 1: dist.init_process_group("gloo") trainer = NERTrainer( @@ -651,11 +651,12 @@ def run(args): iteration_num=args.iteration_num, batch_size=args.batch_size, device_id=int(os.environ["RANK"]), - distributed=int(os.environ["WORLD_SIZE"]) > 1 and torch.cuda.is_available(), + distributed=int(os.environ.get("WORLD_SIZE", "1")) > 1 + and torch.cuda.is_available(), ) trainer.execute(args.checkpoint) - if torch.cuda.is_available() or int(os.environ["WORLD_SIZE"]) > 1: + if int(os.environ.get("WORLD_SIZE", "1")) > 1: dist.destroy_process_group() diff --git a/examples/components/PNEUMONIA/traininsilo/conda.yaml b/examples/components/PNEUMONIA/traininsilo/conda.yaml index a0fce113..a6093e05 100644 --- a/examples/components/PNEUMONIA/traininsilo/conda.yaml +++ b/examples/components/PNEUMONIA/traininsilo/conda.yaml @@ -2,12 +2,13 @@ name: pneumonia_train_conda_env channels: - defaults - pytorch + - nvidia dependencies: - python=3.8 - pip=22.3.1 - pytorch=1.13.1 - torchvision=0.13.1 - - cudatoolkit=11.3 + - pytorch-cuda=11.6 - pip: - azureml-mlflow==1.48.0 - opacus==1.3.0 diff --git a/examples/components/PNEUMONIA/traininsilo/run.py b/examples/components/PNEUMONIA/traininsilo/run.py index 9a022b00..1afe8457 100644 --- a/examples/components/PNEUMONIA/traininsilo/run.py +++ b/examples/components/PNEUMONIA/traininsilo/run.py @@ -94,8 +94,13 @@ def __init__( # Training setup self.model_ = PneumoniaNetwork() - self.device_ = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") self.model_.to(self.device_) + if self._distributed: + self.model_ = DDP( + self.model_, + device_ids=[self._rank] if self._rank is not None else None, + output_device=self._rank, + ) self.loss_ = nn.CrossEntropyLoss() # Data setup @@ -125,7 +130,7 @@ def __init__( self.train_loader_ = DataLoader( dataset=self.train_dataset_, batch_size=32, - shuffle=True, + shuffle=(not self._distributed), drop_last=True, sampler=self.train_sampler_, ) @@ -245,9 +250,15 @@ def local_train(self, checkpoint): checkpoint: Previous model checkpoint from where training has to be started. """ if checkpoint: - self.model_.load_state_dict( - torch.load(checkpoint + "/model.pt", map_location=self.device_) - ) + if self._distributed: + # DDP comes with "module." prefix: https://pytorch.org/docs/stable/generated/torch.nn.parallel.DistributedDataParallel.html + self.model_.module.load_state_dict( + torch.load(checkpoint + "/model.pt", map_location=self.device_) + ) + else: + self.model_.load_state_dict( + torch.load(checkpoint + "/model.pt", map_location=self.device_) + ) with mlflow.start_run() as mlflow_run: # get Mlflow client and root run id @@ -449,13 +460,13 @@ def run(args): logger.info(f"Distributed process rank: {os.environ['RANK']}") logger.info(f"Distributed world size: {os.environ['WORLD_SIZE']}") - if int(os.environ["WORLD_SIZE"]) > 1 and torch.cuda.is_available(): + if int(os.environ.get("WORLD_SIZE", "1")) > 1 and torch.cuda.is_available(): dist.init_process_group( "nccl", rank=int(os.environ["RANK"]), - world_size=int(os.environ["WORLD_SIZE"]), + world_size=int(os.environ.get("WORLD_SIZE", "1")), ) - elif int(os.environ["WORLD_SIZE"]) > 1: + elif int(os.environ.get("WORLD_SIZE", "1")) > 1: dist.init_process_group("gloo") trainer = PTLearner( @@ -471,12 +482,13 @@ def run(args): iteration_num=args.iteration_num, model_path=args.model + "/model.pt", device_id=int(os.environ["RANK"]), - distributed=int(os.environ["WORLD_SIZE"]) > 1 and torch.cuda.is_available(), + distributed=int(os.environ.get("WORLD_SIZE", "1")) > 1 + and torch.cuda.is_available(), ) trainer.execute(args.checkpoint) - if torch.cuda.is_available() or int(os.environ["WORLD_SIZE"]) > 1: + if int(os.environ.get("WORLD_SIZE", "1")) > 1: dist.destroy_process_group()