In [1]:
# Cria√ß√£o da SparkSession
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("ExemploSparkSession") \
    .getOrCreate()

In [2]:
caminho = r"C:\Users\fred\meu_projeto_etl"

In [3]:
#Importa√ß√£o da tabela base
train_base = spark.read.parquet(fr"{caminho}\\data\interim\train_base_tratada")

In [4]:
train_applprev_1 = spark.read.parquet(fr"{caminho}\\data\interim\train_applprev_1.parquet")

In [5]:
train_bureau_a_1 = spark.read.parquet(fr"{caminho}\\data\interim\train_bureau_a_1.parquet")

In [6]:
train_bureau_a_2 = spark.read.parquet(fr"{caminho}\\data\interim\train_bureau_a_2.parquet")

In [7]:
train_bureau_b_1 = spark.read.parquet(fr"{caminho}\\data\interim\train_bureau_b_1.parquet")

In [14]:
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, TimestampType
from functools import reduce  # ‚¨ÖÔ∏è Importar do m√≥dulo padr√£o

def filtrar_temporal_e_auditar(df, anchors, nome_dataset=None):
    if nome_dataset is None:
        nome_dataset = "dataset"

    # Identifica colunas de data no schema do DataFrame Spark
    datetime_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, (DateType, TimestampType))]

    if not datetime_cols:
        print(f"‚ö†Ô∏è Nenhuma coluna de data encontrada no DataFrame: {nome_dataset}.")
        return df, []

    # Prepara anchors: garantir que est√° com tipo de data
    anchors = anchors.withColumn("decision_date", F.to_timestamp("date_decision"))

    # Junta com anchors
    df_merged = df.join(anchors, on="case_id", how="left")

    # Filtro: mant√©m apenas linhas em que TODAS as colunas datetime <= decision_date
    condicoes = [(F.col(col).isNull()) | (F.col(col) <= F.col("decision_date")) for col in datetime_cols]
    condicao_final = reduce(lambda a, b: a & b, condicoes)

    df_filtrado = df_merged.filter(condicao_final).drop("decision_date")

    # Contagem para relat√≥rio
    total_antes = df.count()
    total_depois = df_filtrado.count()
    removidos = total_antes - total_depois

    print(f"üìä Colunas de data consideradas para `{nome_dataset}`: {datetime_cols}")
    print(f"üìä Filtro aplicado: {removidos} registros removidos por datas > date_decision")

    return df_filtrado, datetime_cols


In [15]:
anchors = train_base.select("case_id", "date_decision")

applprev1_filtrado, colunas_data = filtrar_temporal_e_auditar(train_applprev_1, anchors, "train_applprev_1")
bureau_a1_filtrado, colunas_data = filtrar_temporal_e_auditar(train_bureau_a_1, anchors, "train_bureau_a_1")
bureau_a2_filtrado, colunas_data = filtrar_temporal_e_auditar(train_bureau_a_2, anchors, "train_bureau_a_2")
bureau_b1_filtrado, colunas_data = filtrar_temporal_e_auditar(train_bureau_b_1, anchors, "train_bureau_b_1")



üìä Colunas de data consideradas para `train_applprev_1`: ['approvaldate_319D', 'creationdate_885D', 'dateactivated_425D', 'dtlastpmt_581D', 'dtlastpmtallstes_3545839D', 'employedfrom_700D', 'firstnonzeroinstldate_307D']
üìä Filtro aplicado: 582274 registros removidos por datas > date_decision
üìä Colunas de data consideradas para `train_bureau_a_1`: ['dateofcredend_289D', 'dateofcredend_353D', 'dateofcredstart_181D', 'dateofcredstart_739D', 'dateofrealrepmt_138D', 'lastupdate_1112D', 'lastupdate_388D', 'numberofoverdueinstlmaxdat_148D', 'numberofoverdueinstlmaxdat_641D', 'overdueamountmax2date_1002D', 'overdueamountmax2date_1142D', 'refreshdate_3813885D']
üìä Filtro aplicado: 6751470 registros removidos por datas > date_decision
‚ö†Ô∏è Nenhuma coluna de data encontrada no DataFrame: train_bureau_a_2.
üìä Colunas de data consideradas para `train_bureau_b_1`: ['contractdate_551D', 'contractmaturitydate_151D', 'lastupdate_260D']
üìä Filtro aplicado: 80907 registros removidos por da

In [14]:
bureau_a1_filtrado.count()

9189067

In [12]:
train_applprev_1.columns

['case_id',
 'actualdpd_943P',
 'annuity_853A',
 'approvaldate_319D',
 'byoccupationinc_3656910L',
 'cancelreason_3545846M',
 'childnum_21L',
 'creationdate_885D',
 'credacc_actualbalance_314A',
 'credacc_credlmt_575A',
 'credacc_maxhisbal_375A',
 'credacc_minhisbal_90A',
 'credacc_status_367L',
 'credacc_transactions_402L',
 'credamount_590A',
 'credtype_587L',
 'currdebt_94A',
 'dateactivated_425D',
 'district_544M',
 'downpmt_134A',
 'dtlastpmt_581D',
 'dtlastpmtallstes_3545839D',
 'education_1138M',
 'employedfrom_700D',
 'familystate_726L',
 'firstnonzeroinstldate_307D',
 'inittransactioncode_279L',
 'isbidproduct_390L',
 'isdebitcard_527L',
 'mainoccupationinc_437A',
 'maxdpdtolerance_577P',
 'num_group1',
 'outstandingdebt_522A',
 'pmtnum_8L',
 'postype_4733339M',
 'profession_152M',
 'rejectreason_755M',
 'rejectreasonclient_4145042M',
 'revolvingaccount_394A',
 'status_219L',
 'tenor_203L',
 'dias_para_aprovacao',
 'dias_ate_ativacao',
 'dias_ult_pagamento',
 'dias_ult_pagamen

In [16]:
num_cols = [
    "annuity_853A",
    "credacc_actualbalance_314A",
    "credacc_credlmt_575A",
    "credacc_maxhisbal_375A",
    "credacc_minhisbal_90A",
    "credacc_transactions_402L",
    "credamount_590A",
    "currdebt_94A",
    "downpmt_134A",
    "mainoccupationinc_437A",
    "byoccupationinc_3656910L",
    "outstandingdebt_522A",
    "pmtnum_8L",
    "revolvingaccount_394A",
    "tenor_203L",
    "divida_total"
]


In [17]:
from pyspark.sql import functions as F

# 1) Checar tipos reais no DataFrame
schema = dict(applprev1_filtrado.dtypes)

for c in num_cols:
    tipo = schema.get(c)
    print(f"üìä Coluna {c} ‚Üí tipo detectado: {tipo}")

    if tipo not in ["int", "bigint", "double", "float", "decimal"]:
        print(f"‚ö†Ô∏è Coluna {c} n√£o √© num√©rica, ser√° ignorada.")
        num_cols.remove(c)

# 2) Definir agrega√ß√µes padr√£o para num√©ricas
aggs_num = [
    F.sum(c).alias(f"{c}_sum") for c in num_cols
] + [
    F.avg(c).alias(f"{c}_avg") for c in num_cols
] + [
    F.max(c).alias(f"{c}_max") for c in num_cols
] + [
    F.min(c).alias(f"{c}_min") for c in num_cols
]

# 3) Executar agrega√ß√£o por case_id
agg_applprev_1_num = applprev1_filtrado.groupBy("case_id").agg(*aggs_num)

print("‚úÖ Agrega√ß√£o num√©rica conclu√≠da.")


üìä Coluna annuity_853A ‚Üí tipo detectado: double
üìä Coluna credacc_actualbalance_314A ‚Üí tipo detectado: double
üìä Coluna credacc_credlmt_575A ‚Üí tipo detectado: double
üìä Coluna credacc_maxhisbal_375A ‚Üí tipo detectado: double
üìä Coluna credacc_minhisbal_90A ‚Üí tipo detectado: double
üìä Coluna credacc_transactions_402L ‚Üí tipo detectado: int
üìä Coluna credamount_590A ‚Üí tipo detectado: double
üìä Coluna currdebt_94A ‚Üí tipo detectado: double
üìä Coluna downpmt_134A ‚Üí tipo detectado: double
üìä Coluna mainoccupationinc_437A ‚Üí tipo detectado: double
üìä Coluna byoccupationinc_3656910L ‚Üí tipo detectado: double
üìä Coluna outstandingdebt_522A ‚Üí tipo detectado: double
üìä Coluna pmtnum_8L ‚Üí tipo detectado: int
üìä Coluna revolvingaccount_394A ‚Üí tipo detectado: double
üìä Coluna tenor_203L ‚Üí tipo detectado: int
üìä Coluna divida_total ‚Üí tipo detectado: double
‚úÖ Agrega√ß√£o num√©rica conclu√≠da.


In [18]:
from pyspark.sql import functions as F

# 1) Liste aqui as colunas de DATA do train_applprev_1
date_cols = [
    "approvaldate_319D",
    "creationdate_885D",
    "dateactivated_425D",
    "dtlastpmt_581D",
    "dtlastpmtallstes_3545839D",
    "employedfrom_700D",
    "firstnonzeroinstldate_307D",
]

# 2) Checar tipos no DataFrame e filtrar apenas date/timestamp
schema = dict(applprev1_filtrado.dtypes)
valid_date_cols = []
for c in date_cols:
    t = schema.get(c)
    print(f"üìÖ Coluna {c} ‚Üí tipo detectado: {t}")
    if t in ("date", "timestamp"):
        valid_date_cols.append(c)
    else:
        print(f"‚ö†Ô∏è {c} n√£o est√° em 'date'/'timestamp' e ser√° ignorada na agrega√ß√£o.")

if not valid_date_cols:
    print("‚ö†Ô∏è Nenhuma coluna de data v√°lida encontrada para agregar.")
else:
    # 3) Agrega√ß√µes: m√≠nima (primeira ocorr√™ncia) e m√°xima (√∫ltima ocorr√™ncia)
    aggs_dates = []
    for c in valid_date_cols:
        aggs_dates.append(F.min(c).alias(f"{c}_min"))
        aggs_dates.append(F.max(c).alias(f"{c}_max"))

    # 4) Executar agrega√ß√£o por case_id
    agg_applprev_1_dates = applprev1_filtrado.groupBy("case_id").agg(*aggs_dates)
    print("‚úÖ Agrega√ß√£o de datas conclu√≠da.")


üìÖ Coluna approvaldate_319D ‚Üí tipo detectado: date
üìÖ Coluna creationdate_885D ‚Üí tipo detectado: date
üìÖ Coluna dateactivated_425D ‚Üí tipo detectado: date
üìÖ Coluna dtlastpmt_581D ‚Üí tipo detectado: date
üìÖ Coluna dtlastpmtallstes_3545839D ‚Üí tipo detectado: date
üìÖ Coluna employedfrom_700D ‚Üí tipo detectado: date
üìÖ Coluna firstnonzeroinstldate_307D ‚Üí tipo detectado: date
‚úÖ Agrega√ß√£o de datas conclu√≠da.


In [19]:
from pyspark.sql import functions as F

num_cols = [
    "annuity_853A","credacc_actualbalance_314A","credacc_credlmt_575A",
    "credacc_maxhisbal_375A","credacc_minhisbal_90A","credacc_transactions_402L",
    "credamount_590A","currdebt_94A","downpmt_134A","mainoccupationinc_437A",
    "byoccupationinc_3656910L","outstandingdebt_522A","pmtnum_8L",
    "revolvingaccount_394A","tenor_203L","divida_total"
]

schema = dict(applprev1_filtrado.dtypes)
num_valid = [c for c in num_cols if schema.get(c) in ("int","bigint","double","float","decimal")]

aggs_num = (
    [F.sum(c).alias(f"{c}_sum") for c in num_valid] +
    [F.avg(c).alias(f"{c}_avg") for c in num_valid] +
    [F.max(c).alias(f"{c}_max") for c in num_valid] +
    [F.min(c).alias(f"{c}_min") for c in num_valid]
)

agg_applprev_1_num = applprev1_filtrado.groupBy("case_id").agg(*aggs_num)
print("‚úÖ Agrega√ß√µes num√©ricas conclu√≠das.")


‚úÖ Agrega√ß√µes num√©ricas conclu√≠das.


In [20]:
date_cols = [
    "approvaldate_319D","creationdate_885D","dateactivated_425D",
    "dtlastpmt_581D","dtlastpmtallstes_3545839D","employedfrom_700D",
    "firstnonzeroinstldate_307D",
]

schema = dict(applprev1_filtrado.dtypes)
date_valid = [c for c in date_cols if schema.get(c) in ("date","timestamp")]

aggs_dates = []
for c in date_valid:
    aggs_dates.append(F.min(c).alias(f"{c}_min"))
    aggs_dates.append(F.max(c).alias(f"{c}_max"))

agg_applprev_1_dates = applprev1_filtrado.groupBy("case_id").agg(*aggs_dates)
print("‚úÖ Agrega√ß√µes de datas conclu√≠das.")


‚úÖ Agrega√ß√µes de datas conclu√≠das.


In [24]:
flag_cols = [
    "sem_aprovacao_flag","sem_ativacao_flag","sem_pagamento_flag",
    "sem_pagamento_total_flag","sem_emprego_flag","sem_parcela_flag",
    "mainoccupationinc_null_flag","byoccupationinc_null_flag",
    "tem_revolving_flag","limite_cartao_credito_flag","sem_historico_credito_flag"
]

schema = dict(applprev1_filtrado.dtypes)
flag_valid = [c for c in flag_cols if schema.get(c) in ("int","bigint","double")]

aggs_flags = (
    [F.max(F.col(c)).alias(f"{c}_max") for c in flag_valid] +   # se algum =1, fica 1
    [F.sum(F.col(c)).alias(f"{c}_sum") for c in flag_valid]     # quantos registros marcaram 1
)

agg_applprev_1_flags = applprev1_filtrado.groupBy("case_id").agg(*aggs_flags)
print("‚úÖ Agrega√ß√µes de flags conclu√≠das.")


‚úÖ Agrega√ß√µes de flags conclu√≠das.


In [21]:
dias_cols = [
    "dias_para_aprovacao","dias_ate_ativacao","dias_ult_pagamento",
    "dias_ult_pagamento_all","dias_desde_inicio_emprego","dias_para_primeira_parcela"
]

schema = dict(applprev1_filtrado.dtypes)
dias_valid = [c for c in dias_cols if schema.get(c) in ("int","bigint","double","float","decimal")]

aggs_dias = (
    [F.min(c).alias(f"{c}_min") for c in dias_valid] +
    [F.avg(c).alias(f"{c}_avg") for c in dias_valid] +
    [F.max(c).alias(f"{c}_max") for c in dias_valid]
)

agg_applprev_1_dias = applprev1_filtrado.groupBy("case_id").agg(*aggs_dias)
print("‚úÖ Agrega√ß√µes das colunas de dias conclu√≠das.")


‚úÖ Agrega√ß√µes das colunas de dias conclu√≠das.


In [22]:
cat_cols = [
    "cancelreason_3545846M","credacc_status_367L","credtype_587L",
    "district_544M","education_1138M","familystate_726L","inittransactioncode_279L",
    "postype_4733339M","profession_152M","rejectreason_755M",
    "rejectreasonclient_4145042M","status_219L"
]

schema = dict(applprev1_filtrado.dtypes)
cat_valid = [c for c in cat_cols if schema.get(c) in ("string",)]

aggs_cat = (
    [F.countDistinct(c).alias(f"{c}_ndistinct") for c in cat_valid] +
    [F.first(c, ignorenulls=True).alias(f"{c}_first") for c in cat_valid]
)

agg_applprev_1_cat = applprev1_filtrado.groupBy("case_id").agg(*aggs_cat)
print("‚úÖ Agrega√ß√µes categ√≥ricas conclu√≠das.")


‚úÖ Agrega√ß√µes categ√≥ricas conclu√≠das.


In [25]:
from functools import reduce

to_join = [agg_applprev_1_num, agg_applprev_1_dates, agg_applprev_1_flags, agg_applprev_1_dias, agg_applprev_1_cat]
to_join = [df for df in to_join if len(df.columns) > 1]  # mant√©m apenas os que t√™m m√©tricas

agg_applprev_1_all = reduce(lambda l, r: l.join(r, on="case_id", how="left"), to_join)

print(f"‚úÖ Dataset agregado (train_applprev_1) pronto. Colunas: {len(agg_applprev_1_all.columns)}")


‚úÖ Dataset agregado (train_applprev_1) pronto. Colunas: 139


In [26]:
from pyspark.sql import functions as F

# --------------------------
# 0) Refer√™ncias
# --------------------------
agg_df = agg_applprev_1_all  # DF final agregado por case_id
print(f"üîé Validando DF agregado: colunas={len(agg_df.columns)}")

# --------------------------
# 1) Unicidade de case_id
# --------------------------
total = agg_df.count()
distinct_ids = agg_df.select("case_id").distinct().count()
print(f"ü™™ Linhas: {total} | case_id distintos: {distinct_ids}")
assert total == distinct_ids, "‚ùå case_id n√£o √© √∫nico no agregado!"

# --------------------------
# 2) Listas de colunas por categoria (como usadas na agrega√ß√£o)
# --------------------------
num_cols = [
    "annuity_853A","credacc_actualbalance_314A","credacc_credlmt_575A",
    "credacc_maxhisbal_375A","credacc_minhisbal_90A","credacc_transactions_402L",
    "credamount_590A","currdebt_94A","downpmt_134A","mainoccupationinc_437A",
    "byoccupationinc_3656910L","outstandingdebt_522A","pmtnum_8L",
    "revolvingaccount_394A","tenor_203L","divida_total"
]

date_cols = [
    "approvaldate_319D","creationdate_885D","dateactivated_425D",
    "dtlastpmt_581D","dtlastpmtallstes_3545839D","employedfrom_700D",
    "firstnonzeroinstldate_307D",
]

flag_cols = [
    "sem_aprovacao_flag","sem_ativacao_flag","sem_pagamento_flag",
    "sem_pagamento_total_flag","sem_emprego_flag","sem_parcela_flag",
    "mainoccupationinc_null_flag","byoccupationinc_null_flag",
    "tem_revolving_flag","limite_cartao_credito_flag","sem_historico_credito_flag"
]

dias_cols = [
    "dias_para_aprovacao","dias_ate_ativacao","dias_ult_pagamento",
    "dias_ult_pagamento_all","dias_desde_inicio_emprego","dias_para_primeira_parcela"
]

cat_cols = [
    "cancelreason_3545846M","credacc_status_367L","credtype_587L",
    "district_544M","education_1138M","familystate_726L","inittransactioncode_279L",
    "postype_4733339M","profession_152M","rejectreason_755M",
    "rejectreasonclient_4145042M","status_219L"
]

# --------------------------
# 3) Presen√ßa das colunas esperadas no DF agregado
# --------------------------
present = set(agg_df.columns)

def _missing(expected_suffixes):
    return [c for c in expected_suffixes if c not in present]

expected_num = []
for c in num_cols:
    expected_num += [f"{c}_sum", f"{c}_avg", f"{c}_max", f"{c}_min"]

expected_dates = []
for c in date_cols:
    expected_dates += [f"{c}_min", f"{c}_max"]

expected_flags = []
for c in flag_cols:
    expected_flags += [f"{c}_max", f"{c}_sum"]

expected_dias = []
for c in dias_cols:
    expected_dias += [f"{c}_min", f"{c}_avg", f"{c}_max"]

expected_cat = []
for c in cat_cols:
    expected_cat += [f"{c}_ndistinct", f"{c}_first"]

missing_any = (
    _missing(expected_num) +
    _missing(expected_dates) +
    _missing(expected_flags) +
    _missing(expected_dias) +
    _missing(expected_cat)
)

if missing_any:
    print("‚ö†Ô∏è Colunas agregadas esperadas ausentes (ok se algumas categorias n√£o foram geradas):")
    for m in missing_any[:50]:
        print("  -", m)

# --------------------------
# 4) Regras de coer√™ncia das m√©tricas
# --------------------------
issues = {}

from pyspark.sql import functions as F

EPS = 1e-6  # toler√¢ncia num√©rica

# 4.1 Num√©ricos: apenas checar se avg est√° entre min e max (com toler√¢ncia)
for c in num_cols:
    c_sum, c_avg, c_min, c_max = f"{c}_sum", f"{c}_avg", f"{c}_min", f"{c}_max"
    if {c_sum, c_avg, c_min, c_max}.issubset(set(agg_df.columns)):
        bad = (
            agg_df
            .filter(
                F.col(c_min).isNotNull() & F.col(c_max).isNotNull() & F.col(c_avg).isNotNull()
                & (
                    (F.col(c_avg) + F.lit(EPS) < F.col(c_min)) |   # avg < min (com folga)
                    (F.col(c_avg) - F.lit(EPS) > F.col(c_max))     # avg > max (com folga)
                )
            )
            .select("case_id", c_min, c_avg, c_max, c_sum)
        )
        if bad.limit(1).count() > 0:
            issues[f"num_metric_incoherent::{c}"] = bad.limit(20)


# 4.2 Datas: min ‚â§ max
for c in date_cols:
    c_min, c_max = f"{c}_min", f"{c}_max"
    if {c_min, c_max}.issubset(present):
        bad = (
            agg_df
            .filter(F.col(c_min).isNotNull() & F.col(c_max).isNotNull() & (F.col(c_min) > F.col(c_max)))
            .select("case_id", c_min, c_max)
        )
        if bad.limit(1).count() > 0:
            issues[f"date_min_gt_max::{c}"] = bad.limit(20)

# 4.3 Flags: max ‚àà {0,1} e sum ‚â• max
for c in flag_cols:
    c_max, c_sum = f"{c}_max", f"{c}_sum"
    if {c_max, c_sum}.issubset(present):
        bad = (
            agg_df
            .filter(
                F.col(c_max).isNotNull() & (~F.col(c_max).isin(0,1))   # max deve ser 0/1
                | (F.col(c_sum) < F.col(c_max))                        # sum n√£o pode ser < max
            )
            .select("case_id", c_max, c_sum)
        )
        if bad.limit(1).count() > 0:
            issues[f"flag_incoherent::{c}"] = bad.limit(20)

# 4.4 dias_*: min ‚â§ avg ‚â§ max
for c in dias_cols:
    c_min, c_avg, c_max = f"{c}_min", f"{c}_avg", f"{c}_max"
    if {c_min, c_avg, c_max}.issubset(present):
        bad = (
            agg_df
            .filter(
                (F.col(c_min).isNotNull()) & (F.col(c_avg).isNotNull()) & (F.col(c_max).isNotNull()) &
                ((F.col(c_avg) < F.col(c_min)) | (F.col(c_avg) > F.col(c_max)))
            )
            .select("case_id", c_min, c_avg, c_max)
        )
        if bad.limit(1).count() > 0:
            issues[f"dias_incoherent::{c}"] = bad.limit(20)

# --------------------------
# 5) Relato de inconsist√™ncias
# --------------------------
if not issues:
    print("‚úÖ Valida√ß√£o conclu√≠da: nenhuma inconsist√™ncia encontrada nas m√©tricas agregadas.")
else:
    print(f"‚ö†Ô∏è Inconsist√™ncias encontradas ({len(issues)} tipos). Exemplos por tipo:")
    for k, df_bad in issues.items():
        print(f"\n‚îÄ‚îÄ {k} ‚îÄ‚îÄ")
        df_bad.show(truncate=False)

        

üîé Validando DF agregado: colunas=139
ü™™ Linhas: 1171847 | case_id distintos: 1171847
‚ö†Ô∏è Colunas agregadas esperadas ausentes (ok se algumas categorias n√£o foram geradas):
  - credacc_status_367L_ndistinct
  - credacc_status_367L_first
  - status_219L_ndistinct
  - status_219L_first
‚úÖ Valida√ß√£o conclu√≠da: nenhuma inconsist√™ncia encontrada nas m√©tricas agregadas.


In [34]:
bureau_a1_filtrado.columns

['case_id',
 'annualeffectiverate_199L',
 'annualeffectiverate_63L',
 'classificationofcontr_13M',
 'classificationofcontr_400M',
 'contractst_545M',
 'contractst_964M',
 'contractsum_5085717L',
 'credlmt_230A',
 'credlmt_935A',
 'dateofcredend_289D',
 'dateofcredend_353D',
 'dateofcredstart_181D',
 'dateofcredstart_739D',
 'dateofrealrepmt_138D',
 'debtoutstand_525A',
 'debtoverdue_47A',
 'description_351M',
 'dpdmax_139P',
 'dpdmax_757P',
 'dpdmaxdatemonth_442T',
 'dpdmaxdatemonth_89T',
 'dpdmaxdateyear_596T',
 'dpdmaxdateyear_896T',
 'financialinstitution_382M',
 'financialinstitution_591M',
 'instlamount_768A',
 'instlamount_852A',
 'interestrate_508L',
 'lastupdate_1112D',
 'lastupdate_388D',
 'monthlyinstlamount_332A',
 'monthlyinstlamount_674A',
 'nominalrate_281L',
 'nominalrate_498L',
 'num_group1',
 'numberofcontrsvalue_258L',
 'numberofcontrsvalue_358L',
 'numberofinstls_229L',
 'numberofinstls_320L',
 'numberofoutstandinstls_520L',
 'numberofoutstandinstls_59L',
 'numberofo

In [27]:
from pyspark.sql import functions as F

df = bureau_a1_filtrado

schema = dict(df.dtypes)

# Colunas num√©ricas (exclui flags, datas D, partes de data T e categ√≥ricas M)
num_candidates = [
    c for c in df.columns
    if not c.endswith("_flag")
    and not c.endswith("D")  # datas
    and not c.endswith("T")  # partes de data (m√™s/ano) - trataremos √† parte
    and not c.endswith("M")  # categ√≥ricas
]

num_valid = [c for c in num_candidates if schema.get(c) in ("int","bigint","double","float","decimal")]

print("üìä Num√©ricas v√°lidas:", len(num_valid))

aggs_num = (
    [F.sum(c).alias(f"{c}_sum") for c in num_valid] +
    [F.avg(c).alias(f"{c}_avg") for c in num_valid] +
    [F.max(c).alias(f"{c}_max") for c in num_valid] +
    [F.min(c).alias(f"{c}_min") for c in num_valid]
)

agg_bureau_a1_num = df.groupBy("case_id").agg(*aggs_num)
print("‚úÖ Agrega√ß√µes num√©ricas conclu√≠das.")


üìä Num√©ricas v√°lidas: 48
‚úÖ Agrega√ß√µes num√©ricas conclu√≠das.


In [28]:
# Datas: termina com D e tipo date/timestamp
date_candidates = [c for c in df.columns if c.endswith("D")]
date_valid = [c for c in date_candidates if schema.get(c) in ("date","timestamp")]

print("üìÖ Datas v√°lidas:", len(date_valid))

aggs_dates = []
for c in date_valid:
    aggs_dates.append(F.min(c).alias(f"{c}_min"))
    aggs_dates.append(F.max(c).alias(f"{c}_max"))

agg_bureau_a1_dates = df.groupBy("case_id").agg(*aggs_dates) if aggs_dates else None
print("‚úÖ Agrega√ß√µes de datas conclu√≠das.")


üìÖ Datas v√°lidas: 12
‚úÖ Agrega√ß√µes de datas conclu√≠das.


In [29]:
flag_cols = [c for c in df.columns if c.endswith("_flag")]
flag_valid = [c for c in flag_cols if schema.get(c) in ("int","bigint","double")]

print("üö© Flags v√°lidas:", len(flag_valid))

aggs_flags = (
    [F.max(F.col(c)).alias(f"{c}_max") for c in flag_valid] +  # se algum registro = 1 ‚Üí 1
    [F.sum(F.col(c)).alias(f"{c}_sum") for c in flag_valid]    # quantos registros com 1
)

agg_bureau_a1_flags = df.groupBy("case_id").agg(*aggs_flags) if aggs_flags else None
print("‚úÖ Agrega√ß√µes de flags conclu√≠das.")


üö© Flags v√°lidas: 66
‚úÖ Agrega√ß√µes de flags conclu√≠das.


In [30]:
# Partes de data (T) ‚Äî geralmente m√™s (1‚Äì12) e ano (YYYY)
t_candidates = [c for c in df.columns if c.endswith("T")]
t_valid = [c for c in t_candidates if schema.get(c) in ("int","bigint","double","float","decimal")]

print("üóìÔ∏è Partes de data v√°lidas (T):", len(t_valid))

aggs_t = []
for c in t_valid:
    aggs_t.append(F.min(c).alias(f"{c}_min"))
    aggs_t.append(F.max(c).alias(f"{c}_max"))

agg_bureau_a1_t = df.groupBy("case_id").agg(*aggs_t) if aggs_t else None
print("‚úÖ Agrega√ß√µes de partes de data conclu√≠das.")


üóìÔ∏è Partes de data v√°lidas (T): 8
‚úÖ Agrega√ß√µes de partes de data conclu√≠das.


In [31]:
# Categ√≥ricas: termina com M ou tipo string
cat_candidates = list({c for c in df.columns if c.endswith("M")} | {c for c,t in schema.items() if t == "string"})
cat_valid = [c for c in cat_candidates if schema.get(c) == "string"]

print("üî§ Categ√≥ricas v√°lidas:", len(cat_valid))

aggs_cat = (
    [F.countDistinct(c).alias(f"{c}_ndistinct") for c in cat_valid] +
    [F.first(c, ignorenulls=True).alias(f"{c}_first") for c in cat_valid]
)

agg_bureau_a1_cat = df.groupBy("case_id").agg(*aggs_cat) if aggs_cat else None
print("‚úÖ Agrega√ß√µes categ√≥ricas conclu√≠das.")


üî§ Categ√≥ricas v√°lidas: 11
‚úÖ Agrega√ß√µes categ√≥ricas conclu√≠das.


In [32]:
from functools import reduce

to_join = [agg_bureau_a1_num, agg_bureau_a1_dates, agg_bureau_a1_flags, agg_bureau_a1_t, agg_bureau_a1_cat]
to_join = [d for d in to_join if d is not None and len(d.columns) > 1]

agg_credit_bureau_a_1_all = reduce(lambda l, r: l.join(r, on="case_id", how="left"), to_join)

print(f"‚úÖ Dataset agregado (train_credit_bureau_a_1) pronto. Colunas: {len(agg_credit_bureau_a_1_all.columns)}")


‚úÖ Dataset agregado (train_credit_bureau_a_1) pronto. Colunas: 387


In [33]:
from pyspark.sql import functions as F

df = agg_credit_bureau_a_1_all
EPS = 1e-6
ANO_MIN, ANO_MAX = 1950, 2025

print(f"üîé Validando DF agregado: colunas={len(df.columns)}")

# 1) Unicidade de case_id
total = df.count()
distinct_ids = df.select("case_id").distinct().count()
print(f"ü™™ Linhas: {total} | case_id distintos: {distinct_ids}")
assert total == distinct_ids, "‚ùå case_id n√£o √© √∫nico no agregado!"

cols = set(df.columns)

# 2) Descobrir fam√≠lias de colunas agregadas por sufixo
def fam(prefixes):
    return [c for c in df.columns if any(c.endswith(suf) for suf in prefixes)]

num_min   = fam(["_min"])
num_max   = fam(["_max"])
num_avg   = fam(["_avg"])
num_sum   = fam(["_sum"])

# Datas: terminam com _min/_max mas eram de colunas 'D' no original
date_min  = [c for c in num_min if "_D_" in c or c.endswith("D_min")]
date_max  = [c for c in num_max if "_D_" in c or c.endswith("D_max")]

# Flags: agregadas com _max e _sum e terminam com "_flag_..."
flag_max  = [c for c in num_max if "_flag_" in c]
flag_sum  = [c for c in num_sum if "_flag_" in c]

# Partes de data (T): agregadas com _min/_max e tinham 'T' no nome original
t_min     = [c for c in num_min if "_T_" in c or c.endswith("T_min")]
t_max     = [c for c in num_max if "_T_" in c or c.endswith("T_max")]

issues = {}

# 3) NUM√âRICOS: avg dentro de [min, max] (com toler√¢ncia)
#    Procuramos pares baseando-nos no prefixo do nome (antes do sufixo _min/_avg/_max/_sum)
def base_name(colname, suf):
    assert colname.endswith(suf)
    return colname[: -len(suf)]

bases = set()
for c in (num_min + num_avg + num_max):
    if c.endswith("_min") or c.endswith("_avg") or c.endswith("_max"):
        for suf in ("_min", "_avg", "_max"):
            if c.endswith(suf):
                bases.add(base_name(c, suf))

for b in bases:
    c_min, c_avg, c_max = f"{b}_min", f"{b}_avg", f"{b}_max"
    if {c_min, c_avg, c_max}.issubset(cols):
        bad = (
            df.filter(
                F.col(c_min).isNotNull() & F.col(c_avg).isNotNull() & F.col(c_max).isNotNull() &
                (
                    (F.col(c_avg) + F.lit(EPS) < F.col(c_min)) |
                    (F.col(c_avg) - F.lit(EPS) > F.col(c_max))
                )
            )
            .select("case_id", c_min, c_avg, c_max)
        )
        if bad.limit(1).count() > 0:
            issues[f"num_avg_out_of_range::{b}"] = bad.limit(20)

# 4) DATAS: min ‚â§ max
#    Detecta pares *_min / *_max que s√£o de datas (D)
date_bases = set()
for c in (date_min + date_max):
    for suf in ("_min", "_max"):
        if c.endswith(suf):
            date_bases.add(base_name(c, suf))

for b in date_bases:
    cmin, cmax = f"{b}_min", f"{b}_max"
    if {cmin, cmax}.issubset(cols):
        bad = df.filter(F.col(cmin).isNotNull() & F.col(cmax).isNotNull() & (F.col(cmin) > F.col(cmax))) \
                .select("case_id", cmin, cmax)
        if bad.limit(1).count() > 0:
            issues[f"date_min_gt_max::{b}"] = bad.limit(20)

# 5) FLAGS: max ‚àà {0,1} e sum ‚â• max
for cmax in flag_max:
    b = base_name(cmax, "_max")
    csum = f"{b}_sum"
    if csum in cols:
        bad = (
            df.filter(
                (F.col(cmax).isNotNull() & (~F.col(cmax).isin(0,1))) |
                (F.col(csum) < F.col(cmax))
            )
            .select("case_id", cmax, csum)
        )
        if bad.limit(1).count() > 0:
            issues[f"flag_incoherent::{b}"] = bad.limit(20)

# 6) PARTES DE DATA (T):
#    Regras de faixa por nome:
#    - *datemonth*  ‚Üí 1..12
#    - *dateyear*   ‚Üí ANO_MIN..ANO_MAX
for cmin in t_min:
    b = base_name(cmin, "_min")
    cmax = f"{b}_max"
    if cmax in cols:
        # faixa por padr√£o: apenas min<=max
        bad_order = df.filter(F.col(cmin).isNotNull() & F.col(cmax).isNotNull() & (F.col(cmin) > F.col(cmax))) \
                    .select("case_id", cmin, cmax)
        if bad_order.limit(1).count() > 0:
            issues[f"T_min_gt_max::{b}"] = bad_order.limit(20)

        # se o nome contiver "datemonth": 1..12
        if "datemonth" in b.lower():
            bad_month = df.filter(
                (F.col(cmin).isNotNull() & (F.col(cmin) < 1)) | (F.col(cmin) > 12) |
                (F.col(cmax).isNotNull() & (F.col(cmax) < 1)) | (F.col(cmax) > 12)
            ).select("case_id", cmin, cmax)
            if bad_month.limit(1).count() > 0:
                issues[f"T_month_out_of_range::{b}"] = bad_month.limit(20)

        # se o nome contiver "dateyear": ANO_MIN..ANO_MAX
        if "dateyear" in b.lower():
            bad_year = df.filter(
                (F.col(cmin).isNotNull() & (F.col(cmin) < ANO_MIN)) | (F.col(cmin) > ANO_MAX) |
                (F.col(cmax).isNotNull() & (F.col(cmax) < ANO_MIN)) | (F.col(cmax) > ANO_MAX)
            ).select("case_id", cmin, cmax)
            if bad_year.limit(1).count() > 0:
                issues[f"T_year_out_of_range::{b}"] = bad_year.limit(20)

# 7) Relato
if not issues:
    print("‚úÖ Valida√ß√£o conclu√≠da: nenhuma inconsist√™ncia encontrada.")
else:
    print(f"‚ö†Ô∏è Inconsist√™ncias encontradas ({len(issues)} tipos). Exemplos por tipo:")
    for name, sample in issues.items():
        print(f"\n‚îÄ‚îÄ {name} ‚îÄ‚îÄ")
        sample.show(truncate=False)


üîé Validando DF agregado: colunas=387
ü™™ Linhas: 1386248 | case_id distintos: 1386248
‚úÖ Valida√ß√£o conclu√≠da: nenhuma inconsist√™ncia encontrada.


In [38]:
bureau_a2_filtrado.columns

['case_id',
 'collater_typofvalofguarant_298M',
 'collater_typofvalofguarant_407M',
 'collater_valueofguarantee_1124L',
 'collater_valueofguarantee_876L',
 'collaterals_typeofguarante_359M',
 'collaterals_typeofguarante_669M',
 'num_group1',
 'num_group2',
 'pmts_dpd_1073P',
 'pmts_dpd_303P',
 'pmts_month_158T',
 'pmts_month_706T',
 'pmts_overdue_1140A',
 'pmts_overdue_1152A',
 'pmts_year_1139T',
 'pmts_year_507T',
 'subjectroles_name_541M',
 'subjectroles_name_838M',
 'collater_valueofguarantee_1124L_flag',
 'collater_valueofguarantee_876L_flag',
 'pmts_dpd_1073P_flag',
 'pmts_dpd_303P_flag',
 'pmts_month_158T_flag',
 'pmts_month_706T_flag',
 'pmts_overdue_1140A_flag',
 'pmts_overdue_1152A_flag',
 'pmts_year_1139T_flag',
 'pmts_year_507T_flag']

In [34]:
from pyspark.sql import functions as F

df = bureau_a2_filtrado
schema = dict(df.dtypes)

# Candidatas num√©ricas (exclui flags e categ√≥ricas M e partes de data T)
num_candidates = [
    c for c in df.columns
    if not c.endswith("_flag") and not c.endswith("M") and not c.endswith("T")
]
num_valid = [c for c in num_candidates if schema.get(c) in ("int","bigint","double","float","decimal")]

print("üìä Num√©ricas v√°lidas:", len(num_valid))

aggs_num = (
    [F.sum(c).alias(f"{c}_sum") for c in num_valid] +
    [F.avg(c).alias(f"{c}_avg") for c in num_valid] +
    [F.max(c).alias(f"{c}_max") for c in num_valid] +
    [F.min(c).alias(f"{c}_min") for c in num_valid]
)

agg_bureau_a2_num = df.groupBy("case_id").agg(*aggs_num)
print("‚úÖ Agrega√ß√µes num√©ricas conclu√≠das.")


üìä Num√©ricas v√°lidas: 9
‚úÖ Agrega√ß√µes num√©ricas conclu√≠das.


In [35]:
t_candidates = [c for c in df.columns if c.endswith("T")]
t_valid = [c for c in t_candidates if schema.get(c) in ("int","bigint","double","float","decimal")]

print("üóìÔ∏è Partes de data (T) v√°lidas:", len(t_valid))

aggs_t = []
for c in t_valid:
    aggs_t.append(F.min(c).alias(f"{c}_min"))
    aggs_t.append(F.max(c).alias(f"{c}_max"))

agg_bureau_a2_t = df.groupBy("case_id").agg(*aggs_t) if aggs_t else None
print("‚úÖ Agrega√ß√µes de T conclu√≠das.")


üóìÔ∏è Partes de data (T) v√°lidas: 4
‚úÖ Agrega√ß√µes de T conclu√≠das.


In [36]:
flag_cols = [c for c in df.columns if c.endswith("_flag")]
flag_valid = [c for c in flag_cols if schema.get(c) in ("int","bigint","double")]

print("üö© Flags v√°lidas:", len(flag_valid))

aggs_flags = (
    [F.max(F.col(c)).alias(f"{c}_max") for c in flag_valid] +
    [F.sum(F.col(c)).alias(f"{c}_sum") for c in flag_valid]
)

agg_bureau_a2_flags = df.groupBy("case_id").agg(*aggs_flags) if aggs_flags else None
print("‚úÖ Agrega√ß√µes de flags conclu√≠das.")


üö© Flags v√°lidas: 10
‚úÖ Agrega√ß√µes de flags conclu√≠das.


In [37]:
cat_candidates = [c for c in df.columns if c.endswith("M")]
cat_valid = [c for c in cat_candidates if schema.get(c) == "string"]

print("üî§ Categ√≥ricas v√°lidas:", len(cat_valid))

aggs_cat = (
    [F.countDistinct(c).alias(f"{c}_ndistinct") for c in cat_valid] +
    [F.first(c, ignorenulls=True).alias(f"{c}_first") for c in cat_valid]
)

agg_bureau_a2_cat = df.groupBy("case_id").agg(*aggs_cat) if aggs_cat else None
print("‚úÖ Agrega√ß√µes categ√≥ricas conclu√≠das.")


üî§ Categ√≥ricas v√°lidas: 6
‚úÖ Agrega√ß√µes categ√≥ricas conclu√≠das.


In [38]:
from functools import reduce

to_join = [agg_bureau_a2_num, agg_bureau_a2_t, agg_bureau_a2_flags, agg_bureau_a2_cat]
to_join = [d for d in to_join if d is not None and len(d.columns) > 1]

agg_credit_bureau_a_2_all = reduce(lambda l, r: l.join(r, on="case_id", how="left"), to_join)

print(f"‚úÖ Dataset agregado (train_bureau_a_2) pronto. Colunas: {len(agg_credit_bureau_a_2_all.columns)}")


‚úÖ Dataset agregado (train_bureau_a_2) pronto. Colunas: 77


In [39]:
from pyspark.sql import functions as F

df_agg = agg_credit_bureau_a_2_all
EPS = 1e-6
ANO_MIN, ANO_MAX = 1950, 2025

print(f"üîé Validando DF agregado: colunas={len(df_agg.columns)}")

# Unicidade
n = df_agg.count()
n_ids = df_agg.select("case_id").distinct().count()
print(f"ü™™ Linhas: {n} | case_id distintos: {n_ids}")
assert n == n_ids, "‚ùå case_id n√£o √© √∫nico no agregado!"

cols = set(df_agg.columns)

# Fam√≠lias
def fam(suf):
    return [c for c in df_agg.columns if c.endswith(suf)]

mins  = fam("_min")
maxs  = fam("_max")
avgs  = fam("_avg")

# (a) Coer√™ncia num√©rica: avg entre min e max (quando existirem)
bases = set(c[:-4] for c in mins) | set(c[:-4] for c in maxs) | set(c[:-4] for c in avgs)
issues = {}

for b in bases:
    cmin, cavg, cmax = f"{b}_min", f"{b}_avg", f"{b}_max"
    if {cmin, cavg, cmax}.issubset(cols):
        bad = df_agg.filter(
            F.col(cmin).isNotNull() & F.col(cavg).isNotNull() & F.col(cmax).isNotNull() &
            ((F.col(cavg) + F.lit(EPS) < F.col(cmin)) | (F.col(cavg) - F.lit(EPS) > F.col(cmax)))
        ).select("case_id", cmin, cavg, cmax)
        if bad.limit(1).count() > 0:
            issues[f"avg_out_of_range::{b}"] = bad.limit(20)

# (b) Partes de data (T): faixa de m√™s/ano
t_min = [c for c in mins if "T_" in c or c.endswith("T_min")]
for cmin in t_min:
    b = cmin[:-4]
    cmax = f"{b}_max"
    if cmax in cols:
        # ordem
        bad_order = df_agg.filter(
            F.col(cmin).isNotNull() & F.col(cmax).isNotNull() & (F.col(cmin) > F.col(cmax))
        ).select("case_id", cmin, cmax)
        if bad_order.limit(1).count() > 0:
            issues[f"T_min_gt_max::{b}"] = bad_order.limit(20)

        # m√™s 1..12
        if "month" in b.lower():
            bad_month = df_agg.filter(
                (F.col(cmin).isNotNull() & ((F.col(cmin) < 1) | (F.col(cmin) > 12))) |
                (F.col(cmax).isNotNull() & ((F.col(cmax) < 1) | (F.col(cmax) > 12)))
            ).select("case_id", cmin, cmax)
            if bad_month.limit(1).count() > 0:
                issues[f"T_month_out_of_range::{b}"] = bad_month.limit(20)

        # ano plaus√≠vel
        if "year" in b.lower():
            bad_year = df_agg.filter(
                (F.col(cmin).isNotNull() & ((F.col(cmin) < ANO_MIN) | (F.col(cmin) > ANO_MAX))) |
                (F.col(cmax).isNotNull() & ((F.col(cmax) < ANO_MIN) | (F.col(cmax) > ANO_MAX)))
            ).select("case_id", cmin, cmax)
            if bad_year.limit(1).count() > 0:
                issues[f"T_year_out_of_range::{b}"] = bad_year.limit(20)

# (c) Flags: max ‚àà {0,1} e sum ‚â• max
flag_max = [c for c in maxs if "_flag_" in c]
for cmax in flag_max:
    b = cmax[:-4]
    csum = f"{b}_sum"
    if csum in cols:
        bad = df_agg.filter(
            (F.col(cmax).isNotNull() & (~F.col(cmax).isin(0,1))) |
            (F.col(csum) < F.col(cmax))
        ).select("case_id", cmax, csum)
        if bad.limit(1).count() > 0:
            issues[f"flag_incoherent::{b}"] = bad.limit(20)

# Resultado
if not issues:
    print("‚úÖ Valida√ß√£o conclu√≠da: nenhuma inconsist√™ncia encontrada.")
else:
    print(f"‚ö†Ô∏è Inconsist√™ncias encontradas ({len(issues)} tipos). Exemplos por tipo:")
    for k, d in issues.items():
        print(f"\n‚îÄ‚îÄ {k} ‚îÄ‚îÄ")
        d.show(truncate=False)


üîé Validando DF agregado: colunas=77
ü™™ Linhas: 1385288 | case_id distintos: 1385288
‚ö†Ô∏è Inconsist√™ncias encontradas (1 tipos). Exemplos por tipo:

‚îÄ‚îÄ T_year_out_of_range::pmts_year_507T ‚îÄ‚îÄ
+-------+------------------+------------------+
|case_id|pmts_year_507T_min|pmts_year_507T_max|
+-------+------------------+------------------+
|254453 |2005.0            |2028.0            |
+-------+------------------+------------------+

