In [10]:
from jflegDataset import JflegDataset
import pandas as pd
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from transformers import BertTokenizer, GPT2Tokenizer
import torch
from torch import nn

In [11]:
custom_tokens = ["[CLS]", "[SEP]", "[PAD]"]
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
tokenizer.add_tokens(custom_tokens)
TRAIN_PATH = "src/dataset/train.csv"
VAL_PATH = "src/dataset/eval.csv"


In [12]:
tokenizer.padding_side = "left"
tokenizer.pad_token = "[PAD]"
tokenizer.bos_token = "[CLS]"
tokenizer.eos_token = "[SEP]"


sentences = ["It will rain in the",
            "I want to eat a big bowl of",
            "My dog is"]
a = tokenizer(sentences, return_tensors="pt", padding=True)

target = ["It will",
            "I want to",
            "My"]

b = tokenizer(target, return_tensors="pt", padding=True)

b

{'input_ids': tensor([[50259,  1026,   481],
        [   40,   765,   284],
        [50259, 50259,  3666]]), 'attention_mask': tensor([[0, 1, 1],
        [1, 1, 1],
        [0, 0, 1]])}

In [13]:
def custom_collate_fn(batch):
    inputs, targets = zip(*batch)

    inputs = {key: torch.cat([item[key] for item in inputs], dim=0)
              for key in inputs[0].keys()}
    targets = [{key: torch.cat([item[key] for item in targets_batch], dim=0)
                for key in targets_batch[0].keys()} for targets_batch in zip(*targets)]

    return inputs, targets

ds_train = JflegDataset(TRAIN_PATH, tokenizer)
ds_eval = JflegDataset(VAL_PATH, tokenizer)

dl_train = DataLoader(ds_train, batch_size=2)
dl_eval = DataLoader(ds_eval, batch_size=2)

tokenizer

GPT2Tokenizer(name_or_path='gpt2', vocab_size=50257, model_max_length=1024, is_fast=False, padding_side='left', truncation_side='right', special_tokens={'bos_token': '[CLS]', 'eos_token': '[SEP]', 'unk_token': '<|endoftext|>', 'pad_token': '[PAD]'}, clean_up_tokenization_spaces=True),  added_tokens_decoder={
	50256: AddedToken("<|endoftext|>", rstrip=False, lstrip=False, single_word=False, normalized=True, special=True),
	50257: AddedToken("[CLS]", rstrip=False, lstrip=False, single_word=False, normalized=True, special=False),
	50258: AddedToken("[SEP]", rstrip=False, lstrip=False, single_word=False, normalized=True, special=False),
	50259: AddedToken("[PAD]", rstrip=False, lstrip=False, single_word=False, normalized=True, special=False),
}

In [14]:
it = iter(dl_train)
next(it)
x,y = next(it)
print(x["input_ids"].squeeze())
ds_train.decode(x["input_ids"][0].squeeze())

tensor([[50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259,
         50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259,
         50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259,
         50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259,
         50259, 50259, 50259, 50259, 50259, 50259, 50259, 50257,   604,    25,
          9930,   423,   257,  1263,  2863,   284,  8335,   329,   511,  2003,
          1204,   220,   220, 50258],
        [50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259,
         50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259,
         50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259, 50259,
         50257,   317,  2818,  1672,   286,  6416,  7793,  1134,   373,   618,
           347,  1042,   283,   694, 25036,   262,  3850,  3194,   416,   262,
          5822,   290,  1908,   340,   284,   262,  4141, 23129, 28283,  6711,
           764

' 4:they have a big chance to prepare for their future life  '

In [15]:
from torch import nn, Tensor
import math

# https://pytorch.org/tutorials/beginner/transformer_tutorial.html
class PositionalEncoding(nn.Module):

    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: torch.Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

In [16]:
import torch
from torch import nn, Tensor

class TransformerModel(nn.Module):

    def __init__(self, ntoken: int, d_model: int):
        super().__init__()
        self.model_type = 'Transformer'
        self.pos_encoder = PositionalEncoding(d_model, 0.1)
        self.embedding = TokenEmbedding(ntoken, d_model)

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=4,
            num_encoder_layers=6,
            num_decoder_layers=6,
            dim_feedforward=1024,
            dropout=0.1,
            batch_first=True
        )

        self.head = nn.Linear(d_model, ntoken)
        self.sm = nn.Softmax(dim=0)

    def forward(self, input: Tensor, target: Tensor, input_mask: Tensor, target_mask: Tensor) -> Tensor:
        input_mask = input_mask.bool()
        target_mask = target_mask.bool()

        src = self.embedding(input) 
        src = self.pos_encoder(src)

        target = self.embedding(target)
        target = self.pos_encoder(target)

        encoded = self.transformer.encoder(src, src_key_padding_mask=input_mask)
        decoded = self.transformer.decoder(target, memory=encoded, tgt_key_padding_mask=target_mask)

        out = self.head(decoded)

        return self.sm(out)

In [17]:
model = TransformerModel(tokenizer.vocab_size+len(custom_tokens), 768)

for x,y in dl_train:
    src = x["input_ids"]
    target = y[0]["input_ids"]
    src_mask = x["attention_mask"]
    target_mask = y[0]["attention_mask"]

    print(f"{src.shape=}")
    print(f"{target.shape=}")
    out = model(src,target,src_mask,target_mask)
    print(out.shape)
    break


src.shape=torch.Size([2, 64])
target.shape=torch.Size([2, 64])
torch.Size([2, 64, 50260])


In [18]:

if __name__ == "__main__":
    n_classes = 100

    source = torch.randint(low=0, high=n_classes, size=(20, 16))
    target = torch.randint(low=0, high=n_classes, size=(20, 32))

    s2s = TransformerModel(n_classes,700)

    out = s2s(source, target)
    print(out.size())
    print(out)

TypeError: TransformerModel.forward() missing 2 required positional arguments: 'input_mask' and 'target_mask'