In [None]:
m!pip install pyspark



#ETL Spark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("application_train.csv") \
    .getOrCreate()


In [None]:
df = spark.read.option("header", "true").csv("application_train.csv")
df.show(5)  # Mostra as primeiras linhas


+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+----------------+--------------------+--------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+---------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+--------------------+-------------------+------------------+-------------------+--------------+----------------+---------------------------+------------------+--------------+-------------+-------------+-------------+-------------+------------+--------------------+--------------+-----------------

###Selecionando as colunas para a predinção

In [None]:
colunas_utilizadas = [
    "SK_ID_CURR",
    "TARGET",
    "NAME_CONTRACT_TYPE",
    "CODE_GENDER",
    "FLAG_OWN_CAR",
    "FLAG_OWN_REALTY",
    "CNT_CHILDREN",
    "AMT_INCOME_TOTAL",
    "AMT_CREDIT",
    "AMT_ANNUITY",
    "AMT_GOODS_PRICE",
    "NAME_INCOME_TYPE",
    "NAME_EDUCATION_TYPE",
    "NAME_FAMILY_STATUS",
    "NAME_HOUSING_TYPE",
    "DAYS_BIRTH",
    "DAYS_EMPLOYED",
    "OCCUPATION_TYPE",
    "CNT_FAM_MEMBERS",
    "REGION_RATING_CLIENT",
    "EXT_SOURCE_1",
    "EXT_SOURCE_2",
    "EXT_SOURCE_3"
]
df_reduzido = df.select(colunas_utilizadas)
df_reduzido.show(10)


+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+---------------+---------------+--------------------+-------------------+-------------------+-------------------+
|SK_ID_CURR|TARGET|NAME_CONTRACT_TYPE|CODE_GENDER|FLAG_OWN_CAR|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|AMT_CREDIT|AMT_ANNUITY|AMT_GOODS_PRICE|    NAME_INCOME_TYPE| NAME_EDUCATION_TYPE|  NAME_FAMILY_STATUS|NAME_HOUSING_TYPE|DAYS_BIRTH|DAYS_EMPLOYED|OCCUPATION_TYPE|CNT_FAM_MEMBERS|REGION_RATING_CLIENT|       EXT_SOURCE_1|       EXT_SOURCE_2|       EXT_SOURCE_3|
+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+--------------------+--------------------+--------------------+-----------------+----------+-------------+----------

In [None]:
df.select("NAME_CONTRACT_TYPE").distinct().show()

+------------------+
|NAME_CONTRACT_TYPE|
+------------------+
|   Revolving loans|
|        Cash loans|
+------------------+



##substitui os valores ausentes (NaN) por essas médias:
 preenchendo os dados de forma que não se perca informação importante para os modelos.

In [None]:
from pyspark.sql.functions import col, isnan, when, count

df.select([
    count(when(col(c).isNull(), c)).alias(c + "_nulls")
    for c in ["EXT_SOURCE_1", "EXT_SOURCE_2", "EXT_SOURCE_3"]
]).show()

+------------------+------------------+------------------+
|EXT_SOURCE_1_nulls|EXT_SOURCE_2_nulls|EXT_SOURCE_3_nulls|
+------------------+------------------+------------------+
|            173378|               660|             60965|
+------------------+------------------+------------------+



In [None]:
from pyspark.sql.functions import mean

# Calcular a média de cada coluna
media_1 = df.select(mean("EXT_SOURCE_1")).first()[0]
media_2 = df.select(mean("EXT_SOURCE_2")).first()[0]
media_3 = df.select(mean("EXT_SOURCE_3")).first()[0]

# Mostrar os valores médios
print(f"Média EXT_SOURCE_1: {media_1}")
print(f"Média EXT_SOURCE_2: {media_2}")
print(f"Média EXT_SOURCE_3: {media_3}")

Média EXT_SOURCE_1: 0.5021298056566661
Média EXT_SOURCE_2: 0.5143926741308394
Média EXT_SOURCE_3: 0.5108529061799263


In [None]:
# Substituir os valores nulos pela média
df = df_reduzido.fillna({
    "EXT_SOURCE_1": media_1,
    "EXT_SOURCE_2": media_2,
    "EXT_SOURCE_3": media_3
})

In [None]:
# Dicionário com traduções das colunas para facilitar entendimento
colunas_traduzidas = {
    "SK_ID_CURR": "ID_do_Cliente",
    "TARGET": "Inadimplente",
    "NAME_CONTRACT_TYPE": "Tipo_de_Contrato",
    "CODE_GENDER": "Genero",
    "FLAG_OWN_CAR": "Possui_Carro",
    "FLAG_OWN_REALTY": "Possui_Imovel",
    "CNT_CHILDREN": "Numero_de_Filhos",
    "AMT_INCOME_TOTAL": "Renda_Total_Declarada",
    "AMT_CREDIT": "Valor_do_Credito",
    "AMT_ANNUITY": "Valor_da_Anuidade",
    "AMT_GOODS_PRICE": "Valor_dos_Bens",
    "NAME_INCOME_TYPE": "Tipo_de_Renda",
    "NAME_EDUCATION_TYPE": "Nivel_de_Escolaridade",
    "NAME_FAMILY_STATUS": "Estado_Civil",
    "NAME_HOUSING_TYPE": "Tipo_de_Moradia",
    "DAYS_BIRTH": "Dias_desde_Nascimento",
    "DAYS_EMPLOYED": "Dias_de_Emprego",
    "OCCUPATION_TYPE": "Ocupacao",
    "CNT_FAM_MEMBERS": "Numero_de_Membros_da_Familia",
    "REGION_RATING_CLIENT": "Avaliacao_Regiao_Cliente",
    "EXT_SOURCE_1": "Fonte_Externa_1",
    "EXT_SOURCE_2": "Fonte_Externa_2",
    "EXT_SOURCE_3": "Fonte_Externa_3"
}
# Aplicar a renomeação
for antiga, nova in colunas_traduzidas.items():
    df = df.withColumnRenamed(antiga, nova)

In [None]:
df.show(20)


+-------------+------------+----------------+------+------------+-------------+----------------+---------------------+----------------+-----------------+--------------+--------------------+---------------------+--------------------+-----------------+---------------------+---------------+-----------+----------------------------+------------------------+-------------------+-------------------+-------------------+
|ID_do_Cliente|Inadimplente|Tipo_de_Contrato|Genero|Possui_Carro|Possui_Imovel|Numero_de_Filhos|Renda_Total_Declarada|Valor_do_Credito|Valor_da_Anuidade|Valor_dos_Bens|       Tipo_de_Renda|Nivel_de_Escolaridade|        Estado_Civil|  Tipo_de_Moradia|Dias_desde_Nascimento|Dias_de_Emprego|   Ocupacao|Numero_de_Membros_da_Familia|Avaliacao_Regiao_Cliente|    Fonte_Externa_1|    Fonte_Externa_2|    Fonte_Externa_3|
+-------------+------------+----------------+------+------------+-------------+----------------+---------------------+----------------+-----------------+--------------+--

In [None]:
#Fazendo dowload do datafreme transformado
df.coalesce(1).write.mode("overwrite").option("header", True).csv("df_trans.csv")
