In [1]:
# Importação de bibliotecas

import os
import sys
import pandas as pd
import numpy as np
import polars as pl
import re
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score
from bertopic import BERTopic
from transformers import BertTokenizer, BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup
import torch
from torch.utils.data import DataLoader, Dataset

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Definicao da raiz do projeto

PROJECT_ROOT = 'G:/Csouza/nlp/topic_modeling'

os.chdir(PROJECT_ROOT)

sys.path.insert(0, PROJECT_ROOT)

In [None]:
def extract(extract_path, file_name='all_process.xlsx', sheet_name='Sheet1'):
    
    return pl.read_excel(f'{extract_path}/{file_name}', sheet_name=sheet_name)

In [7]:
data_path = os.path.join(PROJECT_ROOT, 'data', 'internal', 'fapesp_projects')

full_data = extract(data_path)

variables = {
'N. Processo_B.V': 'n_processo',
'Data de Início': 'data',
'Título (Português)': 'titulo',
'Grande Área do Conhecimento': 'grande_area',
'Área do Conhecimento': 'area',
'Subárea do Conhecimento': 'subarea',
'Palavras-Chave do Processo': 'palavras_chave',
'Assuntos': 'assuntos',
'Resumo (Português)': 'resumo'}

full_data = (
    full_data
    .lazy()
    .rename(variables)
    .select(variables.values())
    .filter(
        pl.col('n_processo').is_not_null(),
        pl.col('resumo').is_not_null(),
        pl.col('resumo') != '')
    .with_columns(
        pl.col('data').str.to_datetime('%m-%d-%y').dt.year().alias('ano'),
        pl.col('data').str.to_datetime('%m-%d-%y').dt.month().alias('mes'))
    .select(pl.exclude('data'))
).collect()

full_data.head(3)

n_processo,titulo,grande_area,area,subarea,palavras_chave,assuntos,resumo,ano,mes
str,str,str,str,str,str,str,str,i32,i8
"""95/04916-0""","""Estudo sistemático de campos h…","""Ciências Exatas e da Terra""","""Física""","""Física da Matéria Condensada""","""CORRELACAO ANGULAR, ESTUDO SIS…",,"""Este projeto está vinculado ao…",1995,12
"""95/05064-7""","""Cultura, ideologia e represent…","""Ciências Humanas""","""Sociologia""","""Outras Sociologias Específicas""","""BRASIL, IDENTIDADE, PENSAMENTO…","""Brasil:Identidade social""","""Participar do Seminário """"Soci…",1995,12
"""95/09836-4""","""Bernard Schmitt | Université d…","""Ciências Exatas e da Terra""","""Probabilidade e Estatística""","""Probabilidade""","""COMPRESSOR, ENTROPIA, ESTADO D…","""Entropia (matemática aplicada)…","""O principal objetivo da visita…",1995,12


In [8]:
data_train_test = full_data.filter(pl.col('assuntos').is_not_null())

data_train_test.shape

(234228, 10)

In [None]:
def clean_text(text, special_char_remover=re.compile(r'[^A-Za-z\s]')):
    if not isinstance(text, str):
        raise ValueError("O argumento 'text' deve ser uma string.")
    
    text = special_char_remover.sub('', text)
    text = ' '.join(text.split())
    text = text.lower()

    return text

In [None]:
# Carregar os dados
data = data_train_test.to_pandas()

data['titulo'] = data['titulo'].astype(str)
data['palavras_chave'] = data['palavras_chave'].astype(str)

data['cleaned_text'] = data['resumo'].apply(clean_text)
data['cleaned_text'] += ' Título: ' + data['titulo'].apply(clean_text) + ' Palavras-chave: ' + data['palavras_chave'].apply(clean_text)

# Dividir os dados em treino e teste
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)

# Tokenização com BERT
tokenizer = BertTokenizer.from_pretrained('bert-base-multilingual-cased')

train_tokenized_texts = tokenizer(train_data['cleaned_text'].tolist(), padding=True, truncation=True, return_tensors="pt")
test_tokenized_texts = tokenizer(test_data['cleaned_text'].tolist(), padding=True, truncation=True, return_tensors="pt")

# Converte os assuntos em listas
train_data['assuntos_list'] = train_data['assuntos'].apply(lambda x: x.split(':'))
test_data['assuntos_list'] = test_data['assuntos'].apply(lambda x: x.split(':'))

# Binariza os rótulos
mlb = MultiLabelBinarizer()

train_binary_labels = mlb.fit_transform(train_data['assuntos_list'])
test_binary_labels = mlb.transform(test_data['assuntos_list'])


In [None]:
# Modelo BERT pré-treinado
model = BertForSequenceClassification.from_pretrained('bert-base-multilingual-cased', num_labels=len(mlb.classes_))

# Ajuste da camada de classificação para multi-rótulo
model.classifier = torch.nn.Sequential(
    torch.nn.Linear(model.config.hidden_size, model.config.hidden_size),
    torch.nn.ReLU(),
    torch.nn.Linear(model.config.hidden_size, len(mlb.classes_)),
    torch.nn.Sigmoid()
)

In [None]:
# Função de perda e otimizador
criterion = torch.nn.BCEWithLogitsLoss()
optimizer = AdamW(model.parameters(), lr=2e-5, eps=1e-8)

num_epochs = 3

# Scheduler de taxa de aprendizado
total_steps = len(train_tokenized_texts['input_ids']) * num_epochs
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)

In [None]:
class CustomDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.float)
        return item

# Criar datasets e dataloaders para treino e teste
train_dataset = CustomDataset(train_tokenized_texts, train_binary_labels)
test_dataset = CustomDataset(test_tokenized_texts, test_binary_labels)
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [None]:
def train(model, dataloader, criterion, optimizer, scheduler, device):
    model.train()
    total_loss = 0

    for batch in dataloader:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)

        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        loss = criterion(outputs.logits, labels)
        total_loss += loss.item()

        loss.backward()
        optimizer.step()
        scheduler.step()

    avg_loss = total_loss / len(dataloader)
    return avg_loss

In [None]:
# Função de avaliação
def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    all_labels = []
    all_preds = []

    with torch.no_grad():
        for batch in dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask)
            loss = criterion(outputs.logits, labels)
            total_loss += loss.item()
            preds = torch.sigmoid(outputs.logits).cpu().numpy()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(preds)

        avg_loss = total_loss / len(dataloader)
        return avg_loss, all_labels, all_preds

# Função para calcular métricas

def compute_metrics(labels, preds, threshold=0.5):
    preds = (preds > threshold).astype(int)
    accuracy = accuracy_score(labels, preds)
    f1 = f1_score(labels, preds, average='samples')
    recall = recall_score(labels, preds, average='samples')
    precision = precision_score(labels, preds, average='samples')
    
    return {
        'accuracy': accuracy,
        'f1': f1,
        'recall': recall,
        'precision': precision
    }

In [None]:
# Loop de Treinamento e Avaliação
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

num_epochs = 3
best_f1 = 0

for epoch in range(num_epochs):
    print(f'Epoch {epoch + 1}/{num_epochs}')
    
    train_loss = train(model, train_dataloader, criterion, optimizer, scheduler, device)
    print(f'Training loss: {train_loss:.4f}')
    
    val_loss, val_labels, val_preds = evaluate(model, test_dataloader, criterion, device)
    print(f'Validation loss: {val_loss:.4f}')
    
    metrics = compute_metrics(np.array(val_labels), np.array(val_preds))
    print(f'Validation metrics: {metrics}')
    
    if metrics['f1'] > best_f1:
        print(f'Saving best model with F1 score: {metrics["f1"]:.4f}')
        torch.save(model.state_dict(), 'best_model.pt')
        best_f1 = metrics['f1']

# Carregar o modelo fine-tuned
model.load_state_dict(torch.load('best_model.pt'))
model.eval()

In [None]:
# Carregar o modelo fine-tuned
model.load_state_dict(torch.load('best_model.pt'))
model.eval()

# Função para gerar embeddings
def get_embeddings(texts, model, tokenizer):
    inputs = tokenizer(texts, padding=True, truncation=True, return_tensors="pt")
    with torch.no_grad():
        outputs = model.bert(**inputs)
    embeddings = outputs.last_hidden_state[:, 0, :]  # Use [CLS] token embeddings
    return embeddings

embeddings = get_embeddings(test_data['cleaned_text'].tolist(), model, tokenizer)

In [None]:
# Inicializar BERTopic com os embeddings ajustados
topic_model = BERTopic(embedding_model="sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2")

# Ajustar o modelo aos dados
topics, probabilities = topic_model.fit_transform(test_data['cleaned_text'].tolist(), embeddings.numpy())

In [None]:
# Ajustar hiperparâmetros de UMAP e HDBSCAN conforme necessário
# Por exemplo, ajustar o número de vizinhos em UMAP ou a densidade mínima em HDBSCAN

# Visualizar os tópicos
topic_model.visualize_topics()

In [None]:
# Função para predizer assuntos com o modelo BERT fine-tuned
def predict_labels(text, model, tokenizer, threshold=0.5):
    model.eval()
    inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt")
    with torch.no_grad():
        outputs = model(**inputs)
    probs = torch.sigmoid(outputs.logits).cpu().numpy()
    preds = (probs > threshold).astype(int)
    return preds, probs

In [None]:
# Função para aplicar BERTopic e identificar tópicos
def apply_bertopic(text, topic_model):
    topics, _ = topic_model.transform([text])
    return topics

In [None]:
def predict_labels(text, model, tokenizer, threshold=0.5):
    model.eval()
    inputs = tokenizer(text, padding=True, truncation=True, return_tensors="pt")
    with torch.no_grad():
        outputs = model(**inputs)
    probs = torch.sigmoid(outputs.logits).cpu().numpy()
    preds = (probs > threshold).astype(int)
    return preds, probs

def apply_bertopic(text, topic_model, bert_model, tokenizer):
    embeddings = get_embeddings([text], bert_model, tokenizer)
    topics, _ = topic_model.transform(embeddings.numpy())
    return topics

def consolidate_results(text, model, tokenizer, topic_model, mlb, threshold=0.5):
    preds, probs = predict_labels(text, model, tokenizer, threshold)
    known_labels = mlb.inverse_transform(preds)
    
    # Se a confiança máxima estiver abaixo do threshold, use BERTopic
    if np.max(probs) < threshold:
        topics = apply_bertopic(text, topic_model, model, tokenizer)
        topic_labels = [topic_model.get_topic(topic)[0] for topic in topics if topic != -1]
        all_labels = list(set(known_labels[0]) | set(topic_labels))
    else:
        all_labels = known_labels[0]
    
    return all_labels

# Exemplo de uso:
text = "Este projeto investiga o impacto da irrigação na produção de soja em diferentes tipos de solo."
consolidated_labels = consolidate_results(text, model, tokenizer, topic_model, mlb, threshold=0.7)
print("Assuntos Consolidados:", consolidated_labels)