In [134]:
import os
from os.path import exists
import torch
from torch.nn.functional import log_softmax, pad
import time
from torch.optim.lr_scheduler import LambdaLR
from torchtext.data.functional import to_map_style_dataset
from torch.utils.data import DataLoader
from torchtext.vocab import build_vocab_from_iterator
import torchtext.datasets as datasets
import spacy
import GPUtil
from torch.utils.data.distributed import DistributedSampler
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP

In [135]:
from make_model import make_model
from Processing.batch import Batch
from Processing.labelsmoothing import LabelSmoothing
from config import Config

In [136]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

In [137]:
class TrainState:
    """Track number of steps, examples, and tokens processed"""

    step: int = 0  # Steps in the current epoch
    accum_step: int = 0  # Number of gradient accumulation steps
    samples: int = 0  # total # of examples used
    tokens: int = 0  # total # of tokens processed

In [138]:
def run_epoch(
    data_iter,
    model,
    loss_compute,
    optimizer,
    scheduler,
    mode="train",
    accum_iter=1,
    train_state=TrainState(),
):
    """Train a single epoch"""
    start = time.time()
    total_tokens = 0
    total_loss = 0
    tokens = 0
    n_accum = 0
    for i, batch in enumerate(data_iter):
        out = model(batch.src, batch.tgt, batch.src_mask, batch.tgt_mask)
        loss, loss_node = loss_compute(out, batch.tgt_y, batch.ntokens)
        # loss_node = loss_node / accum_iter
        
        if mode == "train" or mode == "train+log":
            loss_node.backward()
            train_state.step += 1
            train_state.samples += batch.src.shape[0]
            train_state.tokens += batch.ntokens
            if i % accum_iter == 0:
                optimizer.step()
                optimizer.zero_grad(set_to_none=True)
                n_accum += 1
                train_state.accum_step += 1
            scheduler.step()

        total_loss += loss
        total_tokens += batch.ntokens
        tokens += batch.ntokens
        
        if i % 40 == 1 and (mode == "train" or mode == "train+log"):
            lr = optimizer.param_groups[0]["lr"]
            elapsed = time.time() - start
            print(("Epoch Step: %6d | Accumulation Step: %3d | Loss: %6.2f | Tokens / Sec: %7.1f | Learning Rate: %6.1e")
                % (i, n_accum, loss / batch.ntokens, tokens / elapsed, lr))
            start = time.time()
            tokens = 0
        del loss
        del loss_node
    return (total_loss / total_tokens), train_state

In [139]:
def rate(step, model_size, factor, warmup):
    """
    we have to default the step to 1 for LambdaLR function
    to avoid zero raising to negative power.
    """
    if step == 0:
        step = 1
    return factor * (model_size ** (-0.5) * min(step ** (-0.5), step * warmup ** (-1.5)))

In [140]:
class SimpleLossCompute:
    "A simple loss compute and train function."

    def __init__(self, projection, criterion):
        self.proj = projection
        self.criterion = criterion

    def __call__(self, x, y, norm):
        x = log_softmax(self.proj(x), dim=-1)
        sloss = (self.criterion(x.contiguous().view(-1, x.size(-1)), y.contiguous().view(-1)) / norm)
        return sloss.data * norm, sloss

In [141]:
class DummyOptimizer(torch.optim.Optimizer):
    def __init__(self):
        self.param_groups = [{"lr": 0}]
        None

    def step(self):
        None

    def zero_grad(self, set_to_none=False):
        None
        
class DummyScheduler:
    def step(self):
        None

In [142]:
def load_tokenizers():
    try:
        spacy_de = spacy.load("de_core_news_sm")
    except IOError:
        os.system("python -m spacy download de_core_news_sm")
        spacy_de = spacy.load("de_core_news_sm")

    try:
        spacy_en = spacy.load("en_core_web_sm")
    except IOError:
        os.system("python -m spacy download en_core_web_sm")
        spacy_en = spacy.load("en_core_web_sm")

    return spacy_de, spacy_en

In [143]:
spacy_de, spacy_en = load_tokenizers()

In [144]:
def tokenize(text, tokenizer):
    return [tok.text for tok in tokenizer.tokenizer(text)]


def yield_tokens(data_iter, tokenizer, index):
    for from_to_tuple in data_iter:
        yield tokenizer(from_to_tuple[index])

In [145]:
def tokenize_de(text):
    return tokenize(text, spacy_de)

def tokenize_en(text):
    return tokenize(text, spacy_en)

In [146]:
def build_vocabulary():
    print("Building German Vocabulary ...")
    train, val, test = datasets.Multi30k(language_pair=("de", "en"))
    vocab_src = build_vocab_from_iterator(
        yield_tokens(train + val, tokenize_de, index=0),
        min_freq=2,
        specials=["<s>", "</s>", "<blank>", "<unk>"],
    )

    print("Building English Vocabulary ...")
    train, val, test = datasets.Multi30k(language_pair=("de", "en"))
    vocab_tgt = build_vocab_from_iterator(
        yield_tokens(train + val, tokenize_en, index=1),
        min_freq=2,
        specials=["<s>", "</s>", "<blank>", "<unk>"],
    )

    vocab_src.set_default_index(vocab_src["<unk>"])
    vocab_tgt.set_default_index(vocab_tgt["<unk>"])
    
    del train
    del val
    del test
    
    return vocab_src, vocab_tgt

def load_vocab():
    if not exists("vocab.pt"):
        vocab_src, vocab_tgt = build_vocabulary()
        torch.save((vocab_src, vocab_tgt), "vocab.pt")
    else:
        vocab_src, vocab_tgt = torch.load("vocab.pt")
    print("Finished.\nVocabulary sizes:")
    print(len(vocab_src))
    print(len(vocab_tgt))
    return vocab_src, vocab_tgt


In [147]:
vocab_src, vocab_tgt = load_vocab()

Finished.
Vocabulary sizes:
8185
6291


In [148]:
def collate_batch(
    batch,
    src_pipeline,
    tgt_pipeline,
    src_vocab,
    tgt_vocab,
    device='cuda',
    max_padding=128,
    pad_id=2,
):
    bs_id = torch.tensor([0], device=device)  # <s> token id
    eos_id = torch.tensor([1], device=device)  # </s> token id
    src_list, tgt_list = [], []
    for (_src, _tgt) in batch:
        processed_src = torch.cat(
            [
                bs_id,
                torch.tensor(
                    src_vocab(src_pipeline(_src)),
                    dtype=torch.int64,
                    device=device,
                ),
                eos_id,
            ],
            0,
        )
        processed_tgt = torch.cat(
            [
                bs_id,
                torch.tensor(
                    tgt_vocab(tgt_pipeline(_tgt)),
                    dtype=torch.int64,
                    device=device,
                ),
                eos_id,
            ],
            0,
        )
        src_list.append(
            # warning - overwrites values for negative values of padding - len
            pad(
                processed_src,
                (
                    0,
                    max_padding - len(processed_src),
                ),
                value=pad_id,
            )
        )
        tgt_list.append(
            pad(
                processed_tgt,
                (0, max_padding - len(processed_tgt)),
                value=pad_id,
            )
        )

    src = torch.stack(src_list,)
    tgt = torch.stack(tgt_list)
    return src, tgt

In [149]:
def create_dataloaders(
    vocab_src,
    vocab_tgt,
    batch_size=12000,
    max_padding=128,
    is_distributed=True,
    device='cuda'
):
    # def create_dataloaders(batch_size=12000):
    def collate_fn(batch):
        return collate_batch(
            batch,
            tokenize_de,
            tokenize_en,
            vocab_src,
            vocab_tgt,
            device,
            max_padding=max_padding,
            pad_id=vocab_src.get_stoi()["<blank>"],
        )

    train_iter, valid_iter, _ = datasets.Multi30k(language_pair=("de", "en"))

    train_iter_map = to_map_style_dataset(train_iter)  # DistributedSampler needs a dataset len()
    train_sampler = (DistributedSampler(train_iter_map) if is_distributed else None)
    valid_iter_map = to_map_style_dataset(valid_iter)
    valid_sampler = (DistributedSampler(valid_iter_map) if is_distributed else None)

    train_dataloader = DataLoader(
        train_iter_map,
        batch_size=batch_size,
        shuffle=(train_sampler is None),
        sampler=train_sampler,
        collate_fn=collate_fn,
        generator=torch.Generator('cpu')
    )
    valid_dataloader = DataLoader(
        valid_iter_map,
        batch_size=batch_size,
        shuffle=(valid_sampler is None),
        sampler=valid_sampler,
        collate_fn=collate_fn,
        generator=torch.Generator('cpu')
    )
    return train_dataloader, valid_dataloader

In [150]:
def train_worker(
    gpu,
    ngpus_per_node,
    vocab_src,
    vocab_tgt,
    config,
    is_distributed=False,
):
    print(f"Train worker process using cuda:{gpu} for training", flush=True)
    pad_idx = vocab_tgt["<blank>"]
    d_model = 512
    model = make_model(config)
    is_main_process = True
    
    # if is_distributed:
    #     dist.init_process_group("nccl", init_method="env://", rank=gpu, world_size=ngpus_per_node)
    #     model = DDP(model, device_ids=[gpu])
    #     module = model.module
    #     is_main_process = gpu == 0

    criterion = LabelSmoothing(size=len(vocab_tgt), padding_idx=pad_idx, smoothing=0.1)
    criterion.to(device)

    train_dataloader, valid_dataloader = create_dataloaders(
        vocab_src,
        vocab_tgt,
        batch_size=config.batch_size // ngpus_per_node,
        max_padding=config.max_padding,
        is_distributed=is_distributed,
        device='cuda'
    )

    optimizer = torch.optim.Adam(model.parameters(), lr=config.base_lr, betas=(0.9, 0.98), eps=1e-9)
    lr_scheduler = LambdaLR(
        optimizer=optimizer,
        lr_lambda=lambda step: rate(
            step, d_model, factor=1, warmup=config.warmup
        ),
    )
    train_state = TrainState()

    def gen_batch(dataloader):
        for b0, b1 in dataloader:
            yield Batch(b0.to(device), b1.to(device), pad_idx)

    for epoch in range(config.num_epochs):
        # if is_distributed:
        #     train_dataloader.sampler.set_epoch(epoch)
        #     valid_dataloader.sampler.set_epoch(epoch)

        model.train()
        print(f"Epoch {epoch} Training ====", flush=True)
        run_epoch(
            gen_batch(train_dataloader),
            model,
            SimpleLossCompute(model.proj, criterion),
            optimizer,
            lr_scheduler,
            mode="train+log",
            accum_iter=config.accum_iter,
            train_state=train_state,
        )

        GPUtil.showUtilization()
        if is_main_process:
            file_path = "%s%.2d.pt" % (config.file_prefix, epoch)
            torch.save(model.state_dict(), file_path)
        torch.cuda.empty_cache()

        print(f"Epoch {epoch} Validation ====", flush=True)
        model.eval()
        sloss = run_epoch(
            gen_batch(valid_dataloader),
            model,
            SimpleLossCompute(model.proj, criterion),
            DummyOptimizer(),
            DummyScheduler(),
            mode="eval",
        )
        print(sloss)
        torch.cuda.empty_cache()

    if is_main_process:
        file_path = "%sfinal.pt" % config.file_prefix
        torch.save(model.state_dict(), file_path)

# def train_distributed_model(vocab_src, vocab_tgt, config):
#     ngpus = torch.cuda.device_count()
#     os.environ["MASTER_ADDR"] = "localhost"
#     os.environ["MASTER_PORT"] = "12356"
#     print(f"Number of GPUs detected: {ngpus}")
#     print("Spawning training processes ...")
#     mp.spawn(train_worker, nprocs=ngpus, args=(ngpus, vocab_src, vocab_tgt, config, True))

def train_model(vocab_src, vocab_tgt, config):
    # if config.distributed:
    #     train_distributed_model(vocab_src, vocab_tgt, config)
    # else:
        train_worker(0, 1, vocab_src, vocab_tgt, config, False)

In [151]:
def load_trained_model():
    config = Config()
    config.batch_size = 32
    config.src_vocab = len(vocab_src)
    config.tgt_vocab = len(vocab_tgt)
    config.N = 6
    model_path = "multi30k_model_final.pt"
    if not exists(model_path):
        train_model(vocab_src, vocab_tgt, config)
    model = make_model(config)
    model.load_state_dict(torch.load("multi30k_model_final.pt"))
    return model

In [152]:
model = load_trained_model()

Train worker process using cuda:0 for training
Epoch 0 Training ====


RuntimeError: Expected a 'cuda' device type for generator but found 'cpu'