# Feature Engineering

**Bureau** - Dados de outras instituições financeiras

**Application Train** - Base de público para treino

**Application Test** - Base de público para escoragem

**Join** - entre as tabelas pós feature enginerring resultando nas tabelas finais de treino e teste

## Configuração do ambiente para utilização do Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Fazendo download
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

# Descompactando os arquivos
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

# Importando a biblioteca os
import os

# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"


# instalando a findspark
!pip install -q findspark

# Importando a findspark
import findspark

# Iniciando o findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("Minha Primeira Aplicação no Pyspark") \
        .getOrCreate()

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
# Mapeando o Google Drive, para possibilitar o acesso ao arquivo de leitura
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


## Leitura dos dados

 - Tabela bureau

In [None]:
# Lê o arquivo csv - Tabela bureau
dados = spark.read.csv("/content/gdrive/MyDrive/dados/bureau.csv", header=True)

# Mostra os dados
dados.show()

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|    CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|    215354|     5714462|       Closed|     currency 1|       -497|                 0|             -153.0|           -153

In [None]:
print(len(dados.columns))
print(dados.count())

17
1716428


## Criação de flags para nos auxiliar na visão temporal dos dados

In [None]:
## Habilitando uso do SparkSQL
dados.createOrReplaceTempView("dados")

df_temp_01 = spark.sql("""
SELECT
    *,
      CASE
        WHEN DAYS_CREDIT >= -90 THEN 1
        ELSE 0
    END AS ultimos_3_meses,
    CASE
        WHEN DAYS_CREDIT >= -180 THEN 1
        ELSE 0
    END AS ultimos_6_meses,
    CASE
        WHEN DAYS_CREDIT >= -360 THEN 1
        ELSE 0
    END AS ultimos_12_meses,
    CASE
        WHEN DAYS_CREDIT >= -720 THEN 1
        ELSE 0
    END AS ultimos_24_meses,
    CASE
        WHEN DAYS_CREDIT >= -1080 THEN 1
        ELSE 0
    END AS ultimos_36_meses
FROM dados
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_01.createOrReplaceTempView("df_temp_01")
df_temp_01.show()

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+--------------------+------------------+-----------+---------------+---------------+----------------+----------------+----------------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|         CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|ultimos_3_meses|ultimos_6_meses|ultimos_12_meses|ultimos_24_meses|ultimos_36_meses|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------

In [None]:
df_temp_01.createOrReplaceTempView("df_temp_01")

df_temp_02 = spark.sql("""
SELECT
    *,
      CASE
        WHEN CREDIT_ACTIVE = "Closed" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_CLOSED,
    CASE
        WHEN CREDIT_ACTIVE = "Active" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_ACTIVE,
    CASE
        WHEN CREDIT_ACTIVE = "Sold" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_SOLD,
    CASE
        WHEN CREDIT_ACTIVE = "Bad debt" THEN 1
        ELSE 0
    END AS CREDIT_ACTIVE_BAD_DEBT
FROM df_temp_01
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_02.createOrReplaceTempView("df_temp_01")
df_temp_02.show()

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+--------------------+------------------+-----------+---------------+---------------+----------------+----------------+----------------+--------------------+--------------------+------------------+----------------------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|         CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|ultimos_3_meses|ultimos_6_meses|ultimos_12_meses|ultimos_24_meses|ultimos_36_meses|CREDIT_ACTIVE_CLOSED|CREDIT_ACTIVE_ACTIVE|CREDIT_ACTIVE_SOLD|CREDIT_ACTIVE_BAD_DEBT|
+----------+------------+-------------+---------------+-----------+---

In [None]:
df_temp_02.createOrReplaceTempView("df_temp_02")

df_temp_03 = spark.sql("""
SELECT
    *,
      CASE
        WHEN CREDIT_CURRENCY = "currency 1" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_currency_1,
    CASE
        WHEN CREDIT_CURRENCY = "currency 2" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_currency_2,
    CASE
        WHEN CREDIT_CURRENCY = "currency 3" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_currency_3,
    CASE
        WHEN CREDIT_CURRENCY = "currency 4" THEN 1
        ELSE 0
    END AS CREDIT_CURRENCY_currency_4
FROM df_temp_02
ORDER BY `SK_ID_BUREAU`;
""")
df_temp_03.createOrReplaceTempView("df_temp_02")
df_temp_03.show()

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+--------------------+------------------+-----------+---------------+---------------+----------------+----------------+----------------+--------------------+--------------------+------------------+----------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|         CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|ultimos_3_meses|ultimos_6_meses|ultimos_12_meses|ultimos_24_meses|ultimos_36_meses|CREDIT_ACTIVE_CLOSED|CREDIT_ACTIVE_ACTIVE|CREDI

## Sumarização na visão cliente (Automatizada)

In [None]:
from pyspark.sql.functions import col, round, sum, avg, max, min, when

# Definir as colunas para agregação
colunas_agregacao_total = ['CREDIT_DAY_OVERDUE','DAYS_CREDIT_ENDDATE','DAYS_ENDDATE_FACT','AMT_CREDIT_MAX_OVERDUE',
                           'CNT_CREDIT_PROLONG','AMT_CREDIT_SUM','AMT_CREDIT_SUM_DEBT',
                           'AMT_CREDIT_SUM_LIMIT','AMT_CREDIT_SUM_OVERDUE','DAYS_CREDIT_UPDATE','AMT_ANNUITY']

colunas_flags = ['ultimos_3_meses','ultimos_6_meses','ultimos_12_meses','ultimos_24_meses','ultimos_36_meses']
# colunas_cat1 = ['CREDIT_ACTIVE_CLOSED','CREDIT_ACTIVE_ACTIVE','CREDIT_ACTIVE_SOLD','CREDIT_ACTIVE_BAD_DEBT']
# colunas_cat2 = ['CREDIT_CURRENCY_currency_1','CREDIT_CURRENCY_currency_2','CREDIT_CURRENCY_currency_3','CREDIT_CURRENCY_currency_4']

expressoes_agregacao = []

for flag in colunas_flags:
    for coluna in colunas_agregacao_total:
      if 'DAY' in coluna:
        expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f"QT_MAX_{coluna.upper()}_{flag.upper()}"))
        expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f"QT_MIN_{coluna.upper()}_{flag.upper()}"))
      else:
        expressoes_agregacao.append(round(sum(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_TOT_{coluna.upper()}_{flag.upper()}"))
        expressoes_agregacao.append(round(avg(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_MED_{coluna.upper()}_{flag.upper()}"))
        expressoes_agregacao.append(round(max(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_MAX_{coluna.upper()}_{flag.upper()}"))
        expressoes_agregacao.append(round(min(when(col(flag) == 1, col(coluna))), 2).alias(f"VL_MIN_{coluna.upper()}_{flag.upper()}"))

expressoes_agregacao = tuple(expressoes_agregacao)

# Aplicar as expressões de agregação
df_temp_04 = df_temp_03.groupBy("SK_ID_BUREAU").agg(*expressoes_agregacao).orderBy("SK_ID_BUREAU")

# Mostrar o DataFrame resultante
df_temp_04.show()

+------------+-----------------------------------------+-----------------------------------------+------------------------------------------+------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+-------------------------------------------+--------------

## Join da bureau final com a bureau balance agg pelo SK_ID_BUREAU

In [None]:
# Lê a tabela bureau_balance_agg
bureau_balance = spark.read.csv("/content/gdrive/MyDrive/PoD Academy/11 - Hackaton/dados/case_credito/bureau_balance_agg.csv",header=True)

In [None]:
# Join das tabelas bureau final com a bureau balance agg
df_temp_05 = df_temp_04.join(bureau_balance, on='SK_ID_BUREAU', how='left')

# Mostrar o resultado do join
df_temp_05.show()

+------------+-----------------------------------------+-----------------------------------------+------------------------------------------+------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+-------------------------------------------+--------------

## Join deste resultado a base bureau

In [None]:
df_temp_06 = df_temp_03.select("SK_ID_BUREAU", "SK_ID_CURR").join(df_temp_05, on='SK_ID_BUREAU', how='left')

# Mostrar o resultado do join
df_temp_06.show()

+------------+----------+-----------------------------------------+-----------------------------------------+------------------------------------------+------------------------------------------+----------------------------------------+----------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-----------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+-------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+-------------------------------------------+---

## Sumarização base resultante na visão cliente (SK_ID_CURR)

In [None]:
# Definir as colunas para agregação
colunas_agregacao_total = df_temp_06.columns
colunas_agregacao_total.remove('SK_ID_CURR')
colunas_agregacao_total.remove('SK_ID_BUREAU')

expressoes_agregacao = []

for coluna in colunas_agregacao_total:
  if 'DAY' in coluna:
    expressoes_agregacao.append(round(max(col(coluna)), 2).alias(f"QT_MAX_{coluna.upper()}"))
    expressoes_agregacao.append(round(min(col(coluna)), 2).alias(f"QT_MIN_{coluna.upper()}"))
  else:
    expressoes_agregacao.append(round(sum(col(coluna)), 2).alias(f"VL_TOT_{coluna.upper()}"))
    expressoes_agregacao.append(round(avg(col(coluna)), 2).alias(f"VL_MED_{coluna.upper()}"))
    expressoes_agregacao.append(round(max(col(coluna)), 2).alias(f"VL_MAX_{coluna.upper()}"))
    expressoes_agregacao.append(round(min(col(coluna)), 2).alias(f"VL_MIN_{coluna.upper()}"))

expressoes_agregacao = tuple(expressoes_agregacao)

# Aplicar as expressões de agregação
df_temp_07 = df_temp_06.groupBy("SK_ID_CURR").agg(*expressoes_agregacao).orderBy("SK_ID_CURR")

# Mostrar o DataFrame resultante
df_temp_07.show()

+----------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+-----------------------------

In [None]:
print(len(df_temp_07.columns))
print(df_temp_07.count())

781
305811


## Salvando a tabela sumarizada em parquet

In [None]:
# df_temp_07.write.mode("overwrite").parquet("/content/gdrive/MyDrive/dados/case_credito/bureau_agg_parquet.parquet")

# Construindo as tabelas finais de treino e teste
- Fazendo o Join com as tabelas de application de treino e teste
- Baixando o dataset bureau_agg

In [None]:
df_bureau = spark.read.parquet("/content/gdrive/MyDrive/dados/case_credito/bureau_agg_parquet.parquet")
print(len(df_bureau.columns))
print(df_bureau.count())

In [None]:
df_bureau.createOrReplaceTempView('bureau')

spark.sql("""
SELECT
*
FROM bureau
LIMIT 10
""").show()

+----------+------------------------------------------------+------------------------------------------------+------------------------------------------------+------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-------------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+-----------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+----------------------------------------------------+-----------------------------

In [None]:
# Função que acrescenta um sufixo no nome das variáveis para saber de onde as variáveis vem
# para as tabelas bureau e bureau balance será adotado o sufixo _externo, pois são dados externos

from pyspark.sql.functions import col

def add_suffix_to_columns(df, suffix="_externo"):
    """
    Adiciona um sufixo aos nomes das colunas de um DataFrame PySpark.

    :param df: DataFrame do PySpark
    :param suffix: Sufixo a ser adicionado aos nomes das colunas
    :return: DataFrame com os novos nomes de colunas
    """
    new_columns = [col(c).alias(f"{c}{suffix}") for c in df.columns]
    return df.select(*new_columns)

In [None]:
# Acrescentando o sufixo _externo nos nomes das variáveis

df_bureau_modificado = add_suffix_to_columns(df_bureau)

print(len(df_bureau_modificado.columns))
print(df_bureau_modificado.count())

781
305811


In [None]:
# Modificando o nome da chave primária para o sufixo _publico, pois ela se refere ao público

df_bureau_modificado = df_bureau_modificado.withColumnRenamed('SK_ID_CURR_externo', 'SK_ID_CURR_publico')
print("feito")

feito


In [None]:
df_bureau_modificado.createOrReplaceTempView('df_bureau_modificado')

spark.sql("""
SELECT
*
FROM df_bureau_modificado
LIMIT 10
""").show()

+------------------+--------------------------------------------------------+--------------------------------------------------------+--------------------------------------------------------+--------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+-------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+------------------------------------------------------------+----------------------------------------------------

## Tabela Application

- Trazendo as colunas de Treino e Teste


In [None]:
#Dados de Treino

df_app_train = spark.read.parquet('/content/gdrive/MyDrive/PoD Academy/11 - Hackaton/dados/bases_finais/base_treino_final_modelagem.parquet')

In [None]:
print(len(df_app_train.columns))
print(df_app_train.count())

172
184350


In [None]:
df_app_train.createOrReplaceTempView('df_app_train')

spark.sql("""
SELECT
*
FROM df_app_train
LIMIT 10
""").show()

+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+--------------------+--------------------+--------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+--------------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+--------------------+------------------+------------------+------------------+--------------+----------------+---------------------------+---------------+--------------+-------------+-------------+-------------+-------------+------------+--------------------+--------------+-------------

In [None]:
# Acrescentando o sufixo _publico nos nomes das variáveis

df_app_train_modificado = add_suffix_to_columns(df_app_train, "_publico")

print(len(df_app_train_modificado.columns))
print(df_app_train_modificado.count())

172
184350


In [None]:
df_app_train_modificado.createOrReplaceTempView('df_app_train_modificado')

spark.sql("""
SELECT
*
FROM df_app_train_modificado
LIMIT 10
""").show()

+------------------+--------------+--------------------------+-------------------+--------------------+-----------------------+--------------------+------------------------+------------------+-------------------+-----------------------+-----------------------+------------------------+---------------------------+--------------------------+-------------------------+----------------------------------+------------------+---------------------+-------------------------+-----------------------+-------------------+------------------+----------------------+-----------------------+------------------------+------------------+------------------+-----------------------+-----------------------+----------------------------+-----------------------------------+----------------------------------+-------------------------------+----------------------------------+----------------------------------+-----------------------------------+------------------------------+------------------------------+-----------

In [None]:
# Dados de Teste

df_app_test = spark.read.parquet('/content/gdrive/MyDrive/PoD Academy/11 - Hackaton/dados/bases_finais/base_teste_final_modelagem.parquet')

In [None]:
print(len(df_app_test.columns))
print(df_app_test.count())

171
79141


In [None]:
df_app_test.createOrReplaceTempView('df_app_test')

spark.sql("""
SELECT
*
FROM df_app_test
LIMIT 10
""").show()

+----------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+--------------------+--------------------+--------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+--------------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+--------------------+------------------+------------------+------------------+--------------+----------------+---------------------------+---------------+--------------+-------------+-------------+-------------+-------------+------------+--------------------+--------------+--------------------

In [None]:
# Acrescentando o sufixo _publico nos nomes das variáveis

df_app_test_modificado = add_suffix_to_columns(df_app_test, "_publico")

print(len(df_app_test_modificado.columns))
print(df_app_test_modificado.count())

171
79141


In [None]:
df_app_test_modificado.createOrReplaceTempView('df_app_test_modificado')

spark.sql("""
SELECT
*
FROM df_app_test_modificado
LIMIT 10
""").show()

+------------------+--------------------------+-------------------+--------------------+-----------------------+--------------------+------------------------+------------------+-------------------+-----------------------+-----------------------+------------------------+---------------------------+--------------------------+-------------------------+----------------------------------+------------------+---------------------+-------------------------+-----------------------+-------------------+------------------+----------------------+-----------------------+------------------------+------------------+------------------+-----------------------+-----------------------+----------------------------+-----------------------------------+----------------------------------+-------------------------------+----------------------------------+----------------------------------+-----------------------------------+------------------------------+------------------------------+--------------------------

## Criando novas variáveis com regras de negócio para o treino e teste

- Idade:
 - AGE_YEARS = -DAYS_BIRTH / 365

- Porcentagens:
 - DAYS_EMPLOYED_PERC = DAYS_EMPLOYED / DAYS_BIRTH
 - INCOME_CREDIT_PERC = AMT_INCOME_TOTAL / AMT_CREDIT
 - INCOME_PER_PERSON = AMT_INCOME_TOTAL / CNT_FAM_MEMBERS
 - ANNUITY_INCOME_PERC = AMT_ANNUITY / AMT_INCOME_TOTAL
 - PAYMENT_RATE = AMT_ANNUITY / AMT_CREDIT

- Índice de Crédito
 - CREDIT_TO_GOODS_RATIO = AMT_CREDIT / AMT_GOODS_PRICE
    
- Índices de Receita
 - INCOME_TO_EMPLOYED_RATIO = AMT_INCOME_TOTAL / DAYS_EMPLOYED
 - INCOME_TO_BIRTH_RATIO = AMT_INCOME_TOTAL / DAYS_BIRTH
    
- Razões de tempo
 - ID_TO_BIRTH_RATIO = DAYS_ID_PUBLISH / DAYS_BIRTH
 - CAR_TO_BIRTH_RATIO = OWN_CAR_AGE / DAYS_BIRTH
 - CAR_TO_EMPLOYED_RATIO = OWN_CAR_AGE / DAYS_EMPLOYED
 - PHONE_TO_BIRTH_RATIO = DAYS_LAST_PHONE_CHANGE / DAYS_BIRTH


In [None]:
# Treino
df_reg_app_train = spark.sql("""
SELECT
SK_ID_CURR,
ROUND((-DAYS_BIRTH / 365), 1) AS AGE_YEARS,
ROUND((DAYS_EMPLOYED / DAYS_BIRTH), 2) AS DAYS_EMPLOYED_PERC,
ROUND((AMT_INCOME_TOTAL / AMT_CREDIT), 2) AS INCOME_CREDIT_PERC,
ROUND((AMT_INCOME_TOTAL / CNT_FAM_MEMBERS), 2) AS INCOME_PER_PERSON,
ROUND((AMT_ANNUITY / AMT_INCOME_TOTAL), 2) AS ANNUITY_INCOME_PERC,
ROUND((AMT_ANNUITY / AMT_CREDIT), 2) AS PAYMENT_RATE,
ROUND((AMT_CREDIT / AMT_GOODS_PRICE), 2) AS CREDIT_TO_GOODS_RATIO,
ROUND((AMT_INCOME_TOTAL / DAYS_EMPLOYED), 2) AS INCOME_TO_EMPLOYED_RATIO,
ROUND((AMT_INCOME_TOTAL / DAYS_BIRTH), 2) AS INCOME_TO_BIRTH_RATIO,
ROUND((DAYS_ID_PUBLISH / DAYS_BIRTH), 2) AS ID_TO_BIRTH_RATIO,
ROUND((OWN_CAR_AGE / DAYS_BIRTH), 2) AS CAR_TO_BIRTH_RATIO,
ROUND((OWN_CAR_AGE / DAYS_EMPLOYED), 2) AS CAR_TO_EMPLOYED_RATIO,
ROUND((DAYS_LAST_PHONE_CHANGE / DAYS_BIRTH), 2) AS PHONE_TO_BIRTH_RATIO
FROM
  df_app_train
""")

print(len(df_reg_app_train.columns))
print(df_reg_app_train.count())

In [None]:
# Acrescentando o sufixo _publico nos nomes das variáveis

df_reg_app_train_modificado = add_suffix_to_columns(df_reg_app_train, "_publico")

print(len(df_reg_app_train_modificado.columns))
print(df_reg_app_train_modificado.count())

14
184350


In [None]:

df_reg_app_train_modificado = df_reg_app_train_modificado.withColumnRenamed('SK_ID_CURR_publico', 'SK_ID_CURR')
print("feito")

feito


In [None]:
df_reg_app_train_modificado.createOrReplaceTempView('df_reg_app_train_modificado')

spark.sql("""
SELECT
*
FROM df_reg_app_train_modificado
LIMIT 10
""").show()

+----------+-----------------+--------------------------+--------------------------+-------------------------+---------------------------+--------------------+-----------------------------+--------------------------------+-----------------------------+-------------------------+--------------------------+-----------------------------+----------------------------+
|SK_ID_CURR|AGE_YEARS_publico|DAYS_EMPLOYED_PERC_publico|INCOME_CREDIT_PERC_publico|INCOME_PER_PERSON_publico|ANNUITY_INCOME_PERC_publico|PAYMENT_RATE_publico|CREDIT_TO_GOODS_RATIO_publico|INCOME_TO_EMPLOYED_RATIO_publico|INCOME_TO_BIRTH_RATIO_publico|ID_TO_BIRTH_RATIO_publico|CAR_TO_BIRTH_RATIO_publico|CAR_TO_EMPLOYED_RATIO_publico|PHONE_TO_BIRTH_RATIO_publico|
+----------+-----------------+--------------------------+--------------------------+-------------------------+---------------------------+--------------------+-----------------------------+--------------------------------+-----------------------------+------------------

In [None]:
# Teste
df_reg_app_test = spark.sql("""
SELECT
SK_ID_CURR,
ROUND((-DAYS_BIRTH / 365), 1) AS AGE_YEARS,
ROUND((DAYS_EMPLOYED / DAYS_BIRTH), 2) AS DAYS_EMPLOYED_PERC,
ROUND((AMT_INCOME_TOTAL / AMT_CREDIT), 2) AS INCOME_CREDIT_PERC,
ROUND((AMT_INCOME_TOTAL / CNT_FAM_MEMBERS), 2) AS INCOME_PER_PERSON,
ROUND((AMT_ANNUITY / AMT_INCOME_TOTAL), 2) AS ANNUITY_INCOME_PERC,
ROUND((AMT_ANNUITY / AMT_CREDIT), 2) AS PAYMENT_RATE,
ROUND((AMT_CREDIT / AMT_GOODS_PRICE), 2) AS CREDIT_TO_GOODS_RATIO,
ROUND((AMT_INCOME_TOTAL / DAYS_EMPLOYED), 2) AS INCOME_TO_EMPLOYED_RATIO,
ROUND((AMT_INCOME_TOTAL / DAYS_BIRTH), 2) AS INCOME_TO_BIRTH_RATIO,
ROUND((DAYS_ID_PUBLISH / DAYS_BIRTH), 2) AS ID_TO_BIRTH_RATIO,
ROUND((OWN_CAR_AGE / DAYS_BIRTH), 2) AS CAR_TO_BIRTH_RATIO,
ROUND((OWN_CAR_AGE / DAYS_EMPLOYED), 2) AS CAR_TO_EMPLOYED_RATIO,
ROUND((DAYS_LAST_PHONE_CHANGE / DAYS_BIRTH), 2) AS PHONE_TO_BIRTH_RATIO
FROM
  df_app_test
""")

print(len(df_reg_app_test.columns))
print(df_reg_app_test.count())

14
79141


In [None]:
# Acrescentando o sufixo _publico nos nomes das variáveis

df_reg_app_test_modificado = add_suffix_to_columns(df_reg_app_test, "_publico")

print(len(df_reg_app_test_modificado.columns))
print(df_reg_app_test_modificado.count())

14
79141


In [None]:
df_reg_app_test_modificado = df_reg_app_test_modificado.withColumnRenamed('SK_ID_CURR_publico', 'SK_ID_CURR')
print("feito")

feito


In [None]:
df_reg_app_test_modificado.createOrReplaceTempView('df_reg_app_test_modificado')

spark.sql("""
SELECT
*
FROM df_reg_app_test_modificado
LIMIT 10
""").show()

+----------+-----------------+--------------------------+--------------------------+-------------------------+---------------------------+--------------------+-----------------------------+--------------------------------+-----------------------------+-------------------------+--------------------------+-----------------------------+----------------------------+
|SK_ID_CURR|AGE_YEARS_publico|DAYS_EMPLOYED_PERC_publico|INCOME_CREDIT_PERC_publico|INCOME_PER_PERSON_publico|ANNUITY_INCOME_PERC_publico|PAYMENT_RATE_publico|CREDIT_TO_GOODS_RATIO_publico|INCOME_TO_EMPLOYED_RATIO_publico|INCOME_TO_BIRTH_RATIO_publico|ID_TO_BIRTH_RATIO_publico|CAR_TO_BIRTH_RATIO_publico|CAR_TO_EMPLOYED_RATIO_publico|PHONE_TO_BIRTH_RATIO_publico|
+----------+-----------------+--------------------------+--------------------------+-------------------------+---------------------------+--------------------+-----------------------------+--------------------------------+-----------------------------+------------------

- Juntando os datafames de treino e teste com bureau

In [None]:
## Junção de Treino e Bureau

df_app_treino_final = df_app_train_modificado.join(df_reg_app_train_modificado, "SK_ID_CURR")\
               .join(df_bureau_modificado, "SK_ID_CURR")

In [None]:
print(len(df_app_treino_final.columns))
print(df_app_treino_final.count())

965
184350


In [None]:
df_app_treino_final = df_app_treino_final.withColumnRenamed('SK_ID_CURR', 'SK_ID_CURR_publico')
print("feito")

- Salvando a base de Treino final

In [None]:
df_app_treino_final.write.mode("overwrite").parquet("/content/gdrive/MyDrive/PoD Academy/11 - Hackaton/dados/bases_finais/base_treino_final.parquet")

In [None]:
## Junção de Teste e Bureau

df_app_teste_final = df_app_test_modificado.join(df_reg_app_test_modificado, "SK_ID_CURR")\
               .join(df_bureau_modificado, "SK_ID_CURR")

In [None]:
print(len(df_app_teste_final.columns))
print(df_app_teste_final.count())

964
79141


- Salvando a base de Teste final

In [None]:
df_app_teste_final.write.mode("overwrite").parquet("/content/gdrive/MyDrive/dados/bases_finais/base_teste_final.parquet")