In [1]:
import random
import torch
from jinja2 import optimizer
from torch import nn, optim
import pandas as pd
import numpy as np
import spacy
import matplotlib.pyplot as plt
import torch.nn.functional as F
from processamento.preparar_texttorch import tokenize, construir_vocabulario, tokens_to_ids, pad_sequences, preparar_dataloader, dividir_treino_validacao



In [2]:
df_training = pd.read_csv('twitter_training.csv')
df_training = df_training.dropna(subset=[df_training.columns[2], df_training.columns[3]])
dados_treinamento = [
    (row[df_training.columns[2]], tokenize(row[df_training.columns[3]]))
    for _, row in df_training.iterrows()
    if isinstance(row[df_training.columns[3]], str)
]

df_test = pd.read_csv('twitter_validation.csv')
df_test = df_test.dropna(subset=[df_test.columns[2], df_test.columns[3]])
dados_teste = [
    (row[df_test.columns[2]], tokenize(row[df_test.columns[3]]))
    for _, row in df_test.iterrows()
    if isinstance(row[df_test.columns[3]], str)
]

In [3]:
print(dados_treinamento[0])

('Positive', ['i', 'am', 'coming', 'to', 'the', 'borders', 'and', 'i', 'will', 'kill', 'you', 'all,'])


In [4]:
vocabulario = construir_vocabulario(dados_treinamento)
print(f"Tamanho do vocabulário: {len(vocabulario)}")
print(f"Exemplo de vocabulário: {list(vocabulario.items())[:10]}")

Tamanho do vocabulário: 71147
Exemplo de vocabulário: [('i', 0), ('am', 1), ('coming', 2), ('to', 3), ('the', 4), ('borders', 5), ('and', 6), ('will', 7), ('kill', 8), ('you', 9)]


In [5]:
dados_treinamento_ids = tokens_to_ids(dados_treinamento, vocabulario)
dados_teste_ids = tokens_to_ids(dados_teste, vocabulario)

print(dados_treinamento_ids[0])

(0, [0, 1, 2, 3, 4, 5, 6, 0, 7, 8, 9, 10])


In [6]:
max_len = 64
dados_treinamento_padded = pad_sequences(dados_treinamento_ids, max_len)
dados_teste_padded = pad_sequences(dados_teste_ids, max_len)

print(dados_treinamento_padded[0])

(0, [0, 1, 2, 3, 4, 5, 6, 0, 7, 8, 9, 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])


In [7]:
dados_treinamento_final, dados_validacao = dividir_treino_validacao(dados_teste_padded)

print(f"Treino: {len(dados_treinamento_final)}, Validação: {len(dados_validacao)}")

Treino: 799, Validação: 200


In [8]:
batch_size = 64
train_loader = preparar_dataloader(dados_treinamento_final, batch_size, drop_last=True)
valid_loader = preparar_dataloader(dados_validacao, batch_size, drop_last=True)
test_loader = preparar_dataloader(dados_teste_padded, batch_size)

for batch in train_loader:
    print(batch)
    break

[tensor([[  120,   356,   115,  ...,     0,     0,     0],
        [ 8565,   782, 10113,  ...,     0,     0,     0],
        [    0,   464,   108,  ...,     0,     0,     0],
        ...,
        [   40,  4358,   846,  ...,     0,     0,     0],
        [  196,   643,    38,  ...,     0,     0,     0],
        [ 6272, 14296, 16274,  ...,     0,     0,     0]]), tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])]


In [9]:
class RNN(nn.Module):
    
    def __init__(self, tam_vocab, tam_embedding, embed_vectors, 
                   ind_unk, ind_pad, hidden_size, output_size):
        super(RNN, self).__init__()
    
        # Inicializaremos a camada de embedding
        self.embedding = nn.Embedding(tam_vocab, tam_embedding)
        self.embedding.weight.data.copy_(embed_vectors)
        self.embedding.weight.data[ind_unk] = torch.zeros(tam_embedding)
        self.embedding.weight.data[ind_pad] = torch.zeros(tam_embedding)
        #######################################
    
        self.hidden_size = hidden_size
        self.rnn = nn.GRU(tam_embedding, hidden_size)
        self.linear = nn.Linear(hidden_size, output_size)
        
    def forward(self, X, tamanhos):
        embed = self.embedding(X)
        
        # Inicializar a memória
        hidden = torch.zeros(1, X.size(1), self.hidden_size)
        
        # Empacotar a sequencia antes de alimentar a uniadde recorrente
        packed_input = nn.utils.rnn.pack_padded_sequence(embed, tamanhos)
        
        # Forward recorrente
        packed_output, hidden = self.rnn(packed_input, hidden)
        
        # Desempacotar a sequência para continuar o fluxo na rede
        output, output_lenghts = nn.utils.rnn.pad_packed_sequence(packed_output)
        
        output = self.linear(hidden.squeeze())
        
        return output
        
tam_vocab = len(vocabulario)
tam_embedding = 100  # tamanho do embedding
embed_vectors = torch.rand(tam_vocab, tam_embedding)  # inicialização
ind_unk = vocabulario.get('<unk>', 0)  # Índice para <unk>
ind_pad = vocabulario.get('<pad>', 0)  # Índice para <pad>
hidden_size = 256  # tamanho do estado oculto (Neurônios ocultos)
output_size = 3  # Exemplo de número de classes

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = RNN(tam_vocab, tam_embedding, embed_vectors, ind_unk, ind_pad, hidden_size, output_size)
model.to(device)
print(model)

RNN(
  (embedding): Embedding(71147, 100)
  (rnn): GRU(100, 256)
  (linear): Linear(in_features=256, out_features=3, bias=True)
)


In [10]:
criterio = nn.CrossEntropyLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=5e-5)

In [11]:
def forward(iterator, num_amostra, etapa):
    if etapa == 'treino': model.train()
    else: model.eval()
    
    acuracia = 0.
    loss_epoca = []
    for k, amostra in enumerate(iterator):
        texto, rotulo = amostra  # Descompactar diretamente
        tamanhos = [len(t) for t in texto]  # Exemplo para obter tamanhos das sequências
        
        saida = model(texto, tamanhos)
        
        loss = criterio(saida, rotulo)
        loss_epoca.append(loss.detach().cpu().numpy())
        
        _, pred = torch.max(saida, axis=-1)
        acuracia += (pred.cpu().data == rotulo.cpu().data).sum()
        
        if etapa == 'treino':
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
    loss_epoca = np.asarray(loss_epoca).ravel()
    acuracia = acuracia / float(num_amostra)
    print('\n', '*'*15 + etapa + '*'*15)
    print('Epoca: {:}, Loss: {:.4f} +/- {:.4f}, Acurácia: {:.4f}'.format(
        epoca, loss_epoca.mean(), loss_epoca.std(), acuracia
    ))
    
    return loss_epoca.mean(), acuracia



In [12]:
loss_treino, loss_test = [], []
acc_treino, acc_test = [], []

for epoca in range(25):
    loss, acuracia = forward(train_loader, len(dados_treinamento_final), 'treino')
    loss_treino.append(loss)
    acc_treino.append(acuracia)
    
    loss, acuracia = forward(valid_loader, len(dados_validacao), 'teste')
    loss_test.append(loss)
    acc_test.append(acuracia)


 ***************treino***************
Epoca: 0, Loss: 0.9823 +/- 0.0676, Acurácia: 0.7447

 ***************teste***************
Epoca: 0, Loss: 0.8670 +/- 0.0922, Acurácia: 0.9600

 ***************treino***************
Epoca: 1, Loss: 0.7776 +/- 0.0996, Acurácia: 0.9612

 ***************teste***************
Epoca: 1, Loss: 0.8051 +/- 0.0892, Acurácia: 0.9600

 ***************treino***************
Epoca: 2, Loss: 0.6478 +/- 0.1031, Acurácia: 0.9612

 ***************teste***************
Epoca: 2, Loss: 0.6302 +/- 0.0766, Acurácia: 0.9600

 ***************treino***************
Epoca: 3, Loss: 0.4696 +/- 0.0884, Acurácia: 0.9612

 ***************teste***************
Epoca: 3, Loss: 0.3639 +/- 0.0364, Acurácia: 0.9600

 ***************treino***************
Epoca: 4, Loss: 0.2803 +/- 0.0712, Acurácia: 0.9612

 ***************teste***************
Epoca: 4, Loss: 0.1486 +/- 0.0305, Acurácia: 0.9600

 ***************treino***************
Epoca: 5, Loss: 0.0495 +/- 0.0600, Acurácia: 0.9612

 **

In [13]:
nlp = spacy.load('en_core_web_sm')  # Certifique-se de ter esse modelo instalado

def predict_sentiment(sentence):
    model.eval()
    
    # Tokenizar a frase usando spaCy
    tokenized = [str(tok) for tok in nlp(sentence)]
    print("Tokenized:", tokenized)

    # Converter tokens em IDs usando o vocabulário
    indexed = [vocabulario.get(t, vocabulario['<unk>']) for t in tokenized]  # Usar <unk> para palavras desconhecidas
    if len(indexed) == 0:
        raise ValueError("A frase não contém palavras do vocabulário.")
    
    # Comprimento da sequência
    length = [len(indexed)]
    print("Indexed:", indexed)

    # Criar tensor e adicionar dimensão para batch
    tensor = torch.LongTensor(indexed).unsqueeze(0).to(device)  # Adicionando dimensão para batch_size = 1
    length_tensor = torch.LongTensor(length).to(device)

    # Fazer a predição
    with torch.no_grad():
        prediction = model(tensor, length_tensor)

    return F.softmax(prediction, dim=-1).cpu().numpy()  # Retornar como numpy

# Testar a função com frases de exemplo
exemplos = [
    "I love this product!",
    "This is the worst experience I've ever had.",
    "It's okay, not great but not bad either.",
    "Absolutely fantastic service!",
    "I'm very disappointed with the quality."
]

# Gerar previsões e plotar os resultados
for frase in exemplos:
    pred = predict_sentiment(frase)

    # Plotar os resultados
    plt.bar(0, pred[0], color='darkred', label='Negativo', width=0.5)
    plt.bar(1, pred[1], color='dodgerblue', label='Positivo', width=0.5)
    plt.title(f"Sentiment Prediction for: '{frase}'")
    plt.legend()
    plt.show()

OSError: [E050] Can't find model 'en_core_web_sm'. It doesn't seem to be a Python package or a valid path to a data directory.

In [ ]:
batch_size = 16
train_loader = preparar_dataloader(dados_treinamento_final, batch_size)
valid_loader = preparar_dataloader(dados_validacao, batch_size)
test_loader = preparar_dataloader(dados_teste_padded, batch_size)

for batch in train_loader:
    print(batch)
    break