## Configuração do ambiente para utilização do Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Fazendo download
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

# Descompactando os arquivos
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

# Importando a biblioteca os
import os

# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"


# instalando a findspark
!pip install -q findspark

# Importando a findspark
import findspark

# Iniciando o findspark
findspark.init()

from pyspark.sql import SparkSession

# Criando a sessão do Spark com configuração de memória aumentada
spark = SparkSession.builder \
    .appName("Feature Engineering") \
    .config("spark.executor.memory", "40g") \
    .config("spark.driver.memory", "40g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.cores", "4") \
    .getOrCreate()

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

## Leitura dos dados

In [None]:
# Lendo o arquivo csv
dados = spark.read.csv("/content/drive/MyDrive/Estudos/Ciencia de Dados/PoD Bank/previous_application.csv",header=True)

## Criação de flags para nos auxiliar na visão temporal dos dados

In [None]:
## Habilitando uso do SparkSQL
dados.createOrReplaceTempView("dados")

df_temp_01 = spark.sql("""
SELECT
    *,
      CASE
        WHEN DAYS_DECISION >= -90 THEN 1
        ELSE 0
    END AS ultimos_3_meses,
    CASE
        WHEN DAYS_DECISION >= -180 THEN 1
        ELSE 0
    END AS ultimos_6_meses,
    CASE
        WHEN DAYS_DECISION >= -360 THEN 1
        ELSE 0
    END AS ultimos_12_meses,
    CASE
        WHEN DAYS_DECISION >= -540 THEN 1
        ELSE 0
    END AS ultimos_18_meses,
    CASE
        WHEN DAYS_DECISION >= -720 THEN 1
        ELSE 0
    END AS ultimos_24_meses,
    CASE
        WHEN DAYS_DECISION >= -900 THEN 1
        ELSE 0
    END AS ultimos_30_meses,
    CASE
        WHEN DAYS_DECISION >= -1080 THEN 1
        ELSE 0
    END AS ultimos_36_meses
FROM dados
ORDER BY `SK_ID_PREV`;
""")
df_temp_01.createOrReplaceTempView("df_temp_01")

## Sumarizar na visão cliente (Automatizada)

In [None]:
from pyspark.sql.functions import col, round, sum, avg, max, min, when, collect_list, first

# Definir as colunas para agregação
colunas_agregacao_total = df_temp_01.columns

variaveis_remover = ['SK_ID_CURR','SK_ID_PREV','DAYS_DECISION','NAME_CONTRACT_STATUS','WEEKDAY_APPR_PROCESS_START','NAME_CASH_LOAN_PURPOSE','NAME_CONTRACT_STATUS','DAYS_DECISION',
                     'NAME_PAYMENT_TYPE','CODE_REJECT_REASON','NAME_TYPE_SUITE','NAME_CLIENT_TYPE','NAME_GOODS_CATEGORY','NAME_PORTFOLIO','NAME_PRODUCT_TYPE','CHANNEL_TYPE',
                     'SELLERPLACE_AREA','NAME_SELLER_INDUSTRY','CNT_PAYMENT','NAME_YIELD_GROUP','PRODUCT_COMBINATION']

colunas_agregacao_total = [col for col in colunas_agregacao_total if col not in variaveis_remover]

# Função para gerar expressões de agregação com flags
def gerar_expressoes_agregacao_com_flags(colunas_agregacao_total, colunas_flags):
    expressoes_agregacao = []

    for flag in colunas_flags:
        for coluna in colunas_agregacao_total:
            if 'DAY' in coluna:
                expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f"QT_MAX_{coluna.upper()}_{flag.upper()}"))
                expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f"QT_MIN_{coluna.upper()}_{flag.upper()}"))
            else:
                expressoes_agregacao.append(round(sum(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_TOT_{coluna.upper()}_{flag.upper()}"))
                expressoes_agregacao.append(round(avg(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_MED_{coluna.upper()}_{flag.upper()}"))
                expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_MAX_{coluna.upper()}_{flag.upper()}"))
                expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_MIN_{coluna.upper()}_{flag.upper()}"))

    return expressoes_agregacao

colunas_flags = ['ultimos_3_meses','ultimos_6_meses', 'ultimos_12_meses', 'ultimos_18_meses',  'ultimos_24_meses',  'ultimos_30_meses',
'ultimos_36_meses']
# Gerar expressões de agregação para flags
expressoes_agregacao_flags = gerar_expressoes_agregacao_com_flags(colunas_agregacao_total, colunas_flags)

# Função de agregação para SK_ID_PREV
expressoes_agregacao_flags.append(first('SK_ID_PREV').alias('SK_ID_PREV'))

# Aplicar as expressões de agregação
df_temp_02 = df_temp_01.groupBy("SK_ID_CURR").agg(*expressoes_agregacao_flags).orderBy("SK_ID_CURR")


## Join com bases (Pos Cash / Credit Card Balance / Installments Payments) contendo apenas as variáveis selecionadas

In [None]:
pos_cash = spark.read.parquet("PoD Bank/Tabelas - Feature_Engineering/pos_cash_agg.parquet")
credit_card_balance = spark.read.parquet("PoD Bank/Tabelas - Feature_Engineering/credit_card_balance_agg.parquet")
installments = spark.read.parquet("PoD Bank/Tabelas - Feature_Engineering/installments_agg.parquet")

In [None]:
# Definir uma função para adicionar sufixos às colunas
def add_table_name_suffix(df, table_name):
    # Renomear todas as colunas, exceto 'SK_ID_PREV', com o sufixo do nome da tabela
    for col_name in df.columns:
        if col_name != 'SK_ID_PREV':
            df = df.withColumnRenamed(col_name, f"{col_name}_{table_name}")
    return df

# Aplicar a função para adicionar sufixos
pos_cash = add_table_name_suffix(pos_cash, "pos_cash")
credit_card_balance = add_table_name_suffix(credit_card_balance, "credit_card_balance")
installments = add_table_name_suffix(installments, "installments")


In [None]:
df_temp_03 = df_temp_02.join(pos_cash,"SK_ID_PREV",how='left').join(credit_card_balance,"SK_ID_PREV",how='left').join(installments,"SK_ID_PREV",how='left')


In [None]:
df_temp_03.show(10)

+----------+----------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+----------------------------------+----------------------------------+----------------------------------+----------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------------+---------------------------------------+---------------------------------------+---------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+--------------------------------------+----------------------------------------------+--------------

In [None]:
df_temp_03 = df_temp_03.repartition(1)
df_temp_03.write.mode("overwrite").parquet("PoD Bank/Tabelas - Feature_Engineering/previous_application_agg.parquet")

In [None]:
import os
os._exit(00)
