In [1]:
import pandas as pd
import numpy as np
import time

In [2]:
x_train_og = pd.read_csv('trainLivraria.csv')

LABELS = {'Historia': 0, 'Administracao': 1, 'Geografia': 2, 'Biologia': 3, 
          'Literatura': 4, 'Artes': 5, 'Matematica': 6}

x_train_og['label'] = x_train_og['Genero'].map(LABELS)

In [3]:
x_test = pd.read_csv('testLivraria.csv')
x_test['label'] = x_test['Genero'].map(LABELS)

In [4]:
from torch.utils.data import DataLoader
import torch
from torchdata.datapipes.iter import IterableWrapper, ShardingFilter

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def collate_batch(batch):
    label_list, text_list, offsets = [], [], [0]
    for _label, _text in batch:
        label_list.append(label_pipeline(_label))
        processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
        text_list.append(processed_text)
        offsets.append(processed_text.size(0))
    label_list = torch.tensor(label_list, dtype=torch.int64).to(device)
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0).to(device)
    text_list = torch.cat(text_list).to(device)
    return label_list, text_list, offsets

train_iter = iter([*x_train_og[['label', 'Titulo']].itertuples(index=False, name=None)])
# train_iter = IterableWrapper(train_iter)
train_iter = ShardingFilter(train_iter)

test_iter = iter([*x_test[['label', 'Titulo']].itertuples(index=False, name=None)])
test_iter = ShardingFilter(test_iter)
                            
train_loader = DataLoader(train_iter, shuffle=False, collate_fn=collate_batch)
test_loader = DataLoader(test_iter, shuffle=False, collate_fn=collate_batch)

In [6]:
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator


tokenizer = get_tokenizer("spacy", language='pt_core_news_sm')
train_iter = iter([*x_train_og[['label', 'Titulo']].itertuples(index=False, name=None)])

def yield_tokens(data_iter):
    for _, rows in data_iter:
        yield tokenizer(rows)

vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials=["[UNK]"])
vocab.set_default_index(vocab["[UNK]"])

text_pipeline = lambda word: vocab(tokenizer(word))
label_pipeline = lambda idx: int(idx)

text_pipeline('Ola, tudo bem?')
# label_pipeline('1')

[0, 0, 0, 0, 66]

In [None]:
"""from torch import nn


class TextClassificationModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class):
        super(TextClassificationModel, self).__init__()
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=False)
        self.fc = nn.Linear(embed_dim, num_class)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)
        return self.fc(embedded)"""

In [None]:
# Configurações Gerais

RANDOM_SEED = 123
torch.manual_seed(RANDOM_SEED)

LEARNING_RATE = 0.005
BATCH_SIZE = 100
NUM_EPOCHS = 20

DEVICE = torch.device('cuda:1' if torch.cuda.is_available() else 'cpu')

EMBEDDING_DIM = 100
HIDDEN_DIM = 256
NUM_CLASSES = 7

In [None]:
# Construção do Modelo LSTM

class RNN(torch.nn.Module):
    
    def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim):
        super().__init__()

        self.embedding = torch.nn.Embedding(input_dim, embedding_dim)
        
        self.rnn = torch.nn.LSTM(embedding_dim,
                                 hidden_dim)        
        
        self.fc = torch.nn.Linear(hidden_dim, output_dim)
        

    def forward(self, text):
        #  dimensão do text: [sentence length, batch size]
        
        embedded = self.embedding(text)
        #  dimensão embedded: [sentence length, batch size, embedding dim]
        
        output, (hidden, cell) = self.rnn(embedded)
        #  dimensão output: [sentence length, batch size, hidden dim]
        #  dimensão hidden: [1, batch size, hidden dim]

        hidden.squeeze_(0)
        #  dimensão hidden: [batch size, hidden dim]
        
        output = self.fc(hidden)

        return output

In [None]:
vocab_size = len(vocab)
emsize = 100
model = RNN(vocab_size, emsize, HIDDEN_DIM ,NUM_CLASSES).to(device)

In [None]:
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [None]:
def compute_accuracy(model, data_loader, device):

    with torch.no_grad():

        correct_pred, num_examples = 0, 0

        for i, (features, targets) in enumerate(data_loader):

            features = features.to(device)
            targets = targets.float().to(device)

            logits = model(features)
            _, predicted_labels = torch.max(logits, 1)
            num_examples += targets.size(0)
            correct_pred += (predicted_labels == targets).sum()
            
    return correct_pred.float()/num_examples * 100

In [None]:
# Treinamento do conjunto

start_time = time.time()

treinamento = []

for epoch in range(NUM_EPOCHS):
    model.train()
    for text, labels in enumerate(train_loader):
        print(text)
        print(labels)
        # FORWARD AND BACK PROP
        logits = model(text)
        loss = F.cross_entropy(logits, labels)
        
        optimizer.zero_grad()
        loss.backward()
        
        # Atualizar os parâmetros do modelo
        optimizer.step()
        
        # Imprimindo Época / Tamanho do Batch / Loss
        print (f'Época: {epoch+1}/{NUM_EPOCHS} | '
                   f'Batch {batch_idx:03d}/{len(train_loader):03d} | '
                   f'Loss: {loss:.4f}')

    with torch.set_grad_enabled(False):
        resultado = compute_accuracy(model, train_loader, DEVICE)
        print(f'Acurácia Treinamento: 'f'{resultado:.2f}%')
        treinamento.append(resultado.item())
        
    print(f'Tempo decorrido: {(time.time() - start_time)/60:.2f} min')
    
print(f'Tempo total decorrido: {(time.time() - start_time)/60:.2f} min')

# Depois de treinar o modelo com as repectivas épocas, mostrar a acurácia no conjunto de teste
print(f'Acurácia de Teste: {compute_accuracy(model, test_loader, DEVICE):.2f}%')

In [None]:
def predict(text, text_pipeline):
    with torch.no_grad():
        text = torch.tensor(text_pipeline(text))
        output = model(text, torch.tensor([0]))
        return output.argmax(1).item()
    
predict('biologia', text_pipeline)