<a href="https://colab.research.google.com/github/camylaand/deteccao-anomalia/blob/main/Segundo_modelo.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
os.makedirs("modelos", exist_ok=True)

!mv scaler.pkl modelos/
!mv colunas_scaler.pkl modelos/
!mv modelo_encoder.keras modelos/
!mv modelo_autoencoder.keras modelos/
!mv kmeans_auto.pkl modelos/
!mv encoder_tipo_transacao.pkl modelos/
!mv encoder_semana.pkl modelos/
!mv encoder_horario.pkl modelos/


In [None]:
!pip install scikit-learn joblib tensorflow matplotlib

Collecting catboost
  Downloading catboost-1.2.8-cp312-cp312-manylinux2014_x86_64.whl.metadata (1.2 kB)
Downloading catboost-1.2.8-cp312-cp312-manylinux2014_x86_64.whl (99.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m99.2/99.2 MB[0m [31m7.6 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: catboost
Successfully installed catboost-1.2.8


In [None]:
import pandas as pd
import numpy as np
import joblib
from tensorflow.keras.models import load_model
import os
from sklearn.metrics import f1_score, recall_score
from xgboost import XGBClassifier
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
import warnings
from sklearn.metrics.pairwise import euclidean_distances
import matplotlib.pyplot as plt

# Suprimir aviso do XGBoost
warnings.filterwarnings("ignore", category=UserWarning, module='xgboost')

# ----------------------
# Carregamento de modelos
# ----------------------
def carregar_modelos():
    modelos = {
        "scaler": joblib.load("modelos/scaler.pkl"),
        "colunas_scaler": joblib.load("modelos/colunas_scaler.pkl"),
        "encoder_model": load_model("modelos/modelo_encoder.keras", compile=False),
        "autoencoder": load_model("modelos/modelo_autoencoder.keras", compile=False),
        "kmeans": joblib.load("modelos/kmeans_auto.pkl"),
        "encoder_tipo": joblib.load("modelos/encoder_tipo_transacao.pkl"),
        "encoder_semana": joblib.load("modelos/encoder_semana.pkl"),
        "encoder_horario": joblib.load("modelos/encoder_horario.pkl")
    }
    return modelos

# ----------------------
# Cálculo de erro e distância
# ----------------------
def calcular_erros_e_distancias(df, modelos):
    colunas_scaler = modelos["colunas_scaler"]
    X_escalado = modelos["scaler"].transform(df[colunas_scaler])
    reconstruido = modelos["autoencoder"].predict(X_escalado)
    df["erro_reconstrucao"] = np.mean(np.square(X_escalado - reconstruido), axis=1)

    cod_latente = modelos["encoder_model"].predict(X_escalado)
    dist = euclidean_distances(cod_latente, modelos["kmeans"].cluster_centers_)
    df["distancia_cluster"] = np.min(dist, axis=1)
    return df

# ----------------------
# Geração da coluna 'fraude_confirmada' com pesos
# ----------------------
def detectar_anomalias(df, modelos):
    df['transacao_data'] = pd.to_datetime(df['transacao_data'], errors='coerce')
    df = df.sort_values(['conta_id', 'transacao_data'])

    df['media_valor'] = pd.to_numeric(df['media_valor'], errors='coerce')
    df['std_valor'] = pd.to_numeric(df['std_valor'], errors='coerce')
    df['transacao_valor'] = pd.to_numeric(df['transacao_valor'], errors='coerce')

    df['tempo_desde_ultima'] = df.groupby('conta_id')['transacao_data'].diff().dt.total_seconds()

    df['regra_valor_alto'] = (df['transacao_valor'] > (df['media_valor'] + 3 * df['std_valor'])).astype(int)
    df['regra_horario'] = (df['faixa_horaria_Madrugada'] == 1).astype(int)
    df['regra_frequencia'] = (df['tempo_desde_ultima'] < 60).astype(int)
    df['regra_cluster'] = df.get('suspeita_cluster', '').isin(['baixa', 'media', 'alta']).astype(int)

    df = calcular_erros_e_distancias(df, modelos)

    df['pontuacao_fraude'] = (
        2 * df['regra_cluster'] +  # restaurado para peso 2
        2 * df['regra_horario'] +
        1 * df['regra_valor_alto'] +
        1 * df['regra_frequencia']
    )
    df['fraude_confirmada'] = (df['pontuacao_fraude'] >= 3).astype(int)

    # Simular 1% de falsos negativos e falsos positivos para robustez
    n_positivos = df['fraude_confirmada'].sum()
    n_negativos = len(df) - n_positivos
    n_ruido = int(0.005 * len(df))  # reduzido de 1% para 0.5%

    # Tornar 1% dos positivos em negativos (FN simulados)
    positivos_idx = df[df['fraude_confirmada'] == 1].sample(n=min(n_ruido, n_positivos), random_state=42).index
    df.loc[positivos_idx, 'fraude_confirmada'] = 0

    # Tornar 1% dos negativos em positivos (FP simulados)
    negativos_idx = df[df['fraude_confirmada'] == 0].sample(n=min(n_ruido, n_negativos), random_state=42).index
    df.loc[negativos_idx, 'fraude_confirmada'] = 1



    return df, None

# ----------------------
# Motivo do alerta
# ----------------------
def gerar_motivo_alerta(row):
    motivos = []
    if row['modelo_predito'] == 1:
        motivos.append("modelo")
    if row['erro_reconstrucao'] > 0.1:
        motivos.append("erro alto")
    if row['distancia_cluster'] > 10:
        motivos.append("distância alta")
    if row['regra_valor_alto'] == 1:
        motivos.append("valor alto")
    if row['regra_horario'] == 1:
        motivos.append("horário suspeito")
    if row['regra_frequencia'] == 1:
        motivos.append("frequência alta")
    if row['regra_cluster'] == 1:
        motivos.append("desvio do cluster")
    return ", ".join(motivos) if motivos else "sem alerta"

def aplicar_motivos_alerta(df):
  df['motivo_alerta'] = df.apply(gerar_motivo_alerta, axis=1)
  return df

# ----------------------
# Avaliação final de detecção
# ----------------------
def analisar_falsos_negativos(df):
    falsos_negativos = df[(df['fraude_confirmada'] == 1) & (df['decisao_final'] == 0)]
    criticos = falsos_negativos[(falsos_negativos['erro_reconstrucao'] > 0.1) | (falsos_negativos['distancia_cluster'] > 15)]
    print("\nFalsos negativos críticos:")
    print(criticos[['transacao_id', 'erro_reconstrucao', 'distancia_cluster']].head(10))
    return criticos

# ----------------------
# Avaiar thresholds
# ----------------------
def avaliar_thresholds(modelo, X_test, y_test, grupo_test, thresholds=[0.4, 0.5, 0.55, 0.6, 0.7, 0.8]):
    resultados = []
    for th in thresholds:
        pred = (modelo.predict_proba(X_test)[:, 1] >= th).astype(int)
        f1 = f1_score(y_test, pred)
        recall_0 = recall_score(y_test[grupo_test == 0], pred[grupo_test == 0])
        recall_1 = recall_score(y_test[grupo_test == 1], pred[grupo_test == 1])
        eq_op = abs(recall_0 - recall_1)
        fn = ((y_test == 1) & (pred == 0)).sum()
        fp = ((y_test == 0) & (pred == 1)).sum()
        resultados.append((th, f1, recall_0, recall_1, eq_op, fn, fp))
        print(f"\n Threshold {th:.2f}")
        print(f"F1-score: {f1:.4f} | Equality of Opportunity: {eq_op:.4f}")
        print(f"Recall grupo 0: {recall_0:.4f} | Recall grupo 1: {recall_1:.4f}")
        print(f"Falsos negativos: {fn} | Falsos positivos: {fp}")

       # Diagnóstico
    print("\nThresholds:", [r[0] for r in resultados])
    print("F1 Scores:", [r[1] for r in resultados])
    print("Equality of Opportunity:", [r[4] for r in resultados])

    thresholds_plot = [r[0] for r in resultados]
    f1_scores = [r[1] for r in resultados]
    eq_op_scores = [r[4] for r in resultados]

    plt.figure(figsize=(10, 5))
    plt.plot(thresholds_plot, f1_scores, label='F1-score', marker='o')
    plt.plot(thresholds_plot, eq_op_scores, label='Equality of Opportunity', marker='x')
    plt.xlabel('Threshold')
    plt.ylabel('Score')
    plt.title('F1-score e Equality of Opportunity por Threshold')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

# ----------------------
# Validação temporal (exemplo de separação por mês)
# ----------------------
def validacao_temporal(df, modelo, modelos, grupo_col):
    df['transacao_data'] = pd.to_datetime(df['transacao_data'])
    df['mes'] = df['transacao_data'].dt.to_period('M')
    meses = sorted(df['mes'].unique())
    resultados_mensais = []

    for i in range(len(meses) - 1):
        treino = df[df['mes'] <= meses[i]]
        teste = df[df['mes'] == meses[i+1]]

        if len(treino) < 1000 or len(teste) < 1000:
            continue

        X_treino = treino[modelos['colunas_scaler']]
        y_treino = treino['fraude_confirmada']
        X_teste = teste[modelos['colunas_scaler']]
        y_teste = teste['fraude_confirmada']
        grupo_teste = teste[grupo_col]

        smote = SMOTE(random_state=42)
        X_res, y_res = smote.fit_resample(X_treino, y_treino)
        modelo.fit(X_res, y_res)

        print(f"\nValidação mês: {meses[i+1]}")
        avaliar_thresholds(modelo, X_teste, y_teste, grupo_teste)
        resultados_mensais.append((meses[i+1], modelo.score(X_teste, y_teste)))

    return resultados_mensais


def gerar_score_continuo(df):
    df['score_final'] = (
        df['modelo_predito'] * 0.5 +
        df['regra_valor_alto'] * 0.2 +
        df['regra_horario'] * 0.2 +
        df['regra_frequencia'] * 0.1
    )
    df['score_final'] = df['score_final'] / df['score_final'].max()

    df['faixa_risco'] = pd.cut(
        df['score_final'],
        bins=[-0.01, 0.4, 0.7, 1.0],
        labels=['baixo', 'moderado', 'alto']
    )
    return df

# ----------------------
# Execução principal
# ----------------------
if __name__ == "__main__":
    df = pd.read_csv("/content/transacoes_com_comportamento_por_conta.csv")
    df['transacao_data'] = pd.to_datetime(df['transacao_data'])

    modelos = carregar_modelos()
    df, _ = detectar_anomalias(df, modelos)

    df['mesma_titularidade'] = df['mesma_titularidade'].astype(int)
    grupo_sensivel = df['mesma_titularidade'].copy()

    colunas_validas = [
        'transacao_valor', 'fim_de_semana',
        'transacao_tipo_pix', 'transacao_tipo_transferencia',
        'erro_reconstrucao', 'distancia_cluster','mesma_titularidade', 'faixa_horaria_Madrugada',
        'dia_de_semana_Sabado', 'dia_de_semana_Domingo'
    ]

    X = df[colunas_validas]
    y = df['fraude_confirmada']

    smote = SMOTE(random_state=42)
    X_res, y_res = smote.fit_resample(X, y)

    fator_repeticao = int(np.ceil(len(y_res) / len(grupo_sensivel)))
    grupo_expandido = pd.Series(np.tile(grupo_sensivel.values, fator_repeticao)[:len(y_res)], name="grupo")

    X_train, X_test, y_train, y_test, grupo_train, grupo_test = train_test_split(
        X_res, y_res, grupo_expandido, test_size=0.3, random_state=42
    )

    modelo_final = XGBClassifier(use_label_encoder=False, eval_metric='logloss')
    modelo_final.fit(X_train, y_train)

    for threshold in [0.6, 0.4]:
        print(f"\n📊 Avaliação com Threshold {threshold:.2f}")
        df[f'modelo_predito_{int(threshold*100)}'] = (modelo_final.predict_proba(X)[:, 1] >= threshold).astype(int)

        df['regra_alerta'] = (
            (df['transacao_valor'] > 0.8) &
            (df['fim_de_semana'] == 1) &
            (df['mesma_titularidade'] == 0) &
            (df['faixa_horaria_Madrugada'] == 1)
        )

        df[f'decisao_final_{int(threshold*100)}'] = (
            (df[f'modelo_predito_{int(threshold*100)}'] == 1) | df['regra_alerta']
        ).astype(int)

        total = df['fraude_confirmada'].sum()
        fn = ((df['fraude_confirmada'] == 1) & (df[f'decisao_final_{int(threshold*100)}'] == 0)).sum()
        fp = ((df['fraude_confirmada'] == 0) & (df[f'decisao_final_{int(threshold*100)}'] == 1)).sum()
        acertos = ((df['fraude_confirmada'] == df[f'decisao_final_{int(threshold*100)}'])).sum()

        print(f"Total fraudes (threshold {threshold}):", total)
        print("Falsos negativos:", fn)
        print("Falsos positivos:", fp)
        print("Acertos:", acertos)


    df['modelo_predito'] = (modelo_final.predict_proba(X)[:, 1] >= 0.6).astype(int)
    df['regra_alerta'] = (
        (df['transacao_valor'] > 0.8) &
        (df['fim_de_semana'] == 1) &
        (df['mesma_titularidade'] == 0) &
        (df.get('faixa_horaria_Madrugada', 0) == 1)
    )
    df['decisao_final'] = ((df['modelo_predito'] == 1) | (df['regra_alerta'])).astype(int)

    df['nivel_suspeita'] = np.select(
        [
            (df['erro_reconstrucao'] > 0.2) & (df['distancia_cluster'] > 15),
            (df['erro_reconstrucao'] > 0.1) | (df['distancia_cluster'] > 10),
            (df['modelo_predito'] == 1)
        ],
        ['alta', 'media', 'baixa'],
        default='nenhuma'
    )

    df['risco_critico'] = (
        (df['fraude_confirmada'] == 1) &
        (df['decisao_final'] == 0) &
        ((df['erro_reconstrucao'] > 0.1) | (df['distancia_cluster'] > 10))
    ).astype(int)

    transacoes_anomalas = df[df['decisao_final'] == 1]
    print(transacoes_anomalas.head(10))

    colunas_chave = [
        'transacao_id', 'cliente_id', 'conta_id', 'transacao_valor',
        'transacao_data', 'modelo_predito', 'regra_valor_alto',
        'regra_horario', 'regra_frequencia', 'regra_cluster', 'decisao_final',
        'erro_reconstrucao', 'distancia_cluster', 'nivel_suspeita'
    ]
    print(transacoes_anomalas[colunas_chave].head(10))

    print(f"Total de transações anômalas detectadas: {len(transacoes_anomalas)}")

    avaliar_thresholds(modelo_final, X_test, y_test, grupo_test, thresholds=[0.4, 0.5, 0.55, 0.6, 0.7, 0.8])
    validacao_temporal(df, modelo_final, modelos, 'mesma_titularidade')
    df.to_csv("transacoes_analisadas.csv", index=False)

    # Salvar o modelo XGBoost treinado para uso futuro
    joblib.dump(modelo_final, "modelos/modelo_xgb.pkl")
    print("💾 Modelo XGBoost salvo com sucesso em 'modelos/modelo_xgb.pkl'.")

    transacoes_anomalas[colunas_chave].to_csv("transacoes_anomalas_log.csv", index=False)
    print("\n📁 Arquivo 'transacoes_anomalas_log.csv' salvo com sucesso.")

In [None]:
# ----------------------
# Avaliação combinada: modelo + regras
# ----------------------
total_fraudes = df['fraude_confirmada'].sum()
falsos_negativos = ((df['fraude_confirmada'] == 1) & (df['decisao_final'] == 0)).sum()
falsos_positivos = ((df['fraude_confirmada'] == 0) & (df['decisao_final'] == 1)).sum()
acertos = ((df['fraude_confirmada'] == df['decisao_final'])).sum()

print("\nAvaliação Final de Detecção de Fraudes")
print(f"Total de fraudes reais no conjunto: {total_fraudes}")
print(f"Falsos Negativos (fraudes não detectadas): {falsos_negativos}")
print(f"Falsos Positivos (falsos alarmes): {falsos_positivos}")
print(f"Acertos (decisão correta): {acertos}")

total_fraudes_40 = df['fraude_confirmada'].sum()
falsos_negativos_40 = ((df['fraude_confirmada'] == 1) & (df['decisao_final_40'] == 0)).sum()
falsos_positivos_40 = ((df['fraude_confirmada'] == 0) & (df['decisao_final_40'] == 1)).sum()
acertos_40 = ((df['fraude_confirmada'] == df['decisao_final_40'])).sum()

print("\n📊 Avaliação Final de Detecção de Fraudes (Threshold 0.40)")
print(f"Total de fraudes reais no conjunto: {total_fraudes_40}")
print(f"Falsos Negativos (anomalias não detectadas): {falsos_negativos_40}")
print(f"Falsos Positivos (falsos alarmes): {falsos_positivos_40}")
print(f"Acertos (decisão correta): {acertos_40}")