diff --git a/allennlp/commands/find_learning_rate.py b/allennlp/commands/find_learning_rate.py index c778ac733a1..c2af08f16f9 100644 --- a/allennlp/commands/find_learning_rate.py +++ b/allennlp/commands/find_learning_rate.py @@ -54,7 +54,7 @@ from allennlp.commands.subcommand import Subcommand from allennlp.common.checks import ConfigurationError, check_for_gpu from allennlp.common import Params, Tqdm -from allennlp.common.util import prepare_environment, lazy_groups_of +from allennlp.common.util import prepare_environment from allennlp.data import Vocabulary, DataIterator from allennlp.models import Model from allennlp.training import Trainer @@ -223,6 +223,7 @@ def find_learning_rate_model( train_data = all_datasets["train"] trainer_params = params.pop("trainer") + no_grad_regexes = trainer_params.pop("no_grad", ()) for name, parameter in model.named_parameters(): if any(re.search(regex, name) for regex in no_grad_regexes): @@ -296,10 +297,7 @@ def search_learning_rate( trainer.model.train() - num_gpus = len(trainer._cuda_devices) - - raw_train_generator = trainer.iterator(trainer.train_data, shuffle=trainer.shuffle) - train_generator = lazy_groups_of(raw_train_generator, num_gpus) + train_generator = trainer.iterator(trainer.train_data, shuffle=trainer.shuffle) train_generator_tqdm = Tqdm.tqdm(train_generator, total=num_batches) learning_rates = [] @@ -310,7 +308,7 @@ def search_learning_rate( else: lr_update_factor = (end_lr / start_lr) ** (1.0 / num_batches) - for i, batch_group in enumerate(train_generator_tqdm): + for i, batch in enumerate(train_generator_tqdm): if linear_steps: current_lr = start_lr + (lr_update_factor * i) @@ -321,7 +319,7 @@ def search_learning_rate( param_group["lr"] = current_lr trainer.optimizer.zero_grad() - loss = trainer.batch_loss(batch_group, for_training=True) + loss = trainer.batch_loss(batch, for_training=True) loss.backward() loss = loss.detach().cpu().item() diff --git a/allennlp/commands/fine_tune.py b/allennlp/commands/fine_tune.py index 46ff0d56d3f..11882c14f55 100644 --- a/allennlp/commands/fine_tune.py +++ b/allennlp/commands/fine_tune.py @@ -382,7 +382,7 @@ def fine_tune_model( model, test_data, validation_iterator or iterator, - cuda_device=trainer._cuda_devices[0], + cuda_device=trainer.cuda_device, batch_weight_key=batch_weight_key, ) diff --git a/allennlp/commands/train.py b/allennlp/commands/train.py index e846012e9da..105b205e098 100644 --- a/allennlp/commands/train.py +++ b/allennlp/commands/train.py @@ -56,7 +56,7 @@ from allennlp.commands.make_vocab import make_vocab_from_params from allennlp.commands.subcommand import Subcommand from allennlp.common import Params -from allennlp.common.checks import ConfigurationError, check_for_gpu, parse_cuda_device +from allennlp.common.checks import ConfigurationError, check_for_gpu from allennlp.common.util import ( prepare_environment, prepare_global_logging, @@ -269,11 +269,10 @@ def train_model( create_serialization_dir(params, serialization_dir, recover, force) params.to_file(os.path.join(serialization_dir, CONFIG_NAME)) - cuda_device = params.params.get("trainer").get("cuda_device", -1) - check_for_gpu(cuda_device) - - distributed = params.params.get("trainer").get("distributed", False) - if not distributed: + distributed_params = params.params.pop("distributed", None) + # If distributed isn't in the config and the config contains strictly + # one cuda device, we just run a single training process. + if distributed_params is None: model = _train_worker( process_rank=0, params=params, @@ -286,18 +285,24 @@ def train_model( ) archive_model(serialization_dir, files_to_archive=params.files_to_archive) return model + + # Otherwise, we are running multiple processes for training. else: - device_id = parse_cuda_device(cuda_device) + # We are careful here so that we can raise a good error if someone + # passed the wrong thing - cuda_devices are required. + device_ids = distributed_params.pop("cuda_devices", None) + multi_device = isinstance(device_ids, list) and len(device_ids) > 1 - if not isinstance(device_id, list): + if not multi_device: raise ConfigurationError( "Multiple cuda devices need to be configured to run distributed training." ) + check_for_gpu(device_ids) - master_addr = params.params.get("trainer").pop("master_address", "127.0.0.1") - master_port = params.params.get("trainer").pop("master_port", 29500) - num_procs = len(device_id) - num_nodes = params.params.get("trainer").pop("num_nodes", 1) + master_addr = distributed_params.pop("master_address", "127.0.0.1") + master_port = distributed_params.pop("master_port", 29500) + num_procs = len(device_ids) + num_nodes = distributed_params.pop("num_nodes", 1) world_size = num_nodes * num_procs os.environ["MASTER_ADDR"] = master_addr @@ -332,10 +337,10 @@ def train_model( cache_prefix, include_package, node_rank, - num_procs, master_addr, master_port, world_size, + device_ids, ), nprocs=num_procs, ) @@ -353,10 +358,10 @@ def _train_worker( cache_prefix: str = None, include_package: List[str] = None, node_rank: int = 0, - num_procs_per_node: int = 0, master_addr: str = "127.0.0.1", master_port: int = 29500, world_size: int = 1, + distributed_device_ids: List[str] = None, ) -> Optional[Model]: """ Helper to train the configured model/experiment. In distributed mode, this is spawned as a @@ -415,18 +420,22 @@ def _train_worker( for package_name in include_package: import_submodules(package_name) + num_procs_per_node = len(distributed_device_ids) # The Unique identifier of the worker process among all the processes in the # distributed training group is computed here. This is used while initializing # the process group using `init_process_group` global_rank = node_rank * num_procs_per_node + process_rank - cuda_device = params.params.get("trainer").get("cuda_device", -1) - device_list = parse_cuda_device(cuda_device) - # In distributed training, the configured device is always going to be a list. # The corresponding gpu id for the particular worker is obtained by picking the id # from the device list with the rank as index - gpu_id = device_list[process_rank] # type: ignore + gpu_id = distributed_device_ids[process_rank] # type: ignore + + # Till now, "cuda_device" might not be set in the trainer params. + # But a worker trainer needs to only know about its specific GPU id. + params["trainer"]["cuda_device"] = gpu_id + params["trainer"]["world_size"] = world_size + params["trainer"]["distributed"] = True torch.cuda.set_device(gpu_id) dist.init_process_group( @@ -440,12 +449,6 @@ def _train_worker( f"for distributed training in worker {global_rank}" ) - # Till now, "cuda_device" will be a list of ids as configured originally - # in params. But a worker trainer needs to only know about its specific - # GPU id. - params["trainer"]["cuda_device"] = gpu_id - params["trainer"]["world_size"] = world_size - trainer_type = params.get("trainer", {}).get("type", "default") if trainer_type == "default": @@ -504,7 +507,7 @@ def _train_worker( trainer.model, evaluation_dataset, evaluation_iterator, - cuda_device=trainer._cuda_devices[0], # pylint: disable=protected-access, + cuda_device=trainer.cuda_device, # TODO(brendanr): Pass in an arg following Joel's trainer refactor. batch_weight_key="", ) diff --git a/allennlp/common/checks.py b/allennlp/common/checks.py index 6ef6450aeb9..c15fe37c12e 100644 --- a/allennlp/common/checks.py +++ b/allennlp/common/checks.py @@ -52,14 +52,36 @@ def check_dimensions_match( ) -def parse_cuda_device(cuda_device: Union[str, int, List[int]]) -> Union[int, List[int]]: +def parse_cuda_device(cuda_device: Union[str, int, List[int]]) -> int: """ Disambiguates single GPU and multiple GPU settings for cuda_device param. """ + message = """ + In allennlp 1.0, the Trainer cannot be passed multiple cuda devices. + Instead, use the faster Distributed Data Parallel. For instance, if you previously had config like: + { + "trainer": { + "cuda_device": [0, 1, 2, 3], + "num_epochs": 20, + ... + } + } + simply change it to: + { + "distributed": { + "cuda_devices": [0, 1, 2, 3], + }, + "trainer": { + "num_epochs": 20, + ... + } + } + """ + def from_list(strings): if len(strings) > 1: - return [int(d) for d in strings] + raise ConfigurationError(message) elif len(strings) == 1: return int(strings[0]) else: @@ -76,8 +98,7 @@ def from_list(strings): return int(cuda_device) # type: ignore -def check_for_gpu(device_id: Union[int, list]): - device_id = parse_cuda_device(device_id) +def check_for_gpu(device_id: Union[int, List[int]]): if isinstance(device_id, list): for did in device_id: check_for_gpu(did) diff --git a/allennlp/tests/commands/train_test.py b/allennlp/tests/commands/train_test.py index 428a6fdc802..934e79ad91f 100644 --- a/allennlp/tests/commands/train_test.py +++ b/allennlp/tests/commands/train_test.py @@ -99,12 +99,8 @@ def test_train_model_distributed(self): "train_data_path": SEQUENCE_TAGGING_DATA_PATH, "validation_data_path": SEQUENCE_TAGGING_DATA_PATH, "iterator": {"type": "basic", "batch_size": 2}, - "trainer": { - "num_epochs": 2, - "optimizer": "adam", - "distributed": True, - "cuda_device": [0, 1], - }, + "trainer": {"num_epochs": 2, "optimizer": "adam"}, + "distributed": {"cuda_devices": [0, 1]}, } ) @@ -136,7 +132,8 @@ def test_distributed_raises_error_with_no_gpus(self): "train_data_path": SEQUENCE_TAGGING_DATA_PATH, "validation_data_path": SEQUENCE_TAGGING_DATA_PATH, "iterator": {"type": "basic", "batch_size": 2}, - "trainer": {"num_epochs": 2, "optimizer": "adam", "distributed": True}, + "trainer": {"num_epochs": 2, "optimizer": "adam"}, + "distributed": {}, } ) with pytest.raises(ConfigurationError): @@ -183,8 +180,8 @@ def test_error_is_throw_when_cuda_device_is_not_available(self): "encoder": {"type": "lstm", "input_size": 5, "hidden_size": 7, "num_layers": 2}, }, "dataset_reader": {"type": "sequence_tagging"}, - "train_data_path": "tests/fixtures/data/sequence_tagging.tsv", - "validation_data_path": "tests/fixtures/data/sequence_tagging.tsv", + "train_data_path": "allennlp/tests/fixtures/data/sequence_tagging.tsv", + "validation_data_path": "allennlp/tests/fixtures/data/sequence_tagging.tsv", "iterator": {"type": "basic", "batch_size": 2}, "trainer": { "num_epochs": 2, diff --git a/allennlp/tests/models/simple_tagger_test.py b/allennlp/tests/models/simple_tagger_test.py index 8f3d103ae9d..eae4590e627 100644 --- a/allennlp/tests/models/simple_tagger_test.py +++ b/allennlp/tests/models/simple_tagger_test.py @@ -63,8 +63,8 @@ def test_regularization(self): training_batch = next(iterator(self.instances, num_epochs=1)) validation_batch = next(iterator(self.instances, num_epochs=1)) - training_loss = trainer.batch_loss([training_batch], for_training=True).item() - validation_loss = trainer.batch_loss([validation_batch], for_training=False).item() + training_loss = trainer.batch_loss(training_batch, for_training=True).item() + validation_loss = trainer.batch_loss(validation_batch, for_training=False).item() # Training loss should have the regularization penalty, but validation loss should not. numpy.testing.assert_almost_equal(training_loss, validation_loss) @@ -124,8 +124,8 @@ def test_regularization(self): training_batch = next(self.iterator(self.instances, num_epochs=1)) validation_batch = next(self.iterator(self.instances, num_epochs=1)) - training_loss = self.trainer.batch_loss([training_batch], for_training=True).data - validation_loss = self.trainer.batch_loss([validation_batch], for_training=False).data + training_loss = self.trainer.batch_loss(training_batch, for_training=True).data + validation_loss = self.trainer.batch_loss(validation_batch, for_training=False).data # Training loss should have the regularization penalty, but validation loss should not. assert (training_loss != validation_loss).all() diff --git a/allennlp/tests/training/callback_trainer_test.py b/allennlp/tests/training/callback_trainer_test.py index 5578d2ffc5a..60af37a4a81 100644 --- a/allennlp/tests/training/callback_trainer_test.py +++ b/allennlp/tests/training/callback_trainer_test.py @@ -262,52 +262,29 @@ def test_trainer_can_run_cuda(self): callbacks=self.default_callbacks(), cuda_device=0, ) - trainer.train() - - @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need multiple GPUs.") - def test_trainer_can_run_multiple_gpu(self): - self.model.cuda() - - class MetaDataCheckWrapper(Model): - """ - Checks that the metadata field has been correctly split across the batch dimension - when running on multiple gpus. - """ - - def __init__(self, model): - super().__init__(model.vocab) - self.model = model - - def forward(self, **kwargs) -> Dict[str, torch.Tensor]: # type: ignore - assert ( - "metadata" in kwargs and "tags" in kwargs - ), f"tokens and metadata must be provided. Got {kwargs.keys()} instead." - batch_size = kwargs["tokens"]["tokens"].size()[0] - assert len(kwargs["metadata"]) == batch_size, ( - f"metadata must be split appropriately. Expected {batch_size} elements, " - f"got {len(kwargs['metadata'])} elements." - ) - return self.model.forward(**kwargs) - - multigpu_iterator = BasicIterator(batch_size=4) - multigpu_iterator.index_with(self.vocab) - trainer = CallbackTrainer( - MetaDataCheckWrapper(self.model), - training_data=self.instances, - iterator=multigpu_iterator, - optimizer=self.optimizer, - num_epochs=2, - callbacks=self.default_callbacks(), - cuda_device=[0, 1], - ) metrics = trainer.train() assert "peak_cpu_memory_MB" in metrics assert isinstance(metrics["peak_cpu_memory_MB"], float) assert metrics["peak_cpu_memory_MB"] > 0 assert "peak_gpu_0_memory_MB" in metrics assert isinstance(metrics["peak_gpu_0_memory_MB"], int) - assert "peak_gpu_1_memory_MB" in metrics - assert isinstance(metrics["peak_gpu_1_memory_MB"], int) + + @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="2 or more GPUs required.") + def test_passing_trainer_multiple_gpus_raises_error(self): + self.model.cuda() + + multigpu_iterator = BasicIterator(batch_size=4) + multigpu_iterator.index_with(self.vocab) + with pytest.raises(ConfigurationError): + CallbackTrainer( + self.model, + training_data=self.instances, + iterator=multigpu_iterator, + optimizer=self.optimizer, + num_epochs=2, + callbacks=self.default_callbacks(), + cuda_device=[0, 1], + ) def test_trainer_can_resume_training(self): trainer = CallbackTrainer( diff --git a/allennlp/tests/training/gan_callback_trainer_test.py b/allennlp/tests/training/gan_callback_trainer_test.py index 05af33b5a87..c68a19f6530 100644 --- a/allennlp/tests/training/gan_callback_trainer_test.py +++ b/allennlp/tests/training/gan_callback_trainer_test.py @@ -207,7 +207,7 @@ def __init__( num_epochs: int = 20, shuffle: bool = False, serialization_dir: Optional[str] = None, - cuda_device: Union[int, List] = -1, + cuda_device: int = -1, callbacks: List[Callback] = None, distributed: bool = False, rank: int = 0, @@ -235,11 +235,9 @@ def _reset_counters(self) -> None: self.fake_stdev = 0.0 self.count = 0 - def train_one_batch_group(self, batch_group): - # Each batch_group should have only one batch - batch, = batch_group - array = batch["array"] + def train_one_batch(self, batch): + array = batch["array"] # We should not have mixed batches: if len(set(batch["stage"])) != 1: raise ValueError("mixed batch") @@ -290,7 +288,7 @@ def train_one_epoch(self) -> None: # Reset epoch counters self._reset_counters() - # Will call `self.train_one_batch_group` + # Will call `self.train_one_batch` super().train_one_epoch() diff --git a/allennlp/tests/training/trainer_test.py b/allennlp/tests/training/trainer_test.py index c1774d9a770..f479c9cf62f 100644 --- a/allennlp/tests/training/trainer_test.py +++ b/allennlp/tests/training/trainer_test.py @@ -107,51 +107,28 @@ def test_trainer_can_run_cuda(self): trainer = Trainer( self.model, self.optimizer, self.iterator, self.instances, num_epochs=2, cuda_device=0 ) - trainer.train() - - @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need multiple GPUs.") - def test_trainer_can_run_multiple_gpu(self): - self.model.cuda() - - class MetaDataCheckWrapper(Model): - """ - Checks that the metadata field has been correctly split across the batch dimension - when running on multiple gpus. - """ - - def __init__(self, model): - super().__init__(model.vocab) - self.model = model - - def forward(self, **kwargs) -> Dict[str, torch.Tensor]: # type: ignore - assert ( - "metadata" in kwargs and "tags" in kwargs - ), f"tokens and metadata must be provided. Got {kwargs.keys()} instead." - batch_size = kwargs["tokens"]["tokens"].size()[0] - assert len(kwargs["metadata"]) == batch_size, ( - f"metadata must be split appropriately. Expected {batch_size} elements, " - f"got {len(kwargs['metadata'])} elements." - ) - return self.model.forward(**kwargs) - - multigpu_iterator = BasicIterator(batch_size=4) - multigpu_iterator.index_with(self.vocab) - trainer = Trainer( - MetaDataCheckWrapper(self.model), - self.optimizer, - multigpu_iterator, - self.instances, - num_epochs=2, - cuda_device=[0, 1], - ) metrics = trainer.train() assert "peak_cpu_memory_MB" in metrics assert isinstance(metrics["peak_cpu_memory_MB"], float) assert metrics["peak_cpu_memory_MB"] > 0 assert "peak_gpu_0_memory_MB" in metrics assert isinstance(metrics["peak_gpu_0_memory_MB"], int) - assert "peak_gpu_1_memory_MB" in metrics - assert isinstance(metrics["peak_gpu_1_memory_MB"], int) + + @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="2 or more GPUs required.") + def test_passing_trainer_multiple_gpus_raises_error(self): + self.model.cuda() + + multigpu_iterator = BasicIterator(batch_size=4) + multigpu_iterator.index_with(self.vocab) + with pytest.raises(ConfigurationError): + Trainer( + self.model, + self.optimizer, + multigpu_iterator, + self.instances, + num_epochs=2, + cuda_device=[0, 1], + ) def test_trainer_can_resume_training(self): trainer = Trainer( diff --git a/allennlp/training/callback_trainer.py b/allennlp/training/callback_trainer.py index 2226a6f90bf..8f302e4c5d6 100644 --- a/allennlp/training/callback_trainer.py +++ b/allennlp/training/callback_trainer.py @@ -6,15 +6,13 @@ import time import datetime import functools -import math -from typing import Dict, Optional, List, Union, Any, Iterable +from typing import Dict, Optional, List, Any, Iterable import torch from allennlp.common import Params -from allennlp.common.checks import parse_cuda_device +from allennlp.common.checks import parse_cuda_device, check_for_gpu from allennlp.common.tqdm import Tqdm -from allennlp.common.util import lazy_groups_of -from allennlp.data.instance import Instance +from allennlp.data import Instance from allennlp.data.iterators.data_iterator import DataIterator, TensorDict from allennlp.models.model import Model from allennlp.nn import util as nn_util @@ -55,7 +53,7 @@ def __init__( num_epochs: int = 20, shuffle: bool = True, serialization_dir: Optional[str] = None, - cuda_device: Union[int, List] = -1, + cuda_device: int = -1, callbacks: List[Callback] = None, distributed: bool = False, rank: int = 0, @@ -96,8 +94,10 @@ def __init__( serialization_dir : str, optional (default=None) Path to directory for saving and loading model files. Models will not be saved if this parameter is not passed. - cuda_device : ``Union[int, List[int]]``, optional (default=-1) + cuda_device : ``int``, optional (default=-1) An integer or list of integers specifying the CUDA device(s) to use. If -1, the CPU is used. + Data parallelism is controlled at the allennlp train level, so each trainer will have a single + GPU. callbacks : ``List[Callback]``, optional (default=None) A list of callbacks that will be called based on training events. """ @@ -125,7 +125,7 @@ def __init__( self.metrics: Dict[str, Any] = {} self.batch_num_total = 0 - self.batch_group: List[TensorDict] = [] + self.batch: TensorDict = None self.batches_this_epoch = 0 self.training_batches: Iterable[List[TensorDict]] = () @@ -165,15 +165,11 @@ def generate_training_batches(self): Generates one epoch worth of training data. Stores it in trainer instance variables so that callbacks can access it. """ - num_gpus = len(self._cuda_devices) + train_generator = self.iterator(self.training_data, num_epochs=1, shuffle=self.shuffle) + self.training_batches = train_generator + self.num_training_batches = self.iterator.get_num_batches(self.training_data) - raw_train_generator = self.iterator(self.training_data, num_epochs=1, shuffle=self.shuffle) - self.training_batches = lazy_groups_of(raw_train_generator, num_gpus) - self.num_training_batches = math.ceil( - self.iterator.get_num_batches(self.training_data) / num_gpus - ) - - def batch_loss(self, batch_group: List[TensorDict], for_training: bool) -> torch.Tensor: + def batch_loss(self, batch: TensorDict, for_training: bool) -> torch.Tensor: """ Does a forward pass on the given batches and returns the ``loss`` value in the result. If ``for_training`` is `True` also applies regularization penalty. @@ -181,13 +177,8 @@ def batch_loss(self, batch_group: List[TensorDict], for_training: bool) -> torch This is a method on the trainer so that it can be used both in training and validation (which are handled separately). """ - if self._multiple_gpu: - output_dict = training_util.data_parallel(batch_group, self.model, self._cuda_devices) - else: - assert len(batch_group) == 1 - batch = batch_group[0] - batch = nn_util.move_to_device(batch, self._cuda_devices[0]) - output_dict = self._pytorch_model(**batch) + batch = nn_util.move_to_device(batch, self.cuda_device) + output_dict = self._pytorch_model(**batch) try: loss = output_dict["loss"] @@ -203,7 +194,7 @@ def batch_loss(self, batch_group: List[TensorDict], for_training: bool) -> torch return loss - def train_one_batch_group(self, batch_group: List[TensorDict]) -> str: + def train_one_batch(self, batch: TensorDict) -> str: """ Handles the training for a single batch group. Fires off the events BATCH_START, FORWARD, BACKWARD, and BATCH_END. @@ -215,7 +206,7 @@ def train_one_batch_group(self, batch_group: List[TensorDict]) -> str: self.batch_num_total += 1 self.handler.fire_event(Events.FORWARD) - loss = self.batch_loss(batch_group, for_training=True) + loss = self.batch_loss(batch, for_training=True) if torch.isnan(loss): raise ValueError("nan loss encountered") @@ -240,7 +231,7 @@ def train_one_epoch(self) -> None: """ Trains the model for a single epoch. Fires off the events EPOCH_START and EPOCH_END, - and repeatedly calls self.train_one_batch_group(). + and repeatedly calls self.train_one_batch(). """ self.handler.fire_event(Events.EPOCH_START) @@ -253,11 +244,11 @@ def train_one_epoch(self) -> None: logger.info("Training") self.batches_this_epoch = 0 - batch_groups_tqdm = Tqdm.tqdm(self.training_batches, total=self.num_training_batches) + batches_tqdm = Tqdm.tqdm(self.training_batches, total=self.num_training_batches) - for self.batch_group in batch_groups_tqdm: - description = self.train_one_batch_group(self.batch_group) - batch_groups_tqdm.set_description(description, refresh=False) + for self.batch in batches_tqdm: + description = self.train_one_batch(self.batch) + batches_tqdm.set_description(description, refresh=False) self.handler.fire_event(Events.VALIDATE) self.handler.fire_event(Events.EPOCH_END) @@ -323,14 +314,11 @@ def from_params( # type: ignore num_epochs = params.pop_int("num_epochs", 20) cuda_device = parse_cuda_device(params.pop("cuda_device", -1)) - if isinstance(cuda_device, list): - model_device = cuda_device[0] - else: - model_device = cuda_device - if model_device >= 0: + check_for_gpu(cuda_device) + if cuda_device >= 0: # Moving model to GPU here so that the optimizer state gets constructed on # the right device. - model = model.cuda(model_device) + model = model.cuda(cuda_device) parameters = [[n, p] for n, p in model.named_parameters() if p.requires_grad] optimizer = Optimizer.from_params(parameters, params.pop("optimizer")) @@ -355,7 +343,7 @@ def from_params( # type: ignore world_size = params.pop_int("world_size", 1) if distributed: - rank = model_device + rank = cuda_device else: rank = 0 diff --git a/allennlp/training/callbacks/log_to_tensorboard.py b/allennlp/training/callbacks/log_to_tensorboard.py index c263eac0667..752ab496124 100644 --- a/allennlp/training/callbacks/log_to_tensorboard.py +++ b/allennlp/training/callbacks/log_to_tensorboard.py @@ -82,7 +82,7 @@ def batch_end_logging(self, trainer: "CallbackTrainer"): ) if self.log_batch_size_period: - cur_batch = sum([training_util.get_batch_size(batch) for batch in trainer.batch_group]) + cur_batch = training_util.get_batch_size(trainer.batch) self.cumulative_batch_size += cur_batch if (trainer.batches_this_epoch - 1) % self.log_batch_size_period == 0: average = self.cumulative_batch_size / trainer.batches_this_epoch diff --git a/allennlp/training/callbacks/validate.py b/allennlp/training/callbacks/validate.py index 564cffd4bdc..32f22e655f6 100644 --- a/allennlp/training/callbacks/validate.py +++ b/allennlp/training/callbacks/validate.py @@ -1,11 +1,9 @@ from typing import Iterable, List, TYPE_CHECKING import logging -import math import torch from allennlp.common.tqdm import Tqdm -from allennlp.common.util import lazy_groups_of from allennlp.data.instance import Instance from allennlp.data.iterators import DataIterator from allennlp.training import util as training_util @@ -67,20 +65,15 @@ def validate(self, trainer: "CallbackTrainer"): trainer.model.eval() - num_gpus = len(trainer._cuda_devices) - - raw_val_generator = self.iterator(self.instances, num_epochs=1, shuffle=False) - val_generator = lazy_groups_of(raw_val_generator, num_gpus) - num_validation_batches = math.ceil( - self.iterator.get_num_batches(self.instances) / num_gpus - ) + val_generator = self.iterator(self.instances, num_epochs=1, shuffle=False) + num_validation_batches = self.iterator.get_num_batches(self.instances) val_generator_tqdm = Tqdm.tqdm(val_generator, total=num_validation_batches) batches_this_epoch = 0 val_loss = 0 - for batch_group in val_generator_tqdm: + for batch in val_generator_tqdm: - loss = trainer.batch_loss(batch_group, for_training=False) + loss = trainer.batch_loss(batch, for_training=False) if loss is not None: # You shouldn't necessarily have to compute a loss for validation, so we allow for # `loss` to be None. We need to be careful, though - `batches_this_epoch` is diff --git a/allennlp/training/trainer.py b/allennlp/training/trainer.py index 900e66b6aa1..40eaac7d604 100644 --- a/allennlp/training/trainer.py +++ b/allennlp/training/trainer.py @@ -1,10 +1,9 @@ import datetime import logging -import math import os import time import traceback -from typing import Dict, Optional, List, Tuple, Union, Iterable, Any +from typing import Dict, Optional, Tuple, Union, Iterable, Any import torch import torch.distributed as dist @@ -12,9 +11,9 @@ from torch.nn.parallel import DistributedDataParallel from allennlp.common import Params -from allennlp.common.checks import ConfigurationError, parse_cuda_device +from allennlp.common.checks import ConfigurationError, parse_cuda_device, check_for_gpu from allennlp.common.tqdm import Tqdm -from allennlp.common.util import dump_metrics, gpu_memory_mb, peak_memory_mb, lazy_groups_of +from allennlp.common.util import dump_metrics, gpu_memory_mb, peak_memory_mb from allennlp.data.instance import Instance from allennlp.data.iterators.data_iterator import DataIterator, TensorDict from allennlp.models.model import Model @@ -51,7 +50,7 @@ def __init__( keep_serialized_model_every_num_seconds: int = None, checkpointer: Checkpointer = None, model_save_interval: float = None, - cuda_device: Union[int, List] = -1, + cuda_device: int = -1, grad_norm: Optional[float] = None, grad_clipping: Optional[float] = None, learning_rate_scheduler: Optional[LearningRateScheduler] = None, @@ -128,8 +127,10 @@ def __init__( If provided, then serialize models every ``model_save_interval`` seconds within single epochs. In all cases, models are also saved at the end of every epoch if ``serialization_dir`` is provided. - cuda_device : ``Union[int, List[int]]``, optional (default = -1) - An integer or list of integers specifying the CUDA device(s) to use. If -1, the CPU is used. + cuda_device : ``int``, optional (default = -1) + An integer specifying the CUDA device(s) to use for this process. If -1, the CPU is used. + Data parallelism is controlled at the allennlp train level, so each trainer will have a single + GPU. grad_norm : ``float``, optional, (default = None). If provided, gradient norms will be rescaled to have a maximum of this value. grad_clipping : ``float``, optional (default = ``None``). @@ -275,25 +276,20 @@ def __init__( # normal case, reference to `Model` is retained. This reference is only used in # these places: `model.__call__`, `model.train` and `model.eval`. if self._distributed: - self._pytorch_model = DistributedDataParallel(self.model, device_ids=self._cuda_devices) + self._pytorch_model = DistributedDataParallel(self.model, device_ids=[self.cuda_device]) else: self._pytorch_model = self.model def rescale_gradients(self) -> Optional[float]: return training_util.rescale_gradients(self.model, self._grad_norm) - def batch_loss(self, batch_group: List[TensorDict], for_training: bool) -> torch.Tensor: + def batch_loss(self, batch: TensorDict, for_training: bool) -> torch.Tensor: """ Does a forward pass on the given batches and returns the ``loss`` value in the result. If ``for_training`` is `True` also applies regularization penalty. """ - if self._multiple_gpu: - output_dict = training_util.data_parallel(batch_group, self.model, self._cuda_devices) - else: - assert len(batch_group) == 1 - batch = batch_group[0] - batch = nn_util.move_to_device(batch, self._cuda_devices[0]) - output_dict = self._pytorch_model(**batch) + batch = nn_util.move_to_device(batch, self.cuda_device) + output_dict = self._pytorch_model(**batch) try: loss = output_dict["loss"] @@ -325,12 +321,9 @@ def _train_epoch(self, epoch: int) -> Dict[str, float]: # Set the model to "train" mode. self._pytorch_model.train() - num_gpus = len(self._cuda_devices) - # Get tqdm for the training batches - raw_train_generator = self.iterator(self.train_data, num_epochs=1, shuffle=self.shuffle) - train_generator = lazy_groups_of(raw_train_generator, num_gpus) - num_training_batches = math.ceil(self.iterator.get_num_batches(self.train_data) / num_gpus) + train_generator = self.iterator(self.train_data, num_epochs=1, shuffle=self.shuffle) + num_training_batches = self.iterator.get_num_batches(self.train_data) self._last_log = time.time() last_save_time = time.time() @@ -350,14 +343,14 @@ def _train_epoch(self, epoch: int) -> Dict[str, float]: train_generator_tqdm = train_generator cumulative_batch_size = 0 - for batch_group in train_generator_tqdm: + for batch in train_generator_tqdm: batches_this_epoch += 1 self._batch_num_total += 1 batch_num_total = self._batch_num_total self.optimizer.zero_grad() - loss = self.batch_loss(batch_group, for_training=True) + loss = self.batch_loss(batch, for_training=True) if torch.isnan(loss): raise ValueError("nan loss encountered") @@ -404,7 +397,7 @@ def _train_epoch(self, epoch: int) -> Dict[str, float]: train_loss, batches_this_epoch, world_size=self._world_size, - cuda_device=self._cuda_devices, + cuda_device=[self.cuda_device], ) # Updating tqdm only for the master as the trainers wouldn't have one @@ -424,7 +417,7 @@ def _train_epoch(self, epoch: int) -> Dict[str, float]: self._tensorboard.log_histograms(self.model, histogram_parameters) if self._log_batch_size_period: - cur_batch = sum([training_util.get_batch_size(batch) for batch in batch_group]) + cur_batch = training_util.get_batch_size(batch) cumulative_batch_size += cur_batch if (batches_this_epoch - 1) % self._log_batch_size_period == 0: average = cumulative_batch_size / batches_this_epoch @@ -448,7 +441,7 @@ def _train_epoch(self, epoch: int) -> Dict[str, float]: batches_this_epoch, reset=True, world_size=self._world_size, - cuda_device=self._cuda_devices, + cuda_device=[self.cuda_device], ) metrics["cpu_memory_MB"] = peak_cpu_usage for (gpu_num, memory) in gpu_usage: @@ -472,19 +465,14 @@ def _validation_loss(self) -> Tuple[float, int]: else: val_iterator = self.iterator - num_gpus = len(self._cuda_devices) - - raw_val_generator = val_iterator(self._validation_data, num_epochs=1, shuffle=False) - val_generator = lazy_groups_of(raw_val_generator, num_gpus) - num_validation_batches = math.ceil( - val_iterator.get_num_batches(self._validation_data) / num_gpus - ) + val_generator = val_iterator(self._validation_data, num_epochs=1, shuffle=False) + num_validation_batches = val_iterator.get_num_batches(self._validation_data) val_generator_tqdm = Tqdm.tqdm(val_generator, total=num_validation_batches) batches_this_epoch = 0 val_loss = 0 - for batch_group in val_generator_tqdm: + for batch in val_generator_tqdm: - loss = self.batch_loss(batch_group, for_training=False) + loss = self.batch_loss(batch, for_training=False) if loss is not None: # You shouldn't necessarily have to compute a loss for validation, so we allow for # `loss` to be None. We need to be careful, though - `batches_this_epoch` is @@ -500,7 +488,7 @@ def _validation_loss(self) -> Tuple[float, int]: val_loss, batches_this_epoch, world_size=self._world_size, - cuda_device=self._cuda_devices, + cuda_device=[self.cuda_device], ) description = training_util.description_from_metrics(val_metrics) val_generator_tqdm.set_description(description, refresh=False) @@ -573,7 +561,7 @@ def train(self) -> Dict[str, Any]: num_batches, reset=True, world_size=self._world_size, - cuda_device=self._cuda_devices, + cuda_device=[self.cuda_device], ) # Check validation metric for early stopping @@ -775,14 +763,11 @@ def from_params( # type: ignore lr_scheduler_params = params.pop("learning_rate_scheduler", None) momentum_scheduler_params = params.pop("momentum_scheduler", None) - if isinstance(cuda_device, list): - model_device = cuda_device[0] - else: - model_device = cuda_device - if model_device >= 0: + check_for_gpu(cuda_device) + if cuda_device >= 0: # Moving model to GPU here so that the optimizer state gets constructed on # the right device. - model = model.cuda(model_device) + model = model.cuda(cuda_device) parameters = [[n, p] for n, p in model.named_parameters() if p.requires_grad] optimizer = Optimizer.from_params(parameters, params.pop("optimizer")) diff --git a/allennlp/training/trainer_base.py b/allennlp/training/trainer_base.py index 6a3c9b2c467..d3530c936ab 100644 --- a/allennlp/training/trainer_base.py +++ b/allennlp/training/trainer_base.py @@ -9,7 +9,7 @@ import logging -from typing import Dict, List, Union, Any +from typing import Dict, Any from allennlp.common import Params, Registrable from allennlp.common.util import is_master @@ -31,39 +31,32 @@ class TrainerBase(Registrable): def __init__( self, serialization_dir: str, - cuda_device: Union[int, List] = -1, + cuda_device: int = -1, distributed: bool = False, rank: int = 0, world_size: int = 1, ) -> None: - check_for_gpu(cuda_device) + check_for_gpu(cuda_device) self._serialization_dir = serialization_dir - # Configure GPUs: - if not isinstance(cuda_device, int) and not isinstance(cuda_device, list): + if isinstance(cuda_device, list): raise ConfigurationError( - "Expected an int or list for cuda_device, got {}".format(cuda_device) + "In allennlp 1.0, the Trainer can only be assigned a single `cuda_device`. " + "Instead, we use torch's DistributedDataParallel at the command level, meaning " + "our Trainer always uses a single GPU per process." ) + if not isinstance(cuda_device, int): + raise ConfigurationError("Expected an int for cuda_device, got {}".format(cuda_device)) + if distributed and world_size <= 1: raise ConfigurationError( "Distributed training can be performed only with more than 1 GPU device. Check " "`cuda_device` key in the experiment configuration." ) - if isinstance(cuda_device, list): - # For distributed training, every trainer worker is only assigned with a single GPU - if distributed: - raise ConfigurationError( - "Distributed worker can only be assigned a single `cuda_device`." - ) - - self._multiple_gpu = True - self._cuda_devices = cuda_device - else: - self._multiple_gpu = False - self._cuda_devices = [cuda_device] + self.cuda_device = cuda_device self._distributed = distributed self._rank = rank @@ -71,8 +64,8 @@ def __init__( self._world_size = world_size def _move_to_gpu(self, model: Model) -> Model: - if self._cuda_devices[0] != -1: - return model.cuda(self._cuda_devices[0]) + if self.cuda_device != -1: + return model.cuda(self.cuda_device) else: return model diff --git a/allennlp/training/util.py b/allennlp/training/util.py index 4ebc52a23fa..65283b87ae3 100644 --- a/allennlp/training/util.py +++ b/allennlp/training/util.py @@ -11,8 +11,6 @@ import shutil import torch -from torch.nn.parallel import replicate, parallel_apply -from torch.nn.parallel.scatter_gather import gather from allennlp.common.checks import ConfigurationError, check_for_gpu from allennlp.common.params import Params @@ -20,7 +18,6 @@ from allennlp.data.dataset_readers import DatasetReader from allennlp.data import Instance from allennlp.data.iterators import DataIterator -from allennlp.data.iterators.data_iterator import TensorDict from allennlp.models.model import Model from allennlp.models.archival import CONFIG_NAME from allennlp.nn import util as nn_util @@ -328,36 +325,6 @@ def create_serialization_dir( os.makedirs(serialization_dir, exist_ok=True) -def data_parallel( - batch_group: List[TensorDict], model: Model, cuda_devices: List -) -> Dict[str, torch.Tensor]: - """ - Performs a forward pass using multiple GPUs. This is a simplification - of torch.nn.parallel.data_parallel to support the allennlp model - interface. - """ - assert len(batch_group) <= len(cuda_devices) - - moved = [ - nn_util.move_to_device(batch, device) for batch, device in zip(batch_group, cuda_devices) - ] - - used_device_ids = cuda_devices[: len(moved)] - # Counterintuitively, it appears replicate expects the source device id to be the first element - # in the device id list. See torch.cuda.comm.broadcast_coalesced, which is called indirectly. - replicas = replicate(model, used_device_ids) - - # We pass all our arguments as kwargs. Create a list of empty tuples of the - # correct shape to serve as (non-existent) positional arguments. - inputs = [()] * len(batch_group) - outputs = parallel_apply(replicas, inputs, moved, used_device_ids) - - # Only the 'loss' is needed. - # a (num_gpu, ) tensor with loss on each GPU - losses = gather([output["loss"].unsqueeze(0) for output in outputs], used_device_ids[0], 0) - return {"loss": losses.mean()} - - def enable_gradient_clipping(model: Model, grad_clipping: Optional[float]) -> None: if grad_clipping is not None: for parameter in model.parameters():