In [1]:
import torch
import torch.nn as nn
import re
from sklearn.preprocessing import OneHotEncoder
import numpy as np
from torch.utils.data import DataLoader, Dataset

In [2]:
with open("adele.txt", "r") as f:
    text = f.read()

# Split the text (I consider whitespaces, points and commas as word here) + remove the empty string ""
# data = np.array(list(filter(lambda a: a != "", re.split('(\W)', text))))
data = np.array(list(filter(lambda a: a != "", map(lambda x: x.lower(), re.split('[^a-zA-Z\.]', text)))))

# Check that the empty string is correctly removed
if len(data[data == ""]) != 0:
    raise Exception("The empty string wasn't proprely removed from the data")

n = int(0.8 * len(data))

# Create the encoder and set the categories on the training set
encoder = OneHotEncoder().fit(data.reshape(-1,1))

# Check the number of categories of the encoder is the same than the different words in the corpus
if len(encoder.categories_[0]) != len(set(data)):
    raise Exception(f"Encoder categories counts {len(encoder.categories_[0])} don't match the value of differents words {len(set(data))}")

vocab_size = len(set(data))

train_data = torch.Tensor(encoder.transform(data[:n].reshape(-1,1)).toarray())
val_data = torch.Tensor(encoder.transform(data[n:].reshape(-1,1)).toarray())

print(f"Example of train data sample: {train_data[0:5]}")
print(f"Dimention of train_data : {train_data.shape}")
print(f"Example of the inverted encoding: {encoder.inverse_transform(train_data[0:5])}")

Example of train data sample: tensor([[0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.],
        [0., 0., 0.,  ..., 0., 0., 0.]])
Dimention of train_data : torch.Size([16124, 1382])
Example of the inverted encoding: [['looking']
 ['for']
 ['some']
 ['education']
 ['made']]


In [22]:
# It is arbitrary values
EMBEDDING_DIM = 100
HIDDEN_DIM = 256
N_LAYERS = 1
DROPOUT = 0.5
N_EPOCHS = 5
LR = 3e-4
BATCH_SIZE = 32
SEQ_LEN = 30

In [4]:
def create_sequence(data, seq_len):
    n = len(data)
    X = []
    y = []
    for i in range(n - seq_len):
        X.append(data[i:i+seq_len])
        y.append(data[i+1:i+seq_len+1])
    return X, y

train_X, train_y = create_sequence(train_data, SEQ_LEN)

class Text(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    
train_dataset = Text(train_X, train_y)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)

In [31]:
class NLP(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, n_layers, dropout):
        super(NLP, self).__init__()
        self.vocab_size = vocab_size
        self.state_dim = hidden_dim
        self.lstm = nn.LSTM(vocab_size, hidden_dim, n_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)
        
    def forward(self, x):
        # x = [batch_size, seq_len, vocab_size]
        # print(x.shape)
        x, _ = self.lstm(x)
        # x = [batch_size, seq_len, state_dim]
        # print(x.shape)
        x = self.fc(x)
        # x = [batch_size, seq_len, vocab_size]
        # print(x.shape)
        return x

In [35]:
def train(model, dataloader, n_epochs, lr, batch_size, seq_len):
    # Setup GPU related variables
    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"device = {device}")
    torch.cuda.empty_cache()
    model.to(device)
    
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    model.train()

    for epoch in range(n_epochs):
        train_losses = []
        for i, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)

            optimizer.zero_grad()
            output = model(X)

            # print(output.shape, y.shape)
            output = output.view(-1, vocab_size)
            y = y.view(-1, vocab_size)

            # print(output.shape, y.shape)
            loss = criterion(output, y)

            loss.backward()
            optimizer.step()

            train_losses.append(loss.item())
            if i % 100 == 0:
                print(f"Epoch {epoch}, step {i}, loss {loss.item()}")
        
        print(f"Epoch {epoch} finished. Train loss: {np.array(train_losses).mean()}, Perplexity: {np.exp(np.array(train_losses).mean())}")

model = NLP(vocab_size, EMBEDDING_DIM, HIDDEN_DIM, N_LAYERS, DROPOUT)

train(model, train_loader, N_EPOCHS, LR, BATCH_SIZE, SEQ_LEN)

device = cpu
Epoch 0, step 0, loss 7.234655857086182
Epoch 0, step 100, loss 5.61555814743042
Epoch 0, step 200, loss 5.565292835235596
Epoch 0, step 300, loss 5.515486717224121
Epoch 0, step 400, loss 5.595889568328857
Epoch 0, step 500, loss 5.36691427230835
Epoch 0 finished. Train loss: 5.658745363739802, Perplexity: 286.7886013646327
Epoch 1, step 0, loss 5.325730323791504
Epoch 1, step 100, loss 5.438483238220215
Epoch 1, step 200, loss 5.254950046539307
Epoch 1, step 300, loss 5.218662738800049
Epoch 1, step 400, loss 5.281633377075195
Epoch 1, step 500, loss 5.16405725479126
Epoch 1 finished. Train loss: 5.301364306190142, Perplexity: 200.61031725616814
Epoch 2, step 0, loss 5.28195858001709
Epoch 2, step 100, loss 4.91642951965332
Epoch 2, step 200, loss 4.980137348175049
Epoch 2, step 300, loss 4.832734107971191
Epoch 2, step 400, loss 4.7653679847717285
Epoch 2, step 500, loss 4.786016464233398
Epoch 2 finished. Train loss: 4.909591751591586, Perplexity: 135.58405113516778
Ep

In [36]:
def generate_text(model, encoder, start_word, num_words=10, random_sample=False):
    """
    Generate text based on the trained model output.
    
    Parameters:
    - model: The trained PyTorch model.
    - encoder: The OneHotEncoder used for encoding the words.
    - start_word: The initial word to start generating text.
    - num_words: Number of words to generate.
    - random_sample: If True, sample from the distribution instead of taking the max probability.
    
    Returns:
    - generated_text: The generated sequence of words.
    """
    model.eval()

    start_word = start_word.lower()
    # Initialize the generated text with the start word
    generated_words = [start_word]
    
    # Convert the start word to its one-hot encoded representation
    input_word = encoder.transform(np.array([[start_word]])).toarray()
    input_tensor = torch.Tensor(input_word)

    # Generate the specified number of words
    for _ in range(num_words):
        # Get the model output
        with torch.no_grad():
            output = model(input_tensor)
        
        # Apply softmax to get probabilities
        probabilities = torch.softmax(output, dim=1).numpy().flatten()
        
        # Determine the next word
        if random_sample:
            next_index = np.random.choice(len(probabilities), p=probabilities)
        else:
            next_index = np.argmax(probabilities)
        
        # Get the corresponding word from the encoder
        next_word = encoder.inverse_transform(np.eye(len(probabilities))[next_index].reshape(1, -1))[0][0]
        
        # Append the generated word to the list
        generated_words.append(next_word)
        
        # Update the input tensor with the new word
        input_tensor = encoder.transform(np.array([[next_word]])).toarray()
        input_tensor = torch.Tensor(input_tensor)
    
    # Join the generated words into a single string
    generated_text = ' '.join(generated_words)
    return generated_text

# Exemple d'utilisation
generated_text = generate_text(model, encoder, start_word='Baby', num_words=20, random_sample=False)
print(generated_text)

generated_text_random = generate_text(model, encoder, start_word='Baby', num_words=20, random_sample=True)
print(generated_text_random)

baby i m the sky fall when i m the sky fall when i m the sky fall when i m
baby chance stars we far every wish i spin. version however so guess haunted walk with burn thank as sometimes free
