# Obtendo os dados e separando entre treino e teste

## Configurações Iniciais

In [0]:
%pip install mlflow>=3.0 --upgrade
dbutils.library.restartPython()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, rand, count, mean, stddev
from pyspark.sql.window import Window
import mlflow
from datetime import datetime
import pandas as pd

## Obtendo os dados
Utilizando o _Unity Catalog_

In [0]:
df = spark.read.table("my_catalog.default.creditcard")

In [0]:
display(df.limit(5))

In [0]:
print("Credit Card Fraud Detection - linhas:",df.count(),"colunas:",len(df.columns))

In [0]:
df.printSchema()

In [0]:
class_distribution = df.groupBy("Class").agg(
    count("*").alias("count"),
    (count("*") / df.count() * 100).alias("%")
).orderBy("Class")

print("Distribuição da Classe Target (Fraudes):")
display(class_distribution)

 Classe | Descrição    | Count   | %          |
-------|--------------|---------|------------|
 0     | Legítimas    | 284,315 | 99.83%     |
 1     | Fraudulentas | 492     | 0.17%      |

In [0]:
display(df.describe())

## Criando Conjuntos de Treino e Teste
<br>

### Função de divisão aleatória (RandomSplit) sem estratificação

In [0]:
def create_random_split(df, target_col="Class", test_ratio=0.2, seed=42):
    
    train_set, test_set = df.randomSplit([1-test_ratio, test_ratio], seed=seed)
    
    print(f"Divisão aleatória concluída!")
    return train_set, test_set

### Função de divisão aleatória (RandomSplit) com estratificação

In [0]:
# função de divisão estratificada
def create_stratified_split(df, target_col="Class", test_ratio=0.2, seed=42):
    """
    cria uma divisão estratificada mantendo a proporção de fraudes
    """
    print("Iniciando divisão...")
    
    # separar os dados em fraudes e transações normais
    fraud_df = df.filter(col(target_col) == 1)
    normal_df = df.filter(col(target_col) == 0)
    
    print(f"Transações normais: {normal_df.count():,}")
    print(f"Transações fraudulentas: {fraud_df.count():,}")
    
    # divide cada grupo separadamente
    fraud_train, fraud_test = fraud_df.randomSplit([1-test_ratio, test_ratio], seed=seed)
    normal_train, normal_test = normal_df.randomSplit([1-test_ratio, test_ratio], seed=seed)
    
    # combina os resultados
    train_set = normal_train.union(fraud_train)
    test_set = normal_test.union(fraud_test)
    
    print("Divisão estratificada concluída! :)")
    return train_set, test_set

In [0]:
# divisão dos conjuntos
#train_set, test_set = create_stratified_split(df, test_ratio=0.2, seed=42)

### Função de divisão com estratificação e determinística ### 
<br>
A ideia aqui é fazer com que o mesmo id (a mesma transação) vá sempre para o mesmo conjunto (teste ou treino).

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def create_deterministic_stratified_split(df, target_col="Class", test_ratio=0.2, seed=42):
    """
    cria uma divisão estratificada determinística usando hash estável
    garante que a mesma linha sempre vai para o mesmo conjunto
    """
    print("Iniciando divisão determinística...")
    
    # cria um identificador único estável para cada linha
    # usa colunas existentes para criar um hash determinístico
    df_with_id = df.withColumn(
        "row_hash", 
        F.hash(F.concat_ws("|", *df.columns))
    )
    
    # adiciona um índice determinístico baseado no hash
    window = Window.orderBy("row_hash")
    df_with_index = df_with_id.withColumn("deterministic_index", F.row_number().over(window))
    
    # separa os dados em fraudes e normais
    fraud_df = df_with_index.filter(F.col(target_col) == 1)
    normal_df = df_with_index.filter(F.col(target_col) == 0)
    
    print(f"Transações normais: {normal_df.count():,}")
    print(f"Transações fraudulentas: {fraud_df.count():,}")
    
    # função para divisão determinística baseada no índice
    def split_deterministic(df, ratio, seed):
        total_count = df.count()
        test_count = int(total_count * ratio)
        
        # ordenar deterministicamente e pegar os primeiros para teste
        test_set = df.orderBy("deterministic_index").limit(test_count)
        
        # o restante vai para treino
        train_set = df.join(test_set, "deterministic_index", "left_anti")
        
        return train_set, test_set
    
    # divide cada grupo
    fraud_train, fraud_test = split_deterministic(fraud_df, test_ratio, seed)
    normal_train, normal_test = split_deterministic(normal_df, test_ratio, seed)
    
    # combina os resultados e remove colunas auxiliares
    train_set = normal_train.union(fraud_train).drop("row_hash", "deterministic_index")
    test_set = normal_test.union(fraud_test).drop("row_hash", "deterministic_index")
    
    # valida as proporções
    train_fraud_rate = train_set.filter(F.col(target_col) == 1).count() / train_set.count()
    test_fraud_rate = test_set.filter(F.col(target_col) == 1).count() / test_set.count()
    original_fraud_rate = df.filter(F.col(target_col) == 1).count() / df.count()
    
    print(f"\nValidação da divisão:")
    print(f"Fraude original: {original_fraud_rate:.4f}")
    print(f"Fraude treino:   {train_fraud_rate:.4f}")
    print(f"Fraude teste:    {test_fraud_rate:.4f}")
    print(f"Divisão determinística concluída! :)")
    
    return train_set, test_set

### Comparação dos métodos
### 
Teste entre aleatório, estratificado e determinístico. (validar afirmação do livro para o nosso dataset e escolher melhor abordagem.)

In [0]:
def validate_all_splits(df, target_col="Class"):
    """
    valida todos os splits e compara os 3 métodos de divisão e salva no MLflow
    """
    
    mlflow.set_experiment(experiment_id="1059645231822150")
    
    with mlflow.start_run(run_name="comparacao_metodos_divisao"):
        print("Comparação dos métodos de divisão: \n")

        total = df.count()
        fraud_count = df.filter(F.col(target_col) == 1).count()
        real_fraud_rate = fraud_count / total
        
        print(f"Dataset original: {total:,} transações")
        print(f"Taxa real de fraudes: {real_fraud_rate:.6f}")
        
        # criar splits com todos os métodos
        print("\nCriando splits...")
        
        # aleatório simples
        train_rand, test_rand = create_random_split(df, target_col)
        
        # estratificado com randomSplit
        train_strat, test_strat = create_stratified_split(df, target_col)
        
        # estratificado determinístico
        train_det, test_det = create_deterministic_stratified_split(df, target_col)
        
        # função para calcular estatísticas
        def get_split_stats(train_set, test_set, method_name):
            train_total = train_set.count()
            train_fraud = train_set.filter(F.col(target_col) == 1).count()
            train_fraud_rate = train_fraud / train_total if train_total > 0 else 0
            train_error = abs(train_fraud_rate - real_fraud_rate) / real_fraud_rate * 100
            
            test_total = test_set.count()
            test_fraud = test_set.filter(F.col(target_col) == 1).count()
            test_fraud_rate = test_fraud / test_total if test_total > 0 else 0
            test_error = abs(test_fraud_rate - real_fraud_rate) / real_fraud_rate * 100
            
            return {
                'method': method_name,
                'train_total': train_total,
                'train_fraud': train_fraud,
                'train_rate': train_fraud_rate,
                'train_error': train_error,
                'test_total': test_total,
                'test_fraud': test_fraud,
                'test_rate': test_fraud_rate,
                'test_error': test_error,
                'max_error': max(train_error, test_error),
                'avg_error': (train_error + test_error) / 2
            }
        
        # coletar estatísticas de todos os métodos
        methods = [
            get_split_stats(train_rand, test_rand, "ALEATORIO"),
            get_split_stats(train_strat, test_strat, "ESTRATIFICADO"),
            get_split_stats(train_det, test_det, "DETERMINISTICO")
        ]
        
        # tabelinha
        print(f"{'METODO':<15} {'CONJUNTO':<8} {'TAMANHO':<10} {'FRAUDES':<8} {'TAXA':<10} {'ERRO %':<8}")
        print(f"{'-'*60}")
        
        for method in methods:
            print(f"{method['method']:<15} {'Treino':<8} {method['train_total']:<10,} {method['train_fraud']:<8,} {method['train_rate']:.6f}  {method['train_error']:>6.2f}%")
            print(f"{'':<15} {'Teste':<8} {method['test_total']:<10,} {method['test_fraud']:<8,} {method['test_rate']:.6f}  {method['test_error']:>6.2f}%")
            print(f"{'-'*60}")
        
        # encontra o melhor método
        best_method = min(methods, key=lambda x: x['max_error'])
        print(f"\nMelhor metodo: {best_method['method']} ({best_method['max_error']:.2f}% erro)")
        
        # MLFlow logging
        
        # parâmetros gerais
        mlflow.log_params({
            "dataset_total": total,
            "fraudes_total": fraud_count,
            "taxa_fraudes_real": float(real_fraud_rate),
            "proporcao_teste": 0.2,
            "seed": 42,
            "melhor_metodo": best_method['method'],
            "aleatorio_erro_max": float(methods[0]['max_error']),
            "estratificado_erro_max": float(methods[1]['max_error']),
            "deterministico_erro_max": float(methods[2]['max_error'])
        })
        
        # métricas para cada método
        metrics = {}
        for method in methods:
            prefix = method['method'].lower()
            metrics.update({
                f"{prefix}_treino_tamanho": method['train_total'],
                f"{prefix}_treino_fraudes": method['train_fraud'],
                f"{prefix}_treino_taxa": float(method['train_rate']),
                f"{prefix}_treino_erro": float(method['train_error']),
                f"{prefix}_teste_tamanho": method['test_total'],
                f"{prefix}_teste_fraudes": method['test_fraud'],
                f"{prefix}_teste_taxa": float(method['test_rate']),
                f"{prefix}_teste_erro": float(method['test_error']),
                f"{prefix}_erro_maximo": float(method['max_error']),
                f"{prefix}_erro_medio": float(method['avg_error'])
            })
        
        mlflow.log_metrics(metrics)
        
        # tags
        mlflow.set_tags({
            "projeto": "detecao_fraudes_tcc",
            "fase": "preprocessamento",
            "tipo_analise": "comparacao_metodos_divisao",
            "unidade": "univesp"
        })
        
        print("\nBoa!Comparação salva no MLflow! :)")
        
        return {
            'real_rate': real_fraud_rate,
            'methods': {m['method']: m for m in methods},
            'best_method': best_method['method'],
            'mlflow_run_id': mlflow.active_run().info.run_id
        }

In [0]:
comparison = validate_all_splits(df)

| Método         | Conjunto | Tamanho   | Fraudes | Taxa       | Erro %  |
|----------------|----------|-----------|---------|------------|---------|
| Aleatório      | Treino   | 227,940   | 400     | 0.001755   | 1.58%   |
| Aleatório               | Teste    | 56,867    | 92      | 0.001618   | 6.35%   |
| Estratificado  | Treino   | 227,948   | 390     | 0.001711   | 0.96%   |
| Estratificado               | Teste    | 56,859    | 102     | 0.001794   | 3.85%   |
| Determinístico | Treino   | 227,846   | 394     | 0.001729   | 0.10%   |
| Determinístico               | Teste    | 56,961    | 98      | 0.001720   | 0.41%   |


### Usar o melhor método de divisão

In [0]:
#if comparison['best_method'] == "DETERMINISTICO":
#   train_set, test_set = create_deterministic_stratified_split(df)
#elif comparison['best_method'] == "ESTRATIFICADO":
#    train_set, test_set = create_stratified_split(df)
#else:
#    train_set, test_set = create_random_split(df)

In [0]:
train_set, test_set = create_deterministic_stratified_split(df)

### Validar a divisão escolhida
Vamos validar se a nossa variável alvo ficou bem dividida entre os splits:

In [0]:
# função para validação das divisões
def validate_split_quality(train_set, test_set, target_col="Class"):

    """
    valida se a divisão manteve as proporções corretas
    """
    
    print("Iniciando validação da divisão...")

    # estatísticas
    train_total = train_set.count()
    test_total = test_set.count()
    overall_total = train_total + test_total
    
    train_fraud = train_set.filter(col(target_col) == 1).count()
    test_fraud = test_set.filter(col(target_col) == 1).count()
    overall_fraud = train_fraud + test_fraud
    
    # proporções
    train_ratio = train_fraud / train_total
    test_ratio = test_fraud / test_total
    overall_ratio = overall_fraud / overall_total
    
    print("Validação da divisão estratificada determinística: \n")
    print(f"Tamanhos dos conjuntos:")
    print(f"Treino: {train_total:,} transações ({train_total/overall_total*100:.1f}%)")
    print(f"Teste: {test_total:,} transações ({test_total/overall_total*100:.1f}%)")
    
    print(f"\n Proporção de fraudes:")
    print(f"Geral: {overall_ratio:.6f} ({overall_fraud:,} fraudes)")
    print(f"Treino: {train_ratio:.6f} ({train_fraud:,} fraudes)")
    print(f"Teste: {test_ratio:.6f} ({test_fraud:,} fraudes)")
    
    # avaliar qualidade
    diff_train = abs(train_ratio - overall_ratio)
    diff_test = abs(test_ratio - overall_ratio)
    total_error = diff_train + diff_test
    
    print(f"\n Qualidade da estratificação:")
    print(f"Diferença no treino: {diff_train:.6f}")
    print(f"Diferença no teste: {diff_test:.6f}")
    print(f"Erro total: {total_error:.6f}")
    
    if total_error < 0.001:
        print("Proporções muito consistentes")
    elif total_error < 0.005:
        print("Proporções consistentes")
    else:
        print("Verificar a divisão")
    
    return {
        'train_size': train_total,
        'test_size': test_total,
        'total_size': overall_total,
        'train_fraud_count': train_fraud,
        'test_fraud_count': test_fraud,
        'total_fraud_count': overall_fraud,
        'train_fraud_ratio': train_ratio,
        'test_fraud_ratio': test_ratio,
        'overall_fraud_ratio': overall_ratio,
        'stratification_error': total_error
    }

In [0]:
validation_stats = validate_split_quality(train_set, test_set)

## Salvando os Conjuntos

In [0]:
# gera timestamp para versionamento
#from datetime import datetime
#timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# define nomes das tabelas
#tabela_treino = f"my_catalog.default.creditcard_treino_v{timestamp}"
#tabela_teste = f"my_catalog.default.creditcard_teste_v{timestamp}"

# salva conjuntos
#train_set.write.mode("overwrite").saveAsTable(tabela_treino)
#print("Treino salvo com sucesso!")

#test_set.write.mode("overwrite").saveAsTable(tabela_teste) 
#print("Teste salvo com sucesso!")

# cria dicionário com os nomes para usar depois
#nomes_tabelas = {
#    'treino': tabela_treino,
#    'teste': tabela_teste
#}

#print(f"\n Datasets salvos:")
#print(f"Treino: {tabela_treino}")
#print(f"Teste: {tabela_teste}")

In [0]:
tabela_treino = "my_catalog.default.creditcard_treino_tcc"
tabela_teste = "my_catalog.default.creditcard_teste_tcc"

train_set.write.mode("overwrite").saveAsTable(tabela_treino)
test_set.write.mode("overwrite").saveAsTable(tabela_teste)

print(f"\n Conjuntos salvos:")
print(f"Treino: {tabela_treino}")
print(f"Teste: {tabela_teste}")
print(f"Total treino: {train_set.count():,} transações")
print(f"Total teste: {test_set.count():,} transações")

In [0]:
#train_set = spark.read.table(tabela_treino)
#test_set = spark.read.table(tabela_teste)

In [0]:
# test_set = spark.read.table("my_catalog.default.creditcard_teste_tcc")
# df_test = test_set.toPandas()
# print(f"Número de linhas: {df_test.shape[0]}")
# print(f"Número de colunas: {df_test.shape[1]}")
# fraud_count_test = df_test[df_test['Class'] == 1].shape[0]
# legit_count_test = df_test[df_test['Class'] == 0].shape[0]
# print(f"Número de transações fraudulentas: {fraud_count_test}")
# print(f"Número de transações legítimas: {legit_count_test}")
# fraud_pct_test = fraud_count_test / df_test.shape[0] * 100
# print(f"Percentual de transações fraudulentas: {fraud_pct_test:.4f}%")

In [0]:
# import pandas as pd
# import matplotlib.pyplot as plt

# train_count = train_set.count()
# test_count = test_set.count()
# total_count = train_count + test_count
# train_pct = train_count / total_count * 100
# test_pct = test_count / total_count * 100

# fig, axes = plt.subplots(1, 2, figsize=(14, 6))

# axes[0].pie([train_count, test_count], labels=['Treino', 'Teste'], autopct=lambda pct: f'{pct:.2f}%\n({int(pct/100*total_count):,})', colors=['skyblue', 'salmon'], startangle=90, textprops={'fontsize': 12, 'fontweight': 'bold'})
# axes[0].set_title('\nProporção de Transações (Treino vs Teste)\n', fontsize=14, fontweight='bold')

# train_counts = train_set.groupBy("Class").count().toPandas()
# test_counts = test_set.groupBy("Class").count().toPandas()
# train_vals = train_counts.sort_values('Class')['count'].values
# test_vals = test_counts.sort_values('Class')['count'].values
# train_total = train_vals.sum()
# test_total = test_vals.sum()
# train_fraud_pct = train_vals[1] / train_total * 100
# test_fraud_pct = test_vals[1] / test_total * 100

# bars = axes[1].bar(['Treino', 'Teste'], [train_fraud_pct, test_fraud_pct], color=['skyblue', 'salmon'])
# axes[1].set_ylabel('% Fraude')
# axes[1].set_title('\nPercentual de Transações Fraudulentas (Treino vs Teste)\n', fontsize=14, fontweight='bold')
# for bar, pct in zip(bars, [train_fraud_pct, test_fraud_pct]):
#     axes[1].text(bar.get_x() + bar.get_width()/2, bar.get_height()/2, f'{pct:.4f}%', ha='center', va='center', fontweight='bold', color='black', fontsize=12)

# plt.tight_layout()
# plt.show()

## Criando um Experimento no ML Flow
<br>

In [0]:
nome_experimento = "/Users/2106144@aluno.univesp.br/creditcard-fraud-detection"

try:
    mlflow.create_experiment(nome_experimento)
    print(f"Novo experimento criado: {nome_experimento}")
except:
    print(f"Usando experimento existente: {nome_experimento}")

mlflow.set_experiment(nome_experimento)
print("MLflow configurado")

## Registra a divisão final no ML Flow

In [0]:
display(comparison)

In [0]:
display(validation_stats)

In [0]:
with mlflow.start_run(run_name="divisao_final_treino_teste"):
    
    mlflow.log_params({
        "proporcao_teste": 0.2,
        "seed": 42,
        "estratificacao": True,
        "metodo_escolhido": comparison['best_method'],
        "tabela_treino": tabela_treino,
        "tabela_teste": tabela_teste
    })
    
    mlflow.log_metrics({
        "tamanho_treino": validation_stats['train_size'],
        "tamanho_teste": validation_stats['test_size'],
        "proporcao_fraudes_treino": validation_stats['train_fraud_ratio'],
        "proporcao_fraudes_teste": validation_stats['test_fraud_ratio'],
        "erro_estratificacao": validation_stats['stratification_error']
    })
    
    mlflow.set_tags({
        "projeto": "detecao_fraudes_tcc",
        "fonte_dados": "unity_catalog", 
        "tipo_processamento": "divisao_final_treino_teste"
    })
    
    print("Registrado no MLflow! :)")