<a href="https://colab.research.google.com/github/Celso-RQ-Valle/Modeling-Functions/blob/main/pseudo_parcelling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Falso DF generico

In [32]:
# ===============================
# INSTALAÇÃO E START DO SPARK
# ===============================
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import when
import pyspark.sql.functions as F
import random
from datetime import datetime, timedelta

spark = SparkSession.builder \
    .appName("BaseFicticiaCredito") \
    .getOrCreate()

# ===============================
# FUNÇÕES AUXILIARES
# ===============================
def gerar_cnpj_falso():
    nums = [random.randint(0, 9) for _ in range(14)]
    return f"{nums[0]}{nums[1]}.{nums[2]}{nums[3]}{nums[4]}." \
           f"{nums[5]}{nums[6]}{nums[7]}/0001-{nums[12]}{nums[13]}"

def gerar_data_proxima(dias_max=30):
    return datetime.today().date() + timedelta(days=random.randint(1, dias_max))

# ===============================
# PARÂMETROS
# ===============================
n_linhas = 10000

status_opcoes = [
    "Aprovado Contratado",
    "Aprovado Não Contratado",
    "Rejeitado"
]

cnae_secoes = list("ABCDEFGHIJKLMNOPQRSTU")

# ===============================
# GERAÇÃO DOS DADOS
# ===============================
dados = [
    (
        gerar_cnpj_falso(),           # cnpj falso
        gerar_data_proxima(),         # dt_ref futura
        random.choice(status_opcoes), # status
        random.choice([0, 1]),        # mau (0/1)
        random.choice(cnae_secoes)    # seção CNAE
    )
    for _ in range(n_linhas)
]

colunas = ["cnpj", "dt_ref", "status", "mau", "cnae_secao"]

# ===============================
# CRIAÇÃO DO DATAFRAME SPARK
# ===============================
df_spark = spark.createDataFrame(dados, colunas)

# ===============================
# COLUNA NUMÉRICA DE STATUS
# ===============================
df_spark = df_spark.withColumn(
    "status_num",
    when(df_spark.status == "Aprovado Contratado", 2)
    .when(df_spark.status == "Aprovado Não Contratado", 1)
    .otherwise(0)
)

# ===============================
# VISUALIZAÇÃO E ESQUEMA
# ===============================
df_spark.show(10, truncate=False)
df_spark.printSchema()

# ===============================
# (OPCIONAL) SALVAR EM PARQUET
# ===============================
# df_spark.write.mode("overwrite").parquet("/content/base_ficticia_credito")

+------------------+----------+-----------------------+---+----------+----------+
|cnpj              |dt_ref    |status                 |mau|cnae_secao|status_num|
+------------------+----------+-----------------------+---+----------+----------+
|51.702.531/0001-99|2026-01-16|Aprovado Contratado    |0  |D         |2         |
|04.567.772/0001-04|2026-01-12|Aprovado Contratado    |0  |G         |2         |
|92.389.033/0001-22|2026-01-17|Rejeitado              |0  |F         |0         |
|51.474.902/0001-97|2026-01-21|Aprovado Não Contratado|1  |A         |1         |
|87.008.712/0001-00|2026-02-03|Aprovado Não Contratado|1  |I         |1         |
|67.746.308/0001-08|2026-01-26|Aprovado Contratado    |0  |Q         |2         |
|36.794.651/0001-84|2026-02-01|Aprovado Contratado    |0  |Q         |2         |
|17.465.821/0001-25|2026-02-06|Rejeitado              |1  |R         |0         |
|25.666.320/0001-06|2026-02-03|Rejeitado              |1  |I         |0         |
|40.429.500/0001

## Falso df mais realista

In [92]:
# ===============================
# INSTALAÇÃO E START DO SPARK
# ===============================
!pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import when
import pyspark.sql.functions as F
import random
from datetime import datetime, timedelta

spark = SparkSession.builder \
    .appName("BaseFicticiaCredito") \
    .getOrCreate()

random.seed(42)

# ===============================
# FUNÇÕES AUXILIARES
# ===============================
def gerar_cnpj_falso():
    nums = [random.randint(0, 9) for _ in range(14)]
    return f"{nums[0]}{nums[1]}.{nums[2]}{nums[3]}{nums[4]}." \
           f"{nums[5]}{nums[6]}{nums[7]}/0001-{nums[12]}{nums[13]}"

def gerar_data_proxima(dias_max=30):
    return datetime.today().date() + timedelta(days=random.randint(1, dias_max))

# ===============================
# PARÂMETROS
# ===============================
n_linhas = 10_000

status_opcoes = [
    "Aprovado Contratado",
    "Aprovado Não Contratado",
    "Rejeitado"
]

cnae_secoes = list("ABCDEFGHIJKLMNOPQRSTU")

# risco estrutural por CNAE
risco_cnae = {
    "A": 0.65, "B": 0.60, "C": 0.55, "D": 0.50, "E": 0.48,
    "F": 0.45, "G": 0.42, "H": 0.40, "I": 0.38, "J": 0.36,
    "K": 0.34, "L": 0.32, "M": 0.30, "N": 0.28, "O": 0.26,
    "P": 0.24, "Q": 0.22, "R": 0.20, "S": 0.18, "T": 0.16,
    "U": 0.14
}

# ajuste por status
ajuste_status = {
    "Aprovado Contratado": -0.15,
    "Aprovado Não Contratado": -0.05,
    "Rejeitado": +0.10
}

def gerar_mau_realista(status, cnae):
    base = risco_cnae.get(cnae, 0.40)
    ajuste = ajuste_status.get(status, 0.0)
    prob_mau = min(max(base + ajuste, 0.01), 0.99)
    return 1 if random.random() < prob_mau else 0

# ===============================
# GERAÇÃO DOS DADOS
# ===============================
dados = [
    (
        gerar_cnpj_falso(),                            # cnpj
        gerar_data_proxima(),                          # data futura
        (status := random.choice(status_opcoes)),      # status
        gerar_mau_realista(status, (cnae := random.choice(cnae_secoes))),
        cnae                                           # cnae_secao
    )
    for _ in range(n_linhas)
]

colunas = ["cnpj", "dt_ref", "status", "mau", "cnae_secao"]

# ===============================
# DATAFRAME SPARK
# ===============================
df_spark = spark.createDataFrame(dados, colunas)

# ===============================
# STATUS NUMÉRICO
# ===============================
df_spark = df_spark.withColumn(
    "status_num",
    when(df_spark.status == "Aprovado Contratado", 2)
    .when(df_spark.status == "Aprovado Não Contratado", 1)
    .otherwise(0)
)

# ===============================
# VISUALIZAÇÃO
# ===============================
df_spark.show(10, truncate=False)
df_spark.printSchema()

# ===============================
# SANIDADE – TAXA DE MAU
# ===============================
df_spark.groupBy("cnae_secao", "status") \
    .agg(
        F.count("*").alias("n"),
        F.avg("mau").alias("taxa_mau")
    ) \
    .orderBy("cnae_secao", "status") \
    .show(200, truncate=False)

# ===============================
# (OPCIONAL) SALVAR
# ===============================
# df_spark.write.mode("overwrite").parquet("/content/base_ficticia_credito")


+------------------+----------+-----------------------+---+----------+----------+
|cnpj              |dt_ref    |status                 |mau|cnae_secao|status_num|
+------------------+----------+-----------------------+---+----------+----------+
|10.433.218/0001-01|2026-01-18|Aprovado Contratado    |0  |Q         |2         |
|83.863.794/0001-42|2026-01-18|Aprovado Não Contratado|1  |D         |1         |
|15.594.078/0001-49|2026-02-09|Aprovado Não Contratado|0  |S         |1         |
|10.341.316/0001-55|2026-01-18|Rejeitado              |0  |I         |0         |
|19.283.276/0001-03|2026-02-07|Aprovado Contratado    |0  |K         |2         |
|13.953.767/0001-88|2026-01-20|Rejeitado              |0  |S         |0         |
|96.532.871/0001-69|2026-01-14|Aprovado Não Contratado|0  |M         |1         |
|78.480.184/0001-27|2026-01-12|Rejeitado              |0  |I         |0         |
|28.148.932/0001-09|2026-01-22|Aprovado Não Contratado|1  |A         |1         |
|54.303.911/0001

In [93]:
df_spark.groupBy('mau').count().show()

+---+-----+
|mau|count|
+---+-----+
|  0| 6791|
|  1| 3209|
+---+-----+



# Inferencia target randomica

In [110]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def inferir_mau_por_grupo_e_status_randomico(
    df,
    grouped_col: str,
    baseline_status: list,
    infer_status: list,
    incremento: float = 0.0,
    seed: int = 42
):

    # -------------------------
    # 1. Taxa baseline por grupo
    # -------------------------
    taxa_baseline = (
        df.filter(F.col("status").isin(baseline_status))
          .groupBy(grouped_col)
          .agg(F.avg("mau").alias("taxa_mau_baseline"))
    )

    df = df.join(taxa_baseline, grouped_col, "left")

    df = df.withColumn(
        "taxa_mau_alvo",
        F.least(F.lit(1.0), F.col("taxa_mau_baseline") + F.lit(incremento))
    )

    # -------------------------
    # 2. Baseline preservado
    # -------------------------
    df_base = (
        df.filter(F.col("status").isin(baseline_status))
          .withColumn("mau_inferido", F.col("mau"))
    )

    # -------------------------
    # 3. Inferidos – taxa EXATA
    # -------------------------
    df_inf = df.filter(F.col("status").isin(infer_status))

    w_rand = Window.partitionBy(grouped_col, "status").orderBy(F.rand(seed))
    w_cnt  = Window.partitionBy(grouped_col, "status")

    df_inf = (
        df_inf
        .withColumn("rn", F.row_number().over(w_rand))
        .withColumn("n", F.count("*").over(w_cnt))
        .withColumn("k", F.round(F.col("n") * F.col("taxa_mau_alvo")))
        .withColumn(
            "mau_inferido",
            F.when(F.col("rn") <= F.col("k"), 1).otherwise(0)
        )
    )

    # -------------------------
    # 4. União final
    # -------------------------
    df_final = df_base.unionByName(
        df_inf.select(df_base.columns)
    )

    return df_final.drop(
        "taxa_mau_baseline", "taxa_mau_alvo", "rn", "n", "k"
    )


df_inf = inferir_mau_por_grupo_e_status_randomico(
    df=df_spark,
    grouped_col="cnae_secao",
    baseline_status=["Aprovado Contratado"],
    infer_status=["Aprovado Não Contratado", "Rejeitado"],
)

In [111]:
df_inf.groupBy("cnae_secao", "status") \
    .agg(
        F.avg("mau_inferido").alias("taxa_mau_inferido")
    ) \
    .orderBy("cnae_secao", "status") \
    .show(truncate=False)


+----------+-----------------------+-------------------+
|cnae_secao|status                 |taxa_mau_inferido  |
+----------+-----------------------+-------------------+
|A         |Aprovado Contratado    |0.46710526315789475|
|A         |Aprovado Não Contratado|0.4662576687116564 |
|A         |Rejeitado              |0.4647058823529412 |
|B         |Aprovado Contratado    |0.45294117647058824|
|B         |Aprovado Não Contratado|0.4527027027027027 |
|B         |Rejeitado              |0.45394736842105265|
|C         |Aprovado Contratado    |0.40963855421686746|
|C         |Aprovado Não Contratado|0.4121212121212121 |
|C         |Rejeitado              |0.4090909090909091 |
|D         |Aprovado Contratado    |0.39072847682119205|
|D         |Aprovado Não Contratado|0.391025641025641  |
|D         |Rejeitado              |0.39263803680981596|
|E         |Aprovado Contratado    |0.34615384615384615|
|E         |Aprovado Não Contratado|0.34523809523809523|
|E         |Rejeitado          

In [112]:
df_inf.groupBy('cnae_secao','status').agg((F.sum('mau')/F.count('mau')).alias('taxa_mau'),(F.sum('mau_inferido')/F.count('mau_inferido')).alias('taxa_mau_inferido')).sort('cnae_secao','status').show(100)

+----------+--------------------+--------------------+--------------------+
|cnae_secao|              status|            taxa_mau|   taxa_mau_inferido|
+----------+--------------------+--------------------+--------------------+
|         A| Aprovado Contratado| 0.46710526315789475| 0.46710526315789475|
|         A|Aprovado Não Cont...|  0.6134969325153374|  0.4662576687116564|
|         A|           Rejeitado|  0.7705882352941177|  0.4647058823529412|
|         B| Aprovado Contratado| 0.45294117647058824| 0.45294117647058824|
|         B|Aprovado Não Cont...|  0.6081081081081081|  0.4527027027027027|
|         B|           Rejeitado|  0.7236842105263158| 0.45394736842105265|
|         C| Aprovado Contratado| 0.40963855421686746| 0.40963855421686746|
|         C|Aprovado Não Cont...| 0.46060606060606063|  0.4121212121212121|
|         C|           Rejeitado|  0.6363636363636364|  0.4090909090909091|
|         D| Aprovado Contratado| 0.39072847682119205| 0.39072847682119205|
|         D|

In [97]:
df_inf.show()

+----------+------------------+----------+-------------------+---+----------+------------+
|cnae_secao|              cnpj|    dt_ref|             status|mau|status_num|mau_inferido|
+----------+------------------+----------+-------------------+---+----------+------------+
|         Q|10.433.218/0001-01|2026-01-18|Aprovado Contratado|  0|         2|           0|
|         K|19.283.276/0001-03|2026-02-07|Aprovado Contratado|  0|         2|           0|
|         I|54.303.911/0001-27|2026-01-29|Aprovado Contratado|  0|         2|           0|
|         C|10.651.333/0001-24|2026-01-26|Aprovado Contratado|  0|         2|           0|
|         H|51.079.911/0001-25|2026-02-09|Aprovado Contratado|  0|         2|           0|
|         U|93.534.874/0001-00|2026-01-22|Aprovado Contratado|  0|         2|           0|
|         H|56.342.160/0001-54|2026-02-07|Aprovado Contratado|  1|         2|           1|
|         E|73.872.148/0001-43|2026-02-06|Aprovado Contratado|  1|         2|           1|

## Function Target externa

In [114]:
def ajustar_mau_por_target_externa(
    df,
    grouped_col: str,
    baseline_status: list,
    infer_status: list,
    target_col: str,
    incremento: float = 0.0,
    seed: int = 42
):

    # -------------------------
    # 1. Taxa baseline
    # -------------------------
    taxa_baseline = (
        df.filter(F.col("status").isin(baseline_status))
          .groupBy(grouped_col)
          .agg(F.avg("mau").alias("taxa_baseline"))
    )

    df = df.join(taxa_baseline, grouped_col, "left")

    df = df.withColumn(
        "taxa_alvo",
        F.least(
            F.lit(1.0),
            F.greatest(F.lit(0.0),
                F.col("taxa_baseline") + F.lit(incremento)
            )
        )
    )

    # -------------------------
    # 2. Baseline preservado
    # -------------------------
    df_base = (
        df.filter(F.col("status").isin(baseline_status))
          .withColumn("mau_inferido", F.col("mau"))
    )

    # -------------------------
    # 3. Inferidos
    # -------------------------
    df_inf = df.filter(F.col("status").isin(infer_status))

    w_grp  = Window.partitionBy(grouped_col, "status")
    w_rand = Window.partitionBy(grouped_col, "status").orderBy(F.rand(seed))

    df_inf = (
        df_inf
        .withColumn("rn", F.row_number().over(w_rand))
        .withColumn("n", F.count("*").over(w_grp))

        # ponto de partida = TARGET EXTERNA
        .withColumn("k_atual", F.round(F.col("n") * F.col(target_col)))

        # alvo = BASELINE (+ incremento)
        .withColumn("k_alvo", F.round(F.col("n") * F.col("taxa_alvo")))
        .withColumn("delta", F.col("k_alvo") - F.col("k_atual"))

        # reconstrução EXATA
        .withColumn(
            "mau_inferido",
            F.when(F.col("rn") <= F.col("k_atual"), 1).otherwise(0)
        )
        .withColumn(
            "mau_inferido",
            F.when(
                (F.col("delta") > 0) &
                (F.col("rn") <= F.col("k_atual") + F.col("delta")),
                1
            ).when(
                (F.col("delta") < 0) &
                (F.col("rn") > F.col("k_alvo")),
                0
            ).otherwise(F.col("mau_inferido"))
        )
    )

    # -------------------------
    # 4. União final
    # -------------------------
    df_final = df_base.unionByName(
        df_inf.select(df_base.columns)
    )

    return df_final.drop(
        "taxa_baseline", "taxa_alvo",
        "n", "k_atual", "k_alvo", "delta", "rn"
    )

In [115]:
df_ext = df_inf.withColumn('target_externa',F.greatest(F.col('mau_inferido'),F.col('mau'))).drop('mau_inferido')
df_ext.show()

+----------+------------------+----------+-------------------+---+----------+--------------+
|cnae_secao|              cnpj|    dt_ref|             status|mau|status_num|target_externa|
+----------+------------------+----------+-------------------+---+----------+--------------+
|         Q|10.433.218/0001-01|2026-01-18|Aprovado Contratado|  0|         2|             0|
|         K|19.283.276/0001-03|2026-02-07|Aprovado Contratado|  0|         2|             0|
|         I|54.303.911/0001-27|2026-01-29|Aprovado Contratado|  0|         2|             0|
|         C|10.651.333/0001-24|2026-01-26|Aprovado Contratado|  0|         2|             0|
|         H|51.079.911/0001-25|2026-02-09|Aprovado Contratado|  0|         2|             0|
|         U|93.534.874/0001-00|2026-01-22|Aprovado Contratado|  0|         2|             0|
|         H|56.342.160/0001-54|2026-02-07|Aprovado Contratado|  1|         2|             1|
|         E|73.872.148/0001-43|2026-02-06|Aprovado Contratado|  1|    

In [118]:
df_inf2 = ajustar_mau_por_target_externa(
    df=df_ext,
    grouped_col="cnae_secao",
    baseline_status=["Aprovado Contratado"],
    infer_status=["Aprovado Não Contratado", "Rejeitado"],
    target_col = "target_externa"
)

In [119]:
df_inf2.groupBy('cnae_secao','status').agg((F.sum('mau')/F.count('mau')).alias('taxa_mau'),(F.sum('mau_inferido')/F.count('mau_inferido')).alias('taxa_mau_inferido')).sort('cnae_secao','status').show(100)

+----------+--------------------+--------------------+--------------------+
|cnae_secao|              status|            taxa_mau|   taxa_mau_inferido|
+----------+--------------------+--------------------+--------------------+
|         A| Aprovado Contratado| 0.46710526315789475| 0.46710526315789475|
|         A|Aprovado Não Cont...|  0.6134969325153374|  0.4662576687116564|
|         A|           Rejeitado|  0.7705882352941177|  0.4647058823529412|
|         B| Aprovado Contratado| 0.45294117647058824| 0.45294117647058824|
|         B|Aprovado Não Cont...|  0.6081081081081081|  0.4527027027027027|
|         B|           Rejeitado|  0.7236842105263158| 0.45394736842105265|
|         C| Aprovado Contratado| 0.40963855421686746| 0.40963855421686746|
|         C|Aprovado Não Cont...| 0.46060606060606063|  0.4121212121212121|
|         C|           Rejeitado|  0.6363636363636364|  0.4090909090909091|
|         D| Aprovado Contratado| 0.39072847682119205| 0.39072847682119205|
|         D|

In [91]:
df_inf2.groupBy('target_externa','mau_inferido').count().show()

+--------------+------------+-----+
|target_externa|mau_inferido|count|
+--------------+------------+-----+
|             1|           1| 5091|
|             0|           0| 4909|
+--------------+------------+-----+

