## Introdução

Aqui irei realizar a etapa de Feature Engineering na base de dados `bureau`.

Esta etapa está dividida em kernels diferentes devido o consumo de memória para carregar e manipular as bases.

Os procedimentos realizados nas outras bases podem ser encontrados nos arquivos `02_feat_eng_<nome-da-base>.ipynb`

O objetivo desta etapa consiste, principalmente, em criar variáveis (`book de variáveis`). Ao criar novas variáveis com base nas variáveis existentes, é possível capturar informações adicionais que podem não estar explicitamente presentes nos dados originais.

Devido o volume de dados, optei por utilizar o PySpark em conjunto com o SparkSQL para as operações a seguir.

* Sobre os Dados

A base `bureau` possui dados de crédito de outras instituições financeiras.

Segundo os Metadados disponibilizados, essas são as informações contidas aqui:

``SK_BUREAU_ID``: ID recodificado do crédito do Bureau de Crédito (codificação única para cada aplicação) - Será usado para trazer os dados da tabela `bureau balance`.

``SK_ID_CURR``: ID do empréstimo em nossa amostra - um empréstimo em nossa amostra pode ter 0, 1, 2 ou mais créditos anteriores relacionados no bureau de crédito.

`CREDIT_ACTIVE`: Status dos créditos reportados pelo Bureau de Crédito (CB).

`CREDIT_CURRENCY`: Moeda recodificada do crédito do Bureau de Crédito.

`DAYS_CREDIT`: Quantos dias antes da aplicação atual o cliente solicitou crédito ao Bureau de Crédito.

`CREDIT_DAY_OVERDUE`: Número de dias em atraso no crédito do CB no momento da aplicação para o empréstimo relacionado em nossa amostra.

`DAYS_CREDIT_ENDDATE`: Duração restante do crédito do CB (em dias) no momento da aplicação no Home Credit.

`DAYS_ENDDATE_FACT`: Dias desde que o crédito do CB foi encerrado no momento da aplicação no Home Credit (apenas para créditos encerrados).

`AMT_CREDIT_MAX_OVERDUE`: Valor máximo em atraso no crédito do Bureau de Crédito até o momento (na data de aplicação do empréstimo em nossa amostra).

`CNT_CREDIT_PROLONG`: Quantas vezes o crédito do Bureau de Crédito foi prolongado.

`AMT_CREDIT_SUM`: Valor atual do crédito para o crédito do Bureau de Crédito.

`AMT_CREDIT_SUM_DEBT`: Dívida atual no crédito do Bureau de Crédito.

`AMT_CREDIT_SUM_LIMIT`: Limite de crédito atual do cartão de crédito relatado no Bureau de Crédito.

`AMT_CREDIT_SUM_OVERDUE`: Valor atual em atraso no crédito do Bureau de Crédito.

`CREDIT_TYPE`: Tipo de crédito do Bureau de Crédito (Carro, dinheiro, ...).

`DAYS_CREDIT_UPDATE`: Quantos dias antes da aplicação do empréstimo foi recebida a última informação sobre o crédito do Bureau de Crédito.

`AMT_ANNUITY`: Anuidade do crédito do Bureau de Crédito.

## Utils

* Importando as bibliotecas que irei utilizar

In [1]:
import os
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import when,min, max, sum, round, col, median, count
spark = SparkSession.builder \
    .appName("FeatureEng") \
    .config("spark.executor.memory", "14g") \
    .config("spark.driver.memory", "14g") \
    .getOrCreate()
from warnings import filterwarnings
filterwarnings('ignore')

## Feature Engineering - Bureau Balance

In [2]:
bureau = spark.read.csv('./DATASETS/bureau.csv', inferSchema= True, header= True)
bureau.count()

1716428

In [3]:
bureau.show(n=10)

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|    CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|    215354|     5714462|       Closed|     currency 1|       -497|                 0|             -153.0|           -153

* Checando a quantidade de linhas da tabela final (para validação posterior)

In [3]:
bureau.groupBy("SK_ID_BUREAU").count().count()

1716428

* Verificando o Schema

In [5]:
bureau.printSchema()

root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- SK_ID_BUREAU: integer (nullable = true)
 |-- CREDIT_ACTIVE: string (nullable = true)
 |-- CREDIT_CURRENCY: string (nullable = true)
 |-- DAYS_CREDIT: integer (nullable = true)
 |-- CREDIT_DAY_OVERDUE: integer (nullable = true)
 |-- DAYS_CREDIT_ENDDATE: double (nullable = true)
 |-- DAYS_ENDDATE_FACT: double (nullable = true)
 |-- AMT_CREDIT_MAX_OVERDUE: double (nullable = true)
 |-- CNT_CREDIT_PROLONG: integer (nullable = true)
 |-- AMT_CREDIT_SUM: double (nullable = true)
 |-- AMT_CREDIT_SUM_DEBT: double (nullable = true)
 |-- AMT_CREDIT_SUM_LIMIT: double (nullable = true)
 |-- AMT_CREDIT_SUM_OVERDUE: double (nullable = true)
 |-- CREDIT_TYPE: string (nullable = true)
 |-- DAYS_CREDIT_UPDATE: integer (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)



### 2. Criação de Flags Temporais

In [4]:
# Criando uma View
bureau.createOrReplaceTempView('bureau')

temp01 = spark.sql("""
SELECT
    *,
        CASE
            WHEN DAYS_CREDIT >= -90 THEN 1
        ELSE 0
    END AS FL_U3M,
        CASE
            WHEN DAYS_CREDIT >= -180 THEN 1
        ELSE 0
    END AS FL_U6M,
        CASE
            WHEN DAYS_CREDIT >= -270 THEN 1
        ELSE 0
    END AS FL_U9M,
        CASE
            WHEN DAYS_CREDIT >= -360 THEN 1
        ELSE 0
    END AS FL_U12M
FROM
    bureau
ORDER BY
    `SK_ID_BUREAU`;
""")

temp01.createOrReplaceTempView('temp01')
temp01.count()

1716428

In [6]:
temp01.show(n=5)

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+------+------+------+-------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|    CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|FL_U3M|FL_U6M|FL_U9M|FL_U12M|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+------+------+------+-------+
|    166497|     5000000|       Cl

### 3. Criação das Flags das Variáveis Categóricas

In [8]:
# # Lista para armazenar as colunas criadas
# flags = []

# # Pegando todas as colunas categoricas da tabela
# cat_cols = [cat_col[0] for cat_col in temp01.dtypes if cat_col[1] == 'string']

# # Gerando as colunas Flag
# for cat_col in cat_cols:
#     unique_vals = [col[0] for col in temp01.select(cat_col).distinct().collect()]

#     for unique_val in unique_vals:
#         flags.append(when(col(cat_col) == unique_val, 1).otherwise(0).alias(f'FL_CAT_{cat_col}_{unique_val.upper().replace(" ", "_")}'))

* Realizando o Unpacking e Criando a Tabela

In [9]:
# temp02 = temp01.select("*", *flags)

In [5]:
# Aqui irei filtrar todas as colunas categoricas para remove-las da agregação posteriormente.
# Essa decisão foi tomada por questões computacionais.
cat_cols = [cat_col[0] for cat_col in temp01.dtypes if cat_col[1] == 'string']

temp02 = temp01

### 4. Criação das Variáveis (Agrupadas)

* Filtrando as colunas que serão agregadas

In [6]:
# Selecionando as variáveis que serão agregadas (exceto Flags e IDs)
agg_cols = [col for col in temp02.columns if ("FL_" not in col) & ("SK_ID" not in col)]

# Removendo a Coluna de Janela Temporal
agg_cols.remove('DAYS_CREDIT')

# Removendo as colunas categóricas que tiveram flags criadas
for cat_col in cat_cols:
    agg_cols.remove(cat_col)

for col in [cat_col[0] for cat_col in temp01.dtypes if cat_col[1] == 'string']:
    try:
        agg_cols.remove(col)
    except:
        pass

In [7]:
flags_temporais = ['FL_U3M', 'FL_U6M', 'FL_U9M','FL_U12M']

flags_categoricas = [col for col in temp02.columns if 'FL_CAT_' in col]

#### 4.1 Usando Apenas Flags Temporais

In [10]:
new_cols = []

for flag_temp in flags_temporais:
    nome_flag_temp_corrigido = flag_temp.replace('FL_','')

    for agg_col in agg_cols:

        if 'DPD' in agg_col:
            new_cols.append(round(max(when(col(flag_temp) == 1, col(agg_col))),2).alias(f"QT_MAX_{agg_col}_{nome_flag_temp_corrigido}"))
            new_cols.append(round(min(when(col(flag_temp) == 1, col(agg_col))),2).alias(f"QT_MIN_{agg_col}_{nome_flag_temp_corrigido}"))
        else:
            new_cols.append(round(max(when(col(flag_temp) == 1, col(agg_col))),2).alias(f"VL_MAX_{agg_col}_{nome_flag_temp_corrigido}"))
            new_cols.append(round(min(when(col(flag_temp) == 1, col(agg_col))),2).alias(f"VL_MIN_{agg_col}_{nome_flag_temp_corrigido}"))
            new_cols.append(round(sum(when(col(flag_temp) == 1, col(agg_col))),2).alias(f"VL_SUM_{agg_col}_{nome_flag_temp_corrigido}"))
            new_cols.append(round(median(when(col(flag_temp) == 1, col(agg_col))),2).alias(f"VL_MD_{agg_col}_{nome_flag_temp_corrigido}"))


new_cols = tuple(new_cols)

temp03 = temp02.groupBy("SK_ID_BUREAU").agg(*new_cols).orderBy("SK_ID_BUREAU")

temp03.count()

1716428

In [11]:
print("Quantidade de Vars. Criadas:",len(temp03.columns))

Quantidade de Vars. Criadas: 177


In [17]:
# temp03 = temp03.repartition(1)
# temp03.write.mode("overwrite").option("compression", "gzip").parquet("./VARS/BUREAU/FL_TEMPORAL")

In [18]:
# temp03.show(n=10)

#### 4.2 Usando Apenas Flags Categóricas

In [19]:
# new_cols = []

# for cat_flag in flags_categoricas:
#     nome_flag_cat_corrigido = cat_flag.replace('FL_CAT_','')
#     for agg_col in agg_cols:
#         if 'DPD' in agg_col:
#             new_cols.append(round(max(when(col(cat_flag) == 1, col(agg_col))),2).alias(f"QT_MAX_{agg_col}_{nome_flag_cat_corrigido}"))
#             new_cols.append(round(min(when(col(cat_flag) == 1, col(agg_col))),2).alias(f"QT_MIN_{agg_col}_{nome_flag_cat_corrigido}"))
#         else:
#             new_cols.append(round(max(when(col(cat_flag) == 1, col(agg_col))),2).alias(f"VL_MAX_{agg_col}_{nome_flag_cat_corrigido}"))
#             new_cols.append(round(min(when(col(cat_flag) == 1, col(agg_col))),2).alias(f"VL_MIN_{agg_col}_{nome_flag_cat_corrigido}"))
#             new_cols.append(round(sum(when(col(cat_flag) == 1, col(agg_col))),2).alias(f"VL_SUM_{agg_col}_{nome_flag_cat_corrigido}"))
#             new_cols.append(round(median(when(col(cat_flag) == 1, col(agg_col))),2).alias(f"VL_MD_{agg_col}_{nome_flag_cat_corrigido}"))

# new_cols = tuple(new_cols)

# temp04 = temp02.groupBy("SK_ID_BUREAU").agg(*new_cols).orderBy("SK_ID_BUREAU")

# temp04.count()

1716428

In [20]:
# print("Quantidade de Vars. Criadas:",len(temp04.columns))

Quantidade de Vars. Criadas: 837


In [21]:
# temp04 = temp04.repartition(1)
# temp04.write.mode("overwrite").option("compression", "gzip").parquet("./VARS/BUREAU/FL_CATEGORICA")

In [22]:
# temp04.show(n=10)

### 5. Juntando as Tabelas

* Juntando as tabelas das agregações

In [23]:
# temp05 = temp04.join(temp03, on= "SK_ID_BUREAU", how = 'left')

In [24]:
# temp05.count(), len(temp05.columns)

(1716428, 1013)

* Carregando a Bureau Balance

In [12]:
bureau_balance_agg = spark.read.parquet('./BASES_FEAT_ENG/BUREAU_BALANCE_FEAT_ENG')
bureau_balance_agg.count(), len(bureau_balance_agg.columns)

(817395, 33)

* Juntando com a Bureau Balance

In [13]:
temp06 = temp03.join(bureau_balance_agg, on= "SK_ID_BUREAU", how = "left")

In [14]:
temp06.count(), len(temp06.columns)

(1716428, 209)

In [28]:
# temp06.show(n=5)

In [15]:
# Trazendo a Chave para conexão com as demais tabelas (SK_ID_CURR)
temp02 = temp02.select("SK_ID_BUREAU", "SK_ID_CURR")

In [16]:
temp07 = temp06.join(other= temp02, on= "SK_ID_BUREAU", how= "left")
temp07.count()

1716428

In [17]:
temp07.count(), len(temp07.columns)

(1716428, 210)

### 6. Agregações na Tabela Final

* Liberando Espaço na memória (Tabelas que não serão mais utilizadas)

In [18]:
agg_cols = temp07.columns

agg_cols.remove("SK_ID_CURR")
agg_cols.remove("SK_ID_BUREAU")

new_cols = []

for agg_col in agg_cols:
  if 'DAY' in agg_col:
    new_cols.append(round(max(col(agg_col)), 2).alias(f"QT_MAX_{agg_col.upper()}_BUREAU"))
    new_cols.append(round(min(col(agg_col)), 2).alias(f"QT_MIN_{agg_col.upper()}_BUREAU"))
  else:
    new_cols.append(round(sum(col(agg_col)), 2).alias(f"VL_TOT_{agg_col.upper()}_BUREAU"))
    new_cols.append(round(median(col(agg_col)), 2).alias(f"VL_MED_{agg_col.upper()}_BUREAU"))
    new_cols.append(round(max(col(agg_col)), 2).alias(f"VL_MAX_{agg_col.upper()}_BUREAU"))
    new_cols.append(round(min(col(agg_col)), 2).alias(f"VL_MIN_{agg_col.upper()}_BUREAU"))


new_cols = tuple(new_cols)

In [19]:
bureau_agg = temp07.groupBy("SK_ID_CURR").agg(*new_cols).orderBy("SK_ID_CURR")
bureau_agg.count()

305811

In [20]:
bureau_agg = bureau_agg.repartition(1)
bureau_agg.write.mode('overwrite').parquet("./BASES_FEAT_ENG/BUREAU_FEAT_ENG")