In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Configuração do ambiente para utilização do Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Fazendo download
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

# Descompactando os arquivos
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

# Importando a biblioteca os
import os

# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"


# instalando a findspark
!pip install -q findspark

# Importando a findspark
import findspark

# Iniciando o findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("Minha Primeira Aplicação no Pyspark") \
        .getOrCreate()

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

## Leitura dos dados

In [None]:
# Lê o arquivo Parquet
dados = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/Projetos/Modelo_Credito_Application /Conjunto_de_Dados/bureau_balance.csv",header=True)

# Mostra os dados
dados.show()

+------------+--------------+------+
|SK_ID_BUREAU|MONTHS_BALANCE|STATUS|
+------------+--------------+------+
|     5715448|             0|     C|
|     5715448|            -1|     C|
|     5715448|            -2|     C|
|     5715448|            -3|     C|
|     5715448|            -4|     C|
|     5715448|            -5|     C|
|     5715448|            -6|     C|
|     5715448|            -7|     C|
|     5715448|            -8|     C|
|     5715448|            -9|     0|
|     5715448|           -10|     0|
|     5715448|           -11|     X|
|     5715448|           -12|     X|
|     5715448|           -13|     X|
|     5715448|           -14|     0|
|     5715448|           -15|     0|
|     5715448|           -16|     0|
|     5715448|           -17|     0|
|     5715448|           -18|     0|
|     5715448|           -19|     0|
+------------+--------------+------+
only showing top 20 rows



## Sumarização na visão cliente

## Criação de flags para nos auxiliar na visão temporal dos dados

Nesta etapa, foi habilitado o uso do SparkSQL para permitir consultas SQL diretamente sobre o DataFrame dados. O comando createOrReplaceTempView("dados") cria uma visão temporária, permitindo que o DataFrame seja tratado como uma tabela SQL dentro do Spark. Em seguida, é executada uma consulta que adiciona novas colunas com base na variável MONTHS_BALANCE, criando indicadores que identificam se cada registro pertence aos últimos 3, 6, 12, 24 ou 36 meses. Cada coluna funciona como uma flag temporal, recebendo valor 1 quando o registro está dentro do período e 0 caso contrário. Essa estrutura facilita a análise de comportamento ao longo do tempo, permitindo observar padrões recentes de crédito. Por fim, o resultado é ordenado pela coluna SK_ID_BUREAU e salvo como uma nova visão temporária (df_temp_01) para uso nas próximas etapas de agregação e consolidação.

In [None]:
## Habilitando uso do SparkSQL
dados.createOrReplaceTempView("dados")

df_temp_01 = spark.sql("""
SELECT
    *,
      CASE
        WHEN MONTHS_BALANCE >= -3 THEN 1
        ELSE 0
    END AS ultimos_3_meses,
    CASE
        WHEN MONTHS_BALANCE >= -6 THEN 1
        ELSE 0
    END AS ultimos_6_meses,
    CASE
        WHEN MONTHS_BALANCE >= -12 THEN 1
        ELSE 0
    END AS ultimos_12_meses,
    CASE
        WHEN MONTHS_BALANCE >= -24 THEN 1
        ELSE 0
    END AS ultimos_24_meses,
    CASE
        WHEN MONTHS_BALANCE >= -36 THEN 1
        ELSE 0
    END AS ultimos_36_meses
FROM dados
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_01.createOrReplaceTempView("df_temp_01")
df_temp_01.show()

+------------+--------------+------+---------------+---------------+----------------+----------------+----------------+
|SK_ID_BUREAU|MONTHS_BALANCE|STATUS|ultimos_3_meses|ultimos_6_meses|ultimos_12_meses|ultimos_24_meses|ultimos_36_meses|
+------------+--------------+------+---------------+---------------+----------------+----------------+----------------+
|     5001709|           -18|     C|              0|              0|               0|               1|               1|
|     5001709|           -10|     C|              0|              0|               1|               1|               1|
|     5001709|           -17|     C|              0|              0|               0|               1|               1|
|     5001709|            -5|     C|              0|              1|               1|               1|               1|
|     5001709|            -9|     C|              0|              0|               1|               1|               1|
|     5001709|           -13|     C|    

Nesta etapa, foi criada uma nova visão temporária a partir do DataFrame anterior para transformar a variável STATUS em colunas numéricas (dummies). Cada valor distinto de STATUS foi convertido em uma coluna separada — como STATUS_C, STATUS_3, STATUS_0, STATUS_2, STATUS_X, STATUS_5 e STATUS_1 — que recebe o valor 1 quando o registro pertence a essa categoria e 0 caso contrário.

Essa transformação facilita o uso dos dados em análises estatísticas e modelagem, pois converte uma variável categórica em formato numérico. Por fim, o resultado é ordenado pela coluna SK_ID_BUREAU e salvo novamente como uma visão temporária (df_temp_01), deixando os dados prontos para as próximas etapas de agregação e consolidação.

In [None]:
## Habilitando uso do SparkSQL
df_temp_01.createOrReplaceTempView("df_temp_01")

df_temp_02 = spark.sql("""
SELECT
    *,
      CASE
        WHEN STATUS = "C" THEN 1
        ELSE 0
    END AS STATUS_C,
    CASE
        WHEN STATUS = "3" THEN 1
        ELSE 0
    END AS STATUS_3,
    CASE
        WHEN STATUS = "0" THEN 1
        ELSE 0
    END AS STATUS_0,
    CASE
        WHEN STATUS = "2" THEN 1
        ELSE 0
    END AS STATUS_2,
    CASE
        WHEN STATUS = "X" THEN 1
        ELSE 0
    END AS STATUS_X,
    CASE
        WHEN STATUS = "5" THEN 1
        ELSE 0
    END AS STATUS_5,
    CASE
        WHEN STATUS = "1" THEN 1
        ELSE 0
    END AS STATUS_1
FROM df_temp_01
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_02.createOrReplaceTempView("df_temp_01")
df_temp_02.show()

+------------+--------------+------+---------------+---------------+----------------+----------------+----------------+--------+--------+--------+--------+--------+--------+--------+
|SK_ID_BUREAU|MONTHS_BALANCE|STATUS|ultimos_3_meses|ultimos_6_meses|ultimos_12_meses|ultimos_24_meses|ultimos_36_meses|STATUS_C|STATUS_3|STATUS_0|STATUS_2|STATUS_X|STATUS_5|STATUS_1|
+------------+--------------+------+---------------+---------------+----------------+----------------+----------------+--------+--------+--------+--------+--------+--------+--------+
|     5001709|           -18|     C|              0|              0|               0|               1|               1|       1|       0|       0|       0|       0|       0|       0|
|     5001709|           -10|     C|              0|              0|               1|               1|               1|       1|       0|       0|       0|       0|       0|       0|
|     5001709|           -17|     C|              0|              0|               0|

## Sumarizar na visão cliente (Automatizada)

Esse código realiza a agregação dos dados de status por bureau, contabilizando quantas vezes cada tipo de status ocorreu em diferentes períodos de tempo.

Primeiro, são definidas as colunas de status (STATUS_C, STATUS_3, STATUS_0, etc.) e as colunas de tempo (ultimos_3_meses, ultimos_6_meses, ultimos_12_meses, ultimos_24_meses, ultimos_36_meses). Em seguida, o código percorre cada combinação dessas colunas e calcula, com a função count(when(...)), quantas ocorrências de cada status existem dentro de cada intervalo de tempo.

Essas contagens são agrupadas por SK_ID_BUREAU, o identificador do registro no bureau, e o resultado é armazenado em df_temp_03. Assim, o novo DataFrame apresenta, para cada registro de crédito, o número de ocorrências de cada tipo de status em diferentes janelas temporais — uma informação essencial para avaliar o histórico e o comportamento de pagamento ao longo do tempo.

In [None]:
from pyspark.sql.functions import col, round, sum, avg, max, min, when, count

# Definir as colunas para agregação
colunas_agregacao_total = ['STATUS_C','STATUS_3','STATUS_0','STATUS_2','STATUS_X','STATUS_5','STATUS_1']

colunas_flags = ['ultimos_3_meses','ultimos_6_meses','ultimos_12_meses','ultimos_24_meses','ultimos_36_meses']

expressoes_agregacao = []

for flag in colunas_flags:
  for coluna in colunas_agregacao_total:
    expressoes_agregacao.append(round(count(when(col(flag) == 1, col(coluna))), 2).alias(f"QTD_{coluna.upper()}_{flag.upper()}"))

expressoes_agregacao = tuple(expressoes_agregacao)

# Aplicar as expressões de agregação
df_temp_03 = df_temp_02.groupBy("SK_ID_BUREAU").agg(*expressoes_agregacao).orderBy("SK_ID_BUREAU")

# Mostrar o DataFrame resultante
df_temp_03.show()

+------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+----------

## Salvar a tabela sumarizada

Esse trecho salva o resultado das agregações em um arquivo CSV chamado bureau_balance_agg.csv. O comando repartition(1) garante que o Spark gere apenas um único arquivo de saída, em vez de vários (um por partição, como é padrão). Em seguida, o método write.mode("overwrite").csv(..., header=True) grava o arquivo, sobrescrevendo versões anteriores e incluindo o cabeçalho das colunas. Essa etapa finaliza o processamento do conjunto bureau balance, exportando o resumo consolidado de status por período de tempo, pronto para ser utilizado em análises ou integrado com outras bases de crédito.

In [None]:
df_temp_03 = df_temp_03.repartition(1)
df_temp_03.write.mode("overwrite").csv("bureau_balance_agg.csv",header=True)