In [1]:
# Process the data
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from process_data import TextPreprocess


radonpy_data = pd.read_csv('./radonpy.csv')
# Remove the space of column name
radonpy_data.columns = radonpy_data.columns.str.replace(' ', '')

smiles_list = radonpy_data['smiles'].tolist()

smiles_process = TextPreprocess()
word2index, index2word, word_count = smiles_process.create_vocabulary(smiles_list)


x = smiles_process.text_to_index(smiles_list, padding=True)
x = torch.tensor(x, dtype=torch.int64)  # Convert to torch.long data type
# Only use the first 10 samples
x = x[:120]
x.shape


In [2]:
class test_mlp(nn.Module):
    def __init__(self):
        super(test_mlp, self).__init__()
        self.fc1 = nn.Linear(5, 3)
        self.fc2 = nn.Linear(3, 2)
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.softmax(x)
        return x
    
test_x = torch.randn(5, 5)
mlp = test_mlp()
y = mlp(test_x)
pred = y.argmax(dim=1)
print(y.shape, pred.shape)


torch.Size([5, 2]) torch.Size([5])


In [3]:
import torch
import torch.nn as nn

class VAE_Encoder(nn.Module):
    def __init__(self, word_count, d_model, latent_dim, nhead=1):
        super(VAE_Encoder, self).__init__()
        # Embedding layer
        self.embedding = nn.Embedding(word_count, d_model)
        
        # Transformer encoder layer
        self.encoder = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, batch_first=True)
        
        # Fully connected layers for mean and log variance of latent space
        self.fc_mu = nn.Sequential(
            nn.Linear(d_model, latent_dim),
            nn.ReLU()
        )
        self.fc_logvar = nn.Sequential(
            nn.Linear(d_model, latent_dim),
            nn.ReLU()
        )
        
    def reparameter(self, mu, logvar):
        """Reparameterization trick: sample from N(mu, sigma^2)"""
        std = torch.exp(0.5 * logvar)  # Compute standard deviation
        eps = torch.randn_like(std)    # Sample epsilon from standard normal
        return mu + eps * std          # Return reparameterized z

    def forward(self, x):
        # x has shape (batch_size, seq_length)
        
        # Embedding the input sequence of tokens
        x = self.embedding(x)  # Shape: (batch_size, seq_length, d_model)
        
        # Pass through Transformer encoder
        x = self.encoder(x)    # Shape: (batch_size, seq_length, d_model)
        
        # Apply mean pooling over the sequence length dimension
        # Compress sequence dimension to get sentence representation
        x = torch.mean(x, dim=1)  # Shape: (batch_size, d_model)
        
        # Compute mean and log variance for the latent distribution
        mu = self.fc_mu(x)        # Shape: (batch_size, latent_dim)
        logvar = self.fc_logvar(x)  # Shape: (batch_size, latent_dim)
        
        # Reparameterization to sample z from the latent space
        z = self.reparameter(mu, logvar)  # Shape: (batch_size, latent_dim)
        
        return z, mu, logvar


In [4]:
encoder = VAE_Encoder(word_count=39, d_model=64, latent_dim=32)
x = torch.randint(0, 39, (10, 12), dtype=torch.int64)
z, mu, logvar = encoder(x)
z.shape, mu.shape, logvar.shape

(torch.Size([10, 32]), torch.Size([10, 32]), torch.Size([10, 32]))

In [5]:
class VAE_Decoder(nn.Module):
    def __init__(self, word_count, latent_dim, d_model, hidden_dim ,output_dim, num_layers=1):
        super(VAE_Decoder, self).__init__()
        self.word_count = word_count
        self.latent_dim = latent_dim
        self.d_model = d_model
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.num_layers = num_layers

        self.embedding = nn.Embedding(word_count, d_model)
        self.decoder = nn.GRU(input_size=d_model, hidden_size=hidden_dim, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.softmax = nn.Softmax(dim=2)

    def forward(self, x, hidden):
        x = self.embedding(x)
        hidden = self.linear_transform(x, hidden)
        x, hidden = self.decoder(x, hidden)
        x = self.fc(x)
        x = self.softmax(x)
        return x, hidden

    def get_linear_layer(self, number_layer=1, **kwargs):
        layer_dims = kwargs.get('layer_dims', None)

        if layer_dims is None:
            raise ValueError("Layer dimensions must be provided")
        
        if len(layer_dims) >3:
            raise ValueError("Only 3 layer are supported")
        
        layers = []
        for i in range(len(layer_dims)-1):
            layer = nn.Linear(layer_dims[i], layer_dims[i+1])
            layers.append(layer)
            layers.append(nn.ReLU())

        return nn.Sequential(*layers)

    def linear_transform(self, x, hidden):
        """
        The hidden size should be (num_layers, batch_size, hidden_dim).
        The x size should be (batch_size, seq_len, d_model).
        For the first time step, the x should be the <start> token.

        x = torch.randn(10, 100, 64)        # Batch size 10, sequence length 12, input size 64
        hidden = torch.randn(1, 10, 128)    # num_layers 1, batch size 10, hidden size 128
        """
        
        if hidden.shape[1] == x.shape[0] and hidden.shape[2] == self.hidden_dim:
            print('The hidden satisfies the requirement')
            return hidden


        # If latent dimension is not the same as embedding dimension, we need to do the linear transformation
        if hidden.shape[2] != self.hidden_dim:
            print('Warning: The latent dimension is not the same as the hidden dimension, we do the linear transformation')
            new_linear = self.get_linear_layer(layer_dims=[self.latent_dim, self.hidden_dim])
            hidden = new_linear(hidden)
            print('After linear transformation, the hidden shape is: ', hidden.shape)
        

        return hidden


In [6]:
decoder = VAE_Decoder(latent_dim=32, word_count=39, d_model=64, hidden_dim=128, output_dim=39, num_layers=1)
trg_input = torch.randint(0, 39, (10, 12), dtype=torch.int64)
hidden = z.unsqueeze(0)

decoder_out, hidden = decoder(trg_input, hidden)
print(decoder_out.shape, hidden.shape)


After linear transformation, the hidden shape is:  torch.Size([1, 10, 128])
torch.Size([10, 12, 39]) torch.Size([1, 10, 128])


In [7]:
class VAE(nn.Module):
    def __init__(self, encoder, decoder):
        super(VAE, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        
    def forward(self, 
                src: torch.Tensor,
                trg: torch.Tensor,
                teacher_forcing: bool = False,
                teacher_forcing_ratio: float = 0.5):
        
        batch_size = trg.shape[0]
        seq_len = trg.shape[1]
        trg_vocab_size = self.decoder.output_dim

        output_seq = torch.zeros(batch_size, seq_len, trg_vocab_size).to(src.device)

        z, mu, logvar = self.encoder(src)

        hidden = z.unsqueeze(0)
        trg_input = trg[:, 0].unsqueeze(1)

        for t in range(1, seq_len):
            out, hidden = self.decoder(trg_input, hidden)
            # Shape of out is (batch_size, 1, trg_vocab_size)
            output_seq[:, t] = out.squeeze(1)

            if teacher_forcing:
                p_teacher_forcing = torch.rand(1).item()
                if p_teacher_forcing < teacher_forcing_ratio:
                    trg_input = trg[:, t].unsqueeze(1)

                else:
                    trg_input = out.argmax(dim=2)
            else:
                trg_input = out.argmax(dim=2)

        return z, mu, logvar, output_seq, hidden


In [8]:
vae = VAE(encoder, decoder)
vae_x = torch.randint(0, 39, (10, 12), dtype=torch.int64) # 10 samples, 12 sequence length
vae_y = torch.randint(0, 39, (10, 12), dtype=torch.int64) # 10 samples, 12 sequence length

z, mu, logvar = vae.encoder(vae_x)
print('z shape is ', z.shape, 'mu shape is ', mu.shape, 'logvar shape is ', logvar.shape)


z shape is  torch.Size([10, 32]) mu shape is  torch.Size([10, 32]) logvar shape is  torch.Size([10, 32])


In [9]:
decoder_out, hidden = vae.decoder(vae_y, z.unsqueeze(0))
print('Decoder out shape is ', decoder_out.shape, 'hidden shape is ', hidden.shape)

After linear transformation, the hidden shape is:  torch.Size([1, 10, 128])
Decoder out shape is  torch.Size([10, 12, 39]) hidden shape is  torch.Size([1, 10, 128])


In [10]:
decoder_out = decoder_out[:, 4, :].unsqueeze(1)
decoder_out.shape   

torch.Size([10, 1, 39])

In [11]:
z, mu, logvar, output_seq, hidden = vae(vae_x, vae_y)
print('z shape is ', z.shape, 'mu shape is ', mu.shape, 'logvar shape is ', logvar.shape)
print('Output seq shape is ', output_seq.shape, 'hidden shape is ', hidden.shape)


After linear transformation, the hidden shape is:  torch.Size([1, 10, 128])
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
z shape is  torch.Size([10, 32]) mu shape is  torch.Size([10, 32]) logvar shape is  torch.Size([10, 32])
Output seq shape is  torch.Size([10, 12, 39]) hidden shape is  torch.Size([1, 10, 128])


In [12]:
output_seq.sum(dim=2)

tensor([[0.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000],
        [0.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000],
        [0.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000],
        [0.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000],
        [0.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000],
        [0.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000],
        [0.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000],
        [0.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,
         1.0000, 1.0000, 1.0000],
        [0.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000, 1.0000,

RNN demo,

In [13]:
rnn = nn.RNN(input_size=64, hidden_size=128, num_layers=1, batch_first=True)
x = torch.randn(10, 100, 64) # Batch size 10, sequence length 12, input size 64
hidden = torch.randn(1, 10, 128) # num_layers 1, batch size 10, hidden size 128
out, hidden = rnn(x, hidden)
out.shape, hidden.shape

(torch.Size([10, 100, 128]), torch.Size([1, 10, 128]))

import from the generative.py

In [19]:
from generative_model import VAE_Encoder, VAE_Decoder, VAE
import torch

encoder = VAE_Encoder(word_count=39, d_model=64, latent_dim=32)
decoder = VAE_Decoder(latent_dim=32, word_count=39, d_model=64, hidden_dim=128, output_dim=39, num_layers=1)
vae = VAE(encoder, decoder)

vae_x = torch.randint(0, 39, (10, 12), dtype=torch.int64) # 10 samples, 12 sequence length
vae_y = torch.randint(0, 39, (10, 12), dtype=torch.int64) # 10 samples, 12 sequence length

z, mu, logvar = vae.encoder(vae_x)
print('z shape is ', z.shape, 'mu shape is ', mu.shape, 'logvar shape is ', logvar.shape)

decoder_out, hidden = vae.decoder(vae_y, z.unsqueeze(0))
print('Decoder out shape is ', decoder_out.shape, 'hidden shape is ', hidden.shape)


z shape is  torch.Size([10, 32]) mu shape is  torch.Size([10, 32]) logvar shape is  torch.Size([10, 32])
After linear transformation, the hidden shape is:  torch.Size([1, 10, 128])
Decoder out shape is  torch.Size([10, 12, 39]) hidden shape is  torch.Size([1, 10, 128])


In [20]:
z, mu, logvar, output_seq, hidden = vae(vae_x, vae_y)
print('z shape is ', z.shape, 'mu shape is ', mu.shape, 'logvar shape is ', logvar.shape)
print('Output seq shape is ', output_seq.shape, 'hidden shape is ', hidden.shape)

After linear transformation, the hidden shape is:  torch.Size([1, 10, 128])
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
The hidden satisfies the requirement
z shape is  torch.Size([10, 32]) mu shape is  torch.Size([10, 32]) logvar shape is  torch.Size([10, 32])
Output seq shape is  torch.Size([10, 12, 39]) hidden shape is  torch.Size([1, 10, 128])


In [21]:
output_seq.argmax(dim=2)

tensor([[ 0, 35, 26, 18, 22,  6,  0, 14, 38, 21, 38,  6],
        [ 0, 11, 38,  6,  3,  7, 24, 12, 22,  6,  0, 14],
        [ 0, 22, 12, 22,  6,  0, 14, 38, 34, 12, 32,  6],
        [ 0,  5,  0, 10, 38, 34, 14, 26, 22, 12, 22,  6],
        [ 0, 27,  1,  1, 16, 14,  6,  3,  4, 19, 19, 12],
        [ 0, 17, 38,  1,  1,  1,  1,  1,  1,  1,  1,  1],
        [ 0, 22,  6,  0, 14, 38, 34, 12, 32,  6,  0, 14],
        [ 0, 10, 38, 34, 14, 26, 22, 12, 22,  6,  0, 14],
        [ 0,  6,  3,  4, 19, 20,  5,  0,  5, 35,  5, 10],
        [ 0, 22,  6,  0, 14, 38, 21, 38,  6,  3,  7, 24]])

For Process the data debug


In [229]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

class Vocabulary:
    def __init__(self):
        self.word2index = {}
        self.index2word = {}
        self.word_counts = 0

    def add_word(self, word):
        word = str(word)
        if word not in self.word2index:
            index = len(self.word2index)
            self.word2index[word] = index
            self.index2word[index] = word
            self.word_counts += 1
        return self.word2index[word]  # Return the index directly

    def create_vocabulary(self, text):
        for sentence in text:
            for word in sentence:
                self.add_word(word)

        for token in ['<start>', '<end>', '<pad>']:
            self.add_word(token)

        return self.word2index, self.index2word, self.word_counts
    
    def get_index(self, word):
        return self.word2index.get(word, None)  # Use .get() to avoid KeyError
    
    def get_word(self, index):
        return self.index2word.get(index, None)  # Use .get() for safety
    
    def __len__(self):
        return len(self.index2word)

class TextPreprocessor:
    def __init__(self, vocab):
        self.vocab = vocab

    def pad_sequences(self, text_index):
        max_length = max(len(sentence) for sentence in text_index)
        return [sentence + [self.vocab.get_index('<pad>')] * (max_length - len(sentence)) for sentence in text_index]

    def text_to_index(self, text, padding=False, start=False, end=False):
        text_index = []
        for sentence in text:
            sentence_index = []
            if start:
                sentence_index.append(self.vocab.get_index('<start>'))
            sentence_index.extend(self.vocab.get_index(word) for word in sentence)
            if end:
                sentence_index.append(self.vocab.get_index('<end>'))

            text_index.append(torch.tensor(sentence_index, dtype=torch.int64))

        if padding:
            text_index = self.pad_sequences(text_index)

        return text_index

class RadonpyDataset(Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

class RadonpyDataLoader:
    def __init__(self, x, y, batch_size, shuffle, vocab):
        self.x = x
        self.y = y
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.vocab = vocab

    def collate_fn(self, batch):
        x, y = zip(*batch)
        x = pad_sequence(x, batch_first=True, padding_value=self.vocab.get_index('<pad>'))
        y = pad_sequence(y, batch_first=True, padding_value=self.vocab.get_index('<pad>'))
        return x, y
        
    def get_dataloader(self):
        dataset = RadonpyDataset(self.x, self.y)
        return DataLoader(dataset, batch_size=self.batch_size, shuffle=self.shuffle, collate_fn=self.collate_fn)

# Load and preprocess data
radonpy_data = pd.read_csv('./radonpy.csv')
radonpy_data.columns = radonpy_data.columns.str.replace(' ', '')

smiles_list = radonpy_data['smiles'].tolist()[:100]

smiles_vocabulary = Vocabulary()
smiles_vocabulary.create_vocabulary(smiles_list)

smiles_processor = TextPreprocessor(smiles_vocabulary)
x = smiles_processor.text_to_index(smiles_list, start=False, end=True, padding=False)
y = smiles_processor.text_to_index(smiles_list, start=True, end=True, padding=False)

dataloader = RadonpyDataLoader(x, y, batch_size=2, shuffle=True, vocab=smiles_vocabulary).get_dataloader()

for i, (data,label) in enumerate(dataloader):
    print('Batch ', i)
    print('data is', data)
    print('label is', label)
    break


Batch  0
data is tensor([[ 0,  1,  7,  1,  1,  1,  0, 12, 13, 13, 13, 13, 13, 13],
        [ 0,  1,  2,  1,  1,  1,  1,  1,  1,  1,  0,  3,  1, 12]])
label is tensor([[11,  0,  1,  7,  1,  1,  1,  0, 12, 13, 13, 13, 13, 13, 13],
        [11,  0,  1,  2,  1,  1,  1,  1,  1,  1,  1,  0,  3,  1, 12]])


In [227]:
import torch
from torch.utils.data import Dataset, DataLoader
import random

# A simple function to generate a random sentence with length between 5 and 15 words
def generate_random_sentence(min_len=5, max_len=15):
    words = ["word{}".format(i) for i in range(1, 101)]  # A list of possible words: ['word1', 'word2', ..., 'word100']
    sentence_length = random.randint(min_len, max_len)  # Randomly select sentence length between 5 and 15
    return random.sample(words, sentence_length)  # Randomly select words based on the generated length

# Custom dataset class for sentences and labels
class SentenceDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sentence = self.data[idx]
        label = self.labels[idx]
        return sentence, label

# Create dummy sentences as data and labels that are one word longer than the data
data = [generate_random_sentence() for _ in range(10)]  # 10 sentences, each with random lengths (5 to 15 words)
labels = [sentence + ['label_extra_word'] for sentence in data]  # Label is one word longer than the sentence

# Function to pad sentences or labels to the maximum length in a batch
def pad_sequence(sequence, max_length, pad_token='<PAD>'):
    return sequence + [pad_token] * (max_length - len(sequence))  # Pads with <PAD> token

# Custom collate function to handle variable-length sentence data
def custom_collate_fn(batch):
    # `batch` is a list of tuples where each tuple is (sentence, label)
    sentences, labels = zip(*batch)

    # Find the maximum length of sentences and labels in the current batch
    max_sentence_length = max(len(sentence) for sentence in sentences)
    max_label_length = max(len(label) for label in labels)

    # Pad sentences and labels to the maximum lengths in the batch
    padded_sentences = [pad_sequence(sentence, max_sentence_length) for sentence in sentences]
    padded_labels = [pad_sequence(label, max_label_length) for label in labels]

    return padded_sentences, padded_labels

# Instantiate the dataset
dataset = SentenceDataset(data, labels)

# Create a DataLoader using the custom collate function
dataloader = DataLoader(dataset, batch_size=2, shuffle=True, collate_fn=custom_collate_fn)

# Iterate through the DataLoader
for batch_sentences, batch_labels in dataloader:
    print("Batch Sentences (Data):", batch_sentences)
    print("Batch Labels:", batch_labels)
    print('-' * 50)


Batch Sentences (Data): [['word92', 'word59', 'word47', 'word85', 'word55', 'word72', 'word83', 'word75', 'word29', 'word65', '<PAD>', '<PAD>'], ['word97', 'word48', 'word21', 'word8', 'word99', 'word52', 'word20', 'word7', 'word23', 'word66', 'word46', 'word89']]
Batch Labels: [['word92', 'word59', 'word47', 'word85', 'word55', 'word72', 'word83', 'word75', 'word29', 'word65', 'label_extra_word', '<PAD>', '<PAD>'], ['word97', 'word48', 'word21', 'word8', 'word99', 'word52', 'word20', 'word7', 'word23', 'word66', 'word46', 'word89', 'label_extra_word']]
--------------------------------------------------
Batch Sentences (Data): [['word37', 'word55', 'word100', 'word67', 'word48', 'word50', 'word77', 'word18', 'word88', 'word33', 'word61', 'word78', 'word76', 'word86'], ['word56', 'word77', 'word27', 'word48', 'word36', 'word55', 'word13', 'word60', 'word12', 'word92', 'word91', 'word78', '<PAD>', '<PAD>']]
Batch Labels: [['word37', 'word55', 'word100', 'word67', 'word48', 'word50', 'wor

for Training debug


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

from typing import Union
import os


class CalculateMetrics:
    def __init__(self, y_true, y_pred):
        self.y_true = y_true
        self.y_pred = y_pred

    def accuracy(self):
        return accuracy_score(self.y_true, self.y_pred)
    
    def precision(self):
        return precision_score(self.y_true, self.y_pred)
    
    def recall(self):
        return recall_score(self.y_true, self.y_pred)
    
    def f1(self):
        return f1_score(self.y_true, self.y_pred)
    
    def get_metrics(self):
        return [self.accuracy(), self.precision(), self.recall(), self.f1()]



class Training:
    def __init__(self, 
                 *,
                 model : nn.Module = None,
                 optimizer : optim = None,
                 loss_function : nn.Module = None,
                 epochs : int = 10,
                 lr_scheduler = None,
                 clip = None,
                 check_point_path : str = None,
                 device = 'cpu'):
        
        self.model = model
        self.optimizer = optimizer
        self.loss_function = loss_function
        self.epochs = epochs
        self.lr_scheduler = lr_scheduler
        self.clip = clip
        self.device = device

        if check_point_path:
            self.check_point_path = check_point_path
        else:
            self.check_point_path = os.getcwd() + '/checkpoints'

    def save_checkpoint(self, idx):
        if not os.path.exists(self.check_point_path):
            os.makedirs(self.check_point_path)

        torch.save(self.model.state_dict(), self.check_point_path + f'/checkpoint_{idx}.pth')

        return self.check_point_path + f'/checkpoint_{idx}.pth'

    def _get_index_checkpoint(self, checkpoint_path, checkpoint_name='checkpoint'):
        index_list = []
        for file in os.listdir(checkpoint_path):
            if file.startswith(checkpoint_name):
                index_list.append(int(file.split('_')[-1].split('.')[0]))
        return index_list

    def load_checkpoint(self, 
                        checkpoint: Union[bool, int] = False):
        
        if checkpoint:
            if isinstance(checkpoint, bool):
                # Load the last checkpoint
                index_list = self._get_index_checkpoint(self.check_point_path)
                idx = max(index_list)
                self.model.load_state_dict(torch.load(self.check_point_path + f'/checkpoint_{idx}.pth'))
                checkpoint_path = self.check_point_path + f'/checkpoint_{idx}.pth'
                return checkpoint_path
            
            elif isinstance(checkpoint, int):
                self.model.load_state_dict(torch.load(self.check_point_path + f'/checkpoint_{checkpoint}.pth'))
                checkpoint_path = self.check_point_path + f'/checkpoint_{checkpoint}.pth'
                return checkpoint_path

            else:
                raise ValueError("checkpoint must be a boolean or an integer")
                   

    def train_loop(self, 
                   train_loader : DataLoader, 
                   teacher_forcing : bool = False):
        # Only do the task to predict one sentence !!!
        self.model.train()
        total_loss = 0

        for seq_input, seq_output in train_loader:
            seq_input, seq_output = seq_input.to(self.device), seq_output.to(self.device)
            self.optimizer.zero_grad()
            output = self.model(seq_input, seq_output)

        return total_loss / len(train_loader)
    
    def test_loop(self, test_loader):
        # Same with above
        self.model.eval()
        total_loss = 0

        with torch.no_grad():

            for data, target in test_loader:
                data, target = data.to(self.device), target.to(self.device)
                output = self.model(data)
                total_loss += self.loss_function(output, target).item()

        return total_loss / len(test_loader)
    

    
    def get_loss_mask(self, data, mask_value=0):
        return torch.where(data != mask_value, torch.tensor(1).to(self.device), torch.tensor(0).to(self.device))
    
    def fit(self, train_loader, test_loader,
                checkpoint_save: Union[bool, int] = False,
                checkpoint_load: Union[bool, str] = False,
                epochs : int = None):
        # Get the dataloader here

        """
        checkpoint: bool or int
            If False, no checkpoint will be saved. If True, checkpoint will be saved after each epoch.
            If int, checkpoint will be saved after each epoch modulo the int value.
        
        checkpoint_load: bool or int   
            If False, no checkpoint will be loaded. If True, the last checkpoint will be loaded.
            If int, the checkpoint will be loaded the epoch modulo the int value.
        """
        
        self.train_losses = []
        self.test_losses = []

        if epochs is None:
            epochs = self.epochs

        if checkpoint_load:
            checkpoint_path = self.load_checkpoint(checkpoint_load)
            state_dict = torch.load(checkpoint_path)
            self.model.load_state_dict(state_dict)
            print(f'Now You Are Training From Checkpoint {checkpoint_path}')

        for epoch in range(epochs):
            train_loss = self.train_loop(train_loader)
            test_loss = self.test_loop(test_loader)
            self.train_losses.append(train_loss)
            self.test_losses.append(test_loss)

            if checkpoint_save:
                if isinstance(checkpoint_save, bool):
                    self.save_checkpoint(epoch)
                    print(f"Each epoch checkpoint saved at {self.check_point_path}")
                elif isinstance(checkpoint_save, int) and isinstance(checkpoint_load, bool):
                    print("No checkpoint loaded, so the checkpoint will be saved at each epoch modulo the int value")
                    if epoch % checkpoint_save == 0:
                        self.save_checkpoint(epoch)
                elif isinstance(checkpoint_save, int) and isinstance(checkpoint_load, int):
                    print(f'{checkpoint_path} is loaded, so the checkpoint will be saved at each epoch modulo the int value')
                    if epoch % checkpoint_save == 0:
                        self.save_checkpoint(epoch + checkpoint_load)
        
        return self.train_losses, self.test_losses

    def predict(self, data):
        self.model.eval()
        with torch.no_grad():
            data = data.to(self.device)
            output = self.model(data)
            return output
        
    
        

In [210]:
loss_fn = torch.nn.CrossEntropyLoss(reduction='none')  # Keep individual losses

predictions = torch.randn(10, 12, 39)  # (batch_size, seq_len, vocab_size)
targets = torch.randint(0, 5, (10, 39))  # (batch_size, seq_len)

# Compute loss (batch_size, seq_len)
loss = loss_fn(predictions, targets)

# Create a mask to ignore <PAD> tokens (assuming <PAD> is token index 0)
mask = (targets != 0).float()

# Apply the mask to the loss
loss = loss * mask

# Optionally, sum the losses over valid tokens and take the average
loss = loss.sum() / mask.sum()  # Averaging only over non-padded tokens
mask


tensor([[1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 0., 0., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 0., 1., 1., 0., 0., 1., 1., 0., 1., 1., 1., 1.,
         0., 1., 1.],
        [0., 1., 1., 1., 0., 1., 0., 1., 1., 1., 0., 0., 1., 0., 1., 1., 1., 1.,
         1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
         1., 1., 0.],
        [0., 1., 0., 1., 1., 1., 1., 0., 1., 0., 1., 1., 1., 1., 0., 0., 1., 0.,
         1., 0., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 0., 1., 1., 1., 1.,
         1., 0., 1.],
        [1., 0., 1., 1., 1., 1., 1., 1., 1., 1., 0., 1., 1., 1., 0., 0., 1., 0.,
         1., 0., 0., 1., 1., 0., 1., 0., 1., 1., 1., 1., 1., 0., 1., 1., 1., 1.,
         1., 1., 1.],
        [0., 0., 1., 0., 1., 0., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 0.,
         1., 0., 0., 1., 1., 0., 1., 1., 1., 1., 1., 1., 0., 1., 1., 1., 1., 1.,
         0., 0., 1.],
        [1., 1., 1., 1., 1., 1., 1., 1., 1., 0., 1., 0., 1., 1., 1., 1., 1., 0.,

chatgpt demo

In [17]:
import torch
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hidden_dim, num_layers=1):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)  # Embedding layer
        self.rnn = nn.RNN(emb_dim, hidden_dim, num_layers=num_layers, batch_first=True)  # RNN layer
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

    def forward(self, x):
        # x: (batch_size, seq_len)
        embedded = self.embedding(x)  # Shape: (batch_size, seq_len, emb_dim)
        outputs, hidden = self.rnn(embedded)  # outputs: (batch_size, seq_len, hidden_dim), hidden: (num_layers, batch_size, hidden_dim)
        return hidden  # We return the final hidden state to initialize the decoder


class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, num_layers=1):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)  # Embedding layer
        self.rnn = nn.RNN(emb_dim, hidden_dim, num_layers=num_layers, batch_first=True)  # RNN layer
        self.fc_out = nn.Linear(hidden_dim, output_dim)  # Output layer to project RNN hidden state to output tokens
        self.softmax = nn.Softmax(dim=2)
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim

    def forward(self, x, hidden):
        # x: (batch_size) contains the index of the current input token
        x = x.unsqueeze(1)  # Add the time dimension: (batch_size) -> (batch_size, 1)
        embedded = self.embedding(x)  # Shape: (batch_size, 1, emb_dim)
        output, hidden = self.rnn(embedded, hidden)  # output: (batch_size, 1, hidden_dim), hidden: (num_layers, batch_size, hidden_dim)
        prediction = self.fc_out(output)  # Shape: (batch_size, 1, output_dim)
        prediction = self.softmax(prediction)  # Apply softmax to get probabilities over the vocabulary
        return prediction.squeeze(1), hidden  # Remove the time dimension for prediction


class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        """
        src: source sentence (batch_size, src_len)
        trg: target sentence (batch_size, trg_len)
        teacher_forcing_ratio: the probability of using teacher forcing
        """
        batch_size = trg.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.fc_out.out_features

        # Initialize an empty tensor to store the decoder's predictions
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)

        # Encode the source sequence
        hidden = self.encoder(src)  # Shape: (num_layers, batch_size, hidden_dim)

        # First input to the decoder is the <START> token (usually tokenized as 0)
        input = trg[:, 0]  # Shape: (batch_size)

        # Loop through the target sequence length
        for t in range(1, trg_len):
            # Get the prediction from the decoder
            output, hidden = self.decoder(input, hidden)  # output: (batch_size, output_dim)
            outputs[:, t, :] = output

            # Determine if we will use teacher forcing or not
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)  # Get the predicted token with the highest probability

            # If teacher forcing, use the actual next token as input; else, use the predicted token
            input = trg[:, t] if teacher_force else top1

        return outputs


# Example usage

# Hyperparameters
INPUT_DIM = 1000  # Vocabulary size of the input
OUTPUT_DIM = 1000  # Vocabulary size of the output (should match target vocab size)
EMB_DIM = 256  # Embedding size for both encoder and decoder
HIDDEN_DIM = 512  # Hidden state dimension
NUM_LAYERS = 1  # Number of layers in the RNN
BATCH_SIZE = 32
SRC_SEQ_LEN = 10  # Source sentence length
TRG_SEQ_LEN = 12  # Target sentence length

# Define encoder and decoder
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
encoder = Encoder(INPUT_DIM, EMB_DIM, HIDDEN_DIM, NUM_LAYERS)
decoder = Decoder(OUTPUT_DIM, EMB_DIM, HIDDEN_DIM, NUM_LAYERS)

# Create Seq2Seq model
model = Seq2Seq(encoder, decoder, device).to(device)

# Example batch of source sentences (batch_size, src_seq_len) and target sentences (batch_size, trg_seq_len)
src = torch.randint(0, INPUT_DIM, (BATCH_SIZE, SRC_SEQ_LEN)).to(device)
trg = torch.randint(0, OUTPUT_DIM, (BATCH_SIZE, TRG_SEQ_LEN)).to(device)

# Forward pass (with teacher forcing)
outputs = model(src, trg, teacher_forcing_ratio=0.75)  # Shape: (batch_size, trg_seq_len, output_dim)
print(outputs.shape)  # (32, 12, 1000) - Batch size, target sequence length, vocabulary size


torch.Size([32, 12, 1000])


In [18]:
# Loss function
criterion = nn.CrossEntropyLoss()

# Example batch of target sentences (ground truth)
# trg has shape (batch_size, trg_len) where each value is the index of the token
trg = torch.randint(0, OUTPUT_DIM, (BATCH_SIZE, TRG_SEQ_LEN)).to(device)

print('at first trg shape:', trg.shape)
# Forward pass through the model
outputs = model(src, trg, teacher_forcing_ratio=0.75)  # Shape: (batch_size, trg_len, output_dim)
print('output shape:', outputs.shape)

# Reshape the outputs and target to match the requirements of nn.CrossEntropyLoss
# The loss function expects inputs of shape (batch_size * trg_len, vocab_size)
# and target of shape (batch_size * trg_len)
outputs = outputs[:, 1:].reshape(-1, OUTPUT_DIM)  # Skip <START> token and flatten to (batch_size * trg_len, output_dim)
print('After reshape output shape:', outputs.shape)
trg = trg[:, 1:].reshape(-1)  # Skip <START> token and flatten to (batch_size * trg_len)
print('After reshape trg shape:', trg.shape)

# Compute the loss
loss = criterion(outputs, trg)

at first trg shape: torch.Size([32, 12])
output shape: torch.Size([32, 12, 1000])
After reshape output shape: torch.Size([352, 1000])
After reshape trg shape: torch.Size([352])
