In [1]:
import os
import re
import sys
import json
import torch
import IPython
import random
import kagglehub
import pandas as pd
from tqdm import tqdm
from peft import PeftModel
from tqdm.notebook import tqdm
from collections import Counter
from gstop import GenerationStopper
from torch.utils.data import Dataset
from transformers import AutoModelForCausalLM, AutoTokenizer, StoppingCriteria, StoppingCriteriaList

# Dataset

In [2]:
path = kagglehub.dataset_download("victorcallejasf/multimodal-hate-speech")

print("Path to dataset files:", path)

Path to dataset files: C:\Users\osval\.cache\kagglehub\datasets\victorcallejasf\multimodal-hate-speech\versions\1


In [3]:
json_gt_path = os.path.join(path, "MMHS150K_GT.json")

splits = {
    "Treino": os.path.join(path, "splits", "train_ids.txt"),
    "Validação": os.path.join(path, "splits", "val_ids.txt"),
    "Teste": os.path.join(path, "splits", "test_ids.txt"),
}

with open(json_gt_path, 'r') as f:
    data = json.load(f)

def contar_tweets(split_path):
    with open(split_path, 'r') as f:
        ids = [line.strip() for line in f.readlines()]
    return len([id_ for id_ in ids if id_ in data])

for nome_split, caminho_split in splits.items():
    total_tweets = contar_tweets(caminho_split)
    print(f"{nome_split}: {total_tweets} tweets")

Treino: 134823 tweets
Validação: 5000 tweets
Teste: 10000 tweets


In [4]:
def clean_text(text):
    """Remove menções, hashtags, URLs e espaços extras do texto."""
    text = re.sub(r'http\S+', '', text)
    text = re.sub(r'@\w+', '', text)
    text = re.sub(r'#\w+', '', text)
    text = re.sub(r'\s+', ' ', text).strip()
    return text.lower()

def carregar_dados_singlelabel(split_path):
    with open(split_path, 'r') as f:
        ids = [line.strip() for line in f.readlines()]
    texts = []
    label = []
    freq = []
    for id_ in ids:
        if id_ in data:
            texts.append(clean_text(data[id_]['tweet_text']))
            label_list = data[id_]['labels']  # ex: [0, 1, 1]
            _freq = label_list.count(0)
            binary_label = False
            if _freq <= 1:
                binary_label = True
            else:
                binary_label = False

            label.append(binary_label)
            freq.append(_freq)

            # print(f"{data[id_]['tweet_text']} - {label_list} - {_freq} - {binary_label}")

    return texts, label, freq

# Como o modelo usado é pré-treinado, utilizaremos apenas o conjunto de teste
target_texts, target_labels, target_freq = carregar_dados_singlelabel(splits['Teste'])

true_count = sum(target_labels)
false_count = len(target_labels) - true_count

print(f"Quantidade de elementos com label True: {true_count}")
print(f"Quantidade de elementos com label False: {false_count}")
print(f"Total de elementos: {len(target_labels)}")
print(f"Porcentagem True: {true_count/len(target_labels)*100:.1f}%")
print(f"Porcentagem False: {false_count/len(target_labels)*100:.1f}%")

Quantidade de elementos com label True: 5001
Quantidade de elementos com label False: 4999
Total de elementos: 10000
Porcentagem True: 50.0%
Porcentagem False: 50.0%


In [20]:
test_tweet = 55
print(f"{target_texts[test_tweet]}: Hate: {target_labels[test_tweet]} Freq: {target_freq[test_tweet]}")

vince wanna say “what do you want from me nigger” so bad: Hate: True Freq: 1


In [5]:
# Como o modelo é muito pesado e demora muito tempo para classificar os tweets optamos por criar um subset

# Cria um subset mantendo proporção específica de True/False
def create_proportional_subset(texts, labels, freq, size, prop_false, prop_true):

    # Separar índices por label
    false_indices = [i for i, label in enumerate(labels) if label == False]
    true_indices = [i for i, label in enumerate(labels) if label == True]

    # Calcular quantos elementos de cada tipo
    n_false = int(size * prop_false)
    n_true = int(size * prop_true)

    # Ajustar se não houver elementos suficientes
    n_false = min(n_false, len(false_indices))
    n_true = min(n_true, len(true_indices))

    print(f"Disponível - False: {len(false_indices)}, True: {len(true_indices)}")
    print(f"Selecionando - False: {n_false}, True: {n_true}")

    # Selecionar aleatoriamente os índices
    selected_false = random.sample(false_indices, n_false)
    selected_true = random.sample(true_indices, n_true)

    # Combinar todos os índices selecionados
    selected_indices = selected_false + selected_true

    # Criar subsets
    subset_texts = [texts[i] for i in selected_indices]
    subset_labels = [labels[i] for i in selected_indices]
    subset_freq = [freq[i] for i in selected_indices]

    return subset_texts, subset_labels, subset_freq

# Exemplo de uso com seus dados
X = 10000   # Tamanho do subset desejado

# Proporção desejada
Z = 0.5  # Não Hate
Y = 0.5  # Hate

# Criar subset
subset_texts, subset_labels, subset_freq = create_proportional_subset(
    target_texts, target_labels, target_freq, X, Z, Y
)

# Verificar resultado
print(f"\n=== RESULTADO DO SUBSET ===")
print(f"Subset criado com {len(subset_texts)} elementos")
print(f"False: {subset_labels.count(False)} ({subset_labels.count(False)/len(subset_labels)*100:.1f}%)")
print(f"True: {subset_labels.count(True)} ({subset_labels.count(True)/len(subset_labels)*100:.1f}%)")

Disponível - False: 4999, True: 5001
Selecionando - False: 4999, True: 5000

=== RESULTADO DO SUBSET ===
Subset criado com 9999 elementos
False: 4999 (50.0%)
True: 5000 (50.0%)


In [102]:
test_tweet = 4
print(f"{subset_texts[test_tweet]}: Hate: {subset_labels[test_tweet]} Freq: {subset_freq[test_tweet]}")

nigga whaat: Hate: False Freq: 2


# Modelo

In [6]:
# Configuração do modelo
torch.cuda.empty_cache()
model_name = "irlab-udc/Llama-3-8B-Distil-MetaHate"
tokenizer = AutoTokenizer.from_pretrained(model_name)
base_model = AutoModelForCausalLM.from_pretrained("unsloth/llama-3-8b-Instruct-bnb-4bit")
model = PeftModel.from_pretrained(base_model, "irlab-udc/Llama-3-8B-Distil-MetaHate")

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(torch.cuda.get_device_name(torch.cuda.current_device()))

NVIDIA GeForce RTX 3050 6GB Laptop GPU


In [7]:
# Otimizacões
model = torch.compile(model, mode="reduce-overhead")

torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True

In [8]:
class JSONStoppingCriteria(StoppingCriteria):
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer
        self.stop_tokens = ['}']
        self.stop_token_ids = [tokenizer.encode(token, add_special_tokens=False)[0] for token in self.stop_tokens]
    
    def __call__(self, input_ids, scores, **kwargs):
        # Verifica se o último token gerado é '}'
        if input_ids[0][-1] in self.stop_token_ids:
            return True
        return False

In [9]:
def create_hate_speech_prompt(message):
    """Cria o prompt estruturado para classificação simples de discurso de ódio"""
    
    system_prompt = """You must analyze if a social media message is hateful or not and tell me your decision. You must always reply with only a JSON containing one field 'hate_speech' including a Boolean value ("True" for hate speech messages, "False" for neutral ones). Do not include text outside the JSON.

This is the definition of hate speech: "language characterized by offensive, derogatory, humiliating, or insulting discourse that promotes violence, discrimination, or hostility towards individuals or groups based on attributes such as race, religion, ethnicity, or gender".

The input format is:

Classify:<Message><input query></Message>.
The output format is:

    {
        "hate_speech": "<Boolean>"
    }"""
    
    user_input = f"Classify:<Message>{message}</Message>"
    
    return f"{system_prompt}\n{user_input}"

In [10]:
def tokenize_texts(texts):
    """
    Tokeniza uma lista de textos mantendo a ordem
    Retorna uma lista de dicionários com tensores no dispositivo correto
    """
    tokenized_list = []

    for text in tqdm(texts, desc='Tokenizando textos'):
        # Cria o prompt estruturado (mesma função do seu código)
        prompt = create_hate_speech_prompt(text)

        # Tokeniza seguindo o mesmo padrão
        inputs = tokenizer(prompt, return_tensors="pt")

        # Move para o dispositivo correto
        inputs = {key: value.to(device) for key, value in inputs.items()}

        # Adiciona à lista mantendo a ordem
        tokenized_list.append(inputs)

    return tokenized_list

tokenized_tweets = tokenize_texts(subset_texts)


Tokenizando textos:   0%|          | 0/9999 [00:00<?, ?it/s]

In [11]:
def generate_model_outputs(tokenized_inputs):
    """
    Gera outputs do modelo para cada input tokenizado
    Retorna uma lista de outputs brutos
    """
    outputs_list = []
    stopping_criteria = StoppingCriteriaList([JSONStoppingCriteria(tokenizer)])

    for inputs in tqdm(tokenized_inputs, desc='Gerando outputs do modelo'):
        with torch.no_grad():
            outputs = model.generate(
                **inputs,
                max_new_tokens=15,
                temperature=0.3,
                stopping_criteria=stopping_criteria
            )

        outputs_list.append({
            'outputs': outputs,
            'input_length': inputs['input_ids'].shape[1]
        })

    return outputs_list

def generate_model_outputs_optimized(tokenized_inputs):
    outputs_list = []
    stopping_criteria = StoppingCriteriaList([JSONStoppingCriteria(tokenizer)])

    for inputs in tqdm(tokenized_inputs, desc='Gerando outputs do modelo'):
        with torch.no_grad():
            with torch.no_grad():
                outputs = model.generate(
                    **inputs,
                    max_new_tokens=15,
                    temperature=0.6,
                    stopping_criteria=stopping_criteria,
                    use_cache=True,
                )

        outputs_list.append({
            'outputs': outputs,
            'input_length': inputs['input_ids'].shape[1]
        })

        if len(outputs_list) % 50 == 0:
            torch.cuda.empty_cache()

    return outputs_list

In [12]:
subset_tokenized_tweets = tokenized_tweets

# Chama a função com o subset
model_outputs = generate_model_outputs_optimized(subset_tokenized_tweets)
print(f"Gerados {len(model_outputs)} outputs do modelo")

Gerando outputs do modelo:   0%|          | 0/9999 [00:00<?, ?it/s]

Gerados 9999 outputs do modelo


In [13]:
def decode_model_responses(model_outputs):
    """
    Decodifica os outputs do modelo em texto
    Retorna uma lista de respostas em texto
    """
    decoded_responses = []

    for output_data in tqdm(model_outputs, desc='Decodificando respostas'):
        # Decodifica apenas a parte nova (sem o prompt)
        response = tokenizer.decode(
            output_data['outputs'][0][output_data['input_length']:],
            skip_special_tokens=True
        )

        decoded_responses.append(response)

    return decoded_responses

# Uso na segunda célula
decoded_responses = decode_model_responses(model_outputs)
print(f"Decodificadas {len(decoded_responses)} respostas")
print("Exemplo de resposta:", decoded_responses[0])

Decodificando respostas:   0%|          | 0/9999 [00:00<?, ?it/s]

Decodificadas 9999 respostas
Exemplo de resposta:     {
        "hate_speech": "True"
    }
Class


In [14]:
def parse_json_responses(decoded_responses):
    """
    Versão com regex para extrair JSON mais robustamente
    Inclui tratamento para JSONs válidos mas sem a chave esperada
    """
    parsed_results = []

    # Padrão regex para encontrar JSON válido
    json_pattern = re.compile(r'\{[^{}]*"hate_speech"\s*:\s*"(True|False)"[^{}]*\}')

    for i, response in enumerate(tqdm(decoded_responses, desc='Parseando JSON com regex')):
        try:
            # Tenta encontrar o padrão JSON
            match = json_pattern.search(response)
            if match:
                clean_response = match.group(0)
            else:
                # Fallback: remove texto após último '}'
                last_brace_index = response.rfind('}')
                if last_brace_index != -1:
                    clean_response = response[:last_brace_index+1]
                else:
                    clean_response = response

            json_result = json.loads(clean_response)

            # Verificar se a chave 'hate_speech' existe
            if 'hate_speech' in json_result:
                parsed_results.append({
                    'index': i,
                    'response': clean_response,
                    'hate_speech': json_result['hate_speech'],
                    'parsed_successfully': True,
                    'error': None
                })
            else:
                # JSON válido mas sem a chave esperada
                print(json_result)
                parsed_results.append({
                    'index': i,
                    'response': clean_response,
                    'hate_speech': None,
                    'parsed_successfully': False,
                    'error': f'JSON válido mas chave "hate_speech" não encontrada. Chaves disponíveis: {list(json_result.keys())}'
                })

        except json.JSONDecodeError as e:
            parsed_results.append({
                'index': i,
                'response': response,
                'hate_speech': None,
                'parsed_successfully': False,
                'error': f'JSON parsing failed: {str(e)}'
            })

    return parsed_results

# Uso na terceira célula
final_results = parse_json_responses(decoded_responses)

# Mostra estatísticas
successful_parses = sum(1 for r in final_results if r['parsed_successfully'])
hate_speech_count = sum(1 for r in final_results if r['parsed_successfully'] and r['hate_speech'])

print(f"Parsing bem-sucedido: {successful_parses}/{len(final_results)}")
print(f"Discurso de ódio detectado: {hate_speech_count}/{successful_parses}")

Parseando JSON com regex:   0%|          | 0/9999 [00:00<?, ?it/s]

Parsing bem-sucedido: 9065/9999
Discurso de ódio detectado: 9065/9065


## Análise dos resultados

In [15]:
def analyze_by_category(df):
    """
    Analisa performance separadamente para True e False labels
    """
    valid_df = df[df['ModelOutputLabel'].notna()].copy()

    print(f"\n=== ANÁLISE POR CATEGORIA ===")

    # Para labels True (hate speech)
    true_labels = valid_df[valid_df['RealLabel'] == True]
    if len(true_labels) > 0:
        true_correct = (true_labels['RealLabel'] == true_labels['ModelOutputLabel']).sum()
        true_accuracy = true_correct / len(true_labels)
        print(f"Hate Speech (True) - Total: {len(true_labels)}, Corretos: {true_correct}, Acurácia: {true_accuracy:.4f}")

    # Para labels False (não hate speech)
    false_labels = valid_df[valid_df['RealLabel'] == False]
    if len(false_labels) > 0:
        false_correct = (false_labels['RealLabel'] == false_labels['ModelOutputLabel']).sum()
        false_accuracy = false_correct / len(false_labels)
        print(f"Não Hate Speech (False) - Total: {len(false_labels)}, Corretos: {false_correct}, Acurácia: {false_accuracy:.4f}")

In [16]:
# Análise mais detalhada
def analyze_model_performance(df):
    """
    Analisa a performance do modelo de forma detalhada
    """
    # Filtrar apenas predições válidas
    valid_df = df[df['ModelOutputLabel'].notna()].copy()

    if len(valid_df) == 0:
        print("Nenhuma predição válida encontrada!")
        return

    # Calcular métricas básicas
    correct = (valid_df['RealLabel'] == valid_df['ModelOutputLabel']).sum()
    total = len(valid_df)
    accuracy = correct / total

    # Matriz de confusão manual
    true_positive = ((valid_df['RealLabel'] == True) & (valid_df['ModelOutputLabel'] == True)).sum()
    true_negative = ((valid_df['RealLabel'] == False) & (valid_df['ModelOutputLabel'] == False)).sum()
    false_positive = ((valid_df['RealLabel'] == False) & (valid_df['ModelOutputLabel'] == True)).sum()
    false_negative = ((valid_df['RealLabel'] == True) & (valid_df['ModelOutputLabel'] == False)).sum()

    print(f"\n=== ANÁLISE DETALHADA DE PERFORMANCE ===")
    print(f"Total de amostras válidas: {total}")
    print(f"Acurácia: {accuracy:.4f} ({accuracy*100:.2f}%)")

    print(f"\n=== MATRIZ DE CONFUSÃO ===")
    print(f"True Positive (TP): {true_positive}")
    print(f"True Negative (TN): {true_negative}")
    print(f"False Positive (FP): {false_positive}")
    print(f"False Negative (FN): {false_negative}")

    # Calcular métricas adicionais
    precision = true_positive / (true_positive + false_positive) if (true_positive + false_positive) > 0 else 0
    recall = true_positive / (true_positive + false_negative) if (true_positive + false_negative) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    print(f"\n=== MÉTRICAS ADICIONAIS ===")
    print(f"Precisão: {precision:.4f} ({precision*100:.2f}%)")
    print(f"Recall: {recall:.4f} ({recall*100:.2f}%)")
    print(f"F1-Score: {f1_score:.4f}")

    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1_score,
        'confusion_matrix': {
            'TP': true_positive,
            'TN': true_negative,
            'FP': false_positive,
            'FN': false_negative
        }
    }

In [17]:
def create_analysis_spreadsheet(target_texts, target_labels, target_freq, final_results, n=250):
    """
    Cria uma planilha com os primeiros n registros dos dados e resultados do modelo
    """
    # Extrair os labels do modelo do final_results
    model_output_labels = []
    for res in final_results:
        if res['parsed_successfully']:
            # Convertendo string 'True'/'False' para boolean
            model_output_labels.append(res['hate_speech'] == 'True')
        else:
            model_output_labels.append(None)  # Caso parsing falhe

    # Garantir que temos o mesmo tamanho para todos os arrays
    min_len = min(len(target_texts), len(target_labels), len(target_freq), len(model_output_labels))

    # Ajustar n se necessário
    n = min(n, min_len)

    # Selecionar os primeiros n exemplos
    texts_n = target_texts[:n]
    freq_n = target_freq[:n]
    real_labels_n = target_labels[:n]
    model_labels_n = model_output_labels[:n]

    # Criar DataFrame
    df = pd.DataFrame({
        'Text': texts_n,
        'Freq': freq_n,
        'RealLabel': real_labels_n,
        'ModelOutputLabel': model_labels_n
    })

    # REMOVER linhas onde ModelOutputLabel é None
    df_filtered = df[df['ModelOutputLabel'].notna()].copy()
    df_filtered['ModelOutputLabel'] = df_filtered['ModelOutputLabel'].astype(bool)

    return df_filtered

# Criar a planilha com os 250 primeiros registros
df_analysis = create_analysis_spreadsheet(subset_texts, subset_labels, subset_freq, final_results, len(subset_labels))
df_analysis['Correct'] = df_analysis['RealLabel'] == df_analysis['ModelOutputLabel']

print(f"Dimensões: {df_analysis.shape[0]} linhas x {df_analysis.shape[1]} colunas")
valid_predictions = df_analysis['ModelOutputLabel'].notna()
correct_count = df_analysis[valid_predictions]['Correct'].sum()
total_valid = valid_predictions.sum()

# Calcular a acurácia
accuracy = correct_count / total_valid if total_valid > 0 else 0

# Mostrar as primeiras linhas para verificação
print("\nPrimeiras 5 linhas:")
print(df_analysis.head())

# Mostrar estatísticas básicas
print(f"\nEstatísticas:")
print(f"Total de registros: {len(df_analysis)}")
print(f"Labels reais - True: {df_analysis['RealLabel'].sum()}, False: {(~df_analysis['RealLabel']).sum()}")
print(f"Labels do modelo - True: {df_analysis['ModelOutputLabel'].sum()}, False: {(~df_analysis['ModelOutputLabel']).sum()}")
print(f"Registros com parsing bem-sucedido: {df_analysis['ModelOutputLabel'].notna().sum()}")

print(f"\n=== ANÁLISE DE PERFORMANCE ===")
print(f"Total de predições válidas: {total_valid}")
print(f"Predições corretas: {correct_count}")
print(f"Predições incorretas: {total_valid - correct_count}")
print(f"Acurácia: {accuracy:.4f} ({accuracy*100:.2f}%)")

# Exportar para CSV
df_analysis.to_csv('dados.csv', index=False)

performance_metrics = analyze_model_performance(df_analysis)
analyze_by_category(df_analysis)

Dimensões: 9065 linhas x 5 colunas

Primeiras 5 linhas:
                                                Text  Freq  RealLabel  \
0              nigga got a cold welp i’m bout to die     3      False   
1        hillbilly superman by the shane givens band     3      False   
2  full movie: dark haired babe lucia love toys a...     2      False   
3                    shout out to my young nigga lol     2      False   
4          ..... can't a nigga feel special for once     3      False   

   ModelOutputLabel  Correct  
0              True    False  
1             False     True  
2             False     True  
3              True    False  
4              True    False  

Estatísticas:
Total de registros: 9065
Labels reais - True: 4526, False: 4539
Labels do modelo - True: 7577, False: 1488
Registros com parsing bem-sucedido: 9065

=== ANÁLISE DE PERFORMANCE ===
Total de predições válidas: 9065
Predições corretas: 4666
Predições incorretas: 4399
Acurácia: 0.5147 (51.47%)

=== ANÁLISE 