Custom Transformer Training
-------------------------------

In this notebook we will train the custom transformer on multiple GPUs if they are available. The GPUs are in a single machine. In [multiple](_custom_transformer_train_multiple.ipynb), we will use sagemaker to distribute the training of the model over multiple instances. 

We will pursue the following steps:

- Load the libraries
- Get the tokenizer (arguments: The version (v3, v5, v6, v7, ~~v8~~))
- Creating function to recuperate datasets (arguments: char_p, word_p, max_len, end_mark, corpus_1, corpus_2, data_directory)
- Training (The model is automatically saved)(arguments: config dictionary initialized before)
- Predictions

➡️ Import the libraries.

In [13]:
from libraries import *

# specify a seed for everything
lt.seed_everything(0)

Global seed set to 0


0

➡️ Recuperate the tokenizer

In [6]:
get_tokenizer = lambda version: T5TokenizerFast(f'wolof-translate/wolof_translate/tokenizers/t5_tokenizers/tokenizer_{version}.model')

version = 'v5' # indicate here the version
tokenizer = get_tokenizer(version)

➡️ Function to recuperate datasets

In [9]:
%%writefile wolof-translate/wolof_translate/utils/recuperate_datasets.py
from wolof_translate.utils.improvements.end_marks import add_end_mark
from wolof_translate.utils.sent_corrections import *

def recuperate_datasets(char_p: float, word_p: float, max_len: int, end_mark: int,
                        corpus_1: str = 'french', corpus_2: str = 'wolof', 
                        data_directory: str = 'data/extractions/new_data/'):

  # Let us recuperate the end_mark adding option
  if end_mark == 1:
    # Create augmentation to add on French sentences
    fr_augmentation_1 = TransformerSequences(nac.KeyboardAug(aug_char_p=char_p, aug_word_p=word_p,
                                                             aug_word_max = max_len),
                                          remove_mark_space, delete_guillemet_space, add_mark_space)

    fr_augmentation_2 = TransformerSequences(remove_mark_space, delete_guillemet_space, add_mark_space)
    
  else:
    
    if end_mark == 2:

      end_mark_fn = partial(add_end_mark, end_mark_to_remove = '!', replace = True)
    
    elif end_mark == 3:

      end_mark_fn = partial(add_end_mark)
    
    elif end_mark == 4:

      end_mark_fn = partial(add_end_mark, end_mark_to_remove = '!')
    
    else:  
        
        raise ValueError(f'No end mark number {end_mark}')

    # Create augmentation to add on French sentences
    fr_augmentation_1 = TransformerSequences(nac.KeyboardAug(aug_char_p=char_p, aug_word_p=word_p,
                                                             aug_word_max = max_len),
                                          remove_mark_space, delete_guillemet_space, add_mark_space, end_mark_fn)
    
    fr_augmentation_2 = TransformerSequences(remove_mark_space, delete_guillemet_space, add_mark_space, end_mark_fn)
    
  # Recuperate the train dataset
  train_dataset_aug = SentenceDataset(f"{data_directory}train_set.csv",
                                        tokenizer,
                                        truncation = False,
                                        cp1_transformer = fr_augmentation_1,
                                        cp2_transformer = fr_augmentation_2,
                                        corpus_1=corpus_1,
                                        corpus_2=corpus_2
                                        )

  # Recuperate the valid dataset
  valid_dataset = SentenceDataset(f"{data_directory}valid_set.csv",
                                        tokenizer,
                                        cp1_transformer = fr_augmentation_2,
                                        cp2_transformer = fr_augmentation_2,
                                        corpus_1=corpus_1,
                                        corpus_2=corpus_2,
                                        truncation = False)
  
  # Return the datasets
  return train_dataset_aug, valid_dataset

Overwriting wolof-translate/wolof_translate/utils/recuperate_datasets.py


In [10]:
%run wolof-translate/wolof_translate/utils/recuperate_datasets.py

➡️ Training

In [22]:
# initialize the configurations
config = {
    'epochs': 100,
    'log_step': 5,
    'corpus_1': 'french',
    'corpus_2': 'wolof',
    'drop_out_rate': None,
    'd_model': 512,
    'n_head': 8,
    'dim_ff': None,
    'n_encoders': 6,
    'n_decoders': 6,
    'learning_rate': None,
    'weight_decay': 0.0,
    'char_p': None,
    'word_p': None,
    'end_mark': 3,
    'label_smoothing': 0.1,
    'max_len': 20,
    'random_state': 0,
    'boundaries': [2, 31, 59, 87, 115, 143, 171],
    'batch_sizes': [256, 128, 64, 32, 16, 8, 4, 2],
    'batch_size': None, 
    'warmup_init': True,
    'relative_step': True,
    'num_workers': 4,
    'pin_memory': True,
    # --------------------> Must be changed when continuing a training
    'model_dir': None,
    'new_model_dir': None,
    'continue': False, # --------------------------> Must be changed when continuing training
    'logging_dir': None,
    'save_best': True,
    'tokenizer': tokenizer,
    'data_directory': 'data/extractions/new_data/',
    'data_file': 'corpora_v6.csv',
    'version': 6,
    # in the case of a distributed training
    'hosts': None,
    'current_host': None,
    'num_gpus': None,
    'max_epoch': None,
    'logger': None,
    'return_trainer': False
}

In [25]:
# %%writefile wolof-translate/wolof_translate/utils/training.py
from libraries import *

def train(config: dict):
    
    
    # ---------------------------------------
    # add distribution if necessary (https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-python-sdk/pytorch_mnist/mnist.py)
    
    logger = config['logger']
    
    is_distributed = len(config['hosts']) > 1 and config['backend'] is not None
    
    use_cuda = config['num_gpus'] > 0
    
    config.update({"num_workers": 1, "pin_memory": True} if use_cuda else {})

    if not logger is None:
        
        logger.debug("Distributed training - {}".format(is_distributed))
        
        logger.debug("Number of gpus available - {}".format(config['num_gpus']))
        
    if is_distributed:
        # Initialize the distributed environment.
        world_size = len(config['hosts'])
        
        os.environ["WORLD_SIZE"] = str(world_size)
        
        host_rank = config['hosts'].index(config['current_host'])
        
        os.environ["RANK"] = str(host_rank)
        
        dist.init_process_group(backend=config['backend'], rank=host_rank, world_size=world_size)
        
        if not logger is None: logger.info(
            "Initialized the distributed environment: '{}' backend on {} nodes. ".format(
                config['backend'], dist.get_world_size()
            )
            + "Current host rank is {}. Number of gpus: {}".format(dist.get_rank(), config['num_gpus'])
        )
    # ---------------------------------------
    
    # split the data
    split_data(config['random_state'], config['data_directory'], config['data_file'])

    # recuperate train and test set
    train_dataset, test_dataset = recuperate_datasets(config['char_p'],
                                                        config['word_p'], config['max_len'],
                                                        config['end_mark'])
    # initialize the evaluation object
    evaluation = TranslationEvaluation(config['tokenizer'], train_dataset.decode)

    # let us initialize the trainer
    trainer = ModelRunner(model = Transformer, seed = 0, evaluation = evaluation, optimizer = Adafactor)

    # initialize the encoder and the decoder layers
    encoder_layer = nn.TransformerEncoderLayer(config['d_model'],
                                                config['n_head'],
                                                config['dim_ff'],
                                                config['drop_out_rate'], batch_first = True)

    decoder_layer = nn.TransformerDecoderLayer(config['d_model'],
                                                config['n_head'],
                                                config['dim_ff'],
                                                config['drop_out_rate'], batch_first = True)

    # let us initialize the encoder and the decoder
    encoder = nn.TransformerEncoder(encoder_layer, config['n_encoders'])

    decoder = nn.TransformerDecoder(decoder_layer, config['n_decoders'])

    #-------------------------------------
    # in the case when the linear learning rate scheduler with warmup is used
    
    # let us calculate the appropriate warmup steps (let us take a max epoch of 100)
    # length = len(train_dataset)

    # n_steps = length // config['batch_size']

    # num_steps = config['max_epoch'] * n_steps

    # warmup_steps = (config['max_epoch'] * n_steps) * config['warmup_ratio']

    # Initialize the scheduler parameters
    # scheduler_args = {'num_warmup_steps': warmup_steps, 'num_training_steps': num_steps}
    #-------------------------------------

    # Initialize the transformer parameters
    model_args = {
        'vocab_size': len(tokenizer),
        'encoder': encoder,
        'decoder': decoder,
        'class_criterion': nn.CrossEntropyLoss(label_smoothing = config['label_smoothing']),
        'max_len': config['max_len']
    }

    # Initialize the optimizer parameters
    optimizer_args = {
        'lr': config['learning_rate'],
        'weight_decay': config['weight_decay'],
        # 'betas': (0.9, 0.98),
        'warmup_init': config['warmup_init'],
        'relative_step': config['relative_step']
    }

    # ----------------------------
    # initialize the bucket samplers for distributed environment
    boundaries = config['boundaries']
    batch_sizes = config['batch_sizes']

    train_sampler = SequenceLengthBatchSampler(train_dataset,
                                                boundaries = boundaries,
                                                batch_sizes = batch_sizes)

    test_sampler = SequenceLengthBatchSampler(test_dataset,
                                                boundaries = boundaries,
                                                batch_sizes = batch_sizes)

    # ------------------------------
    # initialize a bucket sampler with fixed batch size in the case of single machine
    # with parallelization on multiple gpus
    # train_sampler = BucketSampler(train_dataset, config['batch_size'])

    # test_sampler = BucketSampler(test_dataset, config['batch_size'])
    
    # ------------------------------

    # Initialize the loaders parameters
    train_loader_args = {'batch_sampler': train_sampler, 'collate_fn': collate_fn,
                        'num_workers': config['num_workers'], 'pin_memory': config['pin_memory']}

    test_loader_args = {'batch_sampler': test_sampler, 'collate_fn': collate_fn,
                        'num_workers': config['num_workers'], 'pin_memory': config['pin_memory']}

    # Add the datasets and hyperparameters to trainer
    trainer.compile(train_dataset, test_dataset, config['tokenizer'], train_loader_args,
                    test_loader_args, optimizer_kwargs = optimizer_args,
                    model_kwargs = model_args,
                    # lr_scheduler=get_linear_schedule_with_warmup,
                    # lr_scheduler_kwargs=scheduler_args,
                    predict_with_generate = True,
                    is_distributed=is_distributed,
                    logging_dir=config['logging_dir'],
                    dist=dist
                    )

    # load the model
    trainer.load(config['model_dir'], load_best = not config['continue'])
    
    # Train the model
    trainer.train(config['epochs'] - trainer.current_epoch, auto_save = True, log_step = config['log_step'], saving_directory=config['new_model_dir'], save_best = config['save_best'])
    
    if config['return_trainer']:
        
        return trainer
    
    return None


Overwriting wolof-translate/wolof_translate/utils/training.py


Below train and save if we want.

In [None]:
from wolof_translate.utils.training import train

In [None]:
trainer = train(config)

# save if necessary

➡️ Predictions


In [None]:
if not trainer is None:
    
    # recuperate the test dataset
    # initialize the transformation sequence
    end_mark_fn = partial(add_end_mark)
    augmentation = TransformerSequences(remove_mark_space, delete_guillemet_space, add_mark_space, end_mark_fn)


    # let us get the test set
    test_dataset = SentenceDataset(f"{config['data_directory']}test_set.csv",
                                            tokenizer = tokenizer,
                                            cp1_transformer = augmentation,
                                            cp2_transformer = augmentation,
                                            corpus_1=config['corpus_1'],
                                            corpus_2=config['corpus_2'],
                                            truncation = True)

    # initialize the bucket samplers for distributed environment
    boundaries = config['boundaries']
    batch_sizes = config['batch_sizes']

    test_sampler = SequenceLengthBatchSampler(test_dataset,
                                                boundaries = boundaries,
                                                batch_sizes = batch_sizes)

    test_loader_args = {'batch_sampler': test_sampler, 'collate_fn': collate_fn,
                            'num_workers': config['num_workers'], 'pin_memory': config['pin_memory']}

    metrics, prediction = trainer.evaluate(test_dataset, test_loader_args)
