Neste projeto, utilizo Spark para realizar um ETL no dataset Home Credit Default Risk. O objetivo aqui é fazer transformações de dados e, assim, criar novas variáveis e enriquecer o dataset original, conferindo maior valor preditivo ao modelo estatístico.


Este dataset é composto por diferentes tabelas, mas neste projeto, trabalhei exclusivamente com a tabela "Bureau", que traz dados sobre empréstimos obtidos e pagamentos feitos pelos clientes. Além disso, a tabela "columns_description" foi utilizada para a obtenção de mais informaçãoes sobre cada coluna de "bureau".

A aplicação do Spark foi rodada utilizando o serviço EMR da AWS, e os arquivos de dados foram armazenados e acessados no S3.

### Criando a sessão do Spark com SparkSession

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("aplicação no Pyspark para dataset Homecredit risk") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
5,application_1685041529512_0006,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
spark

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fe2af76ed50>

### Acessando a tabela de interesse (Bureau)

In [3]:
df_bureau = spark.read.csv('s3://datasets-projects/bureau.csv', header = True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df_bureau.show(truncate = False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|CREDIT_TYPE    |DAYS_CREDIT_UPDATE|AMT_ANNUITY|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|215354    |5714462     |Closed       |currency 1     |-497       |0                 |-153.0             |-153.0         

### Entendendo o dataset bureau

In [5]:
df_bureau.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- SK_ID_CURR: string (nullable = true)
 |-- SK_ID_BUREAU: string (nullable = true)
 |-- CREDIT_ACTIVE: string (nullable = true)
 |-- CREDIT_CURRENCY: string (nullable = true)
 |-- DAYS_CREDIT: string (nullable = true)
 |-- CREDIT_DAY_OVERDUE: string (nullable = true)
 |-- DAYS_CREDIT_ENDDATE: string (nullable = true)
 |-- DAYS_ENDDATE_FACT: string (nullable = true)
 |-- AMT_CREDIT_MAX_OVERDUE: string (nullable = true)
 |-- CNT_CREDIT_PROLONG: string (nullable = true)
 |-- AMT_CREDIT_SUM: string (nullable = true)
 |-- AMT_CREDIT_SUM_DEBT: string (nullable = true)
 |-- AMT_CREDIT_SUM_LIMIT: string (nullable = true)
 |-- AMT_CREDIT_SUM_OVERDUE: string (nullable = true)
 |-- CREDIT_TYPE: string (nullable = true)
 |-- DAYS_CREDIT_UPDATE: string (nullable = true)
 |-- AMT_ANNUITY: string (nullable = true)

In [6]:
num_colunas_df_bureau = len(df_bureau.columns)

num_colunas_df_bureau

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

17

In [7]:
df_bureau.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1716428

Podemos ver que a tabela tem 17 colunas e 1.716.428 colunas.

### Criando views temporárias

Criação de uma view temporária para começar a trabalhar com os dados de bureau.

In [9]:
df_bureau.createOrReplaceTempView('bureau')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
from pyspark.sql.functions import count

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Aqui vamos ver os dados distintos na coluna CREDIT_ACTIVE e as suas contagens
# Vamos repertir esse passo para diversas outras colunas
spark.sql('''
SELECT
    CREDIT_ACTIVE,
    COUNT(*)
FROM df_bureau
GROUP BY CREDIT_ACTIVE
''').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+--------+
|CREDIT_ACTIVE|count(1)|
+-------------+--------+
|     Bad debt|      21|
|       Active|  630607|
|       Closed| 1079273|
|         Sold|    6527|
+-------------+--------+

Agora vamos transformar a coluna CREDIT_ACTIVE, que armazena dados do tipo de empréstimo tomado pelos clientes

Utilizando o comando CASE WHEN, vamos desmembrar esta coluna em 4 novas colunas (número de diferentes variáveis da coluna original e que serão mantidas). A nova coluna que corresponder ao tipo de credit_active do registro receberá o valor 1, enquanto as outras novas colunas receberão o valor 0. 
Como cada indivíduo (SK_ID_CURR) pode ter diversas ocorrências de CREDIT_ACTIVE em seu histórico, os dados foram agrupados por ID e somados.

Neste sentido, podemos também verificar quantos empréstimos cada cliente tomou.

In [12]:
spark.sql('''
SELECT
    SK_ID_CURR,
    SUM(CASE WHEN CREDIT_ACTIVE = 'Closed' THEN 1 ELSE 0 END) AS qtd_credit_active_closed,
    SUM(CASE WHEN CREDIT_ACTIVE = 'Active' THEN 1 ELSE 0 END) AS qtd_credit_active_active,
    SUM(CASE WHEN CREDIT_ACTIVE = 'Bad debt' THEN 1 ELSE 0 END) AS qtd_credit_active_bad_debt,
    SUM(CASE WHEN CREDIT_ACTIVE = 'Sold' THEN 1 ELSE 0 END) AS qtd_credit_active_sold
FROM df_bureau
GROUP BY SK_ID_CURR
''').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+------------------------+------------------------+--------------------------+----------------------+
|SK_ID_CURR|qtd_credit_active_closed|qtd_credit_active_active|qtd_credit_active_bad_debt|qtd_credit_active_sold|
+----------+------------------------+------------------------+--------------------------+----------------------+
|    146433|                       5|                       1|                         0|                     0|
|    424856|                       7|                       1|                         0|                     0|
|    100320|                       3|                       3|                         0|                     0|
|    411129|                       3|                       2|                         0|                     0|
|    242097|                       1|                       0|                         0|                     0|
|    366716|                       4|                       2|                         0|       

In [13]:
# Verificando os dados distintos da coluna SK_ID_BUREAU, ou seja, verificando quantos empréstimos cada cliente tomou.
spark.sql('''
SELECT
    SK_ID_CURR,
    COUNT(SK_ID_BUREAU)
FROM df_bureau
GROUP BY SK_ID_CURR
''').show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+-------------------+
|SK_ID_CURR|count(SK_ID_BUREAU)|
+----------+-------------------+
|    146433|                  6|
|    424856|                  8|
|    100320|                  6|
|    411129|                  5|
|    242097|                  1|
|    366716|                  6|
|    222457|                  3|
|    306859|                  1|
|    272074|                  3|
|    332661|                 15|
+----------+-------------------+
only showing top 10 rows