In [4]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import random

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using {device} device")

Using cpu device


# Data preparation

In [5]:
class TextDataset(Dataset):
    # TODO: Methods: __init__, __len__, __genitem__, str_to_vec, vec_to_str
    # TODO: Attributes: chars(list), seq_size(int), char_to_idx(dict), idx_to_char(dict), voca_size(int), data_size(int) 

    # input: data, seq_size
    def __init__(self, data: str, seq_size: int = 25) -> None:
        self.chars = sorted(list(set(data)))
        self.seq_size = seq_size
        self.char_to_idx = {c: i for i, c in enumerate(self.chars)}
        self.idx_to_char = {i: c for i, c in enumerate(self.chars)}
        self.voca_size = len(self.chars)
        self.data_size = len(data)
        self.X = self.str_to_vec(data)

    def __len__(self) -> int:
        return int(len(self.X) / self.seq_size - 1)

    def __getitem__(self, index: int) -> tuple[torch.Tensor, torch.Tensor]:
        start = index * self.seq_size
        end = (index + 1) * self.seq_size

        X = torch.tensor(self.X[start: end]).float()
        y = torch.tensor(self.X[start+1 : end+1]).float()

        return X, y

    def str_to_vec(self, line: str) -> list[int]:
        return [self.char_to_idx[l] for l in line]
    
    def vec_to_str(self, vec: list[int]) -> str:
        line = ""
        for v in vec:
            line += self.idx_to_char[v]
        return line

# Model definition

In [6]:
class RNN(nn.Module):
    # Attributes: input_size, hidden_size, output_size, W_hx, W_hh, batch_size
    def __init__(self, input_size: int, hidden_size: int, output_size: int, batch_size: int) -> None:
        super(RNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.batch_size = batch_size

        self.W_hx = nn.Linear(self.input_size, self.hidden_size, bias=False)
        self.W_hh = nn.Linear(self.hidden_size, self.hidden_size)
        self.W_oh = nn.Linear(self.hidden_size, self.output_size)


    # def forward(self, X: torch.Tensor, hidden: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor]:
    def forward(self, X: torch.Tensor, hidden: torch.Tensor):
        W_hx_x = self.W_hx(X)
        hidden = self.W_hh(hidden)
        hidden = torch.tanh(W_hx_x + hidden)
        output = self.W_oh(hidden)
        
        return output, hidden

    def init_hidden(self, batch_size: int = 1) -> torch.Tensor:
        return torch.zeros([batch_size, self.hidden_size], requires_grad=False).float()

In [7]:
class LSTM(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, output_size: int, batch_size: int) -> None:
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.batch_size = batch_size

        self.W_f = nn.Linear(hidden_size, hidden_size)
        self.U_f = nn.Linear(input_size, hidden_size)
        self.W_i = nn.Linear(hidden_size, hidden_size)
        self.U_i = nn.Linear(input_size, hidden_size)
        self.W_o = nn.Linear(hidden_size, hidden_size)
        self.U_o = nn.Linear(input_size, hidden_size)
        self.W_c = nn.Linear(hidden_size, hidden_size)
        self.U_c = nn.Linear(input_size, hidden_size)
        self.W_oh = nn.Linear(hidden_size, output_size)

        self.sigmoid = nn.Sigmoid()
        self.tanh = nn.Tanh()
        
    def forward(self, X: torch.Tensor, hidden: torch.Tensor, cell: torch.Tensor) -> tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
        forget_gate = self.sigmoid(self.W_f(hidden) + self.U_f(X))
        input_gate = self.sigmoid(self.W_i(hidden) + self.U_i(X))
        output_gate = self.sigmoid(self.W_o(hidden) + self.U_o(X))

        new_content = self.tanh(self.W_c(hidden) + self.U_c(X))
        cell = torch.mul(forget_gate, cell) + torch.mul(input_gate, new_content)
        hidden = self.tanh(torch.mul(output_gate, cell))
        output = self.W_oh(hidden)
        return output, hidden, cell
        
    def init_states(self, batch_size: int = 1) -> tuple[torch.Tensor, torch.Tensor]:
        return torch.zeros([batch_size, self.hidden_size], requires_grad=False).float(), torch.zeros([batch_size, self.hidden_size], requires_grad=False).float()


# Set up training 

In [8]:
# generate text
# input: model, dataset, prediction_length
def generate_random_text(model: LSTM, dataset: TextDataset, prediction_length: int = 100) -> str:
    model.eval()
    prediction = dataset.vec_to_str([random.randint(0, len(dataset.chars)-1)])
    hidden, cell = model.init_states()

    for _ in range(prediction_length):
        last_char = torch.tensor([dataset.char_to_idx[prediction[-1]]]).float()
        X, hidden, cell = last_char.to(device), hidden.to(device), cell.to(device)
        output, hidden, cell = model(X, hidden, cell)
        result = torch.multinomial(nn.functional.softmax(output, 1), 1).item()
        prediction += dataset.idx_to_char[result]
    return prediction

In [11]:
# train
def train(model: RNN, optimizer: torch.optim, criterion: nn.Module, epochs: int, dataloader: DataLoader) -> None:
    train_losses = {}
    model.to(device)
    model.train()

    print("Traning Start!!")
    for epoch in range(epochs):
        epoch_losses = list()
        for X, Y in dataloader:
            # skip batch if it doesnt match with the batch_size
            if X.shape[0] != model.batch_size:
                continue
            hidden, cell = model.init_states(batch_size=model.batch_size)

            # send tensors to device
            X, Y, hidden, cell = X.to(device), Y.to(device), hidden.to(device), cell.to(device)

            # 2. clear gradients
            model.zero_grad()

            loss = 0
            for c in range(X.shape[1]):
                out, hidden, cell = model(X[:, c].reshape(X.shape[0], 1), hidden, cell)
                l = criterion(out, Y[:, c].long())
                loss += l

            # 4. Compte gradients gradients
            loss.backward()

            # 5. Adjust learnable parameters
            # clip as well to avoid vanishing and exploding gradients
            nn.utils.clip_grad_norm_(model.parameters(), 3)
            optimizer.step()

            epoch_losses.append(loss.detach().item() / X.shape[1])
        train_losses[epoch] = torch.tensor(epoch_losses).mean()
        print(f'=> Epoch: {epoch+1}, loss: {train_losses[epoch]}')
        print(generate_random_text(model, dataloader.dataset))

In [12]:
# training
data = open('dataset/Book1.txt', 'r').read()
data = data.lower()

seq_size = 30
batch_size = 64
text_data = TextDataset(data, seq_size)
dataloader = DataLoader(text_data, batch_size)

hidden_size = 256
LSTM_model = LSTM(1, hidden_size, output_size=text_data.voca_size, batch_size=batch_size)

epochs = 1000
criterion = nn.CrossEntropyLoss()
optimizer = optim.RMSprop(LSTM_model.parameters(), lr=0.001)
train(LSTM_model, optimizer, criterion, epochs=epochs, dataloader=dataloader)

Traning Start!!
=> Epoch: 1, loss: 2.6857969760894775


TypeError: forward() missing 1 required positional argument: 'cell'