#Introdução

Este notebook é dedicado à feature engineering da tabela POSH CASH. Neste contexto, estarei explorando e transformando os dados da tabela para otimizar o desempenho dos modelos de machine learning que serão aplicados posteriormente. Feature engineering é uma etapa crucial no processo de preparação de dados, onde buscamos extrair informações relevantes e criar novas variáveis (ou features) que possam melhor capturar os padrões subjacentes aos nossos dados.

No contexto específico do nosso negócio de concessão de crédito, a feature engineering desempenha um papel fundamental. A capacidade de prever com precisão a probabilidade de inadimplência ou de pagamento pontual é crucial para a saúde financeira da instituição. Nesse sentido, a feature engineering permite não apenas melhorar a eficácia dos modelos de machine learning em realizar essas previsões, mas também oferece uma oportunidade de compreender melhor o comportamento dos clientes e os fatores que influenciam sua capacidade de pagamento.

O objetivo principal da feature engineering é, portanto, melhorar a capacidade dos modelos de machine learning em aprender com os dados, aumentando assim sua eficácia na realização de previsões ou classificações. Isso é alcançado através da seleção, transformação e criação de features que possam fornecer insights mais significativos e representativos para o problema em questão. Ao final deste processo, que será feito em todas as tabelas, esperamos obter um conjunto de dados mais refinado e adequado para alimentar nossos modelos de machine learning, resultando em predições mais precisas e confiáveis, o que é essencial para a tomada de decisões assertivas no âmbito da concessão de crédito.

##Configuração do Ambiente para Utilização do Spark
Configuração do ambiente e ferramentas essenciais que foram instaladas para o início do projeto.

Instalação do Apache Spark: O Apache Spark, uma poderosa ferramenta para processamento distribuído de dados em grande escala, foi instalado no ambiente de desenvolvimento. Esta instalação permitirá a utilização de todas as funcionalidades oferecidas pelo Spark para análise e manipulação de dados.

Configuração do Ambiente Python: Para interagir de forma eficiente com o Spark utilizando a linguagem Python, foram realizadas configurações específicas do ambiente Python. Isso inclui a instalação do pacote PySpark, uma biblioteca Python que fornece uma API para interagir com o Spark.

Criação da Sessão Spark: Uma sessão Spark foi criada utilizando a classe SparkSession. Essa sessão é fundamental para estabelecer uma conexão com o ambiente Spark e executar operações de processamento de dados distribuídas. Através dessa sessão, será possível acessar todos os recursos e funcionalidades do Spark para realizar análises e manipulações nos dados do projeto.

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Fazendo download
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

# Descompactando os arquivos
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

# Importando a biblioteca os
import os

# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"


# instalando a findspark
!pip install -q findspark

# Importando a findspark
import findspark

# Iniciando o findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("Minha Primeira Aplicação no Pyspark") \
        .getOrCreate()

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

## Leitura do Dataset em Pyspark

In [2]:
df_POS_CASH = spark.read.csv ('/content/drive/MyDrive/Colab Notebooks/AnaliseCredito/POS_CASH_balance.csv', header=True, inferSchema=True)
df_POS_CASH.createOrReplaceTempView("df_POS_CASH")

In [3]:
df_POS_CASH.printSchema()

root
 |-- SK_ID_PREV: integer (nullable = true)
 |-- SK_ID_CURR: integer (nullable = true)
 |-- MONTHS_BALANCE: integer (nullable = true)
 |-- CNT_INSTALMENT: double (nullable = true)
 |-- CNT_INSTALMENT_FUTURE: double (nullable = true)
 |-- NAME_CONTRACT_STATUS: string (nullable = true)
 |-- SK_DPD: integer (nullable = true)
 |-- SK_DPD_DEF: integer (nullable = true)



In [4]:
df_POS_CASH.show()

+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+
|SK_ID_PREV|SK_ID_CURR|MONTHS_BALANCE|CNT_INSTALMENT|CNT_INSTALMENT_FUTURE|NAME_CONTRACT_STATUS|SK_DPD|SK_DPD_DEF|
+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+
|   1803195|    182943|           -31|          48.0|                 45.0|              Active|     0|         0|
|   1715348|    367990|           -33|          36.0|                 35.0|              Active|     0|         0|
|   1784872|    397406|           -32|          12.0|                  9.0|              Active|     0|         0|
|   1903291|    269225|           -35|          48.0|                 42.0|              Active|     0|         0|
|   2341044|    334279|           -35|          36.0|                 35.0|              Active|     0|         0|
|   2207092|    342166|           -32|          12.0|                 12.0|     

##Criação de Flags para Visão Temporal dos Dados

Criação de flags para representar o comportamento do cliente ao longo de diferentes períodos de tempo em relação aos seus pagamentos.

In [5]:
df_flag_01 = spark.sql("""
select *,
  case
    when MONTHS_BALANCE >= -3 then 1
    else 0
    end as ULT_3_MESES,
  case
    when MONTHS_BALANCE >= -6 then 1
    else 0
    end as ULT_6_MESES,
  case
    when MONTHS_BALANCE >= -9 then 1
    else 0
    end as ULT_9_MESES,
  case
    when MONTHS_BALANCE >= -12 then 1
    else 0
    end as ULT_12_MESES,
  case
    when MONTHS_BALANCE >= -18 then 1
    else 0
    end as ULT_18_MESES
from
    df_POS_CASH
order by
    SK_ID_PREV
""")
df_flag_01.show()
df_flag_01.createOrReplaceTempView("df_flag_01")

+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+-----------+-----------+-----------+------------+------------+
|SK_ID_PREV|SK_ID_CURR|MONTHS_BALANCE|CNT_INSTALMENT|CNT_INSTALMENT_FUTURE|NAME_CONTRACT_STATUS|SK_DPD|SK_DPD_DEF|ULT_3_MESES|ULT_6_MESES|ULT_9_MESES|ULT_12_MESES|ULT_18_MESES|
+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+-----------+-----------+-----------+------------+------------+
|   1000001|    158271|            -8|           2.0|                  0.0|           Completed|     0|         0|          0|          0|          1|           1|           1|
|   1000004|    260094|           -26|          10.0|                  7.0|              Active|     0|         0|          0|          0|          0|           0|           0|
|   1000005|    176456|           -49|          10.0|                  3.0|              Active|     0|         0| 

##Criação de Features Agregadas por Período de Tempo

Agregando informações relacionadas aos pagamentos dos clientes ao longo de diferentes períodos de tempo.

In [6]:
df_temp_01 = spark.sql("""
SELECT
    SK_ID_PREV,
    AVG(CASE WHEN ULT_3_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MED_CNT_INSTALMENT_ULT_3_MS,
    MIN(CASE WHEN ULT_3_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MIN_CNT_INSTALMENT_ULT_3_MS,
    MAX(CASE WHEN ULT_3_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MAX_CNT_INSTALMENT_ULT_3_MS,
    SUM(CASE WHEN ULT_3_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_SUM_CNT_INSTALMENT_ULT_3_MS,

    AVG(CASE WHEN ULT_6_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MED_CNT_INSTALMENT_ULT_6_MS,
    MIN(CASE WHEN ULT_6_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MIN_CNT_INSTALMENT_ULT_6_MS,
    MAX(CASE WHEN ULT_6_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MAX_CNT_INSTALMENT_ULT_6_MS,
    SUM(CASE WHEN ULT_6_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_SUM_CNT_INSTALMENT_ULT_6_MS,

    AVG(CASE WHEN ULT_9_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MED_CNT_INSTALMENT_ULT_9_MS,
    MIN(CASE WHEN ULT_9_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MIN_CNT_INSTALMENT_ULT_9_MS,
    MAX(CASE WHEN ULT_9_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MAX_CNT_INSTALMENT_ULT_9_MS,
    SUM(CASE WHEN ULT_9_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_SUM_CNT_INSTALMENT_ULT_9_MS,

    AVG(CASE WHEN ULT_12_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MED_CNT_INSTALMENT_ULT_12_MS,
    MIN(CASE WHEN ULT_12_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MIN_CNT_INSTALMENT_ULT_12_MS,
    MAX(CASE WHEN ULT_12_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MAX_CNT_INSTALMENT_ULT_12_MS,
    SUM(CASE WHEN ULT_12_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_SUM_CNT_INSTALMENT_ULT_12_MS,

    AVG(CASE WHEN ULT_18_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MED_CNT_INSTALMENT_ULT_18_MS,
    MIN(CASE WHEN ULT_18_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MIN_CNT_INSTALMENT_ULT_18_MS,
    MAX(CASE WHEN ULT_18_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_MAX_CNT_INSTALMENT_ULT_18_MS,
    SUM(CASE WHEN ULT_18_MESES = 1 THEN CNT_INSTALMENT ELSE NULL END) AS VL_SUM_CNT_INSTALMENT_ULT_18_MS
FROM
    df_flag_01
GROUP BY
    SK_ID_PREV
ORDER BY
    SK_ID_PREV;
""")
df_temp_01.show()
df_temp_01.createOrReplaceTempView("df_temp_01")

+----------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+
|SK_ID_PREV|VL_MED_CNT_INSTALMENT_ULT_3_MS|VL_MIN_CNT_INSTALMENT_ULT_3_MS|VL_MAX_CNT_INSTALMENT_ULT_3_MS|VL_SUM_CNT_INSTALMENT_ULT_3_MS|VL_MED_CNT_INSTALMENT_ULT_6_MS|VL_MIN_CNT_INSTALMENT_ULT_6_MS|VL_MAX_CNT_INSTALMENT_ULT_6_MS|VL_SUM_CNT_INSTALMENT_ULT_6_MS|VL_MED_CNT_INSTALMENT_ULT_9_MS|VL_MIN_CNT_INSTALMENT_ULT_9_MS|VL_MAX_CNT_INSTALMENT_ULT_9_MS|VL_SUM

##Criação de Features Agregadas para Parcelas Futuras

Agregando informações relacionadas aos pagamentos dos clientes, focando especificamente nas parcelas futuras.

In [7]:
df_temp_02 = spark.sql("""
   SELECT
    SK_ID_PREV,
    AVG(CASE WHEN ULT_3_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MED_CNT_INSTALMENT_FUTURE_ULT_3_MS,
    MIN(CASE WHEN ULT_3_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MIN_CNT_INSTALMENT_FUTURE_ULT_3_MS,
    MAX(CASE WHEN ULT_3_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MAX_CNT_INSTALMENT_FUTURE_ULT_3_MS,
    SUM(CASE WHEN ULT_3_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_SUM_CNT_INSTALMENT_FUTURE_ULT_3_MS,

    AVG(CASE WHEN ULT_6_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MED_CNT_INSTALMENT_FUTURE_ULT_6_MS,
    MIN(CASE WHEN ULT_6_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MIN_CNT_INSTALMENT_FUTURE_ULT_6_MS,
    MAX(CASE WHEN ULT_6_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MAX_CNT_INSTALMENT_FUTURE_ULT_6_MS,
    SUM(CASE WHEN ULT_6_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_SUM_CNT_INSTALMENT_FUTURE_ULT_6_MS,

    AVG(CASE WHEN ULT_9_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MED_CNT_INSTALMENT_FUTURE_ULT_9_MS,
    MIN(CASE WHEN ULT_9_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MIN_CNT_INSTALMENT_FUTURE_ULT_9_MS,
    MAX(CASE WHEN ULT_9_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MAX_CNT_INSTALMENT_FUTURE_ULT_9_MS,
    SUM(CASE WHEN ULT_9_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_SUM_CNT_INSTALMENT_FUTURE_ULT_9_MS,

    AVG(CASE WHEN ULT_12_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MED_CNT_INSTALMENT_FUTURE_ULT_12_MS,
    MIN(CASE WHEN ULT_12_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MIN_CNT_INSTALMENT_FUTURE_ULT_12_MS,
    MAX(CASE WHEN ULT_12_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MAX_CNT_INSTALMENT_FUTURE_ULT_12_MS,
    SUM(CASE WHEN ULT_12_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_SUM_CNT_INSTALMENT_FUTURE_ULT_12_MS,

    AVG(CASE WHEN ULT_18_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MED_CNT_INSTALMENT_FUTURE_ULT_18_MS,
    MIN(CASE WHEN ULT_18_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MIN_CNT_INSTALMENT_FUTURE_ULT_18_MS,
    MAX(CASE WHEN ULT_18_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_MAX_CNT_INSTALMENT_FUTURE_ULT_18_MS,
    SUM(CASE WHEN ULT_18_MESES = 1 THEN CNT_INSTALMENT_FUTURE ELSE NULL END) AS VL_SUM_CNT_INSTALMENT_FUTURE_ULT_18_MS
FROM
    df_flag_01
GROUP BY
    SK_ID_PREV
ORDER BY
    SK_ID_PREV;
""")
df_temp_02.show()
df_temp_02.createOrReplaceTempView("df_temp_02")

+----------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+
|SK_ID_PREV|VL_MED_CNT_INSTALMENT_FUTURE_ULT_3_MS|VL_MIN_CNT_INSTALMENT_FUTURE_ULT_3_MS|VL_MAX_CNT_INSTALMENT_FUTURE_ULT_3_MS|VL_SUM_CNT_INSTALMENT_FUTURE_ULT_3_MS|VL_MED_CNT_INSTALMENT_FUTURE_ULT_6_MS|VL_MIN_CNT_INSTAL

In [8]:
df_temp_03 = spark.sql("""
   SELECT
    SK_ID_PREV,
    AVG(CASE WHEN ULT_3_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MED_SK_DPD_ULT_3_MS,
    MIN(CASE WHEN ULT_3_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MIN_SK_DPD_ULT_3_MS,
    MAX(CASE WHEN ULT_3_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MAX_SK_DPD_ULT_3_MS,
    SUM(CASE WHEN ULT_3_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_SUM_SK_DPD_ULT_3_MS,

    AVG(CASE WHEN ULT_6_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MED_SK_DPD_ULT_6_MS,
    MIN(CASE WHEN ULT_6_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MIN_SK_DPD_ULT_6_MS,
    MAX(CASE WHEN ULT_6_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MAX_SK_DPD_ULT_6_MS,
    SUM(CASE WHEN ULT_6_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_SUM_SK_DPD_ULT_6_MS,

    AVG(CASE WHEN ULT_9_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MED_SK_DPD_ULT_9_MS,
    MIN(CASE WHEN ULT_9_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MIN_SK_DPD_ULT_9_MS,
    MAX(CASE WHEN ULT_9_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MAX_SK_DPD_ULT_9_MS,
    SUM(CASE WHEN ULT_9_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_SUM_SK_DPD_ULT_9_MS,

    AVG(CASE WHEN ULT_12_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MED_SK_DPD_ULT_12_MS,
    MIN(CASE WHEN ULT_12_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MIN_SK_DPD_ULT_12_MS,
    MAX(CASE WHEN ULT_12_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MAX_SK_DPD_ULT_12_MS,
    SUM(CASE WHEN ULT_12_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_SUM_SK_DPD_ULT_12_MS,

    AVG(CASE WHEN ULT_18_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MED_SK_DPD_ULT_18_MS,
    MIN(CASE WHEN ULT_18_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MIN_SK_DPD_ULT_18_MS,
    MAX(CASE WHEN ULT_18_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_MAX_SK_DPD_ULT_18_MS,
    SUM(CASE WHEN ULT_18_MESES = 1 THEN SK_DPD ELSE NULL END) AS VL_SUM_SK_DPD_ULT_18_MS
FROM
    df_flag_01
GROUP BY
    SK_ID_PREV
ORDER BY
    SK_ID_PREV;
""")
df_temp_03.show()
df_temp_03.createOrReplaceTempView("df_temp_03")

+----------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|SK_ID_PREV|VL_MED_SK_DPD_ULT_3_MS|VL_MIN_SK_DPD_ULT_3_MS|VL_MAX_SK_DPD_ULT_3_MS|VL_SUM_SK_DPD_ULT_3_MS|VL_MED_SK_DPD_ULT_6_MS|VL_MIN_SK_DPD_ULT_6_MS|VL_MAX_SK_DPD_ULT_6_MS|VL_SUM_SK_DPD_ULT_6_MS|VL_MED_SK_DPD_ULT_9_MS|VL_MIN_SK_DPD_ULT_9_MS|VL_MAX_SK_DPD_ULT_9_MS|VL_SUM_SK_DPD_ULT_9_MS|VL_MED_SK_DPD_ULT_12_MS|VL_MIN_SK_DPD_ULT_12_MS|VL_MAX_SK_DPD_ULT_12_MS|VL_SUM_SK_DPD_ULT_12_MS|VL_MED_SK_DPD_ULT_18_MS|VL_MIN_SK_DPD_ULT_18_MS|VL_MAX_SK_DPD_ULT_18_MS|VL_SUM_SK_DPD_ULT_18_MS|
+----------+----------------------+---

##Criação de Features Agregadas para Atrasos nos Pagamentos

Features agregadas relacionadas aos atrasos nos pagamentos dos clientes ao longo de diferentes períodos de tempo.

In [9]:
df_temp_04 = spark.sql("""
   SELECT
    SK_ID_PREV,
    AVG(CASE WHEN ULT_3_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MED_SK_DPD_DEF_ULT_3_MS,
    MIN(CASE WHEN ULT_3_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MIN_SK_DPD_DEF_ULT_3_MS,
    MAX(CASE WHEN ULT_3_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MAX_SK_DPD_DEF_ULT_3_MS,
    SUM(CASE WHEN ULT_3_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_SUM_SK_DPD_DEF_ULT_3_MS,

    AVG(CASE WHEN ULT_6_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MED_SK_DPD_DEF_ULT_6_MS,
    MIN(CASE WHEN ULT_6_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MIN_SK_DPD_DEF_ULT_6_MS,
    MAX(CASE WHEN ULT_6_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MAX_SK_DPD_DEF_ULT_6_MS,
    SUM(CASE WHEN ULT_6_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_SUM_SK_DPD_DEF_ULT_6_MS,

    AVG(CASE WHEN ULT_9_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MED_SK_DPD_DEF_ULT_9_MS,
    MIN(CASE WHEN ULT_9_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MIN_SK_DPD_DEF_ULT_9_MS,
    MAX(CASE WHEN ULT_9_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MAX_SK_DPD_DEF_ULT_9_MS,
    SUM(CASE WHEN ULT_9_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_SUM_SK_DPD_DEF_ULT_9_MS,

    AVG(CASE WHEN ULT_12_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MED_SK_DPD_DEF_ULT_12_MS,
    MIN(CASE WHEN ULT_12_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MIN_SK_DPD_DEF_ULT_12_MS,
    MAX(CASE WHEN ULT_12_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MAX_SK_DPD_DEF_ULT_12_MS,
    SUM(CASE WHEN ULT_12_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_SUM_SK_DPD_DEF_ULT_12_MS,

    AVG(CASE WHEN ULT_18_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MED_SK_DPD_DEF_ULT_18_MS,
    MIN(CASE WHEN ULT_18_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MIN_SK_DPD_DEF_ULT_18_MS,
    MAX(CASE WHEN ULT_18_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_MAX_SK_DPD_DEF_ULT_18_MS,
    SUM(CASE WHEN ULT_18_MESES = 1 THEN SK_DPD_DEF ELSE NULL END) AS VL_SUM_SK_DPD_DEF_ULT_18_MS
FROM
    df_flag_01
GROUP BY
    SK_ID_PREV
ORDER BY
    SK_ID_PREV;
""")
df_temp_04.show()
df_temp_04.createOrReplaceTempView("df_temp_04")

+----------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|SK_ID_PREV|VL_MED_SK_DPD_DEF_ULT_3_MS|VL_MIN_SK_DPD_DEF_ULT_3_MS|VL_MAX_SK_DPD_DEF_ULT_3_MS|VL_SUM_SK_DPD_DEF_ULT_3_MS|VL_MED_SK_DPD_DEF_ULT_6_MS|VL_MIN_SK_DPD_DEF_ULT_6_MS|VL_MAX_SK_DPD_DEF_ULT_6_MS|VL_SUM_SK_DPD_DEF_ULT_6_MS|VL_MED_SK_DPD_DEF_ULT_9_MS|VL_MIN_SK_DPD_DEF_ULT_9_MS|VL_MAX_SK_DPD_DEF_ULT_9_MS|VL_SUM_SK_DPD_DEF_ULT_9_MS|VL_MED_SK_DPD_DEF_ULT_12_MS|VL_MIN_SK_DPD_DEF_ULT_12_MS|VL_MAX_SK_DPD_DEF_ULT_12_MS|VL_SUM_SK_DPD_DEF_U

##Realização de Joins entre DataFrames

O DataFrame resultante dessa operação de junção contém todas as informações agregadas e combinadas sobre o histórico de pagamentos dos clientes.

In [10]:
# Realizar o join entre df_flag_01 e df_temp_01
df_posh_cash_final = df_flag_01.join(df_temp_01, on='SK_ID_PREV', how='inner')

# Realizar os joins adicionais com as outras tabelas
df_posh_cash_final = df_posh_cash_final.join(df_temp_02, on='SK_ID_PREV', how='inner')
df_posh_cash_final = df_posh_cash_final.join(df_temp_03, on='SK_ID_PREV', how='inner')
df_posh_cash_final = df_posh_cash_final.join(df_temp_04, on='SK_ID_PREV', how='inner')


# Exibir o DataFrame resultante
df_posh_cash_final.show()


+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+-----------+-----------+-----------+------------+------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------

##Agregação de Variáveis na Visão Cliente

Realização de uma sumarização dos dados na perspectiva do cliente.

In [11]:
from pyspark.sql.functions import first

# Lista de todas as colunas, exceto a coluna SK_ID_PREV
colunas_para_agregar = [col for col in df_posh_cash_final.columns if col != "SK_ID_PREV"]

# Agregue as variáveis por SK_ID_PREV
df_posh_cash_final_01 = df_posh_cash_final.groupBy("SK_ID_PREV").agg(
    *[first(col).alias(col) for col in colunas_para_agregar]
)

# Mostre o resultado
df_posh_cash_final_01.show()


+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+-----------+-----------+-----------+------------+------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------

## Salvar a tabela Sumarizada

In [None]:
df_posh_cash_final_01 = df_posh_cash_final_01.repartition(1)
df_posh_cash_final_01.write.mode("overwrite").csv("posh_cash_agg.csv",header=True)