# MOMENTO 1 ‚Äî Importa√ß√£o e organiza√ß√£o dos insumos

## ETAPA 0 ‚Äî Setup do ambiente + imports centralizados
**Resumo:** Preparar depend√™ncias, padronizar imports e iniciar sess√£o Spark.

In [None]:
# =========================
# ETAPA 13.0 ‚Äî Setup Streamlit no Google Colab
# =========================

!pip install -q streamlit pyngrok pandas numpy
!pip -q install streamlit pyarrow pandas numpy
!npm -q install -g localtunnel



In [None]:
# =========================
# ETAPA 0 ‚Äî Setup (instala√ß√£o opcional) + Imports centralizados
# =========================

# --- 0.0: Instala√ß√£o (ative se estiver em ambiente que precisa) ---
# !pip -q install pyspark ijson
!pip install -q streamlit pyngrok pandas numpy


# --- 0.1: Python stdlib ---
import os
import re
import json
import time
from pathlib import Path

# --- 0.2: Data / Viz (se voc√™ usar mesmo; se n√£o usar, pode remover) ---
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
# import seaborn as sns  # (opcional) se n√£o estiver usando, remova para evitar depend√™ncia extra

# --- 0.3: JSON streaming (Etapas de leitura) ---
import ijson

# --- 0.4: Spark SQL core ---
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import (
    StringType, IntegerType, DoubleType, BooleanType,
    StructType, StructField, ArrayType
)

# --- 0.5: Spark ML helpers ---
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array

# Transformers / feature engineering
from pyspark.ml.feature import (
    StringIndexer, OneHotEncoder, VectorAssembler,
    Tokenizer, RegexTokenizer, StopWordsRemover, HashingTF, IDF,
    StandardScaler
)

# Models
from pyspark.ml.classification import LogisticRegression

# Split / Tuning (se voc√™ usar depois)
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Evaluators / m√©tricas
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# --- 0.6: Spark session (se voc√™ cria aqui; se j√° existe no ambiente, pode comentar) ---
spark = (
    SparkSession.builder
    .appName("Projeto-ML-RH")
    .getOrCreate()
)

print("[Etapa 0] Setup OK | Spark:", spark.version)



## ETAPA 1 ‚Äî Ingest√£o dos dados no Colab e padroniza√ß√£o de caminhos
**Resumo:** upload de arquivos do  Google Drive, organizar paths de entrada/sa√≠da e preparar convers√£o JSON ‚Üí JSONL ‚Üí Parquet.


In [None]:
# =========================
## ETAPA 1 -- INICIO DE AJUSTES DOS DADOS UTILIZADOS NO PROJETO
# Buscando arquivos (bases compartilhadas para elabora√ß√£o do projeto) para abrir (Google Drive) e realizar modifica√ß√µes
# Objetivo final: migrar o tipo de arquivo de JSON > JSONL > PARQUET ==> melhor modelagem para desenvolvimento de schemas de dados entre bases (Spark)
# =========================

# puxando arquivos na pasta do GDrive para elabora√ß√£o do projeto
from google.colab import drive
drive.mount('/content/drive')

# puxando PASTA DO PROJETO (-- obs: criando uma pasta apenas para outputs)
BASE_DIR = "/content/drive/MyDrive/PoÃÅs Tech - Data Analytics & ML - FIAP/FASE 5/Tech Challenge - Fase 5"
DATA_DIR = f"{BASE_DIR}/Base de Dados"
OUT_DIR  = f"{BASE_DIR}/_outputs"

Path(OUT_DIR).mkdir(parents=True, exist_ok=True)

# ENTRADA: Arquivos en JSON
APPLICANTS_JSON = f"{DATA_DIR}/applicants.json"
PROSPECTS_JSON  = f"{DATA_DIR}/prospects.json"
VAGAS_JSON      = f"{DATA_DIR}/vagas.json"

# SA√çDA 0 (intermedi√°ria):  1¬™ camada de ajustes: JSON > JSONL (arquivos JSON dict, organizados por ID)
APPLICANTS_JSONL = f"{OUT_DIR}/applicants.jsonl"
PROSPECTS_JSONL  = f"{OUT_DIR}/prospects.jsonl"
VAGAS_JSONL      = f"{OUT_DIR}/vagas.jsonl"

# SA√çDA FINAL: 2¬™ camada de ajustes: JSON > JSONL (arquivos JSON dict, organizados por ID)
APPLICANTS_PARQUET = f"{OUT_DIR}/applicants_parquet"
PROSPECTS_PARQUET  = f"{OUT_DIR}/prospects_parquet"
VAGAS_PARQUET      = f"{OUT_DIR}/vagas_parquet"

# Valida√ß√£o
for p in [APPLICANTS_JSON, PROSPECTS_JSON, VAGAS_JSON]:
    print("exists?", os.path.exists(p), "|", p)

# MOMENTO 2 ‚Äî Engenharia de dados e estrutura√ß√£o em Big Data


## ETAPA 2 ‚Äî Carregamento dos Parquets e defini√ß√£o dos DataFrames-base
**Resumo:** ler Parquet no Spark, criar dataframes principais e validar volume/esquema antes das transforma√ß√µes.


In [None]:
# =========================
## ETAPA 2: SEPARANDO OS ARQUIVOS PARQUET E DEFININDO PRINCIPAIS DATAFRAMES PARA O PROJETO
# =========================


# --- Carregar Parquets ---
df_prospects = spark.read.parquet(PROSPECTS_PARQUET)
df_vagas     = spark.read.parquet(VAGAS_PARQUET)
df_apps      = spark.read.parquet(APPLICANTS_PARQUET)

# Teste de Sanidade
print("prospects:", df_prospects.count(), "linhas")
print("vagas:", df_vagas.count(), "linhas")
print("applicants:", df_apps.count(), "linhas")


## ETAPA 3 ‚Äî Normaliza√ß√£o e consolida√ß√£o da base prospects (para join/ML)
**Resumo:** achatar estruturas aninhadas, definir chave **√∫nica** e consolidar registros para formar uma base consistente para modelagem.


In [None]:
# =========================
# ETAPA 3 ‚Äî NORMALIZA√á√ÉO + CHAVE + CARDINALIDADE + CONSOLIDA√á√ÉO (UNIFICADA e RESUMIDA)
# Inputs esperados:
#   - df_prospects (schema: __root_id, modalidade, titulo, prospects: array<struct{codigo,...}>)
#   - (opcional depois) df_apps, df_vagas para join na Etapa 4
# Outputs:
#   - df_pro_flat  : 1 linha por item do array prospects (explode)
#   - df_pro_model : 1 linha por codigo (consolidado p/ join/ML)
# =========================


# 3.1 ‚Äî Inspe√ß√£o m√≠nima (schema + colunas)
print("\n[Etapa 3.1] Columns df_prospects:", df_prospects.columns)
df_prospects.printSchema()

# 3.2 ‚Äî Flatten (explode) + sele√ß√£o m√≠nima de campos √∫teis (normaliza√ß√£o estrutural)
df_pro_flat = (
    df_prospects
    .select("__root_id","modalidade","titulo", F.explode_outer("prospects").alias("p"))
    .select(
        "__root_id","modalidade","titulo",
        F.col("p.codigo").alias("codigo"),
        F.col("p.nome").alias("nome"),
        F.col("p.recrutador").alias("recrutador"),
        F.col("p.data_candidatura").alias("data_candidatura"),
        F.col("p.situacao_candidado").alias("situacao_candidado"),
        F.col("p.ultima_atualizacao").alias("ultima_atualizacao"),
        F.col("p.comentario").alias("comentario")
    )
)

# 3.3 ‚Äî Diagn√≥stico de cardinalidade (prints curtos e objetivos)
print("\n[Etapa 3.3] Top __root_id counts (estrutura do JSON):")
df_pro_flat.groupBy("__root_id").count().orderBy(F.desc("count")).show(5, truncate=False)

print("\n[Etapa 3.3] Top codigo counts (duplicidade/nulos da chave candidata):")
df_pro_flat.groupBy("codigo").count().orderBy(F.desc("count")).show(10, truncate=False)

print("\n[Etapa 3.3] Sanidade codigo:")
print("rows(flat) =", df_pro_flat.count())
print("null(codigo) =", df_pro_flat.where(F.col("codigo").isNull()).count())
print("distinct(codigo, non-null) =", df_pro_flat.where(F.col("codigo").isNotNull()).select("codigo").distinct().count())

# 3.4 ‚Äî Consolida√ß√£o: 1 linha por codigo (mant√©m registro mais recente por ultima_atualizacao)
# CORRE√á√ÉO: usar try_to_timestamp via expr (maior compatibilidade com vers√µes Spark)

df_tmp = (
    df_pro_flat
    .where(F.col("codigo").isNotNull())
    .withColumn(
        "ultima_atualizacao_ts",
        F.coalesce(
            F.expr("try_to_timestamp(ultima_atualizacao, 'dd-MM-yyyy HH:mm:ss')"),
            F.expr("try_to_timestamp(ultima_atualizacao, 'dd-MM-yyyy')"),
            F.expr("try_to_timestamp(ultima_atualizacao, 'yyyy-MM-dd HH:mm:ss')"),
            F.expr("try_to_timestamp(ultima_atualizacao, 'yyyy-MM-dd')")
        )
    )
    .withColumn(
        "data_candidatura_ts",
        F.coalesce(
            F.expr("try_to_timestamp(data_candidatura, 'dd-MM-yyyy HH:mm:ss')"),
            F.expr("try_to_timestamp(data_candidatura, 'dd-MM-yyyy')"),
            F.expr("try_to_timestamp(data_candidatura, 'yyyy-MM-dd HH:mm:ss')"),
            F.expr("try_to_timestamp(data_candidatura, 'yyyy-MM-dd')")
        )
    )
)

w = Window.partitionBy("codigo").orderBy(F.col("ultima_atualizacao_ts").desc_nulls_last())

df_pro_model = (
    df_tmp
    .withColumn("rn", F.row_number().over(w))
    .where(F.col("rn") == 1)
    .drop("rn")
)

# 3.5 ‚Äî Output final da Etapa 3 (pronto para join/ML)
print("\n[Etapa 3] OUTPUTS:")
print("df_pro_flat rows =", df_pro_flat.count())
print("df_pro_model rows (1 linha por codigo) =", df_pro_model.count())

print("\n[Etapa 3] Sanidade datas (ap√≥s parsing):")
print("null(ultima_atualizacao_ts) =", df_tmp.where(F.col("ultima_atualizacao_ts").isNull()).count())
print("null(data_candidatura_ts) =", df_tmp.where(F.col("data_candidatura_ts").isNull()).count())

df_pro_model.select(
    "codigo","nome","titulo","modalidade",
    "situacao_candidado","ultima_atualizacao","ultima_atualizacao_ts"
).show(10, truncate=False)


## ETAPA 4 ‚Äî Join e cria√ß√£o da base unificada (Gold)
**Resumo:** unir candidatos (apps/prospects) com vagas, padronizar chaves/colunas e gerar a base final para feature engineering e modelagem.


In [None]:
# =========================
# ETAPA 4 ‚Äî JOIN + BASE GOLD (UNIFICADA e RESUMIDA)
# Inputs:
#   - df_pro_model  (Etapa 3)  -> deve ter coluna "codigo" (string/int ok, vamos cast)
#   - df_apps       (candidaturas/applicants)
#   - df_vagas      (vagas/jobs)
# Output:
#   - df_gold       (base unificada p/ feature engineering / ML)
# =========================

from pyspark.sql import functions as F

# 4.1 ‚Äî Inspe√ß√£o m√≠nima (para n√£o chutar nomes)
print("\n[Etapa 4.1] df_apps columns:", df_apps.columns)
df_apps.printSchema()

print("\n[Etapa 4.1] df_vagas columns:", df_vagas.columns)
df_vagas.printSchema()

# 4.2 ‚Äî Fun√ß√£o curta para sugerir chaves por nome (heur√≠stica)
def pick_key(cols, patterns):
    for p in patterns:
        for c in cols:
            if p in c.lower():
                return c
    return None

# 4.3 ‚Äî Definir chaves
pro_key = "codigo"  # fixo da Etapa 3

# candidata chave de candidato em df_apps
app_key = pick_key(df_apps.columns, [
    "codigo", "candidate", "candidato", "applicant", "prospect", "id"
])

# candidata chave de vaga em df_apps e df_vagas
job_keyA = pick_key(df_apps.columns,  ["vaga", "job", "position", "requisition", "id_vaga", "job_id"])
job_keyV = pick_key(df_vagas.columns, ["vaga", "job", "position", "requisition", "id_vaga", "job_id"])

print("\n[Etapa 4.3] Chaves sugeridas:")
print("pro_key  =", pro_key)
print("app_key  =", app_key)
print("job_keyA =", job_keyA)
print("job_keyV =", job_keyV)

# 4.4 ‚Äî Normaliza√ß√£o leve de tipos (string) para reduzir mismatch no join
df_pro = df_pro_model.withColumn(pro_key, F.col(pro_key).cast("string"))
df_app = df_apps
df_vag = df_vagas

if app_key:
    df_app = df_app.withColumn(app_key, F.col(app_key).cast("string"))
if job_keyA:
    df_app = df_app.withColumn(job_keyA, F.col(job_keyA).cast("string"))
if job_keyV:
    df_vag = df_vag.withColumn(job_keyV, F.col(job_keyV).cast("string"))

# 4.5 ‚Äî JOIN prospects ‚Üî apps (obrigat√≥rio) COM DESAMBIGUA√á√ÉO DE COLUNAS
# Fix: usar alias + selecionar colunas com prefixo p__/app__ para evitar nomes duplicados
p = df_pro.alias("p")
a = df_app.alias("a")

df_join_pa = (
    p.join(a, F.col(f"p.{pro_key}") == F.col(f"a.{app_key}"), "left")
     .select(
         *[F.col(f"p.{c}").alias(c) for c in df_pro.columns],                       # mant√©m prospects sem prefixo
         *[F.col(f"a.{c}").alias(f"app__{c}") for c in df_app.columns]              # prefixa tudo de apps
     )
)

# 4.6 ‚Äî JOIN com vagas (opcional) COM DESAMBIGUA√á√ÉO
if job_keyA and job_keyV:
    j = df_join_pa.alias("j")
    v = df_vag.alias("v")

    df_gold = (
        j.join(v, F.col(f"j.{job_keyA}") == F.col(f"v.{job_keyV}"), "left")
         .select(
             *[F.col(f"j.{c}").alias(c) for c in df_join_pa.columns],               # mant√©m tudo do join_pa
             *[F.col(f"v.{c}").alias(f"vag__{c}") for c in df_vag.columns]          # prefixa tudo de vagas
         )
    )
else:
    print("\n[Etapa 4.6] Aviso: n√£o identifiquei chave de vaga em df_apps e/ou df_vagas. df_gold ficar√° sem join de vagas.")
    df_gold = df_join_pa

# 4.7 ‚Äî Sanity check final (agora SEM ambiguidade)
print("\n[Etapa 4.7] Sanidade df_gold:")
print("rows(df_pro_model) =", df_pro_model.count())
print("rows(df_gold)      =", df_gold.count())
print("distinct(codigo)   =", df_gold.select(pro_key).distinct().count())

# taxa de match: como apps foi prefixado, a chave do apps virou app__{app_key}
app_key_pref = f"app__{app_key}"

match_apps = (
    df_gold
    .withColumn("matched_apps", F.when(F.col(app_key_pref).isNotNull(), F.lit(1)).otherwise(F.lit(0)))
    .groupBy(pro_key)
    .agg(F.max("matched_apps").alias("has_app"))
    .agg(F.avg("has_app").alias("pct_match_apps"))
    .collect()[0]["pct_match_apps"]
)

print("pct_match_apps ‚âà", float(match_apps))

# Amostra enxuta (somente colunas garantidas do prospects + flag + chave prefixada)
df_gold.select(
    pro_key, "nome", "titulo", "situacao_candidado", "ultima_atualizacao", app_key_pref
).show(10, truncate=False)


# MOMENTO 3 ‚Äî Feature Engineering e prepara√ß√£o para modelagem

## ETAPA 5 ‚Äî Feature Engineering (dataset de modelagem)
**Resumo:** criar target (y) e transformar a base Gold em uma tabela tabular com features prontas para treino e avalia√ß√£o.



In [None]:
# =========================
# ETAPA 5 ‚Äî FEATURE ENGINEERING (UNIFICADA e RESUMIDA)
# Input:
#   - df_gold (sa√≠da da Etapa 4, com colunas do prospects + app__ structs)
# Output:
#   - df_ml (dataset tabular com label y + features prontas p/ ML)
# =========================

from pyspark.sql import functions as F
import re

# 5.1 ‚Äî Target (y) a partir do status (situacao_candidado) ‚Äî ajuste a lista se quiser
pos_patterns = [
    "contratad", "aprovad", "hunting", "admit", "selecionad"
]
pos_regex = "|".join([re.escape(p) for p in pos_patterns])

df_feat = (
    df_gold
    .withColumn("status_lc", F.lower(F.coalesce(F.col("situacao_candidado"), F.lit(""))))
    .withColumn("y", F.when(F.col("status_lc").rlike(pos_regex), F.lit(1)).otherwise(F.lit(0)))
)

# 5.2 ‚Äî Flatten m√≠nimo do df_apps (structs -> colunas simples) e limpeza leve
# (S√≥ campos muito √∫teis e de baixo custo; evita explodir o dataset)
df_feat = (
    df_feat
    .withColumn("app_nome", F.col("app__informacoes_pessoais.nome"))
    .withColumn("app_email", F.coalesce(F.col("app__informacoes_pessoais.email"), F.col("app__infos_basicas.email")))
    .withColumn("app_local", F.coalesce(F.col("app__infos_basicas.local"), F.col("app__informacoes_pessoais.endereco")))
    .withColumn("app_nivel_academico", F.col("app__formacao_e_idiomas.nivel_academico"))
    .withColumn("app_nivel_ingles", F.col("app__formacao_e_idiomas.nivel_ingles"))
    .withColumn("app_nivel_espanhol", F.col("app__formacao_e_idiomas.nivel_espanhol"))
    .withColumn("app_area_atuacao", F.col("app__informacoes_profissionais.area_atuacao"))
    .withColumn("app_nivel_profissional", F.col("app__informacoes_profissionais.nivel_profissional"))
    .withColumn("app_remuneracao_raw", F.col("app__informacoes_profissionais.remuneracao"))
    .withColumn("app_conhecimentos_raw", F.col("app__informacoes_profissionais.conhecimentos_tecnicos"))
    .withColumn("app_certificacoes_raw", F.col("app__informacoes_profissionais.certificacoes"))
)

# 5.3 ‚Äî Features simples e robustas (sem parsing complexo de sal√°rio por enquanto)
df_feat = (
    df_feat
    .withColumn("titulo_len", F.length(F.coalesce(F.col("titulo"), F.lit(""))))
    .withColumn("nome_len", F.length(F.coalesce(F.col("nome"), F.lit(""))))
    .withColumn("has_email", F.when(F.col("app_email").isNotNull() & (F.length(F.col("app_email")) > 3), 1).otherwise(0))
    .withColumn("has_linkedin", F.when(F.col("app__informacoes_pessoais.url_linkedin").isNotNull(), 1).otherwise(0))
    .withColumn("has_cv_pt", F.when(F.col("app__cv_pt").isNotNull() & (F.length(F.col("app__cv_pt")) > 0), 1).otherwise(0))
    .withColumn("has_cv_en", F.when(F.col("app__cv_en").isNotNull() & (F.length(F.col("app__cv_en")) > 0), 1).otherwise(0))
    .withColumn("has_conhecimentos", F.when(F.col("app_conhecimentos_raw").isNotNull() & (F.length(F.col("app_conhecimentos_raw")) > 0), 1).otherwise(0))
    .withColumn("has_certificacoes", F.when(F.col("app_certificacoes_raw").isNotNull() & (F.length(F.col("app_certificacoes_raw")) > 0), 1).otherwise(0))
)

# 5.4 ‚Äî Sele√ß√£o final do dataset para ML (tabular + textos raw como opcional)
df_ml = df_feat.select(
    F.col("codigo").cast("string").alias("candidate_id"),
    "y",
    "modalidade",
    "titulo",
    "status_lc",
    "ultima_atualizacao",
    "titulo_len",
    "nome_len",
    "has_email",
    "has_linkedin",
    "has_cv_pt",
    "has_cv_en",
    "has_conhecimentos",
    "has_certificacoes",
    "app_local",
    "app_nivel_academico",
    "app_nivel_profissional",
    "app_area_atuacao",
    "app_nivel_ingles",
    "app_nivel_espanhol",
    # textos raw (mant√©m pra NLP futura, mas n√£o quebra nada agora)
    "app_conhecimentos_raw",
    "app_certificacoes_raw",
    "app_remuneracao_raw"
)

# 5.5 ‚Äî Sanity check curto
print("\n[Etapa 5] Sanidade df_ml:")
print("rows =", df_ml.count())
df_ml.groupBy("y").count().show(truncate=False)
df_ml.show(10, truncate=False)


# MOMENTO 4 ‚Äî Modelagem e avalia√ß√£o anal√≠tica


## ETAPA 6 ‚Äî Modelagem e racional anal√≠tico
**Resumo:** Nessa etapa, √© necess√°rio introduzir sobre o contexto l√≥gico de escolha do modelo, abordagem de treinamento e interpreta√ß√£o dos resultados, considerando o RH como principal stakeholder.



*Inicialmente, foi desenvolvido um modelo generalista utilizando todos os candidatos dispon√≠veis, de modo a capturar padr√µes globais de sucesso no processo seletivo. Reconhece-se, entretanto, que diferentes √°reas profissionais apresentam caracter√≠sticas e crit√©rios distintos, motivo pelo qual an√°lises segmentadas por √°rea s√£o propostas como extens√£o do trabalho.*



In [None]:
# =========================
# ETAPA 6 ‚Äî MODELAGEM BASELINE (ROBUSTA) SEM ONEHOT
# Motivo: evitar erros de metadata/nomes no OneHotEncoder ("Cannot have an empty string for name" etc.)
# Input:
#   - df_ml (Etapa 5)
# Output:
#   - model, predictions, m√©tricas
# =========================

from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

label_col = "y"

num_cols = [
    "titulo_len",
    "nome_len",
    "has_email",
    "has_linkedin",
    "has_cv_pt",
    "has_cv_en",
    "has_conhecimentos",
    "has_certificacoes"
]

cat_cols = [
    "modalidade",
    "app_local",
    "app_nivel_academico",
    "app_nivel_profissional",
    "app_area_atuacao",
    "app_nivel_ingles",
    "app_nivel_espanhol"
]

# 6.1 ‚Äî Dataset base
df_model = df_ml.select(label_col, *num_cols, *cat_cols)

# 6.2 ‚Äî Normaliza√ß√£o forte (garante que NUNCA tem null/"")
# num√©ricas: double + null -> 0
for c in num_cols:
    df_model = df_model.withColumn(c, F.coalesce(F.col(c).cast("double"), F.lit(0.0)))

# categ√≥ricas: trim + null/" " -> "unknown"
for c in cat_cols:
    df_model = df_model.withColumn(
        c,
        F.when(
            F.trim(F.coalesce(F.col(c).cast("string"), F.lit(""))) == "",
            F.lit("unknown")
        ).otherwise(F.trim(F.col(c).cast("string")))
    )

# (debug m√≠nimo ‚Äî pode comentar depois)
print("\n[Etapa 6] Checagem de strings vazias ap√≥s limpeza:")
for c in cat_cols:
    empties = df_model.where(F.col(c) == "").count()
    nulls = df_model.where(F.col(c).isNull()).count()
    print(f"- {c}: empty={empties}, null={nulls}")

# 6.3 ‚Äî Indexa√ß√£o categ√≥rica (sem OneHot)
indexers = [
    StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep")
    for c in cat_cols
]

# 6.4 ‚Äî VectorAssembler (num√©ricas + idx)
feature_cols = num_cols + [f"{c}_idx" for c in cat_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# 6.5 ‚Äî Modelo baseline
lr = LogisticRegression(
    featuresCol="features",
    labelCol=label_col,
    maxIter=50,
    regParam=0.01
)

pipeline = Pipeline(stages=indexers + [assembler, lr])

# 6.6 ‚Äî Split
train_df, test_df = df_model.randomSplit([0.7, 0.3], seed=42)
print("\n[Etapa 6] Split:")
print("train =", train_df.count())
print("test  =", test_df.count())

# 6.7 ‚Äî Treino
model = pipeline.fit(train_df)

# 6.8 ‚Äî Predi√ß√£o
predictions = model.transform(test_df)

# 6.9 ‚Äî M√©tricas
auc_eval = BinaryClassificationEvaluator(
    labelCol=label_col,
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

acc_eval = MulticlassClassificationEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="accuracy"
)

prec_eval = MulticlassClassificationEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="weightedPrecision"
)

rec_eval = MulticlassClassificationEvaluator(
    labelCol=label_col,
    predictionCol="prediction",
    metricName="weightedRecall"
)

auc = auc_eval.evaluate(predictions)
acc = acc_eval.evaluate(predictions)
prec = prec_eval.evaluate(predictions)
rec = rec_eval.evaluate(predictions)

print("\n[Etapa 6] M√©tricas:")
print(f"AUC       = {auc:.4f}")
print(f"Accuracy  = {acc:.4f}")
print(f"Precision = {prec:.4f}")
print(f"Recall    = {rec:.4f}")

# 6.10 ‚Äî Amostra de predi√ß√µes
predictions.select(label_col, "prediction", "probability").show(10, truncate=False)


## ETAPA 7 ‚Äî Avalia√ß√£o do modelo e an√°lise de m√©tricas
**Resumo:** Avaliar o desempenho do modelo por m√©tricas estat√≠sticas e interpretar os resultados no contexto de decis√£o do RH.


In [None]:
# =========================
# ETAPA 7 ‚Äî AVALIA√á√ÉO CORRETA (DESBALANCEAMENTO + CONFUSION + THRESHOLD)
# Input:
#   - predictions (sa√≠da da Etapa 6)
#   - df_model (usado na Etapa 6) OU df_ml (para distribui√ß√£o do y)
# Output:
#   - distribui√ß√£o do y
#   - confusion matrix
#   - precision/recall/f1 da classe positiva (y=1)
#   - sugest√£o de threshold (opcional)
# =========================

# 7.1 ‚Äî Distribui√ß√£o do target no dataset todo
print("\n[Etapa 7.1] Distribui√ß√£o do y (dataset completo):")
df_ml.groupBy("y").count().orderBy("y").show()

# 7.2 ‚Äî Confusion Matrix no TEST (predictions)
# tn, fp, fn, tp
cm = (
    predictions
    .select("y", "prediction")
    .withColumn("y_int", F.col("y").cast("int"))
    .withColumn("pred_int", F.col("prediction").cast("int"))
    .groupBy("y_int", "pred_int")
    .count()
)

print("\n[Etapa 7.2] Confusion Matrix (test):")
cm.orderBy("y_int", "pred_int").show()

# 7.3 ‚Äî M√©tricas manuais focadas na classe positiva (y=1)
agg = (
    predictions
    .select(F.col("y").cast("int").alias("y"), F.col("prediction").cast("int").alias("pred"))
    .agg(
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==1), 1).otherwise(0)).alias("tp"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred")==1), 1).otherwise(0)).alias("fp"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==0), 1).otherwise(0)).alias("fn"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred")==0), 1).otherwise(0)).alias("tn"),
    )
).collect()[0]

tp, fp, fn, tn = agg["tp"], agg["fp"], agg["fn"], agg["tn"]

precision_pos = tp / (tp + fp) if (tp + fp) else 0.0
recall_pos    = tp / (tp + fn) if (tp + fn) else 0.0
f1_pos        = (2 * precision_pos * recall_pos / (precision_pos + recall_pos)) if (precision_pos + recall_pos) else 0.0

print("\n[Etapa 7.3] M√©tricas para classe positiva (y=1):")
print(f"TP={tp}  FP={fp}  FN={fn}  TN={tn}")
print(f"Precision(y=1) = {precision_pos:.4f}")
print(f"Recall(y=1)    = {recall_pos:.4f}")
print(f"F1(y=1)        = {f1_pos:.4f}")

# 7.4 ‚Äî Ajuste de threshold usando probabilidade da classe 1
# pega prob da classe 1: probability[1]

pred_prob = (
    predictions
    .withColumn("prob_arr", vector_to_array(F.col("probability")))
    .withColumn("p1", F.col("prob_arr")[1])
)

# thresholds simples
thresholds = [0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.5]

print("\n[Etapa 7.4] Varredura simples de threshold (classe 1):")
rows = []

for t in thresholds:
    tmp = pred_prob.withColumn("pred_t", F.when(F.col("p1") >= F.lit(t), 1).otherwise(0))

    m = (
        tmp.select(F.col("y").cast("int").alias("y"), F.col("pred_t").cast("int").alias("pred"))
        .agg(
            F.sum(F.when((F.col("y")==1) & (F.col("pred")==1), 1).otherwise(0)).alias("tp"),
            F.sum(F.when((F.col("y")==0) & (F.col("pred")==1), 1).otherwise(0)).alias("fp"),
            F.sum(F.when((F.col("y")==1) & (F.col("pred")==0), 1).otherwise(0)).alias("fn"),
        )
        .collect()[0]
    )

    tp2, fp2, fn2 = m["tp"], m["fp"], m["fn"]
    p = tp2 / (tp2 + fp2) if (tp2 + fp2) else 0.0
    r = tp2 / (tp2 + fn2) if (tp2 + fn2) else 0.0
    f1 = (2*p*r/(p+r)) if (p+r) else 0.0
    rows.append((t, tp2, fp2, fn2, p, r, f1))

for (t, tp2, fp2, fn2, p, r, f1) in rows:
    print(f"thr={t:.2f} | TP={tp2} FP={fp2} FN={fn2} | Prec={p:.3f} Rec={r:.3f} F1={f1:.3f}")

# extra: mostra distribui√ß√£o das probabilidades pra voc√™ entender escala
print("\n[Etapa 7.4] Distribui√ß√£o de p1 (classe 1):")
pred_prob.select("p1").summary("count","min","25%","50%","75%","max","mean").show(truncate=False)


# MOMENTO 5 ‚Äî Aplica√ß√£o do modelo e gera√ß√£o de valor

## ETAPA 8 ‚Äî Aplica√ß√£o pr√°tica do modelo no contexto de RH
**Resumo:** demonstrar como o output do modelo pode ser utilizado para apoiar decis√µes reais de recrutamento e triagem de candidatos.



In [None]:
# =========================
# ETAPA 8 ‚Äî MODELO BALANCEADO (UNIFICADO)
# Objetivo: aumentar Recall da classe positiva (y=1)
# Estrat√©gia: class_weight manual
# =========================


# 8.1 ‚Äî Calcula pesos por classe (baseado no treino)
label_dist = (
    train_df
    .groupBy("y")
    .count()
    .collect()
)

counts = {row["y"]: row["count"] for row in label_dist}
total = counts[0] + counts[1]

w0 = total / (2 * counts[0])
w1 = total / (2 * counts[1])

print(f"[Etapa 8.1] Pesos calculados: w0={w0:.3f} | w1={w1:.3f}")

# 8.2 ‚Äî Cria coluna weight
train_w = train_df.withColumn(
    "weight",
    F.when(F.col("y") == 1, F.lit(w1)).otherwise(F.lit(w0))
)

test_w = test_df.withColumn("weight", F.lit(1.0))

# 8.3 ‚Äî Pipeline completo (indexers + assembler + LR com weight)
lr_bal = LogisticRegression(
    labelCol="y",
    featuresCol="features",
    weightCol="weight",
    maxIter=50,
    regParam=0.01
)

pipeline_bal = Pipeline(stages=indexers + [assembler, lr_bal])

# 8.4 ‚Äî Treino
model_bal = pipeline_bal.fit(train_w)

# 8.5 ‚Äî Predi√ß√£o
pred_bal = model_bal.transform(test_w)

# 8.6 ‚Äî AUC
auc_eval = BinaryClassificationEvaluator(
    labelCol="y",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)
auc_bal = auc_eval.evaluate(pred_bal)
print(f"\n[Etapa 8.6] AUC (balanceado) = {auc_bal:.4f}")

# 8.7 ‚Äî M√©tricas da classe positiva (y=1)
agg = (
    pred_bal
    .select(F.col("y").cast("int").alias("y"), F.col("prediction").cast("int").alias("pred"))
    .agg(
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==1), 1).otherwise(0)).alias("tp"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred")==1), 1).otherwise(0)).alias("fp"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==0), 1).otherwise(0)).alias("fn"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred")==0), 1).otherwise(0)).alias("tn"),
    )
).collect()[0]

TP, FP, FN, TN = agg["tp"], agg["fp"], agg["fn"], agg["tn"]

precision_pos = TP / (TP + FP) if (TP + FP) else 0.0
recall_pos    = TP / (TP + FN) if (TP + FN) else 0.0
f1_pos        = (2 * precision_pos * recall_pos / (precision_pos + recall_pos)) if (precision_pos + recall_pos) else 0.0

print("\n[Etapa 8.7] M√©tricas classe positiva (y=1) ‚Äî modelo balanceado:")
print(f"TP={TP}  FP={FP}  FN={FN}  TN={TN}")
print(f"Precision(y=1) = {precision_pos:.4f}")
print(f"Recall(y=1)    = {recall_pos:.4f}")
print(f"F1(y=1)        = {f1_pos:.4f}")

# 8.8 ‚Äî Distribui√ß√£o de probabilidade p1
pred_bal_prob = (
    pred_bal
    .withColumn("p1", vector_to_array(F.col("probability"))[1])
)

print("\n[Etapa 8.8] Distribui√ß√£o p1 (classe 1) ‚Äî modelo balanceado:")
pred_bal_prob.select("p1").summary(
    "count","min","25%","50%","75%","max","mean"
).show(truncate=False)

# 8.9 ‚Äî (opcional) mostra 10 exemplos
pred_bal_prob.select("y", "prediction", "p1").show(10, truncate=False)


## ETAPA 9 ‚Äî Constru√ß√£o do aplicativo anal√≠tico (Streamlit)
**Resumo:** Disponibilizar os resultados do projeto em modelo interativo, buscando facilitar a interpreta√ß√£o e apoiar decis√µes estrat√©gicas do RH



In [None]:
# =========================
# ETAPA 9 ‚Äî THRESHOLD TUNING COM RECALL M√çNIMO (UNIFICADA E CORRIGIDA)
# =========================

# -------------------------------------------------
# 9.0 ‚Äî GARANTIA DE INPUT
# (corrige o erro: pred_bal n√£o existia)
# -------------------------------------------------
pred_bal = model_bal.transform(test_df)

# -------------------------------------------------
# 9.1 ‚Äî Extrai probabilidade da classe positiva (y=1)
# -------------------------------------------------
pred_thr = (
    pred_bal
    .select(
        F.col("y").cast("int").alias("y"),
        vector_to_array(F.col("probability"))[1].alias("p1")
    )
)

# -------------------------------------------------
# 9.2 ‚Äî Lista de thresholds (foco na regi√£o real do p1)
# -------------------------------------------------
thr_list = (
    [round(i / 100, 2) for i in range(10, 51, 5)] +     # 0.10..0.50
    [round(i / 1000, 3) for i in range(450, 551, 10)]  # 0.45..0.55
)
thr_list = sorted(set(thr_list))

# -------------------------------------------------
# 9.3 ‚Äî Fun√ß√£o de m√©tricas
# -------------------------------------------------
def metrics_for_threshold(df, thr):
    tmp = df.withColumn("pred", F.when(F.col("p1") >= F.lit(thr), 1).otherwise(0))
    r = tmp.agg(
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==1), 1).otherwise(0)).alias("tp"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred")==1), 1).otherwise(0)).alias("fp"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==0), 1).otherwise(0)).alias("fn"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred")==0), 1).otherwise(0)).alias("tn"),
    ).collect()[0]

    TP, FP, FN, TN = r["tp"], r["fp"], r["fn"], r["tn"]
    precision = TP / (TP + FP) if (TP + FP) else 0.0
    recall    = TP / (TP + FN) if (TP + FN) else 0.0
    f1        = (2*precision*recall/(precision+recall)) if (precision+recall) else 0.0

    return {
        "thr": thr,
        "tp": TP, "fp": FP, "fn": FN, "tn": TN,
        "precision": precision,
        "recall": recall,
        "f1": f1
    }

# -------------------------------------------------
# 9.4 ‚Äî Varredura completa
# -------------------------------------------------
results = [metrics_for_threshold(pred_thr, thr) for thr in thr_list]

# -------------------------------------------------
# 9.5 ‚Äî Sele√ß√£o autom√°tica por recall m√≠nimo
# -------------------------------------------------
targets = [0.80, 0.70, 0.60]

def pick_best(rows, recall_min):
    eligible = [r for r in rows if r["recall"] >= recall_min]
    if not eligible:
        return None
    eligible.sort(
        key=lambda r: (r["precision"], r["f1"], r["thr"]),
        reverse=True
    )
    return eligible[0]

best = None
best_target = None
for t in targets:
    best = pick_best(results, t)
    if best:
        best_target = t
        break

# -------------------------------------------------
# 9.6 ‚Äî OUTPUTS
# -------------------------------------------------
print("\n[Etapa 9.1] Distribui√ß√£o p1:")
pred_thr.select("p1").summary("min","25%","50%","75%","max","mean").show(truncate=False)

print("\n[Etapa 9.2] Amostra de thresholds:")
for r in results:
    if r["thr"] in [0.10, 0.20, 0.30, 0.40, 0.45, 0.48, 0.50]:
        print(f"thr={r['thr']:.2f} | Prec={r['precision']:.3f} Rec={r['recall']:.3f} F1={r['f1']:.3f}")

if best is None:
    print("\n[Etapa 9.3] ‚ùå Nenhum threshold atingiu Recall >= 0.60")
else:
    print(f"\n[Etapa 9.3] ‚úÖ Melhor threshold com Recall >= {best_target:.2f}")
    print(f"Threshold = {best['thr']}")
    print(f"TP={best['tp']} FP={best['fp']} FN={best['fn']} TN={best['tn']}")
    print(f"Precision = {best['precision']:.4f}")
    print(f"Recall    = {best['recall']:.4f}")
    print(f"F1        = {best['f1']:.4f}")

# -------------------------------------------------
# 9.7 ‚Äî Predi√ß√£o final com threshold escolhido
# -------------------------------------------------
if best:
    pred_final = (
        pred_bal
        .withColumn("p1", vector_to_array(F.col("probability"))[1])
        .withColumn("pred_final", F.when(F.col("p1") >= F.lit(best["thr"]), 1).otherwise(0))
    )

    print("\n[Etapa 9.7] Preview pred_final:")
    pred_final.select("y", "prediction", "p1", "pred_final").show(10, truncate=False)



# MOMENTO 6 ‚Äî Produto final, reflex√£o e encerramento

## ETAPA 10 ‚Äî An√°lise cr√≠tica dos resultados e limita√ß√µes do modelo
**Resumo:** Discutir limita√ß√µes t√©cnicas, vieses dos dados e impactos dessas restri√ß√µes na aplica√ß√£o pr√°tica do modelo.


In [None]:
# =========================
# ETAPA 10 ‚Äî M√©tricas por √°rea (segmenta√ß√£o)
# FIX: resolve NameError (DF n√£o definido) + evita DIVIDE_BY_ZERO
# =========================
from pyspark.sql import functions as F

# 10.0 ‚Äî Resolver automaticamente qual dataframe usar
# (ordem de prefer√™ncia: pred_final -> pred_thr -> pred_prob -> predictions -> pred_bal)
_candidates = ["pred_final", "pred_thr", "pred_prob", "predictions", "pred_bal", "pred_final_df"]
df_seg = None
for _name in _candidates:
    if _name in globals() and globals()[_name] is not None:
        df_seg = globals()[_name]
        print(f"[Etapa 10.0] Usando dataframe: {_name}")
        break

if df_seg is None:
    raise NameError(
        "Nenhum dataframe de predi√ß√£o encontrado. Esperado um destes nomes: "
        + ", ".join(_candidates)
        + ". Crie/defina o DF da etapa 9 e rode a etapa 10 novamente."
    )

# 10.0.1 ‚Äî Descobrir coluna de √°rea (fallbacks)
# voc√™ tem: app_area_atuacao (do output da etapa 10.1)
area_col = None
for c in ["app_area_atuacao", "app_area_atuacao_norm", "area_atuacao", "app_area", "area", "segmento"]:
    if c in df_seg.columns:
        area_col = c
        break
if area_col is None:
    raise NameError(
        "N√£o encontrei coluna de √°rea. Procurei por: app_area_atuacao, app_area_atuacao_norm, area_atuacao, app_area, area, segmento.\n"
        f"Colunas dispon√≠veis: {df_seg.columns}"
    )

# 10.0.2 ‚Äî Garantir colunas y e pred_final
# y deve existir (etapas anteriores). pred_final pode estar como pred_final ou prediction/prediction_final.
y_col = "y" if "y" in df_seg.columns else None
if y_col is None:
    raise NameError(f"N√£o encontrei coluna 'y'. Colunas dispon√≠veis: {df_seg.columns}")

pred_col = None
for c in ["pred_final", "prediction_final", "pred", "prediction"]:
    if c in df_seg.columns:
        pred_col = c
        break
if pred_col is None:
    raise NameError(
        "N√£o encontrei coluna de predi√ß√£o final. Procurei por: pred_final, prediction_final, pred, prediction.\n"
        f"Colunas dispon√≠veis: {df_seg.columns}"
    )

print(f"[Etapa 10.0] Colunas usadas: area='{area_col}' | y='{y_col}' | pred='{pred_col}'")

# 10.1 ‚Äî Sanidade base segmentada
print("\n[Etapa 10.1] Sanidade base segmentada:")
(
    df_seg
    .groupBy(F.coalesce(F.col(area_col), F.lit("unknown")).alias(area_col))
    .count()
    .orderBy(F.desc("count"))
    .show(20, truncate=False)
)

# 10.2 ‚Äî M√©tricas por √°rea (TP/FP/FN/TN + Precision/Recall/F1) com divis√£o segura
def safe_div(num_col, den_col):
    return F.when(den_col != 0, (num_col / den_col)).otherwise(F.lit(None).cast("double"))

agg = (
    df_seg
    .select(
        F.coalesce(F.col(area_col), F.lit("unknown")).alias("area"),
        F.col(y_col).cast("int").alias("y"),
        F.col(pred_col).cast("int").alias("pred")
    )
    .groupBy("area")
    .agg(
        F.count(F.lit(1)).alias("n_total"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==1), 1).otherwise(0)).alias("TP"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred")==1), 1).otherwise(0)).alias("FP"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==0), 1).otherwise(0)).alias("FN"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred")==0), 1).otherwise(0)).alias("TN"),
        F.sum(F.when(F.col("y")==1, 1).otherwise(0)).alias("n_y1"),
        F.sum(F.when(F.col("pred")==1, 1).otherwise(0)).alias("n_pred1"),
    )
)

metrics_by_area = (
    agg
    .withColumn("precision_y1", safe_div(F.col("TP"), (F.col("TP")+F.col("FP"))))
    .withColumn("recall_y1",    safe_div(F.col("TP"), (F.col("TP")+F.col("FN"))))
    .withColumn("f1_y1", F.when(
        (F.col("precision_y1").isNotNull()) & (F.col("recall_y1").isNotNull()) & ((F.col("precision_y1")+F.col("recall_y1")) != 0),
        2*F.col("precision_y1")*F.col("recall_y1")/(F.col("precision_y1")+F.col("recall_y1"))
    ).otherwise(F.lit(None).cast("double")))
    .withColumn("prevalencia_y1", safe_div(F.col("n_y1"), F.col("n_total")))
    .orderBy(F.desc("n_total"))
)

print("\n[Etapa 10.2] M√©tricas por √°rea (ordenado por volume):")
metrics_by_area.select(
    F.col("area").alias(area_col),
    "n_total","n_y1","n_pred1","TP","FP","FN","TN",
    F.round("prevalencia_y1",4).alias("prev_y1"),
    F.round("precision_y1",4).alias("precision_y1"),
    F.round("recall_y1",4).alias("recall_y1"),
    F.round("f1_y1",4).alias("f1_y1"),
).show(30, truncate=False)

# 10.3 ‚Äî (Opcional) s√≥ √°reas com volume m√≠nimo
print("\n[Etapa 10.3] M√©tricas por √°rea (apenas n_total >= 50):")
(
    metrics_by_area
    .where(F.col("n_total") >= 50)
    .select(
        F.col("area").alias(area_col),
        "n_total","n_y1",
        F.round("precision_y1",4).alias("precision_y1"),
        F.round("recall_y1",4).alias("recall_y1"),
        F.round("f1_y1",4).alias("f1_y1"),
    )
    .show(30, truncate=False)
)


## ETAPA 11 ‚Äî Propostas de melhoria e evolu√ß√£o do projeto
**Resumo:** Apresentar caminhos futuros para aprimorar dados, modelos, m√©tricas e integra√ß√£o com processos de RH.


In [None]:
# =========================
# ETAPA 11 (OP√á√ÉO B) ‚Äî FIX: cria p1 a partir de probability quando p1 n√£o existir
# =========================

from pyspark.sql import functions as F
from pyspark.ml.functions import vector_to_array

# -------------------------------------------------
# 11.0 ‚Äî INPUTS (ajuste s√≥ aqui)
# -------------------------------------------------
df_in = pred_bal  # <<< DF que tem: y, app_area_atuacao, probability (vector) e/ou p1
area_col = "app_area_atuacao"
y_col    = "y"

# Se j√° tiver p1, use esse nome. Se n√£o, extrairemos do vector probability.
p1_col = "p1"
prob_col = "probability"   # padr√£o do Spark ML

# Alvo de recall por √°rea (voc√™ pediu come√ßar pelo m√≠nimo)
target_recall = 0.60

# Regras de estabilidade
min_n_total = 50
min_pos     = 10

# Fallback global (da sua etapa 9)
global_thr = 0.46

print(f"[Etapa 11.0] DF usado = {df_in}")
print(f"[Etapa 11.0] Config: target_recall={target_recall} | min_n_total={min_n_total} | min_pos={min_pos} | global_thr={global_thr}")

# -------------------------------------------------
# 11.0b ‚Äî Garante que existe coluna p1
# -------------------------------------------------
cols = set(df_in.columns)

if p1_col in cols:
    df_scored = df_in
    print("[Etapa 11.0b] Coluna 'p1' j√° existe ‚Äî ok.")
else:
    if prob_col not in cols:
        raise ValueError(f"N√£o encontrei '{p1_col}' nem '{prob_col}' no df_in. Colunas dispon√≠veis: {sorted(list(cols))[:40]} ...")
    df_scored = df_in.withColumn(p1_col, vector_to_array(F.col(prob_col))[1].cast("double"))
    print("[Etapa 11.0b] 'p1' n√£o existia ‚Äî criada a partir de probability[1].")

print("\n[Etapa 11.0b] Preview (y, area, p1):")
df_scored.select(F.col(y_col).cast("int").alias("y"),
                 F.col(area_col).cast("string").alias(area_col),
                 F.col(p1_col).cast("double").alias("p1")).show(5, truncate=False)

# -------------------------------------------------
# 11.1 ‚Äî Base m√≠nima (y, p1, √°rea)
# -------------------------------------------------
df_base = (
    df_scored
    .select(
        F.coalesce(F.col(area_col).cast("string"), F.lit("unknown")).alias(area_col),
        F.col(y_col).cast("int").alias("y"),
        F.col(p1_col).cast("double").alias("p1")
    )
    .fillna({area_col: "unknown"})
)

print("\n[Etapa 11.1] Top √°reas por volume:")
df_base.groupBy(area_col).count().orderBy(F.desc("count")).show(20, truncate=False)

# -------------------------------------------------
# 11.2 ‚Äî Threshold por √°rea via quantil dos POSITIVOS
# -------------------------------------------------
q = float(1.0 - target_recall)  # recall 0.60 => quantil 0.40
q = max(0.0, min(1.0, q))

thr_by_area = (
    df_base
    .groupBy(area_col)
    .agg(
        F.count("*").alias("n_total"),
        F.sum(F.when(F.col("y")==1, 1).otherwise(0)).alias("n_y1"),
        F.expr(f"percentile_approx(CASE WHEN y=1 THEN p1 END, {q}, 10000)").alias("thr_area_raw")
    )
    .withColumn(
        "use_area_thr",
        (F.col("n_total") >= F.lit(min_n_total)) &
        (F.col("n_y1")   >= F.lit(min_pos)) &
        F.col("thr_area_raw").isNotNull()
    )
    .withColumn(
        "thr_use",
        F.when(F.col("use_area_thr"), F.col("thr_area_raw")).otherwise(F.lit(global_thr))
    )
    .select(area_col, "n_total", "n_y1", "use_area_thr", "thr_use")
)

print("\n[Etapa 11.2] Thresholds por √°rea (fallback inclu√≠do):")
thr_by_area.orderBy(F.desc("n_total")).show(50, truncate=False)

# -------------------------------------------------
# 11.3 ‚Äî Aplica threshold por √°rea e cria pred_final_B
# -------------------------------------------------
df_b = (
    df_base
    .join(thr_by_area.select(area_col, "thr_use", "use_area_thr"), on=area_col, how="left")
    .withColumn("pred_final_B", F.when(F.col("p1") >= F.col("thr_use"), 1).otherwise(0))
)

print("\n[Etapa 11.3] Preview:")
df_b.select(area_col, "y", "p1", "thr_use", "use_area_thr", "pred_final_B").show(10, truncate=False)

# -------------------------------------------------
# 11.4 ‚Äî M√©tricas gerais (classe positiva)
# -------------------------------------------------
agg_all = df_b.agg(
    F.sum(F.when((F.col("y")==1) & (F.col("pred_final_B")==1), 1).otherwise(0)).alias("TP"),
    F.sum(F.when((F.col("y")==0) & (F.col("pred_final_B")==1), 1).otherwise(0)).alias("FP"),
    F.sum(F.when((F.col("y")==1) & (F.col("pred_final_B")==0), 1).otherwise(0)).alias("FN"),
    F.sum(F.when((F.col("y")==0) & (F.col("pred_final_B")==0), 1).otherwise(0)).alias("TN"),
).collect()[0]

TP, FP, FN, TN = int(agg_all["TP"]), int(agg_all["FP"]), int(agg_all["FN"]), int(agg_all["TN"])
prec = TP / (TP + FP) if (TP + FP) > 0 else None
rec  = TP / (TP + FN) if (TP + FN) > 0 else None
f1   = (2*prec*rec)/(prec+rec) if (prec is not None and rec is not None and (prec+rec) > 0) else None

print("\n[Etapa 11.4] M√©tricas gerais ‚Äî Op√ß√£o B:")
print(f"TP={TP} FP={FP} FN={FN} TN={TN}")
print(f"Precision(y=1) = {prec:.4f}" if prec is not None else "Precision(y=1) = NULL")
print(f"Recall(y=1)    = {rec:.4f}"  if rec  is not None else "Recall(y=1)    = NULL")
print(f"F1(y=1)        = {f1:.4f}"   if f1   is not None else "F1(y=1)        = NULL")

# -------------------------------------------------
# 11.5 ‚Äî M√©tricas por √°rea (sem divis√£o por zero)
# -------------------------------------------------
metrics_area = (
    df_b
    .groupBy(area_col)
    .agg(
        F.count("*").alias("n_total"),
        F.sum(F.when(F.col("y")==1, 1).otherwise(0)).alias("n_y1"),
        F.sum(F.when(F.col("pred_final_B")==1, 1).otherwise(0)).alias("n_pred1"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred_final_B")==1), 1).otherwise(0)).alias("TP"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred_final_B")==1), 1).otherwise(0)).alias("FP"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred_final_B")==0), 1).otherwise(0)).alias("FN"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred_final_B")==0), 1).otherwise(0)).alias("TN"),
    )
    .withColumn("prev_y1", F.when(F.col("n_total")>0, F.col("n_y1")/F.col("n_total")).otherwise(F.lit(None)))
    .withColumn("precision_y1", F.when((F.col("TP")+F.col("FP"))>0, F.col("TP")/(F.col("TP")+F.col("FP"))).otherwise(F.lit(None)))
    .withColumn("recall_y1",    F.when((F.col("TP")+F.col("FN"))>0, F.col("TP")/(F.col("TP")+F.col("FN"))).otherwise(F.lit(None)))
    .withColumn(
        "f1_y1",
        F.when(
            (F.col("precision_y1").isNotNull()) & (F.col("recall_y1").isNotNull()) & ((F.col("precision_y1")+F.col("recall_y1"))>0),
            (2*F.col("precision_y1")*F.col("recall_y1"))/(F.col("precision_y1")+F.col("recall_y1"))
        ).otherwise(F.lit(None))
    )
)

print(f"\n[Etapa 11.5] M√©tricas por √°rea (n_total >= {min_n_total}):")
metrics_area.where(F.col("n_total") >= F.lit(min_n_total)).orderBy(F.desc("n_total")).select(
    area_col, "n_total", "n_y1", "precision_y1", "recall_y1", "f1_y1"
).show(50, truncate=False)

# -------------------------------------------------
# 11.6 ‚Äî OUTPUTS
# -------------------------------------------------
pred_final_B = df_b
threshold_table = thr_by_area

print("\n[Etapa 11] OUTPUTS prontos: pred_final_B / threshold_table")



## ETAPA 12 ‚Äî Considera√ß√µes finais e aprendizados do projeto
**Resumo:** Consolidar os principais aprendizados t√©cnicos e anal√≠ticos obtidos ao longo do desenvolvimento do projeto.


In [None]:
# ============================================================
# ETAPA 12 (UNIFICADA) ‚Äî PR-curve + escolha de thresholds
# Objetivo: manter recall >= target_recall e maximizar precision
# Sa√≠das: best_thr_global, threshold_table_best, pred_final_B2
# ============================================================

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.functions import vector_to_array

# ----------------------------
# 12.0 ‚Äî INPUTS (ajuste s√≥ aqui)
# ----------------------------
df_in = pred_bal  # <- ajuste se o seu DF tiver outro nome

y_col = "y"
area_col = "app_area_atuacao"
prob_col = "probability"

target_recall = 0.60
min_n_total = 50
min_pos = 10

# grid de thresholds (0.00..1.00)
thr_grid = [i/100 for i in range(0, 101)]
thr_df = spark.createDataFrame([(t,) for t in thr_grid], ["thr"])

print(f"[Etapa 12.0] DF usado = {df_in}")
print(f"[Etapa 12.0] Config: target_recall={target_recall} | min_n_total={min_n_total} | min_pos={min_pos}")

# ----------------------------
# 12.1 ‚Äî Base de scoring (y, area, p1)
# ----------------------------
df_sc = (
    df_in
    .select(
        F.col(y_col).cast("int").alias("y"),
        F.coalesce(F.col(area_col).cast("string"), F.lit("unknown")).alias(area_col),
        vector_to_array(F.col(prob_col))[1].cast("double").alias("p1")
    )
)

print("[Etapa 12.1] Preview (y, area, p1):")
df_sc.select("y", area_col, "p1").show(5, truncate=False)

# ----------------------------
# 12.2 ‚Äî PR GLOBAL: escolhe best_thr_global
# Max precision mantendo recall >= target_recall
# ----------------------------
df_metrics_global = (
    df_sc.select("y", "p1").crossJoin(thr_df)
    .withColumn("pred", F.when(F.col("p1") >= F.col("thr"), 1).otherwise(0))
    .groupBy("thr")
    .agg(
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==1), 1).otherwise(0)).alias("TP"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred")==1), 1).otherwise(0)).alias("FP"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==0), 1).otherwise(0)).alias("FN"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred")==0), 1).otherwise(0)).alias("TN"),
    )
    .withColumn("precision", F.when((F.col("TP")+F.col("FP"))>0, F.col("TP")/(F.col("TP")+F.col("FP"))))
    .withColumn("recall",    F.when((F.col("TP")+F.col("FN"))>0, F.col("TP")/(F.col("TP")+F.col("FN"))))
    .withColumn(
        "f1",
        F.when(
            (F.col("precision").isNotNull()) & (F.col("recall").isNotNull()) & ((F.col("precision")+F.col("recall"))>0),
            (2*F.col("precision")*F.col("recall"))/(F.col("precision")+F.col("recall"))
        )
    )
)

print("[Etapa 12.2] Top thresholds globais (recall >= alvo) ordenado por precision:")
df_best_global = (
    df_metrics_global
    .where(F.col("recall") >= F.lit(target_recall))
    .orderBy(F.desc("precision"), F.desc("f1"), F.desc("thr"))
)
df_best_global.show(20, truncate=False)

best_row = df_best_global.first()
best_thr_global = float(best_row["thr"]) if best_row else None
print(f"[Etapa 12.2] BEST global_thr (max precision com recall>={target_recall}): {best_thr_global}")

# fallback se por algum motivo n√£o existir linha (muito raro)
if best_thr_global is None:
    best_thr_global = 0.50
    print(f"[Etapa 12.2] WARNING: fallback best_thr_global={best_thr_global}")

# ----------------------------
# 12.3 ‚Äî √Åreas eleg√≠veis (volume e positivos)
# ----------------------------
area_stats = (
    df_sc.groupBy(area_col)
    .agg(
        F.count("*").alias("n_total"),
        F.sum(F.when(F.col("y")==1, 1).otherwise(0)).alias("n_y1")
    )
)

print("[Etapa 12.3] Top √°reas por volume:")
area_stats.orderBy(F.desc("n_total")).show(20, truncate=False)

area_eligible = area_stats.where((F.col("n_total")>=min_n_total) & (F.col("n_y1")>=min_pos)).select(area_col)

# ----------------------------
# 12.4 ‚Äî PR POR √ÅREA: escolhe thr_area_best
# Max precision mantendo recall >= target_recall (por √°rea)
# ----------------------------
df_m_area = (
    df_sc.join(area_eligible, on=area_col, how="inner")
    .crossJoin(thr_df)
    .withColumn("pred", F.when(F.col("p1") >= F.col("thr"), 1).otherwise(0))
    .groupBy(area_col, "thr")
    .agg(
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==1), 1).otherwise(0)).alias("TP"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred")==1), 1).otherwise(0)).alias("FP"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred")==0), 1).otherwise(0)).alias("FN"),
    )
    .withColumn("precision", F.when((F.col("TP")+F.col("FP"))>0, F.col("TP")/(F.col("TP")+F.col("FP"))))
    .withColumn("recall",    F.when((F.col("TP")+F.col("FN"))>0, F.col("TP")/(F.col("TP")+F.col("FN"))))
    .withColumn(
        "f1",
        F.when(
            (F.col("precision").isNotNull()) & (F.col("recall").isNotNull()) & ((F.col("precision")+F.col("recall"))>0),
            (2*F.col("precision")*F.col("recall"))/(F.col("precision")+F.col("recall"))
        )
    )
)

w = Window.partitionBy(area_col).orderBy(F.desc("precision"), F.desc("f1"), F.desc("thr"))

threshold_table_best = (
    df_m_area
    .where(F.col("recall") >= F.lit(target_recall))
    .withColumn("rn", F.row_number().over(w))
    .where(F.col("rn")==1)
    .select(
        area_col,
        F.col("thr").alias("thr_area_best"),
        "precision",
        "recall",
        "f1"
    )
)

print("[Etapa 12.4] Thresholds BEST por √°rea (max precision com recall >= alvo):")
threshold_table_best.orderBy(F.desc("precision")).show(50, truncate=False)

# ----------------------------
# 12.5 ‚Äî Aplica threshold por √°rea + fallback global
# ----------------------------
pred_final_B2 = (
    df_sc
    .join(threshold_table_best.select(area_col, "thr_area_best"), on=area_col, how="left")
    .withColumn("thr_use", F.coalesce(F.col("thr_area_best"), F.lit(best_thr_global)))
    .withColumn("pred_final_B2", F.when(F.col("p1") >= F.col("thr_use"), 1).otherwise(0))
)

print("[Etapa 12.5] Preview (area, y, p1, thr_use, pred_final_B2):")
pred_final_B2.select(area_col, "y", "p1", "thr_use", "pred_final_B2").show(10, truncate=False)

# ----------------------------
# 12.6 ‚Äî M√©tricas gerais (B2)
# ----------------------------
m_global = (
    pred_final_B2
    .agg(
        F.sum(F.when((F.col("y")==1) & (F.col("pred_final_B2")==1), 1).otherwise(0)).alias("TP"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred_final_B2")==1), 1).otherwise(0)).alias("FP"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred_final_B2")==0), 1).otherwise(0)).alias("FN"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred_final_B2")==0), 1).otherwise(0)).alias("TN"),
    )
    .collect()[0]
)

TP, FP, FN, TN = int(m_global["TP"]), int(m_global["FP"]), int(m_global["FN"]), int(m_global["TN"])
precision = TP/(TP+FP) if (TP+FP)>0 else None
recall    = TP/(TP+FN) if (TP+FN)>0 else None
f1        = (2*precision*recall)/(precision+recall) if (precision is not None and recall is not None and (precision+recall)>0) else None

print("[Etapa 12.6] M√©tricas gerais ‚Äî Op√ß√£o B2 (PR-otimizada):")
print(f"TP={TP} FP={FP} FN={FN} TN={TN}")
print(f"Precision(y=1) = {precision}")
print(f"Recall(y=1)    = {recall}")
print(f"F1(y=1)        = {f1}")

# ----------------------------
# 12.7 ‚Äî M√©tricas por √°rea (n_total >= min_n_total)
# ----------------------------
df_area_metrics = (
    pred_final_B2
    .groupBy(area_col)
    .agg(
        F.count("*").alias("n_total"),
        F.sum(F.when(F.col("y")==1, 1).otherwise(0)).alias("n_y1"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred_final_B2")==1), 1).otherwise(0)).alias("TP"),
        F.sum(F.when((F.col("y")==0) & (F.col("pred_final_B2")==1), 1).otherwise(0)).alias("FP"),
        F.sum(F.when((F.col("y")==1) & (F.col("pred_final_B2")==0), 1).otherwise(0)).alias("FN"),
    )
    .withColumn("precision_y1", F.when((F.col("TP")+F.col("FP"))>0, F.col("TP")/(F.col("TP")+F.col("FP"))))
    .withColumn("recall_y1",    F.when((F.col("TP")+F.col("FN"))>0, F.col("TP")/(F.col("TP")+F.col("FN"))))
    .withColumn(
        "f1_y1",
        F.when(
            (F.col("precision_y1").isNotNull()) & (F.col("recall_y1").isNotNull()) & ((F.col("precision_y1")+F.col("recall_y1"))>0),
            (2*F.col("precision_y1")*F.col("recall_y1"))/(F.col("precision_y1")+F.col("recall_y1"))
        )
    )
)

print(f"[Etapa 12.7] M√©tricas por √°rea (n_total >= {min_n_total}):")
df_area_metrics.where(F.col("n_total")>=F.lit(min_n_total)).orderBy(F.desc("n_total")).select(
    area_col, "n_total", "n_y1", "precision_y1", "recall_y1", "f1_y1"
).show(50, truncate=False)

print("[Etapa 12] OUTPUTS prontos: pred_final_B2 / threshold_table_best / best_thr_global")


In [None]:
# =========================================
# ETAPA 12.8 ‚Äî Exportar CSV final para Streamlit
# =========================================

# Escolha do DF final
df_final = pred_final_B2   # ou pred_final_B, se preferir

# Colunas m√≠nimas que o app usa
cols_export = [
    "app_area_atuacao",
    "p1",
    "pred_final_B2"
]

# Inclui y se existir (para m√©tricas no app)
if "y" in df_final.columns:
    cols_export.append("y")

df_export = df_final.select(*cols_export)

# Converter para Pandas (Streamlit trabalha com CSV/Pandas)
pdf = df_export.toPandas()

# Caminho padr√£o usado pelo app
OUTPUT_PATH = "/content/predicoes_streamlit.csv"
pdf.to_csv(OUTPUT_PATH, index=False)

print("‚úÖ CSV para o Streamlit gerado com sucesso!")
print("üìÅ Caminho:", OUTPUT_PATH)
print("üîé Preview:")
print(pdf.head())


## ETAPA 13 ‚Äî Setup e execu√ß√£o do aplicativo Streamlit
**Resumo:** Configurar ambiente, estruturar o app e disponibilizar a aplica√ß√£o interativa como entrega final do projeto.



In [None]:
# =========================================
# ETAPA 13 (UNIFICADA) ‚Äî Streamlit no Google Colab (FIX cloudflared)
# =========================================

# 1) Instalar Streamlit (cloudflared via bin√°rio oficial, N√ÉO via pip)
!pip -q install streamlit pandas numpy

import os, json, time, re, subprocess
from pathlib import Path

APP_DIR = Path("/content/streamlit_app")
APP_DIR.mkdir(parents=True, exist_ok=True)

# 2) Baixar bin√°rio oficial do cloudflared (Linux x86_64 ‚Äî padr√£o do Colab)
CLOUDFLARED_PATH = Path("/content/cloudflared")
if not CLOUDFLARED_PATH.exists():
    !wget -q -O /content/cloudflared https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64
    !chmod +x /content/cloudflared

# 3) (Opcional) Se seus arquivos estiverem em outro caminho, ajuste aqui:
THRESHOLD_CSV_DEFAULT = "/content/threshold_table_best.csv"
GLOBAL_JSON_DEFAULT   = "/content/best_thr_global.json"

# 4) Escrever o app Streamlit
app_code = rf'''
import json
import numpy as np
import pandas as pd
import streamlit as st
import re

st.set_page_config(page_title="Scoring de Candidatos ‚Äî Threshold por √Årea", layout="wide")
st.title("üìå Scoring de Candidatos ‚Äî Threshold por √Årea (fallback global)")

# --------------------------------
# Helpers
# --------------------------------
def load_best_thr(path_json: str, default_thr: float = 0.48) -> float:
    try:
        with open(path_json, "r", encoding="utf-8") as f:
            data = json.load(f)
        if isinstance(data, dict):
            for k in ["best_thr_global", "global_thr", "thr", "threshold"]:
                if k in data:
                    return float(data[k])
            return float(list(data.values())[0])
        return float(data)
    except Exception:
        return float(default_thr)

def load_threshold_table(path_csv: str) -> pd.DataFrame:
    df = pd.read_csv(path_csv)
    expected = {{"app_area_atuacao", "thr_area_best"}}
    if not expected.issubset(set(df.columns)):
        raise ValueError(
            f"threshold_table_best.csv precisa ter as colunas {{sorted(list(expected))}}. "
            f"Encontradas: {{df.columns.tolist()}}"
        )
    df["app_area_atuacao"] = df["app_area_atuacao"].astype(str).fillna("unknown")
    df["thr_area_best"] = pd.to_numeric(df["thr_area_best"], errors="coerce")
    return df

def ensure_p1(df: pd.DataFrame) -> pd.DataFrame:
    if "p1" in df.columns:
        df["p1"] = pd.to_numeric(df["p1"], errors="coerce")
        return df

    if "probability" in df.columns:
        def parse_prob(x):
            if pd.isna(x):
                return np.nan
            if isinstance(x, (list, tuple, np.ndarray)) and len(x) >= 2:
                return float(x[1])
            s = str(x).strip()
            nums = re.findall(r"[-+]?\d*\.\d+|[-+]?\d+", s)
            if len(nums) >= 2:
                return float(nums[1])
            return np.nan

        df["p1"] = df["probability"].apply(parse_prob)
        return df

    return df

def apply_thresholds(df: pd.DataFrame, thr_table: pd.DataFrame, global_thr: float) -> pd.DataFrame:
    out = df.copy()
    out["app_area_atuacao"] = out.get("app_area_atuacao", "unknown").astype(str).fillna("unknown")

    thr = thr_table[["app_area_atuacao","thr_area_best"]].copy()
    out = out.merge(thr, on="app_area_atuacao", how="left")

    out["thr_use"] = out["thr_area_best"].fillna(global_thr).astype(float)
    out["use_area_thr"] = out["thr_area_best"].notna()

    if "p1" not in out.columns:
        raise ValueError("N√£o encontrei 'p1'. Envie CSV com 'p1' OU 'probability' parse√°vel.")

    out["p1"] = pd.to_numeric(out["p1"], errors="coerce")
    if out["p1"].isna().all():
        raise ValueError("'p1' existe, mas est√° toda nula/NaN. Verifique seu arquivo.")

    out["pred_final"] = (out["p1"] >= out["thr_use"]).astype(int)
    return out

def metrics_if_y_exists(df: pd.DataFrame) -> dict:
    if "y" not in df.columns:
        return {{}}
    y = pd.to_numeric(df["y"], errors="coerce").fillna(0).astype(int)
    pred = pd.to_numeric(df["pred_final"], errors="coerce").fillna(0).astype(int)

    tp = int(((y==1) & (pred==1)).sum())
    fp = int(((y==0) & (pred==1)).sum())
    fn = int(((y==1) & (pred==0)).sum())
    tn = int(((y==0) & (pred==0)).sum())

    prec = tp / (tp + fp) if (tp+fp) else 0.0
    rec  = tp / (tp + fn) if (tp+fn) else 0.0
    f1   = (2*prec*rec)/(prec+rec) if (prec+rec) else 0.0

    return {{"TP":tp,"FP":fp,"FN":fn,"TN":tn,"precision":prec,"recall":rec,"f1":f1}}

def to_csv_bytes(df: pd.DataFrame) -> bytes:
    return df.to_csv(index=False).encode("utf-8")

# --------------------------------
# Sidebar config
# --------------------------------
with st.sidebar:
    st.header("Config")
    threshold_csv = st.text_input("Caminho do threshold_table_best.csv", value="{THRESHOLD_CSV_DEFAULT}")
    global_json   = st.text_input("Caminho do best_thr_global.json", value="{GLOBAL_JSON_DEFAULT}")
    default_thr   = st.number_input("Fallback global (se JSON falhar)", value=0.48, min_value=0.0, max_value=1.0, step=0.01)

# Carrega thresholds
try:
    thr_table = load_threshold_table(threshold_csv)
    global_thr = load_best_thr(global_json, default_thr=float(default_thr))
    st.success(f"Thresholds carregados ‚úÖ | global_thr={{global_thr:.2f}} | √°reas={{thr_table.shape[0]}}")
except Exception as e:
    st.error(f"Falha ao carregar thresholds: {{e}}")
    st.stop()

st.caption("Envie um CSV com pelo menos: **app_area_atuacao** e **p1** (ou **probability** parse√°vel). Se tiver **y**, calcula m√©tricas.")

uploaded = st.file_uploader("üì§ Upload do CSV de candidatos", type=["csv"])
if uploaded is None:
    st.info("Fa√ßa upload de um CSV para come√ßar.")
    st.stop()

# L√™ CSV
try:
    df_raw = pd.read_csv(uploaded)
except Exception as e:
    st.error(f"N√£o consegui ler seu CSV: {{e}}")
    st.stop()

# Valida m√≠nima
if "app_area_atuacao" not in df_raw.columns:
    st.error("Coluna obrigat√≥ria ausente: **app_area_atuacao**")
    st.write("Colunas encontradas:", df_raw.columns.tolist())
    st.stop()

df_raw = ensure_p1(df_raw)

# Aplica thresholds
try:
    df_scored = apply_thresholds(df_raw, thr_table, global_thr)
except Exception as e:
    st.error(f"Erro ao aplicar thresholds: {{e}}")
    st.write("Colunas encontradas:", df_raw.columns.tolist())
    st.stop()

# KPIs
c1, c2, c3, c4 = st.columns(4)
c1.metric("Registros", f"{{len(df_scored):,}}".replace(",", "."))
c2.metric("Pred=1", f"{{int(df_scored['pred_final'].sum()):,}}".replace(",", "."))
c3.metric("Global thr", f"{{global_thr:.2f}}")
c4.metric("√Åreas no CSV", f"{{df_scored['app_area_atuacao'].nunique():,}}".replace(",", "."))

m = metrics_if_y_exists(df_scored)
if m:
    st.subheader("üìà M√©tricas (seu CSV tem coluna y)")
    mc1, mc2, mc3, mc4 = st.columns(4)
    mc1.metric("Precision", f"{{m['precision']:.4f}}")
    mc2.metric("Recall", f"{{m['recall']:.4f}}")
    mc3.metric("F1", f"{{m['f1']:.4f}}")
    mc4.metric("TP / FP / FN / TN", f"{{m['TP']}} / {{m['FP']}} / {{m['FN']}} / {{m['TN']}}")

st.subheader("üîé Preview")
st.dataframe(df_scored.head(50), use_container_width=True)

st.subheader("‚¨áÔ∏è Download")
st.download_button(
    "Baixar CSV com pred_final (scored.csv)",
    data=to_csv_bytes(df_scored),
    file_name="scored.csv",
    mime="text/csv"
)

with st.expander("Ver thresholds por √°rea (tabela carregada)"):
    st.dataframe(thr_table.sort_values("app_area_atuacao").reset_index(drop=True), use_container_width=True)
'''

(APP_DIR / "app.py").write_text(app_code, encoding="utf-8")
print("‚úÖ App escrito em:", APP_DIR / "app.py")

# 5) Limpar processos antigos (se existirem)
!pkill -f "streamlit run" -9 || true
!pkill -f "cloudflared tunnel" -9 || true

# 6) Subir Streamlit
p_streamlit = subprocess.Popen(
    ["streamlit", "run", str(APP_DIR / "app.py"), "--server.port", "8501", "--server.address", "0.0.0.0"],
    stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)

# 7) Abrir tunnel com o BIN√ÅRIO /content/cloudflared
p_tunnel = subprocess.Popen(
    [str(CLOUDFLARED_PATH), "tunnel", "--url", "http://localhost:8501"],
    stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True
)

# 8) Capturar URL p√∫blica
public_url = None
start = time.time()
while time.time() - start < 60:
    line = p_tunnel.stdout.readline()
    if not line:
        continue
    m = re.search(r"(https://[a-zA-Z0-9\-]+\.trycloudflare\.com)", line)
    if m:
        public_url = m.group(1)
        break

print("\n" + "="*60)
if public_url:
    print("‚úÖ Streamlit no Colab rodando! Acesse a URL:")
    print(public_url)
else:
    print("‚ö†Ô∏è N√£o consegui capturar a URL automaticamente.")
    print("‚û°Ô∏è Veja as linhas acima do cloudflared e procure um link .trycloudflare.com")
print("="*60)

# 9) Mostrar alguns logs iniciais do Streamlit (pra garantir que subiu)
print("\n--- LOGS Streamlit (primeiras linhas) ---")
for _ in range(20):
    try:
        print(p_streamlit.stdout.readline().rstrip())
    except:
        break


In [None]:
# =========================================
# ETAPA 16.1 ‚Äî FIX: textos vazios => sim_score_bert tudo 0
# Diagn√≥stico + sele√ß√£o autom√°tica de colunas textuais + recompute embeddings
# =========================================

import os
import numpy as np
import pandas as pd

import torch
from sentence_transformers import SentenceTransformer

from pyspark.sql import functions as F
from pyspark.sql import types as T

# -------------------------
# Configs
# -------------------------
MODEL_NAME = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

CACHE_DIR = "/content/nlp_cache"
os.makedirs(CACHE_DIR, exist_ok=True)

APP_EMB_PATH = os.path.join(CACHE_DIR, "app_embeddings.parquet")
JOB_EMB_PATH = os.path.join(CACHE_DIR, "job_embeddings.parquet")
OUT_SCORED_PATH = os.path.join(CACHE_DIR, "df_scored_with_bert.parquet")

print(f"[Etapa 16.1] DEVICE={DEVICE} | MODEL={MODEL_NAME} | CACHE={CACHE_DIR}")

# -------------------------
# Inputs obrigat√≥rios
# -------------------------
df_apps = globals().get("df_apps", None)
df_vagas = globals().get("df_vagas", None)

if df_apps is None or df_vagas is None:
    raise ValueError("N√£o encontrei df_apps e/ou df_vagas no ambiente. Rode as etapas anteriores que criam esses DataFrames.")

# -------------------------
# 16.1a ‚Äî Join robusto (mesmo que s√≥ exista __root_id)
# -------------------------
PREFERRED_JOIN_CANDIDATES = [
    "vaga_id","job_id","id_vaga","position_id","id_position",
    "codigo_vaga","cod_vaga","id_job","jobid","vacancy_id","__root_id"
]

def detect_join_key(df_left, df_right, preferred):
    common = sorted(list(set(df_left.columns).intersection(set(df_right.columns))))
    print(f"\n[Etapa 16.1] Colunas em comum ({len(common)}): {common[:200]}")
    for c in preferred:
        if c in common:
            return c
    if len(common) == 1:
        return common[0]
    return None

JOIN_KEY = detect_join_key(df_apps, df_vagas, PREFERRED_JOIN_CANDIDATES)
if JOIN_KEY is None:
    raise ValueError("N√£o encontrei coluna de join entre df_apps e df_vagas. Voc√™ precisa definir uma chave manualmente.")

print(f"[Etapa 16.1] JOIN_KEY='{JOIN_KEY}'")
df_join = df_apps.join(df_vagas, on=JOIN_KEY, how="inner")

# cria ids se n√£o existirem
if "app_id" not in df_join.columns:
    df_join = df_join.withColumn("app_id", F.monotonically_increasing_id())
if "vaga_id" not in df_join.columns:
    df_join = df_join.withColumn("vaga_id", F.monotonically_increasing_id())

# -------------------------
# 16.1b ‚Äî Auto-descoberta de colunas textuais √∫teis
# -------------------------
# Ideia: entre colunas string, calcular m√©dia do length (ignorando vazios) e escolher as top N.

def top_text_columns(df, exclude_cols=set(), topn=6, sample_frac=0.15, seed=42):
    string_cols = [c for c, t in df.dtypes if t == "string" and c not in exclude_cols]
    if not string_cols:
        return []

    # amostra pra n√£o ficar pesado
    dfx = df
    try:
        dfx = df.sample(withReplacement=False, fraction=sample_frac, seed=seed)
    except Exception:
        pass

    stats = []
    for c in string_cols:
        s = (
            dfx.select(
                F.avg(F.length(F.trim(F.coalesce(F.col(c), F.lit(""))))).alias("avg_len"),
                F.sum((F.length(F.trim(F.coalesce(F.col(c), F.lit("")))) > 0).cast("int")).alias("n_nonempty")
            )
            .collect()[0]
        )
        avg_len = float(s["avg_len"]) if s["avg_len"] is not None else 0.0
        n_nonempty = int(s["n_nonempty"]) if s["n_nonempty"] is not None else 0
        stats.append((c, avg_len, n_nonempty))

    # ordena por "conte√∫do": primeiro n_nonempty, depois avg_len
    stats_sorted = sorted(stats, key=lambda x: (x[2], x[1]), reverse=True)

    print("\n[Etapa 16.1] Top colunas textuais (c, avg_len, n_nonempty) ‚Äî amostra:")
    for row in stats_sorted[:15]:
        print(row)

    chosen = [c for c, avg_len, n_nonempty in stats_sorted if n_nonempty > 0 and avg_len >= 10][:topn]
    return chosen

# vamos excluir ids, chaves e colunas muito ‚Äút√©cnicas‚Äù
exclude_apps = set([JOIN_KEY, "app_id", "vaga_id"])
exclude_vagas = set([JOIN_KEY, "app_id", "vaga_id"])

apps_text_cols = top_text_columns(df_join.select([c for c in df_apps.columns if c in df_join.columns]), exclude_cols=exclude_apps, topn=6)
vagas_text_cols = top_text_columns(df_join.select([c for c in df_vagas.columns if c in df_join.columns]), exclude_cols=exclude_vagas, topn=6)

print(f"\n[Etapa 16.1] Colunas escolhidas p/ candidate_text: {apps_text_cols}")
print(f"[Etapa 16.1] Colunas escolhidas p/ job_text:       {vagas_text_cols}")

# fallback se n√£o achou nada
if not apps_text_cols:
    print("[Etapa 16.1][WARN] N√£o achei colunas textuais √∫teis em df_apps dentro do join. Vou usar qualquer string col do join como fallback.")
    apps_text_cols = [c for c, t in df_join.dtypes if t=="string" and c not in exclude_apps][:3]

if not vagas_text_cols:
    print("[Etapa 16.1][WARN] N√£o achei colunas textuais √∫teis em df_vagas dentro do join. Vou usar qualquer string col do join como fallback.")
    vagas_text_cols = [c for c, t in df_join.dtypes if t=="string" and c not in exclude_vagas][:3]

# -------------------------
# 16.1c ‚Äî Construir textos finais
# -------------------------
df_join = df_join.withColumn(
    "candidate_text",
    F.trim(F.concat_ws("\n", *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in apps_text_cols]))
)

df_join = df_join.withColumn(
    "job_text",
    F.trim(F.concat_ws("\n", *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in vagas_text_cols]))
)

print("\n[Etapa 16.1] Preview tamanhos dos textos (deve ser > 0):")
df_join.select(
    "app_id","vaga_id",
    F.length("candidate_text").alias("len_candidate_text"),
    F.length("job_text").alias("len_job_text")
).show(10, truncate=False)

# sanity check: se job_text continuar 0, n√£o faz sentido seguir
agg = df_join.select(
    F.mean(F.length("candidate_text")).alias("mean_len_candidate"),
    F.mean(F.length("job_text")).alias("mean_len_job"),
    F.sum((F.length("job_text") > 0).cast("int")).alias("n_job_nonempty"),
    F.count("*").alias("n")
).collect()[0]

print("\n[Etapa 16.1] Sanity:")
print(dict(agg.asDict()))

if int(agg["n_job_nonempty"]) == 0:
    raise ValueError(
        "job_text continua vazio para 100% das linhas. Isso indica que df_vagas n√£o trouxe campos de descri√ß√£o/requisitos no join.\n"
        "‚û°Ô∏è Solu√ß√£o: precisamos ajustar quais colunas de texto existem em df_vagas (fora do join) ou corrigir a chave de join (prov√°vel)."
    )

# -------------------------
# 16.1d ‚Äî Modelo BERT
# -------------------------
model = SentenceTransformer(MODEL_NAME, device=DEVICE)

def encode_texts(pdf: pd.DataFrame, text_col: str, id_col: str, batch_size: int = 64) -> pd.DataFrame:
    texts = pdf[text_col].fillna("").astype(str).tolist()
    ids = pdf[id_col].tolist()

    non_empty_texts, non_empty_pos = [], []
    for i, t in enumerate(texts):
        if t.strip():
            non_empty_texts.append(t)
            non_empty_pos.append(i)

    vecs = None
    if non_empty_texts:
        vecs = model.encode(
            non_empty_texts,
            batch_size=batch_size,
            show_progress_bar=True,
            convert_to_numpy=True,
            normalize_embeddings=True
        ).astype(np.float32)

    out_emb = [None] * len(texts)
    if vecs is not None:
        for j, i in enumerate(non_empty_pos):
            out_emb[i] = vecs[j].tolist()

    return pd.DataFrame({id_col: ids, "embedding": out_emb})

def build_embeddings_cache(df_unique, id_col, text_col, cache_path):
    # sempre sobrescreve aqui porque estamos corrigindo texto
    if os.path.exists(cache_path):
        os.remove(cache_path)

    pdf = df_unique.select(id_col, text_col).toPandas()
    emb_pdf = encode_texts(pdf, text_col=text_col, id_col=id_col, batch_size=64)
    emb_pdf.to_parquet(cache_path, index=False)
    return emb_pdf

# uniques
apps_unique = df_join.select("app_id","candidate_text").dropDuplicates(["app_id"])
jobs_unique = df_join.select("vaga_id","job_text").dropDuplicates(["vaga_id"])

print(f"\n[Etapa 16.1] √önicos: apps={apps_unique.count()} | jobs={jobs_unique.count()}")

# embeddings (rebuild)
print(f"[Etapa 16.1] Recriando embeddings apps => {APP_EMB_PATH}")
apps_emb_pdf = build_embeddings_cache(apps_unique, "app_id", "candidate_text", APP_EMB_PATH)

print(f"[Etapa 16.1] Recriando embeddings jobs => {JOB_EMB_PATH}")
jobs_emb_pdf = build_embeddings_cache(jobs_unique, "vaga_id", "job_text", JOB_EMB_PATH)

# volta pro spark
apps_emb = spark.createDataFrame(apps_emb_pdf)
jobs_emb = spark.createDataFrame(jobs_emb_pdf)

# cosine (dot; embeddings normalizados)
@F.udf(returnType=T.FloatType())
def cosine_sim(u, v):
    if u is None or v is None:
        return float("nan")
    a = np.array(u, dtype=np.float32)
    b = np.array(v, dtype=np.float32)
    return float(np.dot(a, b))

df_scored = (
    df_join
    .join(apps_emb.withColumnRenamed("embedding","emb_app"), on="app_id", how="left")
    .join(jobs_emb.withColumnRenamed("embedding","emb_job"), on="vaga_id", how="left")
    .withColumn("sim_score_bert", cosine_sim(F.col("emb_app"), F.col("emb_job")))
)

df_scored = df_scored.withColumn(
    "sim_score_bert",
    F.when(F.isnan("sim_score_bert") | F.col("sim_score_bert").isNull(), F.lit(0.0)).otherwise(F.col("sim_score_bert"))
)

print("\n[Etapa 16.1] Preview sim_score_bert (agora deve variar):")
df_scored.select("app_id","vaga_id","sim_score_bert").show(10, truncate=False)

print("\n[Etapa 16.1] Distribui√ß√£o:")
df_scored.select(
    F.count("*").alias("n"),
    F.mean("sim_score_bert").alias("mean"),
    F.expr("percentile(sim_score_bert, array(0.1,0.5,0.9))").alias("p10_p50_p90")
).show(truncate=False)

# salvar scored
df_scored.write.mode("overwrite").parquet(OUT_SCORED_PATH)
print(f"\nOK ‚úÖ Salvo: {OUT_SCORED_PATH}")

globals()["df_scored"] = df_scored


In [None]:
# =========================================
# ETAPA 16.2 ‚Äî CV x VAGA "de verdade"
# Cria candidate_text a partir de df_apps e job_text a partir de df_vagas (fora do join)
# Depois calcula embeddings e similaridade e junta no df_final
# =========================================

import os
import numpy as np
import pandas as pd

import torch
from sentence_transformers import SentenceTransformer

from pyspark.sql import functions as F
from pyspark.sql import types as T

MODEL_NAME = "sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

CACHE_DIR = "/content/nlp_cache"
os.makedirs(CACHE_DIR, exist_ok=True)

APP_EMB_PATH = os.path.join(CACHE_DIR, "app_embeddings.parquet")
JOB_EMB_PATH = os.path.join(CACHE_DIR, "job_embeddings.parquet")
OUT_SCORED_PATH = os.path.join(CACHE_DIR, "df_scored_with_bert.parquet")

print(f"[Etapa 16.2] DEVICE={DEVICE} | MODEL={MODEL_NAME} | CACHE={CACHE_DIR}")

df_apps = globals().get("df_apps", None)
df_vagas = globals().get("df_vagas", None)
if df_apps is None or df_vagas is None:
    raise ValueError("N√£o encontrei df_apps e/ou df_vagas. Rode as etapas que criam esses DataFrames.")

JOIN_KEY = "__root_id"
if JOIN_KEY not in df_apps.columns or JOIN_KEY not in df_vagas.columns:
    raise ValueError(f"JOIN_KEY '{JOIN_KEY}' precisa existir em df_apps e df_vagas.")

# -------------------------
# helper: achar melhores colunas textuais em um DF
# -------------------------
def top_text_columns(df, exclude_cols=set(), topn=8, sample_frac=0.20, seed=42):
    string_cols = [c for c, t in df.dtypes if t == "string" and c not in exclude_cols]
    if not string_cols:
        return []

    dfx = df
    try:
        dfx = df.sample(withReplacement=False, fraction=sample_frac, seed=seed)
    except Exception:
        pass

    stats = []
    for c in string_cols:
        s = (
            dfx.select(
                F.avg(F.length(F.trim(F.coalesce(F.col(c), F.lit(""))))).alias("avg_len"),
                F.sum((F.length(F.trim(F.coalesce(F.col(c), F.lit("")))) > 0).cast("int")).alias("n_nonempty")
            ).collect()[0]
        )
        avg_len = float(s["avg_len"]) if s["avg_len"] is not None else 0.0
        n_nonempty = int(s["n_nonempty"]) if s["n_nonempty"] is not None else 0
        stats.append((c, avg_len, n_nonempty))

    stats_sorted = sorted(stats, key=lambda x: (x[2], x[1]), reverse=True)

    print("\n[Etapa 16.2] Top colunas textuais (c, avg_len, n_nonempty) ‚Äî amostra:")
    for row in stats_sorted[:20]:
        print(row)

    chosen = [c for c, avg_len, n_nonempty in stats_sorted if n_nonempty > 0 and avg_len >= 30][:topn]
    return chosen

# -------------------------
# 16.2a ‚Äî candidate_text (apps)
# -------------------------
# seu output mostrou cv_pt como a melhor: vamos manter
if "cv_pt" not in df_apps.columns:
    raise ValueError("N√£o achei coluna 'cv_pt' em df_apps. Me diga qual coluna tem o texto do CV.")

apps_text_cols = ["cv_pt"]
df_apps_text = (
    df_apps
    .select(JOIN_KEY, *apps_text_cols)
    .withColumn("candidate_text", F.trim(F.concat_ws("\n", *[F.coalesce(F.col(c), F.lit("")) for c in apps_text_cols])))
)

# -------------------------
# 16.2b ‚Äî job_text (vagas) REAL (descobrir automaticamente no df_vagas)
# -------------------------
exclude_vagas = {JOIN_KEY}
vagas_text_cols = top_text_columns(df_vagas, exclude_cols=exclude_vagas, topn=10)

if not vagas_text_cols:
    raise ValueError(
        "N√£o encontrei colunas textuais √∫teis em df_vagas (avg_len>=30 e n_nonempty>0).\n"
        "‚û°Ô∏è Me mande df_vagas.printSchema() ou df_vagas.columns pra eu mapear os campos de descri√ß√£o/requisitos."
    )

df_vagas_text = (
    df_vagas
    .select(JOIN_KEY, *vagas_text_cols)
    .withColumn("job_text", F.trim(F.concat_ws("\n", *[F.coalesce(F.col(c), F.lit("")) for c in vagas_text_cols])))
)

print(f"\n[Etapa 16.2] candidate_text cols: {apps_text_cols}")
print(f"[Etapa 16.2] job_text cols:       {vagas_text_cols}")

print("\n[Etapa 16.2] Preview tamanhos:")
(
    df_apps_text.select(F.length("candidate_text").alias("len_candidate")).summary("count","mean","min","max").show()
)
(
    df_vagas_text.select(F.length("job_text").alias("len_job")).summary("count","mean","min","max").show()
)

# -------------------------
# 16.2c ‚Äî Embeddings
# -------------------------
model = SentenceTransformer(MODEL_NAME, device=DEVICE)

def encode_texts(pdf: pd.DataFrame, text_col: str, id_col: str, batch_size: int = 64) -> pd.DataFrame:
    texts = pdf[text_col].fillna("").astype(str).tolist()
    ids = pdf[id_col].tolist()

    non_empty_texts, non_empty_pos = [], []
    for i, t in enumerate(texts):
        if t.strip():
            non_empty_texts.append(t)
            non_empty_pos.append(i)

    vecs = None
    if non_empty_texts:
        vecs = model.encode(
            non_empty_texts,
            batch_size=batch_size,
            show_progress_bar=True,
            convert_to_numpy=True,
            normalize_embeddings=True
        ).astype(np.float32)

    out_emb = [None] * len(texts)
    if vecs is not None:
        for j, i in enumerate(non_empty_pos):
            out_emb[i] = vecs[j].tolist()

    return pd.DataFrame({id_col: ids, "embedding": out_emb})

def rebuild_embeddings(df_unique, id_col, text_col, cache_path):
    if os.path.exists(cache_path):
        os.remove(cache_path)
    pdf = df_unique.select(id_col, text_col).toPandas()
    emb_pdf = encode_texts(pdf, text_col=text_col, id_col=id_col, batch_size=64)
    emb_pdf.to_parquet(cache_path, index=False)
    return emb_pdf

apps_unique = df_apps_text.select(JOIN_KEY, "candidate_text").dropDuplicates([JOIN_KEY])
jobs_unique = df_vagas_text.select(JOIN_KEY, "job_text").dropDuplicates([JOIN_KEY])

print(f"\n[Etapa 16.2] √önicos: apps={apps_unique.count()} | jobs={jobs_unique.count()}")

print(f"[Etapa 16.2] Recriando embeddings apps => {APP_EMB_PATH}")
apps_emb_pdf = rebuild_embeddings(apps_unique, JOIN_KEY, "candidate_text", APP_EMB_PATH)

print(f"[Etapa 16.2] Recriando embeddings jobs => {JOB_EMB_PATH}")
jobs_emb_pdf = rebuild_embeddings(jobs_unique, JOIN_KEY, "job_text", JOB_EMB_PATH)

apps_emb = spark.createDataFrame(apps_emb_pdf).withColumnRenamed("embedding","emb_app")
jobs_emb = spark.createDataFrame(jobs_emb_pdf).withColumnRenamed("embedding","emb_job")

@F.udf(returnType=T.FloatType())
def cosine_sim(u, v):
    if u is None or v is None:
        return float("nan")
    a = np.array(u, dtype=np.float32)
    b = np.array(v, dtype=np.float32)
    return float(np.dot(a, b))

df_scored = (
    df_apps_text
    .select(JOIN_KEY, "candidate_text")
    .join(df_vagas_text.select(JOIN_KEY, "job_text"), on=JOIN_KEY, how="inner")
    .join(apps_emb, on=JOIN_KEY, how="left")
    .join(jobs_emb, on=JOIN_KEY, how="left")
    .withColumn("sim_score_bert", cosine_sim(F.col("emb_app"), F.col("emb_job")))
)

df_scored = df_scored.withColumn(
    "sim_score_bert",
    F.when(F.isnan("sim_score_bert") | F.col("sim_score_bert").isNull(), F.lit(0.0)).otherwise(F.col("sim_score_bert"))
)

print("\n[Etapa 16.2] Distribui√ß√£o sim_score_bert (agora deve fazer sentido):")
df_scored.select(
    F.count("*").alias("n"),
    F.mean("sim_score_bert").alias("mean"),
    F.expr("percentile(sim_score_bert, array(0.1,0.5,0.9))").alias("p10_p50_p90")
).show(truncate=False)

print("\n[Etapa 16.2] Preview:")
df_scored.select(F.length("candidate_text").alias("len_c"), F.length("job_text").alias("len_j"), "sim_score_bert").show(10)

df_scored.write.mode("overwrite").parquet(OUT_SCORED_PATH)
print(f"\nOK ‚úÖ Salvo: {OUT_SCORED_PATH}")

globals()["df_scored"] = df_scored
