In [47]:
# ==============================================================================
# FASE 2: An√°lise Explorat√≥ria de Dados (EDA) e Minera√ß√£o (Completo)
# Arquivo: analise_fase2.py
# ==============================================================================

import os
import sys
import matplotlib.pyplot as plt
import numpy as np

# Importa√ß√µes Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, size, lower, avg, stddev, abs as _abs, round as _round, max as _max, min as _min, count
from pyspark.sql.window import Window
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml.clustering import KMeans

# --- 1. Configura√ß√£o de Ambiente (Windows) ---
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

print("--- Iniciando Fase 2: An√°lise Explorat√≥ria ---")

# --- 2. Inicializando Sess√£o Spark ---
spark = SparkSession.builder \
    .appName("Analise_Gastos_Fase2") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.driver.host", "127.0.0.1") \
    .master("local[*]") \
    .getOrCreate()

# Otimiza√ß√£o Arrow
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.sparkContext.setLogLevel("WARN")

# --- 3. Carregamento dos Dados ---
BASE_DIR = os.path.join(os.getcwd(), "dados")
input_path = os.path.join(BASE_DIR, "Consolidado_Final")

print(f"üìÇ Buscando base consolidada em: {input_path}")

if not os.path.exists(input_path):
    print(f"‚ùå ARQUIVO N√ÉO ENCONTRADO: {input_path}")
    sys.exit() # Encerra se n√£o achar o arquivo

try:
    df = spark.read.parquet(input_path)
    df.cache() # Cache do dataset bruto
    print(f"‚úÖ Base carregada: {df.count()} registros.")
except Exception as e:
    print(f"‚ùå Erro leitura: {e}")
    sys.exit()


# ==============================================================================
# CORRE√á√ÉO CR√çTICA: Remo√ß√£o de Duplicatas
# ==============================================================================
print(f"\n--- Saneamento da Base ---")
print(f"Total Bruto: {df.count()}")

# Remove linhas onde Objeto, Valor e Favorecido s√£o id√™nticos
# Isso elimina as repeti√ß√µes causadas pela fus√£o de c√©lulas no Excel
df = df.dropDuplicates(['objeto_aquisicao', 'valor_transacao', 'nome_favorecido'])

# For√ßa o rec√°lculo e cache na mem√≥ria
df.cache()
count_real = df.count()

print(f"‚úÖ Total Real (√önicos): {count_real}")
print(f"üóëÔ∏è Lixo Removido: {54196 - count_real}")

--- Iniciando Fase 2: An√°lise Explorat√≥ria ---
üìÇ Buscando base consolidada em: c:\VSCode\projetoMineracao\dados\Consolidado_Final
‚úÖ Base carregada: 54196 registros.

--- Saneamento da Base ---
Total Bruto: 54196
‚úÖ Total Real (√önicos): 12572
üóëÔ∏è Lixo Removido: 41624


In [48]:
# ==============================================================================
# C√âLULA 6 (V16 - LIMPEZA DE JUSTIFICATIVAS): NLP Refinado
# ==============================================================================
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import col, size, regexp_replace, expr, lower

print("--- Iniciando NLP V16 (Foco: Remover Justificativas) ---")

# 1. Limpeza de Caracteres
df_clean_chars = df.withColumn("objeto_limpo", regexp_replace(lower(col("objeto_aquisicao")), r"[^a-z]", " "))

# 2. Stopwords Expandida (Baseada na sua √∫ltima auditoria)
stopwords_pt_custom = [
    # Artigos/Preposi√ß√µes B√°sicas
    "de", "a", "o", "que", "e", "do", "da", "em", "um", "para", "com", "nao", "uma", "os", "no", 
    "se", "na", "por", "mais", "as", "dos", "como", "mas", "ao", "ele", "das", "seu", "sua", "ou", 
    "quando", "muito", "nos", "ja", "eu", "tambem", "so", "pelo", "pela", "ate", "isso", "ela", 
    "entre", "depois", "sem", "mesmo", "aos", "seus", "quem", "nas", "me", "esse", "eles", "voce", 
    "foi", "desta", "deste", "pelas", "pelos", "nesta", "neste", "pois", "havia",
    
    # JUSTIFICATIVAS (Os vil√µes dos Clusters 15, 17)
    "falta", "prestes", "acabar", "estoque", "razao", "motivo", "devido", "vista", "haja",
    "considerando", "referente", "referida", "relativo", "conforme", "solicitado", "atender", 
    "atendimento", "necessidade", "necessario", "necessarios", "visando", "objeto", "visto",
    "funcionamento", "bom", "mau", "impossibilidade", "urgencia", "emergencia", "carater",
    
    # Processos Burocr√°ticos
    "pagamento", "aquisicao", "compra", "fornecimento", "servico", "servicos", "prestacao",
    "entrega", "entregar", "empresa", "terceirizada", "contratada", "vulto", "monta", "despesa",
    
    # Termos Gen√©ricos
    "unidade", "unid", "qtd", "quantidade", "material", "materiais", "consumo", "permanente",
    "item", "itens", "produto", "produtos", "uso", "utilizacao", "aplicacao", "estoque",
    "novo", "velho", "usado", "manutencao", "reparo", "conserto", "troca", "substituicao",
    "especie", "tipo", "modelo", "marca", "cor", "tamanho", "oficial", "diversos",
    
    # Institucional
    "pr", "prm", "dr", "dra", "sr", "sra", "secretaria", "departamento", "divisao", "setor",
    "gabinete", "coordenadoria", "administracao", "regional", "publico", "federal", "estadual",
    "municipio", "municipal", "processo", "protocolo", "memorando", "oficio", "despacho",
    "lei", "decreto", "artigo", "portaria", "resolucao", "ata", "pregao", "licitacao", "prr"
]

try:
    tokenizer = Tokenizer(inputCol="objeto_limpo", outputCol="words_raw")
    df_tokenized = tokenizer.transform(df_clean_chars)

    remover = StopWordsRemover(inputCol="words_raw", outputCol="words_temp")
    remover.setStopWords(stopwords_pt_custom)
    df_clean_temp = remover.transform(df_tokenized)

    # FILTRO SQL (Mantido e refor√ßado)
    filter_expression = """
        filter(words_temp, x -> 
            x != '' AND 
            length(x) > 2 AND 
            NOT (length(x) == 4 AND substring(x, 1, 2) == 'pr') AND
            substring(x, 1, 6) != 'necess' AND
            substring(x, 1, 6) != 'demand' AND
            substring(x, 1, 7) != 'solicit' AND
            substring(x, 1, 7) != 'apresen' AND
            substring(x, 1, 7) != 'contrat' AND
            substring(x, 1, 7) != 'pagamen' AND
            substring(x, 1, 7) != 'forneci' AND
            substring(x, 1, 5) != 'possu' AND
            substring(x, 1, 6) != 'servid' AND
            substring(x, 1, 6) != 'defeit' AND
            substring(x, 1, 7) != 'disponi' AND
            substring(x, 1, 6) != 'inexis' AND
            substring(x, 1, 6) != 'inform' AND
            substring(x, 1, 5) != 'urgen' AND
            substring(x, 1, 5) != 'emerg' AND
            substring(x, 1, 7) != 'justifi' AND
            substring(x, 1, 5) != 'almox' AND
            substring(x, 1, 5) != 'reemb'
        )
    """
    
    df_clean_nlp = df_clean_temp.withColumn("words_filtered", expr(filter_expression))
    df_final_nlp = df_clean_nlp.filter(size(col("words_filtered")) > 0)

    print("‚úÖ NLP V16 conclu√≠do.")

except Exception as e:
    print(f"‚ùå Erro NLP: {e}")

--- Iniciando NLP V16 (Foco: Remover Justificativas) ---
‚úÖ NLP V16 conclu√≠do.


In [49]:
# ==============================================================================
# C√âLULA 7 (V10): Vetoriza√ß√£o Word2Vec (minCount=5)
# ==============================================================================
from pyspark.ml.feature import Word2Vec, Normalizer

print("\n--- Vetoriza√ß√£o V10 (minCount=5 para robustez) ---")

try:
    word2Vec = Word2Vec(vectorSize=50, 
                        minCount=5,   # <--- AUMENTAMOS PARA LIMPAR RU√çDO
                        inputCol="words_filtered", 
                        outputCol="raw_features",
                        windowSize=2,
                        maxIter=20,
                        stepSize=0.025,
                        seed=42)
    
    model_w2v = word2Vec.fit(df_final_nlp)
    df_w2v = model_w2v.transform(df_final_nlp)
    
    normalizer = Normalizer(inputCol="raw_features", outputCol="features", p=2.0)
    df_tfidf = normalizer.transform(df_w2v) 
    
    df_tfidf.cache()
    print(f"‚úÖ Vetoriza√ß√£o conclu√≠da.")

except Exception as e:
    print(f"‚ùå Erro Word2Vec: {e}")


--- Vetoriza√ß√£o V10 (minCount=5 para robustez) ---
‚úÖ Vetoriza√ß√£o conclu√≠da.


In [50]:
# ==============================================================================
# DIAGN√ìSTICO W2V: Teste de Similaridade Sem√¢ntica
# ==============================================================================
print("--- Auditando a Intelig√™ncia do Modelo Word2Vec ---")

# Escolha palavras que voc√™ sabe que existem na sua base e representam grupos distintos
palavras_teste = ["chave", "torneira", "extintor", "gasolina", "limpeza", "caneta"]

try:
    for palavra in palavras_teste:
        print(f"\nüîé Palavras mais pr√≥ximas de '{palavra}':")
        
        # O m√©todo findSynonyms busca os vizinhos mais pr√≥ximos no espa√ßo vetorial
        # O segundo argumento (5) √© quantas palavras queremos ver
        try:
            sinonimos = model_w2v.findSynonyms(palavra, 5)
            sinonimos.show(truncate=False)
        except Exception:
            print(f"   ‚ö†Ô∏è A palavra '{palavra}' n√£o foi encontrada no vocabul√°rio (talvez cortada pelo minCount).")

except NameError:
    print("‚ùå Erro: A vari√°vel 'model_w2v' n√£o existe. Rode a C√©lula 7 primeiro.")

--- Auditando a Intelig√™ncia do Modelo Word2Vec ---

üîé Palavras mais pr√≥ximas de 'chave':
+---------+------------------+
|word     |similarity        |
+---------+------------------+
|copia    |0.6501355171203613|
|chaves   |0.6043857932090759|
|miolo    |0.5669997930526733|
|gaveteiro|0.5602972507476807|
|yale     |0.5508624911308289|
+---------+------------------+


üîé Palavras mais pr√≥ximas de 'torneira':
+---------+------------------+
|word     |similarity        |
+---------+------------------+
|peca     |0.6849445104598999|
|tanque   |0.6733036637306213|
|cuba     |0.609876811504364 |
|vazamento|0.5764372944831848|
|mangueira|0.5273012518882751|
+---------+------------------+


üîé Palavras mais pr√≥ximas de 'extintor':
+----------+-------------------+
|word      |similarity         |
+----------+-------------------+
|extintores|0.45838749408721924|
|fixar     |0.4460994005203247 |
|colar     |0.42804038524627686|
|estancar  |0.40989598631858826|
|parou     |0.4091754555

In [51]:
# ==============================================================================
# C√âLULA 9 (V4.1): Bisecting K-Means (K=10) - CORRIGIDA
# ==============================================================================
from pyspark.ml.clustering import BisectingKMeans
from pyspark.sql.functions import udf, col
from pyspark.sql.types import DoubleType

# K REDUZIDO PARA 10 GRUPOS
K_FINAL = 20

print(f"\n--- Aplicando Bisecting K-Means (k={K_FINAL}) ---")

try:
    # 1. CRIAMOS UMA UDF PARA CALCULAR A NORMA DO VETOR
    # Isso evita o erro de agrega√ß√£o do Summarizer e funciona linha a linha.
    @udf(returnType=DoubleType())
    def get_vector_norm(v):
        try:
            # Retorna a norma L2 (magnitude) do vetor denso ou esparso
            return float(v.norm(2))
        except:
            return 0.0

    # 2. FILTRAGEM SEGURA
    # Calculamos a norma e filtramos apenas quem tem tamanho > 0
    df_metrics = df_tfidf.withColumn("vector_norm", get_vector_norm(col("features")))
    df_input = df_metrics.filter(col("vector_norm") > 0).drop("vector_norm")

    total_validos = df_input.count()
    print(f"üìä Registros v√°lidos para clusteriza√ß√£o: {total_validos}")

    # 3. CLUSTERIZA√á√ÉO
    bkmeans = BisectingKMeans(featuresCol="features", 
                              k=K_FINAL, 
                              seed=1, 
                              predictionCol="prediction", 
                              minDivisibleClusterSize=100, 
                              distanceMeasure="cosine")
    
    model_final = bkmeans.fit(df_input)
    df_clustered = model_final.transform(df_input)
    
    print(f"‚úÖ Clusteriza√ß√£o k={K_FINAL} conclu√≠da.")
    
    print("\n--- Distribui√ß√£o dos Clusters ---")
    df_clustered.groupBy("prediction").count().orderBy("prediction").show(25)

except Exception as e:
    print(f"‚ùå Erro: {e}")


--- Aplicando Bisecting K-Means (k=20) ---
üìä Registros v√°lidos para clusteriza√ß√£o: 11974
‚úÖ Clusteriza√ß√£o k=20 conclu√≠da.

--- Distribui√ß√£o dos Clusters ---
+----------+-----+
|prediction|count|
+----------+-----+
|         0|  361|
|         1|  765|
|         2|  607|
|         3|  657|
|         4|  536|
|         5|  575|
|         6|  800|
|         7|  537|
|         8|  315|
|         9|  593|
|        10|  691|
|        11|  256|
|        12|  728|
|        13|  609|
|        14|  367|
|        15|  520|
|        16| 1044|
|        17| 1146|
|        18|  379|
|        19|  488|
+----------+-----+



In [52]:
# ==============================================================================
# C√âLULA DE AUDITORIA (V2): Top Palavras + Exemplos
# ==============================================================================
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, rand, explode, count, desc

print("--- Auditoria Detalhada dos Clusters (Keywords + Exemplos) ---\n")

# --- PARTE 1: Descobrir as Top 7 Palavras por Cluster ---
print("1. Calculando as palavras mais frequentes de cada grupo...")

# 1. Explode: Transforma ['pneu', 'aro'] em duas linhas: 'pneu' e 'aro'
df_exploded = df_clustered.withColumn("word", explode(col("words_filtered")))

# 2. Conta frequ√™ncia: Quantas vezes a palavra aparece em cada cluster
df_word_counts = df_exploded.groupBy("prediction", "word").count()

# 3. Rankeia: Pega as Top 7
w_rank = Window.partitionBy("prediction").orderBy(col("count").desc())
df_top_keywords = df_word_counts.withColumn("rank", row_number().over(w_rank)) \
                                .filter(col("rank") <= 7) \
                                .orderBy("prediction", "rank")

# 4. Traz para a mem√≥ria (Dicion√°rio Python) para exibi√ß√£o r√°pida
# Estrutura final: {0: "pneu, aro, camara...", 1: "caneta, lapis..."}
keywords_data = df_top_keywords.collect()
keywords_dict = {}

for row in keywords_data:
    cluster_id = row['prediction']
    word = row['word']
    if cluster_id not in keywords_dict:
        keywords_dict[cluster_id] = []
    keywords_dict[cluster_id].append(word)

# --- PARTE 2: Pegar 5 Exemplos Aleat√≥rios (C√≥digo anterior) ---
print("2. Selecionando amostras aleat√≥rias...")
w_sample = Window.partitionBy("prediction").orderBy(rand(seed=42))
df_amostra = df_clustered.withColumn("rn", row_number().over(w_sample)) \
                          .filter(col("rn") <= 5) \
                          .select("prediction", "objeto_aquisicao", "valor_transacao", "words_filtered")

amostras = df_amostra.collect()
amostras_ordenadas = sorted(amostras, key=lambda x: x['prediction'])

# --- PARTE 3: Exibi√ß√£o do Relat√≥rio ---
from itertools import groupby

print("\n" + "="*100)
print(f"{'RELAT√ìRIO DE CLUSTERS':^100}")
print("="*100 + "\n")

for cluster_id, itens in groupby(amostras_ordenadas, key=lambda x: x['prediction']):
    
    # Monta a string de palavras-chave
    top_words = keywords_dict.get(cluster_id, ["(Sem palavras suficientes)"])
    top_words_str = ", ".join(top_words).upper()
    
    print(f"üìÇ CLUSTER {cluster_id}")
    print(f"üîë PALAVRAS-CHAVE: [{top_words_str}]")
    print("-" * 100)
    
    for item in itens:
        # Formata√ß√£o: Pre√ßo alinhado √† direita | Texto original truncado
        print(f" ‚Ä¢ R$ {item['valor_transacao']:>9.2f} | {item['objeto_aquisicao'][:70]:<70}")
    
    print("-" * 100)
    print("\n")

--- Auditoria Detalhada dos Clusters (Keywords + Exemplos) ---

1. Calculando as palavras mais frequentes de cada grupo...
2. Selecionando amostras aleat√≥rias...

                                       RELAT√ìRIO DE CLUSTERS                                        

üìÇ CLUSTER 0
üîë PALAVRAS-CHAVE: [VEICULO, PLACA, LAVAGEM, ABASTECIMENTO, OLEO, GASOLINA, PNEU]
----------------------------------------------------------------------------------------------------
 ‚Ä¢ R$    100.00 | gasolina                                                              
 ‚Ä¢ R$     50.00 | servico de reconhecimento de firma a pedido do secretario estadual    
 ‚Ä¢ R$     61.60 | necessidade urgente de aquisicao da tampa de oleo do motor para o veic
 ‚Ä¢ R$     61.32 | passagem de balsa para veiculo oficial em diligencia ao municipio de b
 ‚Ä¢ R$     60.00 | lavagem do veiculo  l200  placa oaz8882                               
------------------------------------------------------------------------------

In [54]:
# ==============================================================================
# C√âLULA 10 (Relat√≥rio Estruturado): Top 10 Outliers + Estat√≠sticas por Cluster
# ==============================================================================
from pyspark.sql.window import Window
from pyspark.sql.functions import expr, col, round, count, desc, row_number, lit
import pandas as pd
import os

print("--- 1. Calculando Estat√≠sticas Globais dos Clusters ---")

# 1. Totais por Cluster (Quantos itens existem no total?)
df_totais = df_clustered.groupBy("prediction").agg(count("*").alias("total_itens_cluster"))

# 2. C√°lculo dos Limites IQR (Q1, Mediana, Q3, Teto)
df_quartis = df_clustered.groupBy("prediction").agg(
    expr("percentile_approx(valor_transacao, 0.25)").alias("Q1"),
    expr("percentile_approx(valor_transacao, 0.50)").alias("Mediana"),
    expr("percentile_approx(valor_transacao, 0.75)").alias("Q3")
)

df_limites = df_quartis.withColumn("IQR", col("Q3") - col("Q1")) \
                       .withColumn("limite_superior", col("Q3") + (1.5 * col("IQR")))

# 3. Cruzamento com dados originais
df_analise = df_clustered.join(df_limites, on="prediction", how="inner") \
                          .join(df_totais, on="prediction", how="inner")

# 4. Filtrando Outliers
# Regra: Acima do teto E acima de R$ 50,00
df_outliers_raw = df_analise.filter((col("valor_transacao") > col("limite_superior")) & 
                                    (col("valor_transacao") > 50))

# ==============================================================================
# PARTE A: ESTAT√çSTICAS DE PORCENTAGEM (Visualiza√ß√£o no Console)
# ==============================================================================
print("\n--- 2. Resumo de Contamina√ß√£o por Cluster ---")

# Conta quantos outliers tem em cada cluster
df_stats_outliers = df_outliers_raw.groupBy("prediction", "total_itens_cluster").agg(count("*").alias("qtd_outliers"))

# Calcula a porcentagem
df_resumo = df_stats_outliers.withColumn("perc_outliers", round((col("qtd_outliers") / col("total_itens_cluster")) * 100, 2)) \
                             .orderBy(desc("perc_outliers"))

df_resumo.show(20)

# ==============================================================================
# PARTE B: TOP 10 MAIS DISCREPANTES POR CLUSTER (Arquivo CSV)
# ==============================================================================
print("--- 3. Extraindo os Top 10 Casos Graves por Cluster ---")

# Definimos uma "Janela" por cluster, ordenando pelo valor mais alto (mais grave)
janela_cluster = Window.partitionBy("prediction").orderBy(col("valor_transacao").desc())

# Criamos um Ranking (1¬∫, 2¬∫, 3¬∫...) e filtramos s√≥ at√© o 10¬∫
df_top10 = df_outliers_raw.withColumn("rank", row_number().over(janela_cluster)) \
                          .filter(col("rank") <= 10)

# Calculamos m√©tricas finais para o relat√≥rio
df_export = df_top10.withColumn(
    "diferenca_valor", 
    col("valor_transacao") - col("Mediana")
).withColumn(
    "x_vezes_mediana", 
    round(col("valor_transacao") / col("Mediana"), 1)
).select(
    col("prediction").alias("Cluster"),
    col("rank").alias("Ranking_Gravidade"),
    col("objeto_aquisicao").alias("Descricao_Item"),
    col("valor_transacao").alias("Preco_Pago"),
    col("Mediana").alias("Preco_Medio_Cluster"),
    col("limite_superior").alias("Teto_Aceitavel"),
    col("x_vezes_mediana").alias("Quantas_Vezes_Mais_Caro"),
    col("total_itens_cluster").alias("Tamanho_Cluster")
).orderBy("Cluster", "Ranking_Gravidade")

# --- Exporta√ß√£o ---
try:
    pdf_top10 = df_export.toPandas()
    nome_arquivo = "relatorio_top10_outliers.csv"
    
    # Salva com encoding correto para Excel/PT-BR
    pdf_top10.to_csv(nome_arquivo, index=False, sep=';', encoding='utf-8-sig', float_format='%.2f')
    
    print(f"\n‚úÖ Arquivo '{nome_arquivo}' gerado com sucesso!")
    print(f"   Conte√∫do: Os 10 maiores desvios de cada um dos clusters.")
    print(f"   Local: {os.getcwd()}/{nome_arquivo}")

except Exception as e:
    print(f"‚ùå Erro ao exportar: {e}")

--- 1. Calculando Estat√≠sticas Globais dos Clusters ---

--- 2. Resumo de Contamina√ß√£o por Cluster ---
+----------+-------------------+------------+-------------+
|prediction|total_itens_cluster|qtd_outliers|perc_outliers|
+----------+-------------------+------------+-------------+
|         1|                765|         114|         14.9|
|        12|                728|          83|         11.4|
|         8|                315|          35|        11.11|
|        10|                691|          74|        10.71|
|         4|                536|          56|        10.45|
|        15|                520|          50|         9.62|
|         9|                593|          55|         9.27|
|        13|                609|          56|          9.2|
|         5|                575|          52|         9.04|
|        14|                367|          32|         8.72|
|        16|               1044|          87|         8.33|
|         3|                657|          53|         

  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)



‚úÖ Arquivo 'relatorio_top10_outliers.csv' gerado com sucesso!
   Conte√∫do: Os 10 maiores desvios de cada um dos clusters.
   Local: c:\VSCode\projetoMineracao/relatorio_top10_outliers.csv


In [56]:
# ==============================================================================
# C√âLULA 11 (CORRIGIDA): Auditoria ML (Random Forest Regressor)
# ==============================================================================
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import col, log, exp, abs as spark_abs, round, desc
import pandas as pd
import os

print("--- Iniciando Auditoria Preditiva (Random Forest) ---")
print(f"Par√¢metros: Trees=50 | Depth=8 | MinInstances=5 | Target=Log(Preco)")

# 1. PREPARA√á√ÉO (CORRE√á√ÉO AQUI: Usamos df_clustered em vez de df_tfidf)
# Assim garantimos que a coluna 'prediction' (Cluster ID) exista para o relat√≥rio final.
df_ml = df_clustered.withColumn("label", log(col("valor_transacao") + 1.0))

# 2. TREINAMENTO DO MODELO
rf = RandomForestRegressor(
    featuresCol="features", 
    labelCol="label",
    predictionCol="prediction_log", # Nome exclusivo para n√£o conflitar com o cluster
    seed=42,
    numTrees=50,
    maxDepth=8,
    minInstancesPerNode=5
)

print("‚è≥ Treinando o modelo (analisando padr√µes globais)...")
model_rf = rf.fit(df_ml)

# 3. PREDI√á√ÉO
predictions = model_rf.transform(df_ml)

# 4. C√ÅLCULO DE DISCREP√ÇNCIA
df_analise_ml = predictions.withColumn("preco_estimado_ml", exp(col("prediction_log")) - 1.0) \
                           .withColumn("razao_sobrepreco", col("valor_transacao") / (col("preco_estimado_ml") + 0.01)) \
                           .withColumn("diferenca_valor", col("valor_transacao") - col("preco_estimado_ml"))

# 5. FILTRAGEM
# Regra: Pre√ßo pago > 3x o estimado E Diferen√ßa > R$ 50
df_suspeitas_ml = df_analise_ml.filter((col("razao_sobrepreco") > 3) & 
                                       (col("diferenca_valor") > 50))

# Sele√ß√£o de colunas (Agora 'prediction' vai funcionar pois veio do df_clustered)
df_export_ml = df_suspeitas_ml.select(
    col("prediction").alias("Cluster_Original"), # <-- Agora esta coluna existe!
    col("objeto_aquisicao").alias("Descricao_Item"),
    col("valor_transacao").alias("Preco_Pago_Real"),
    round(col("preco_estimado_ml"), 2).alias("Preco_Justo_Estimado"),
    round(col("razao_sobrepreco"), 1).alias("Quantas_Vezes_Mais_Caro"),
    col("ano"),
    col("unidade_gestora")
).orderBy(desc("razao_sobrepreco"))

total_suspeitas = df_export_ml.count()
print(f"üö© O modelo encontrou {total_suspeitas} transa√ß√µes suspeitas.")

# --- 6. EXPORTA√á√ÉO ---
print("\n--- Gerando Arquivo de Auditoria ML ---")

try:
    pdf_ml = df_export_ml.toPandas()
    nome_arquivo_ml = "auditoria_ml_random_forest.csv"
    
    pdf_ml.to_csv(nome_arquivo_ml, index=False, sep=';', encoding='utf-8-sig', float_format='%.2f')
    
    print(f"‚úÖ Arquivo gerado: '{nome_arquivo_ml}'")
    
    print("\n--- Top 5 Discrep√¢ncias (Vis√£o do Rob√¥) ---")
    print(pdf_ml.head(5)[["Descricao_Item", "Preco_Pago_Real", "Preco_Justo_Estimado", "Quantas_Vezes_Mais_Caro"]])

except Exception as e:
    print(f"‚ùå Erro na exporta√ß√£o: {e}")

--- Iniciando Auditoria Preditiva (Random Forest) ---
Par√¢metros: Trees=50 | Depth=8 | MinInstances=5 | Target=Log(Preco)
‚è≥ Treinando o modelo (analisando padr√µes globais)...
üö© O modelo encontrou 1601 transa√ß√µes suspeitas.

--- Gerando Arquivo de Auditoria ML ---


  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


‚úÖ Arquivo gerado: 'auditoria_ml_random_forest.csv'

--- Top 5 Discrep√¢ncias (Vis√£o do Rob√¥) ---
                                      Descricao_Item  Preco_Pago_Real  \
0                                      anel superior     1.015300e+09   
1  confeccao de resinas para carimbos material ut...     2.016800e+09   
2                                         chapa zn 0     8.040020e+07   
3           aquisicao de de dois numeros em aco inox     2.116025e+07   
4                       valvula reversora para split     7.300000e+05   

   Preco_Justo_Estimado  Quantas_Vezes_Mais_Caro  
0                527.95                1923045.8  
1               1228.10                1642200.6  
2                213.24                 377030.1  
3                245.20                  86292.7  
4                217.24                   3360.2  


In [58]:
# ==============================================================================
# C√âLULA DE CONSOLIDA√á√ÉO (V2): Cruzamento IQR x ML (Filtro Risco > 10)
# ==============================================================================
import pandas as pd
import os

print("--- Iniciando Cruzamento de Auditorias (Filtro: Score > 10) ---")

# Defini√ß√£o dos arquivos de entrada
file_iqr = "auditoria_outliers_iqr.csv"
file_ml = "auditoria_ml_random_forest.csv"

# Verifica√ß√£o de exist√™ncia
if not os.path.exists(file_iqr) or not os.path.exists(file_ml):
    print("‚ùå Erro: Um dos arquivos de entrada n√£o foi encontrado.")
    print("   Certifique-se de ter rodado a C√©lula 10 (IQR) e a C√©lula 11 (ML).")
else:
    try:
        # 1. Carregar os Relat√≥rios
        # Usamos float_precision='high' para garantir precis√£o nos valores monet√°rios
        df_iqr = pd.read_csv(file_iqr, sep=';', encoding='utf-8-sig')
        df_ml = pd.read_csv(file_ml, sep=';', encoding='utf-8-sig')

        print(f"üìÇ Entradas: IQR ({len(df_iqr)} linhas) | ML ({len(df_ml)} linhas)")

        # 2. Padroniza√ß√£o
        df_ml.rename(columns={'Preco_Pago_Real': 'Preco_Pago'}, inplace=True)

        # 3. O Cruzamento (Interse√ß√£o)
        # Identificamos os itens que aparecem nos DOIS relat√≥rios
        df_consenso = pd.merge(
            df_iqr, 
            df_ml, 
            on=['Descricao_Item', 'Preco_Pago', 'ano', 'unidade_gestora'],
            how='inner',
            suffixes=('_IQR', '_ML')
        )
        
        # 4. C√°lculo do Score de Risco Unificado
        # F√≥rmula: (Vezes mais caro ML) + (Excesso IQR / 100)
        # Ex: Se ML diz que √© 10x mais caro e IQR diz que estourou 500% o teto -> Score = 10 + 5 = 15
        df_consenso['Score_Risco'] = df_consenso['Quantas_Vezes_Mais_Caro'] + (df_consenso['Perc_Excesso'] / 100)
        
        # 5. FILTRAGEM AGRESSIVA (Score > 10)
        # S√≥ passamos para o arquivo final se o risco for alt√≠ssimo
        df_final = df_consenso[df_consenso['Score_Risco'] > 10].copy()
        
        # Organiza√ß√£o das Colunas
        colunas_finais = [
            'Cluster_ID',              
            'Descricao_Item',          
            'Preco_Pago',              
            'Score_Risco',             
            'Preco_Justo_Estimado',    # Vis√£o ML
            'Teto_Estatistico',        # Vis√£o Estat√≠stica
            'Quantas_Vezes_Mais_Caro', # Indicador ML
            'Perc_Excesso',            # Indicador IQR
            'ano',
            'unidade_gestora'
        ]
        
        # Ordena: O maior risco no topo
        df_final = df_final[colunas_finais].sort_values(by='Score_Risco', ascending=False)
        
        qtd_total = len(df_final)
        print(f"üö© Registros Cr√≠ticos (Score > 10): {qtd_total}")

        # 6. Exporta√ß√£o
        nome_arquivo_final = "auditoria_critica_score_10.csv"
        
        if qtd_total > 0:
            df_final.to_csv(nome_arquivo_final, index=False, sep=';', encoding='utf-8-sig', float_format='%.2f')
            print(f"\n‚úÖ Relat√≥rio Cr√≠tico Gerado: '{nome_arquivo_final}'")
            print(f"   Local: {os.getcwd()}/{nome_arquivo_final}")
            
            print("\n--- TOP 10 CASOS MAIS GRAVES (ALERTA VERMELHO) ---")
            display_cols = ['Descricao_Item', 'Preco_Pago', 'Score_Risco', 'Quantas_Vezes_Mais_Caro']
            # Formata√ß√£o para leitura no console
            print(df_final[display_cols].head(10).to_string(index=False))
        else:
            print("\n‚ö†Ô∏è Nenhum registro superou o Score de Risco > 10.")
            print("   Isso significa que, embora existam outliers, nenhum √© t√£o extremo a ponto de cruzar os dois m√©todos com essa intensidade.")

    except Exception as e:
        print(f"‚ùå Erro durante o processamento: {e}")

--- Iniciando Cruzamento de Auditorias (Filtro: Score > 10) ---
üìÇ Entradas: IQR (1040 linhas) | ML (1601 linhas)
üö© Registros Cr√≠ticos (Score > 10): 166

‚úÖ Relat√≥rio Cr√≠tico Gerado: 'auditoria_critica_score_10.csv'
   Local: c:\VSCode\projetoMineracao/auditoria_critica_score_10.csv

--- TOP 10 CASOS MAIS GRAVES (ALERTA VERMELHO) ---
                                                                                                                                             Descricao_Item   Preco_Pago  Score_Risco  Quantas_Vezes_Mais_Caro
confeccao de resinas para carimbos material utilizado para atender as necessidades do protocolo obs a nota fiscal foi preenchida sem a data de emissao logo 2016800184.0 7522083.5184                1642200.6
                                                                                                                                              anel superior 1015300400.0 6879356.2474                1923045.8
                                  

In [60]:
# ==============================================================================
# C√âLULA FINAL (CORRIGIDA): Enriquecimento (Join com Dados Originais)
# ==============================================================================
import pandas as pd
import os
from pyspark.sql.functions import col

print("--- Iniciando Enriquecimento dos Dados (Recuperando CPFs e Favorecidos) ---")

# Arquivo de entrada
file_critico = "auditoria_critica_score_10.csv"

if not os.path.exists(file_critico):
    print(f"‚ùå Erro: O arquivo '{file_critico}' n√£o foi encontrado.")
else:
    try:
        # 1. Carregar as Anomalias (Pandas)
        pdf_criticos = pd.read_csv(file_critico, sep=';', encoding='utf-8-sig')
        qtd_criticos = len(pdf_criticos)
        print(f"üìÇ Carregados {qtd_criticos} registros cr√≠ticos para enriquecimento.")

        if qtd_criticos > 0:
            # 2. Sele√ß√£o e Renomea√ß√£o (CORRIGIDO)
            
            # Lista das colunas que existem no CSV
            cols_csv = ['Descricao_Item', 'Preco_Pago', 'ano', 'unidade_gestora', 
                        'Score_Risco', 'Preco_Justo_Estimado', 'Teto_Estatistico', 
                        'Quantas_Vezes_Mais_Caro', 'Perc_Excesso']
            
            # Filtramos o PDF apenas com essas colunas
            pdf_filtrado = pdf_criticos[cols_csv].copy()
            
            # Agora renomeamos para bater com os nomes do Spark (df_clustered)
            pdf_renomeado = pdf_filtrado.rename(columns={
                'Descricao_Item': 'objeto_aquisicao', 
                'Preco_Pago': 'valor_transacao'
            })
            
            # 3. Cria√ß√£o do DataFrame Spark (Chaves para o Join)
            # O Spark vai receber 'objeto_aquisicao' e 'valor_transacao' corretamente agora
            df_keys = spark.createDataFrame(pdf_renomeado)

            # 4. O JOIN (Recuperando os dados originais)
            # Fazemos o join usando as colunas renomeadas
            df_completo = df_clustered.join(
                df_keys,
                on=['objeto_aquisicao', 'valor_transacao', 'ano', 'unidade_gestora'],
                how='inner'
            )

            # 5. Sele√ß√£o Final para o Relat√≥rio (Organiza√ß√£o)
            df_relatorio_final = df_completo.select(
                # M√©tricas de Auditoria
                col("Score_Risco"),
                col("Quantas_Vezes_Mais_Caro").alias("Fator_Sobrepreco"),
                col("Perc_Excesso").alias("Perc_Acima_Teto_Cluster"),
                col("Preco_Justo_Estimado").alias("Preco_Ref_ML"),
                col("Teto_Estatistico").alias("Preco_Ref_Cluster"),
                
                # Dados da Compra
                col("objeto_aquisicao").alias("Descricao_Original"),
                col("valor_transacao").alias("Valor_Pago"),
                col("data_aquisicao"),
                col("ano"),
                col("unidade_gestora"),
                col("prediction").alias("Cluster_ID"),

                # Dados Cadastrais (Ouro)
                col("nome_suprido").alias("Nome_Suprido"),
                col("cpf_suprido").alias("CPF_Suprido"),
                col("nome_favorecido").alias("Nome_Favorecido"),
                col("cpf_cnpj_favorecido").alias("CNPJ_Favorecido"),
                
                col("periodo_aplicacao"),
                col("aprovado")
            ).orderBy(col("Score_Risco").desc())

            # 6. Exporta√ß√£o Final
            nome_arquivo_rastreio = "AUDITORIA_COMPLETA_RASTREAVEL.csv"
            
            pdf_final = df_relatorio_final.toPandas()
            pdf_final.to_csv(nome_arquivo_rastreio, index=False, sep=';', encoding='utf-8-sig', float_format='%.2f')

            print(f"\n‚úÖ Relat√≥rio Completo Gerado: '{nome_arquivo_rastreio}'")
            print(f"   Conte√∫do: {len(pdf_final)} linhas com dados cadastrais completos.")
            print(f"   Local: {os.getcwd()}/{nome_arquivo_rastreio}")
            
            print("\n--- Exemplo de Registro Completo (Top 1) ---")
            print(pdf_final.head(1).T)

        else:
            print("‚ö†Ô∏è N√£o h√° registros cr√≠ticos para enriquecer.")

    except Exception as e:
        print(f"‚ùå Erro no enriquecimento: {e}")

--- Iniciando Enriquecimento dos Dados (Recuperando CPFs e Favorecidos) ---
üìÇ Carregados 166 registros cr√≠ticos para enriquecimento.


  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)
  PyArrow >= 4.0.0 must be installed; however, it was not found.
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)



‚úÖ Relat√≥rio Completo Gerado: 'AUDITORIA_COMPLETA_RASTREAVEL.csv'
   Conte√∫do: 94 linhas com dados cadastrais completos.
   Local: c:\VSCode\projetoMineracao/AUDITORIA_COMPLETA_RASTREAVEL.csv

--- Exemplo de Registro Completo (Top 1) ---
                                                                        0
Score_Risco                                                         82.01
Fator_Sobrepreco                                                     59.3
Perc_Acima_Teto_Cluster                                           2271.33
Preco_Ref_ML                                                        74.16
Preco_Ref_Cluster                                                  185.55
Descricao_Original       compra emergencial para enfermagem mascara caixa
Valor_Pago                                                         4400.0
data_aquisicao                                                 17/09/2020
ano                                                                  2020
unidade_gestora   