Learning a sequence of numbres: 1, 2, 3, 4. Using the full nn.Transformer model.

In [1]:
import math
from tempfile import TemporaryDirectory
from typing import Tuple
import torch
from torch import nn, Tensor
from torch.nn import TransformerEncoder, TransformerEncoderLayer
      
class TransformerModel(nn.Module):
    def __init__(self, ntoken: int, d_model: int, nhead: int, d_hid: int, nlayers: int, dropout: float = 0.5):
        super().__init__()
        self.model_type = 'Transformer'
        self.tgt_mask = None
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        self.transformer = nn.Transformer(d_model, nhead, nlayers, nlayers, d_hid, dropout)
        self.encoder = nn.Embedding(ntoken, d_model)
        self.d_model = d_model
        self.decoder = nn.Linear(d_model, ntoken)
        self.init_weights()

    def _generate_square_subsequent_mask(self, sz: int):
        return torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)

    def init_weights(self):
        initrange = 0.1
        self.encoder.weight.data.uniform_(-initrange, initrange)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-initrange, initrange)

    def forward(self, src: Tensor, tgt: Tensor, use_mask: bool = True):
        if self.tgt_mask is None or self.tgt_mask.size(0) != len(tgt):
            device = tgt.device
            mask = self._generate_square_subsequent_mask(len(tgt)).to(device)
            self.tgt_mask = mask

        src = self.encoder(src) * math.sqrt(self.d_model)
        src = self.pos_encoder(src)
        
        tgt = self.encoder(tgt) * math.sqrt(self.d_model)
        tgt = self.pos_encoder(tgt)
        
        if (use_mask):
          mask = self.tgt_mask
        else:
          mask = None
    
        output = self.transformer(src, tgt, tgt_mask=mask)
        output = self.decoder(output)
        return output
  
  
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1, max_len: int = 5000):
        super().__init__()
        self.dropout = nn.Dropout(p=dropout)

        position = torch.arange(max_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x: Tensor) -> Tensor:
        """
        Arguments:
            x: Tensor, shape ``[seq_len, batch_size, embedding_dim]``
        """
        x = x + self.pe[:x.size(0)]
        return self.dropout(x)

In [2]:
train_data = torch.tensor(range(100))

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def batchify(data: Tensor, bsz: int) -> Tensor:
    """Divides the data into ``bsz`` separate sequences, removing extra elements
    that wouldn't cleanly fit.

    Arguments:
        data: Tensor, shape ``[N]``
        bsz: int, batch size

    Returns:
        Tensor of shape ``[N // bsz, bsz]``
    """
    seq_len = data.size(0) // bsz
    data = data[:seq_len * bsz]
    data = data.view(bsz, seq_len).t().contiguous()
    return data.to(device)

batch_size = 1
eval_batch_size = 1
train_data = batchify(train_data, batch_size)  # shape ``[seq_len, batch_size]``

In [3]:
bptt = 10

def get_batch(source: Tensor, i: int) -> Tuple[Tensor, Tensor]:
    """
    Args:
        source: Tensor, shape ``[full_seq_len, batch_size]``
        i: int

    Returns:
        tuple (data, target), where data has shape ``[seq_len, batch_size]`` and
        target has shape ``[seq_len * batch_size]``
    """
    seq_len = min(bptt, len(source) - 1 - i)
    data = source[i:i+seq_len]
    target = source[i+1:i+1+seq_len]
    return data, target

In [4]:
# ntokens = len(vocab)  # size of vocabulary
ntokens = len(set(train_data.reshape(-1)))  # size of vocabulary
emsize = 200  # embedding dimension
d_hid = 200  # dimension of the feedforward network model in ``nn.TransformerEncoder``
nlayers = 2  # number of ``nn.TransformerEncoderLayer`` in ``nn.TransformerEncoder``
nhead = 2  # number of heads in ``nn.MultiheadAttention``
dropout = 0.2  # dropout probability
model = TransformerModel(ntokens, emsize, nhead, d_hid, nlayers, dropout).to(device)

In [5]:
import copy
import time

criterion = nn.CrossEntropyLoss()
lr = 0.0002  # learning rate
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

def train(model: nn.Module) -> None:
    model.train()  # turn on train mode
    total_loss = 0

    for batch, i in enumerate(range(0, train_data.size(0) - 1, bptt)):
        optimizer.zero_grad()
        data, targets = get_batch(train_data, i)
        output = model(data, targets)
        loss = criterion(output.view(-1, ntokens), targets.reshape(-1)) 
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.5)
        optimizer.step()
        total_loss += loss.item()
        
    return total_loss / (batch + 1)

In [6]:
epochs = 100
start_time = time.time()

with TemporaryDirectory() as tempdir:
  for epoch in range(1, epochs + 1):
    epoch_start_time = time.time()
    train_loss = train(model)
    print(f'Epoch {epoch:3d}: {train_loss:5.4f}')


print(f"Training time: {round(time.time() - start_time)} seconds")

Epoch   1: 5.0555
Epoch   2: 4.6728
Epoch   3: 4.5123
Epoch   4: 4.3737
Epoch   5: 4.1867
Epoch   6: 3.9796
Epoch   7: 3.7331
Epoch   8: 3.4582
Epoch   9: 3.2765
Epoch  10: 3.1079
Epoch  11: 2.9396
Epoch  12: 2.8198
Epoch  13: 2.6412
Epoch  14: 2.3778
Epoch  15: 2.1301
Epoch  16: 2.1219
Epoch  17: 2.0660
Epoch  18: 1.9102
Epoch  19: 1.7953
Epoch  20: 1.7614
Epoch  21: 1.5952
Epoch  22: 1.4810
Epoch  23: 1.4233
Epoch  24: 1.3357
Epoch  25: 1.1823
Epoch  26: 1.1851
Epoch  27: 1.0851
Epoch  28: 1.0348
Epoch  29: 0.9250
Epoch  30: 0.8532
Epoch  31: 0.8646
Epoch  32: 0.7108
Epoch  33: 0.6444
Epoch  34: 0.6378
Epoch  35: 0.6272
Epoch  36: 0.5722
Epoch  37: 0.5143
Epoch  38: 0.5079
Epoch  39: 0.4251
Epoch  40: 0.4218
Epoch  41: 0.3649
Epoch  42: 0.3237
Epoch  43: 0.3065
Epoch  44: 0.2834
Epoch  45: 0.3064
Epoch  46: 0.2383
Epoch  47: 0.2280
Epoch  48: 0.2277
Epoch  49: 0.1886
Epoch  50: 0.2011
Epoch  51: 0.1686
Epoch  52: 0.1563
Epoch  53: 0.1387
Epoch  54: 0.1283
Epoch  55: 0.1248
Epoch  56:

In [7]:
def generate_sequence(model, initial_sequence, n_elements, temperature=1.0):
  model.eval()  # Set the model to evaluation mode
  
  generated_sequence = initial_sequence[:-1]
  
  src_tensor = torch.tensor(initial_sequence[-bptt:]).unsqueeze(1).to(device)
  
  # Use torch.no_grad() to prevent gradient calculations during text generation
  with torch.no_grad():
    # Generate new elements
    for _ in range(n_elements):
      # Convert the input_sequence to a tensor and add batch dimension
      tgt_tensor = torch.tensor(generated_sequence[-bptt:]).unsqueeze(1).to(device)
      
      # print(tgt_tensor)
          
      # Evaluate the model
      # output = model(src_tensor, tgt_tensor, use_mask=False)
      output = model(
        torch.tensor([0, 1, 2]).unsqueeze(1).to(device), 
        torch.tensor([0, 1]).unsqueeze(1).to(device), use_mask=False)
                    
      print(output.shape)
      # Apply temperature scaling to the output logits to control the randomness of the generated text
      output = output[-1, 0, :] / temperature
            
      # Convert the output logits into probabilities using softmax
      probabilities = torch.softmax(output, dim=-1)
                  
      # Sample the next element using the probabilities
      next_element = torch.multinomial(probabilities, num_samples=1).item()

      print(next_element)
      # Append the element
      generated_sequence += [next_element]

  return generated_sequence

generate_sequence(model, [1, 2, 3], 10, 0.8)

torch.Size([2, 1, 100])
1
torch.Size([2, 1, 100])
1
torch.Size([2, 1, 100])
1
torch.Size([2, 1, 100])
1
torch.Size([2, 1, 100])
1
torch.Size([2, 1, 100])
1
torch.Size([2, 1, 100])
1
torch.Size([2, 1, 100])
1
torch.Size([2, 1, 100])
1
torch.Size([2, 1, 100])
1


[1, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

In [8]:
torch.tensor([1,2,3,4,5])[:-1]

tensor([1, 2, 3, 4])