In [1]:
import numpy as np 
import torch 
import os
import sys  

In [2]:
sys.modules['debugpy'].__file__

'/home/yashunin/venvs/dl_venv/lib/python3.10/site-packages/debugpy/__init__.py'

In [3]:
with open('data/mysterious_island.txt', 'r', encoding='utf8') as file:
    text = file.read() 

In [4]:
len(text)

1131711

In [5]:
start_idx = text.find('THE MYSTERIOUS ISLAND')
end_idx = text.find('End of the Project Gutenberg')

In [6]:
text = text[start_idx:end_idx]

In [7]:
len(text) 

1112350

In [8]:
from collections import Counter 
counter = Counter(text)
print(len(set(text)))
sorted(counter.items(), key=lambda arg: arg[1])[:5]

80


[('&', 1), ('/', 1), ('=', 2), ('*', 3), ('(', 4)]

In [9]:
chars = sorted(set(text))
char2int = { char : idx for idx, char in enumerate(chars) }
int2char = np.array(chars)
text_encoded = np.array([char2int[char] for char in text], dtype=np.int32)

In [10]:
print(f"{text[30:38]} <==> {text_encoded[30:38]}")

Produced <==> [40 67 64 53 70 52 54 53]


In [11]:
from torch.utils.data import Dataset, DataLoader 
seq_size = 40
chunk_size = seq_size + 1 
chunks = torch.tensor(
    np.array([text_encoded[i:chunk_size+i] for i in range(len(text_encoded) - chunk_size)])
)

In [12]:
class TextDataset(Dataset):
    def __init__(self, text_chunks):
        self.text_chunks = text_chunks 
    
    def __len__(self):
        return self.text_chunks.size(0)
    
    def __getitem__(self, idx):
        chunk = self.text_chunks[idx]
        return chunk[:-1], chunk[1:].to(torch.int64)

In [14]:
chunk_dataset = TextDataset(text_chunks=chunks)
batch_size = 256
torch.manual_seed(1)
train_dl = DataLoader(
    dataset=chunk_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=os.cpu_count(),
    pin_memory=True,
    drop_last=True,
)

In [14]:
chunk_dataset[0]

(tensor([44, 32, 29,  1, 37, 48, 43, 44, 29, 42, 33, 39, 45, 43,  1, 33, 43, 36,
         25, 38, 28,  1,  6,  6,  6,  0,  0,  0,  0,  0, 40, 67, 64, 53, 70, 52,
         54, 53,  1, 51], dtype=torch.int32),
 tensor([32, 29,  1, 37, 48, 43, 44, 29, 42, 33, 39, 45, 43,  1, 33, 43, 36, 25,
         38, 28,  1,  6,  6,  6,  0,  0,  0,  0,  0, 40, 67, 64, 53, 70, 52, 54,
         53,  1, 51, 74]))

In [15]:
from torch import nn, optim 
from torch.nn import functional as F 
from torchmetrics import Accuracy


class CharacterLevelModel(nn.Module):
    def __init__(self, vocab_size, emb_dim, hidden_size):
        super().__init__()
        self.embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=emb_dim)
        
        self.lstm = nn.LSTM(
            input_size=emb_dim,
            hidden_size=hidden_size,
            batch_first=True
        )
        
        self.hidden_size = hidden_size
        self.classifier = nn.Linear(hidden_size, vocab_size)
    
    def forward(self, inputs, hidden, cell):
        inputs = self.embedding(inputs).unsqueeze(1)
        out, (hidden, cell) = self.lstm(inputs, (hidden, cell))
        out = self.classifier(out).view(out.size(0), -1)
        return out, (hidden, cell)
    
    def init_hidden_cell(self, batch_size):
        device = next(self.parameters()).device
        init_hidden = torch.zeros(1, batch_size, self.hidden_size).to(device)
        init_cell = torch.zeros(1, batch_size, self.hidden_size).to(device)
        return init_hidden, init_cell 
        

In [16]:
vocab_size = len(chars)
emb_dim = 256
hidden_size = 512 
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = CharacterLevelModel(
    vocab_size,
    emb_dim=emb_dim,
    hidden_size=hidden_size
).to(device)

In [17]:
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(
    params=model.parameters(),
    lr=1e-3
)
metric = Accuracy(task='multiclass', num_classes=vocab_size).to(device)

In [19]:
from tqdm import tqdm 


epochs = 2000
torch.manual_seed(1)
model.train() 

train_loop = tqdm(range(1, epochs + 1), desc='[Train]', leave=False)
for epoch in train_loop:
    optimizer.zero_grad() 
    
    hidden, cell = model.init_hidden_cell(batch_size)
    x_batch, y_batch = next(iter(train_dl))
    x_batch, y_batch = x_batch.to(device), y_batch.to(device)
    
    epoch_loss = 0
    epoch_metric = 0 
    for index in range(seq_size):
        y_pred, (hidden, cell) = model(x_batch[:, index], hidden, cell)
        epoch_loss += loss_fn(y_pred, y_batch[:, index])
        epoch_metric += metric(y_pred, y_batch[:, index])
    
    epoch_loss /= seq_size
    epoch_loss.backward() 
    optimizer.step()
    
    epoch_metric /= seq_size 
    
    train_loop.set_description(f"[Train, epoch {epoch}]: loss={epoch_loss:.3f}, metric={epoch_metric:.3f}")
 

                                                                                                  

In [32]:
model 

CharacterLevelModel(
  (embedding): Embedding(80, 256)
  (lstm): LSTM(256, 512, batch_first=True)
  (classifier): Linear(in_features=512, out_features=80, bias=True)
)

In [25]:
from torch.distributions.categorical import Categorical 
torch.manual_seed(1)
logits = torch.tensor([1., 1., 1.]).view(1, -1)
print(F.softmax(logits, dim=1))

tensor([[0.3333, 0.3333, 0.3333]])


In [28]:
sampler = Categorical(logits=logits)

In [31]:
sampler.sample()

tensor([2])

In [34]:
model = model.to('cpu')

In [41]:
model.embedding(torch.tensor([1, 2, 3])).shape

torch.Size([3, 256])

In [42]:
def generate_text(model, input_string, len_generated_text=1000, temperature=1.):
    # batch size is equal to 1
    text_encoded = torch.tensor(
        [char2int[char] for char in input_string]
    ).view(1, -1)
    
    generated_string = input_string
    hidden, cell = model.init_hidden_cell(1)
    
    for index in range(len(input_string) - 1): 
        _, (hidden, cell) = model(text_encoded[:, index], hidden, cell)
    
    last_char = text_encoded[:, -1]
    
    for _ in range(len_generated_text):
        logits, (hidden, cell) = model(last_char, hidden, cell)
        logits = temperature * logits
        sampler = Categorical(logits=logits)
        last_char = sampler.sample()
        generated_string += f"{int2char[last_char]}"
    
    return generated_string

In [44]:
torch.manual_seed(1);


In [48]:
print(generate_text(model, input_string='The island', temperature=10))

The island was also that the sailor was a man who had been able to descend the point of the island was also that the colonists were the colonists were the colonists had been able to prove the sailor and the sailor and the sailor had not been able to say, the sailor was an apparatus which were then to be found the sailor and the sailor had not been able to prove the prisoners of the sailor’s hands.

“Well, we will soon be seen that the colonists had not been a sort of six miles in the morning the colonists were the colonists were the colonists were the colonists were then to be found the colonists were the colonists were so much to the sailor and the sailor and the sailor had not been a sort of some day the sailor and the sailor was a man who was a man who had been seen that the sailor was always to be surprised the sailor and the sailor and the reporter and the reporter and the sailor and the reporter had not the sailor had not to be to be able to say the colonists were the colonists w