#author: Jedrzej Chmiel

In [None]:
%load_ext autoreload
%autoreload 2

In [36]:
import torch
from torch.utils.data import DataLoader
import torch.optim as optim
import torch.nn as nn
from words_batch_dataset import WordsBatchDataset
from one_item_dataset import OneItemDataset
from corpus import Corpus
from embedding import Embedding
from words_batch import WordsBatch
from tqdm import tqdm
import os
import pickle

In [12]:
MAIN_DATA_DIR ='data'
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

In [13]:
corpus = Corpus(MAIN_DATA_DIR+'/dictionary/second_dictionary_30_04.pickle')
one_item_dataset = OneItemDataset(len(corpus))


In [14]:
words_batch_datasets = [WordsBatchDataset(MAIN_DATA_DIR+'/harry_potter_books/prepared_txt/harry_potter_'+str(i)+'_prepared.txt', corpus.dictionary, sequence_length=6)
                        for i in range(1,8)]

In [72]:
words_batch_dataset = WordsBatchDataset(MAIN_DATA_DIR+'/harry_potter_books/prepared_txt/harry_potter_1_prepared.txt', corpus.dictionary, sequence_length=6)

In [18]:
embedding = Embedding(corpus_size = len(corpus), embedding_size=128, dropout_factor=0.1)
embedding = embedding.to(DEVICE)
words_batch = WordsBatch(embedding, hidden_state_size=256, dropout_factor=0.1, sequence_length=6)
words_batch = words_batch.to(DEVICE)

In [19]:
optimizer_word_batch = optim.Adam(words_batch.parameters())
optimizer_encoding = optim.Adam(embedding.parameters())

In [26]:
mses = []
acurate_factors = []

In [24]:
def train_embedding(model:WordsBatch, datasets: list[OneItemDataset], batch_size: int, epochs: int,
                    optimizer: optim.Optimizer, saves_dir:str = None) -> list[float]:
    """
This function trains words_batch model. It uses MSE loss function to do this.
Parameters:
    model:
        Model to be trained. Object of WordsBatch class.
    datasets:
        List of all datasets on which model will be trained. First function uses all inputs from first dataset,
        then all input from second dataset, ..., then all outputs from last dataset. This is the end of first epoch.
    batch_size: the batch size to used
    optimizer: optimizer to use to update weights and biases.
    saves_dir: If not None in this directory function will save the model after each epoch. Files will be named as
        words_batch_epoch_1.pth, words_batch_epoch_2.pth, words_batch_epoch_3.pth, ...
        If in passed directory already exist file called for example words_batch_epoch_1.pth it will be truncated.
        If passed directory does not exist, it will be created.
Returns:
    List of MSEs before each epoch.
    """
    model.train()
    loss_function = nn.MSELoss()
    mse = 0.0
    total_length = float(sum([len(dataset) for dataset in datasets]))
    mses = []
    loaders = [DataLoader(dataset, batch_size, shuffle=True) for dataset in datasets]
    if saves_dir is not None:
        if not os.path.exists(saves_dir):
            os.path.makedir(saves_dir)

    for epoch in range(epochs):
        mse = 0.0
        for i,loader in enumerate(loaders):
            print(f"Epoch {epoch}/{epochs}, dataset {i+1}/7")
            for X, y in tqdm(loader, desc="batch: "):
                X = X.to(DEVICE)
                with torch.no_grad():
                    y = model.embedding.to_dense(y).to(DEVICE)
                pred = model(X)
                loss = loss_function(pred, y)
                mse+=loss.item()
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
        mse /= total_length
        print(f"MSE is: {mse}")
        mses.append(mse)
        model.save(saves_dir+'/'+f"words_batch_epoch_{epoch}.pth")
    return mses

In [29]:
def test_embedding(model:WordsBatch, datasets: list[OneItemDataset]) -> float:
    """
Tests passed word batch model using all datasets and returns MSE
Parameters:
    model:
        Object of WordsBatch class to be tested.
    datasets:
        List of onjects of WordsBatchDataset class. Datasets on which model will be tested.
    """
    mse = 0.0
    total_length = float(sum([len(dataset) for dataset in datasets]))
    loss_function = nn.MSELoss()
    model.eval()
    with torch.no_grad():
        for dataset in tqdm(datasets):
            for X,y in dataset:
                X = torch.unsqueeze(X, dim=0).to(DEVICE)
                y = model.embedding.to_dense(torch.unsqueeze(y, dim=0)).to(DEVICE)
                pred = model(X)
                loss = loss_function(pred, y)
                mse += loss.item()
    mse /= total_length
    print(f"MSE is {mse}")
    return mse

In [30]:
def train_encoding(model: Embedding, dataset: OneItemDataset, batch_size: int, epochs: int, optimizer: optim.Optimizer, saves_dir: str = None) -> list[float]:
    """
This function trains encoding part of Embedding class object. It uses CrossEntropyLoss.
Parameters:
    model:
        Model to be trained. Object of Embedding class.
    datasets:
        Object of OneItemDataset class. Data on which to train.
    batch_size: the batch size to used
    optimizer: optimizer to use to update weights and biases.
    saves_dir: If not None in this directory function will save the model after each epoch. Files will be named as
        words_batch_epoch_1.pth, words_batch_epoch_2.pth, words_batch_epoch_3.pth, ...
        If in passed directory already exist file called for example words_batch_epoch_1.pth it will be truncated.
        If passed directory does not exist, it will be created.
Returns:
    List of acurate factor before each epoch.
    """
    model.train()
    loss_function = nn.CrossEntropyLoss()
    acurate = 0
    acurates = []
    loader = DataLoader(dataset, batch_size, shuffle=True)
    if saves_dir is not None:
        if not os.path.exists(saves_dir):
            os.path.makedir(saves_dir)
    for epoch in range(epochs):
        print(f"Epoch {epoch}/{epochs}")
        for y in tqdm(loader, desc="batch: "):
            with torch.no_grad():
                X = embedding.to_dense(y)
            pred = model(X)
            loss = loss_function(pred, y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            acurate += torch.count_nonzero(torch.argmin(pred,dim=1)==y).item()

        model.save(saves_dir+'/'+f"embedding_epoch_{epoch}.pth")
    print(f"Acurate: {acurate/len(dataset)}")
    acurates.append(acurate)

In [31]:
def test_encoding(model, dataset):
    """
Tests passed embedding model using all datasets and returns acurate factor.
Parameters:
    model:
        Object of Embedding class to be tested.
    datasets:
        Object of OneItemDataset class. Data on which to test.
Return:
    Acurate factor (float).
    """
    acurate = 0
    model.eval()
    with torch.no_grad():
        for y in tqdm(dataset):
            y = torch.unsqueeze(y, dim=0).to(DEVICE)
            X = model.to_dense(y)
            pred = model(X)
            acurate += torch.count_nonzero(torch.argmin(pred,dim=1)==y).item()
    result = acurate/len(dataset)
    print(f"Acurate fraction: {result}")
    return  result

In [None]:
new_mses = train_embedding(words_batch, words_batch_datasets, batch_size=16, epochs=10, optimizer=optimizer_word_batch)
mses += new_mses

In [None]:
new_mse = test_embedding(words_batch, words_batch_datasets)
mses.append(new_mse)

In [None]:
new_acurates =  train_encoding(embedding, one_item_dataset, batch_size=16, epochs=10, optimizer=optimizer_word_batch)
acurate_factors += new_acurates

In [None]:
new_acurate = test_encoding(embedding, one_item_dataset)
acurate_factors.append(new_acurate)

In [38]:
with open(MAIN_DATA_DIR+'/results/mses.pickle', 'wb') as file:
    pickle.dump(mses, file)

In [39]:
with open(MAIN_DATA_DIR+'/results/acurate_factors.pickle', 'wb') as file:
    pickle.dump(acurate_factors, file)

#author: Jedrzej Chmiel