In [1]:
spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1573210380015_0002,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.
<pyspark.sql.session.SparkSession object at 0x7fceb8748c10>

#### Lendo tabela de Metadados

In [2]:
metadados = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").option(
   "delimiter", ',').load(
   's3://turing-bkt-treinamentos-etl/CreditoImobiliario/HomeCredit_columns_description.csv')
metadados.count()

219

In [9]:
metadados.registerTempTable("metadados")

In [4]:
spark.sql("""
              select 
                  Row,
                  Description                  
              from 
                  metadados
              where 
                  Table in ('bureau.csv')


""").show(50,False)

+----------------------+-------------------------------------------------------------------------------------------------------------------+
|Row                   |Description                                                                                                        |
+----------------------+-------------------------------------------------------------------------------------------------------------------+
|SK_ID_CURR            |ID of loan in our sample - one loan in our sample can have 0,1,2 or more related previous credits in credit bureau |
|SK_BUREAU_ID          |Recoded ID of previous Credit Bureau credit related to our loan (unique coding for each loan application)          |
|CREDIT_ACTIVE         |Status of the Credit Bureau (CB) reported credits                                                                  |
|CREDIT_CURRENCY       |Recoded currency of the Credit Bureau credit                                                                       |
|DAYS_CREDIT 

#### Lendo tabela bureau.csv

In [10]:
bureau = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").option(
   "delimiter", ',').load(
   's3://turing-bkt-treinamentos-etl/CreditoImobiliario/bureau.csv')

bureau.registerTempTable("bureau")
bureau.count()

1716428

In [11]:
bureau.printSchema()

root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- SK_ID_BUREAU: integer (nullable = true)
 |-- CREDIT_ACTIVE: string (nullable = true)
 |-- CREDIT_CURRENCY: string (nullable = true)
 |-- DAYS_CREDIT: integer (nullable = true)
 |-- CREDIT_DAY_OVERDUE: integer (nullable = true)
 |-- DAYS_CREDIT_ENDDATE: double (nullable = true)
 |-- DAYS_ENDDATE_FACT: double (nullable = true)
 |-- AMT_CREDIT_MAX_OVERDUE: double (nullable = true)
 |-- CNT_CREDIT_PROLONG: integer (nullable = true)
 |-- AMT_CREDIT_SUM: double (nullable = true)
 |-- AMT_CREDIT_SUM_DEBT: double (nullable = true)
 |-- AMT_CREDIT_SUM_LIMIT: double (nullable = true)
 |-- AMT_CREDIT_SUM_OVERDUE: double (nullable = true)
 |-- CREDIT_TYPE: string (nullable = true)
 |-- DAYS_CREDIT_UPDATE: integer (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)

#### Explorando conceitos de negocio

In [9]:
spark.sql("""
             select
                 CREDIT_ACTIVE,
                 count(*) as Qt
             from 
                 bureau
             group by
                 1


""").show()

+-------------+-------+
|CREDIT_ACTIVE|     Qt|
+-------------+-------+
|     Bad debt|     21|
|         Sold|   6527|
|       Active| 630607|
|       Closed|1079273|
+-------------+-------+

In [11]:
spark.sql("""
             select
                 CREDIT_TYPE,
                 count(*) as Qt
             from 
                 bureau
             group by
                 1


""").show(50,False)

+--------------------------------------------+-------+
|CREDIT_TYPE                                 |Qt     |
+--------------------------------------------+-------+
|Loan for the purchase of equipment          |19     |
|Cash loan (non-earmarked)                   |56     |
|Microloan                                   |12413  |
|Consumer credit                             |1251615|
|Mobile operator loan                        |1      |
|Another type of loan                        |1017   |
|Mortgage                                    |18391  |
|Interbank credit                            |1      |
|Loan for working capital replenishment      |469    |
|Car loan                                    |27690  |
|Real estate loan                            |27     |
|Unknown type of loan                        |555    |
|Loan for business development               |1975   |
|Credit card                                 |402195 |
|Loan for purchase of shares (margin lending)|4      |
+---------

In [12]:
spark.sql("""
             select
                 CREDIT_CURRENCY,
                 count(*) as Qt
             from 
                 bureau
             group by
                 1


""").show(50,False)

+---------------+-------+
|CREDIT_CURRENCY|Qt     |
+---------------+-------+
|currency 2     |1224   |
|currency 1     |1715020|
|currency 4     |10     |
|currency 3     |174    |
+---------------+-------+

#### Criando variaveis de primeira camada com conceito de credito 

In [12]:
bureau_etl_01 = spark.sql(
    """
         select
             SK_ID_CURR as PK_JOIN,
             sum(case when CREDIT_ACTIVE in ('Active') then 1 else 0 end) as QT_CRED_Active,
             sum(case when CREDIT_ACTIVE in ('Sold') then 1 else 0 end) as QT_CRED_Sold,
             sum(case when CREDIT_ACTIVE in ('Closed') then 1 else 0 end) as QT_CRED_Closed,
             sum(case when CREDIT_ACTIVE in ('Bad debt') then 1 else 0 end) as QT_CRED_Bad_debt,
             sum(case when CREDIT_TYPE in ('Microloan') then 1 else 0 end) as QT_CRED_Microloan,
             sum(case when CREDIT_TYPE in ('Credit card') then 1 else 0 end) as QT_CRED_CreditCard,
             sum(case when CREDIT_TYPE in ('Consumer credit') then 1 else 0 end) as QT_CRED_ConsumerCredit,
             sum(case when CREDIT_TYPE in ('Car loan') then 1 else 0 end) as QT_CRED_CarLoan,
             sum(case when CREDIT_TYPE in ('Mortgage') then 1 else 0 end) as QT_CRED_Mortgage,
             sum(case when CREDIT_CURRENCY in ('currency 2') then 1 else 0 end) as QT_CRED_currency2,
             sum(case when CREDIT_CURRENCY in ('currency 1') then 1 else 0 end) as QT_CRED_currency1,
             
             avg(case when CREDIT_ACTIVE in ('Active') then AMT_CREDIT_SUM else 0 end) as AVG_AMT_CREDIT_ACTIVE,
             avg(case when CREDIT_ACTIVE in ('Sold') then AMT_CREDIT_SUM else 0 end) as AVG_AMT_CREDIT_Sold,
             avg(case when CREDIT_ACTIVE in ('Closed') then AMT_CREDIT_SUM else 0 end) as AVG_AMT_CREDIT_Closed,
             avg(case when CREDIT_ACTIVE in ('Bad debt') then AMT_CREDIT_SUM else 0 end) as AVG_AMT_CREDIT_BadDebt,
             
             avg(case when CREDIT_TYPE in ('Microloan') then AMT_CREDIT_SUM else 0 end) as AVG_AMT_CREDIT_Microloan,
             avg(case when CREDIT_TYPE in ('Credit card') then AMT_CREDIT_SUM else 0 end) as AVG_AMT_CREDIT_Creditcard,
             avg(case when CREDIT_TYPE in ('Consumer credit') then AMT_CREDIT_SUM else 0 end) as AVG_AMT_CREDIT_Consumercredit,
             avg(case when CREDIT_TYPE in ('Car loan') then AMT_CREDIT_SUM else 0 end) as AVG_AMT_CREDIT_Carloan,
             avg(case when CREDIT_TYPE in ('Mortgage') then AMT_CREDIT_SUM else 0 end) as AVG_AMT_CREDIT_Mortgage,
             
             avg(case when CREDIT_CURRENCY in ('currency 1') then AMT_CREDIT_SUM else 0 end) as AVG_AMT_CREDIT_currency1,
             avg(case when CREDIT_CURRENCY in ('currency 2') then AMT_CREDIT_SUM else 0 end) as AVG_AMT_CREDIT_currency2,
             
             min(AMT_CREDIT_SUM) as MIN_AMT_CREDIT_SUM,
             max(AMT_CREDIT_SUM) as MAX_AMT_CREDIT_SUM,
             avg(AMT_CREDIT_SUM) as AVG_AMT_CREDIT_SUM,
             
             min(DAYS_CREDIT) as MIN_DAYS_CREDIT,
             max(DAYS_CREDIT) as MAX_DAYS_CREDIT,
             avg(DAYS_CREDIT) as AVG_DAYS_CREDIT,             
             
             min(AMT_CREDIT_SUM_DEBT) as MIN_AMT_CREDIT_SUM_DEBT,
             max(AMT_CREDIT_SUM_DEBT) as MAX_AMT_CREDIT_SUM_DEBT,
             avg(AMT_CREDIT_SUM_DEBT) as AVG_AMT_CREDIT_SUM_DEBT,  
             
             min(AMT_CREDIT_SUM_OVERDUE) as MIN_AMT_CREDIT_SUM_OVERDUE,
             max(AMT_CREDIT_SUM_OVERDUE) as MAX_AMT_CREDIT_SUM_OVERDUE,
             avg(AMT_CREDIT_SUM_OVERDUE) as AVG_AMT_CREDIT_SUM_OVERDUE,             

             min(DAYS_CREDIT_UPDATE) as MIN_DAYS_CREDIT_UPDATE,
             max(DAYS_CREDIT_UPDATE) as MAX_DAYS_CREDIT_UPDATE,
             avg(DAYS_CREDIT_UPDATE) as AVG_DAYS_CREDIT_UPDATE,
                                 
             min(CNT_CREDIT_PROLONG) as MIN_CNT_CREDIT_PROLONG,
             max(CNT_CREDIT_PROLONG) as MAX_CNT_CREDIT_PROLONG,
             avg(CNT_CREDIT_PROLONG) as AVG_CNT_CREDIT_PROLONG,
             
             min(AMT_ANNUITY) as MIN_AMT_ANNUITY,
             max(AMT_ANNUITY) as MAX_AMT_ANNUITY,
             avg(AMT_ANNUITY) as AVG_AMT_ANNUITY
             
         from
             bureau             
         group by         
             SK_ID_CURR
""")

bureau_etl_01.registerTempTable("bureau_etl_01")
bureau_etl_01.count()

305811

In [47]:
bureau_etl_01.printSchema()

root
 |-- PK_JOIN: integer (nullable = true)
 |-- QT_CRED_Active: long (nullable = true)
 |-- QT_CRED_Sold: long (nullable = true)
 |-- QT_CRED_Closed: long (nullable = true)
 |-- QT_CRED_Bad_debt: long (nullable = true)
 |-- QT_CRED_Microloan: long (nullable = true)
 |-- QT_CRED_CreditCard: long (nullable = true)
 |-- QT_CRED_ConsumerCredit: long (nullable = true)
 |-- QT_CRED_CarLoan: long (nullable = true)
 |-- QT_CRED_Mortgage: long (nullable = true)
 |-- QT_CRED_currency2: long (nullable = true)
 |-- QT_CRED_currency1: long (nullable = true)
 |-- AVG_AMT_CREDIT_ACTIVE: double (nullable = true)
 |-- AVG_AMT_CREDIT_Sold: double (nullable = true)
 |-- AVG_AMT_CREDIT_Closed: double (nullable = true)
 |-- AVG_AMT_CREDIT_BadDebt: double (nullable = true)
 |-- AVG_AMT_CREDIT_Microloan: double (nullable = true)
 |-- AVG_AMT_CREDIT_Creditcard: double (nullable = true)
 |-- AVG_AMT_CREDIT_Consumercredit: double (nullable = true)
 |-- AVG_AMT_CREDIT_Carloan: double (nullable = true)
 |-- AVG

#### Tratamento da tabela de publico

In [8]:
application_train = spark.read.format(
   "com.databricks.spark.csv").option(
   "header", "true").option(
   "inferSchema", "true").option(
   "delimiter", ',').load(
   's3://turing-bkt-treinamentos-etl/CreditoImobiliario/application_train.csv')

application_train.registerTempTable("application_train")
application_train.count()

307511

In [13]:
application_train.printSchema()

root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- TARGET: integer (nullable = true)
 |-- NAME_CONTRACT_TYPE: string (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- NAME_TYPE_SUITE: string (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- DAYS_REGISTRATION: double (nullable = true)
 |-- DAYS_ID_PUBLISH: integer (nullable = true)
 |-- OWN_CAR_AG

#### Estrategia 01 - trazer todas as varaveis numericas da tabela de publico 

In [14]:
columnList_int = [item[0] for item in application_train.dtypes if item[1].startswith('int')]
columnList_double = [item[0] for item in application_train.dtypes if item[1].startswith('double')]
columnList_string = [item[0] for item in application_train.dtypes if item[1].startswith('string')]

In [15]:
columnList_int

['SK_ID_CURR', 'TARGET', 'CNT_CHILDREN', 'DAYS_BIRTH', 'DAYS_EMPLOYED', 'DAYS_ID_PUBLISH', 'FLAG_MOBIL', 'FLAG_EMP_PHONE', 'FLAG_WORK_PHONE', 'FLAG_CONT_MOBILE', 'FLAG_PHONE', 'FLAG_EMAIL', 'REGION_RATING_CLIENT', 'REGION_RATING_CLIENT_W_CITY', 'HOUR_APPR_PROCESS_START', 'REG_REGION_NOT_LIVE_REGION', 'REG_REGION_NOT_WORK_REGION', 'LIVE_REGION_NOT_WORK_REGION', 'REG_CITY_NOT_LIVE_CITY', 'REG_CITY_NOT_WORK_CITY', 'LIVE_CITY_NOT_WORK_CITY', 'FLAG_DOCUMENT_2', 'FLAG_DOCUMENT_3', 'FLAG_DOCUMENT_4', 'FLAG_DOCUMENT_5', 'FLAG_DOCUMENT_6', 'FLAG_DOCUMENT_7', 'FLAG_DOCUMENT_8', 'FLAG_DOCUMENT_9', 'FLAG_DOCUMENT_10', 'FLAG_DOCUMENT_11', 'FLAG_DOCUMENT_12', 'FLAG_DOCUMENT_13', 'FLAG_DOCUMENT_14', 'FLAG_DOCUMENT_15', 'FLAG_DOCUMENT_16', 'FLAG_DOCUMENT_17', 'FLAG_DOCUMENT_18', 'FLAG_DOCUMENT_19', 'FLAG_DOCUMENT_20', 'FLAG_DOCUMENT_21']

In [16]:
columnList_double

['AMT_INCOME_TOTAL', 'AMT_CREDIT', 'AMT_ANNUITY', 'AMT_GOODS_PRICE', 'REGION_POPULATION_RELATIVE', 'DAYS_REGISTRATION', 'OWN_CAR_AGE', 'CNT_FAM_MEMBERS', 'EXT_SOURCE_1', 'EXT_SOURCE_2', 'EXT_SOURCE_3', 'APARTMENTS_AVG', 'BASEMENTAREA_AVG', 'YEARS_BEGINEXPLUATATION_AVG', 'YEARS_BUILD_AVG', 'COMMONAREA_AVG', 'ELEVATORS_AVG', 'ENTRANCES_AVG', 'FLOORSMAX_AVG', 'FLOORSMIN_AVG', 'LANDAREA_AVG', 'LIVINGAPARTMENTS_AVG', 'LIVINGAREA_AVG', 'NONLIVINGAPARTMENTS_AVG', 'NONLIVINGAREA_AVG', 'APARTMENTS_MODE', 'BASEMENTAREA_MODE', 'YEARS_BEGINEXPLUATATION_MODE', 'YEARS_BUILD_MODE', 'COMMONAREA_MODE', 'ELEVATORS_MODE', 'ENTRANCES_MODE', 'FLOORSMAX_MODE', 'FLOORSMIN_MODE', 'LANDAREA_MODE', 'LIVINGAPARTMENTS_MODE', 'LIVINGAREA_MODE', 'NONLIVINGAPARTMENTS_MODE', 'NONLIVINGAREA_MODE', 'APARTMENTS_MEDI', 'BASEMENTAREA_MEDI', 'YEARS_BEGINEXPLUATATION_MEDI', 'YEARS_BUILD_MEDI', 'COMMONAREA_MEDI', 'ELEVATORS_MEDI', 'ENTRANCES_MEDI', 'FLOORSMAX_MEDI', 'FLOORSMIN_MEDI', 'LANDAREA_MEDI', 'LIVINGAPARTMENTS_MEDI',

In [17]:
columnList_string

['NAME_CONTRACT_TYPE', 'CODE_GENDER', 'FLAG_OWN_CAR', 'FLAG_OWN_REALTY', 'NAME_TYPE_SUITE', 'NAME_INCOME_TYPE', 'NAME_EDUCATION_TYPE', 'NAME_FAMILY_STATUS', 'NAME_HOUSING_TYPE', 'OCCUPATION_TYPE', 'WEEKDAY_APPR_PROCESS_START', 'ORGANIZATION_TYPE', 'FONDKAPREMONT_MODE', 'HOUSETYPE_MODE', 'WALLSMATERIAL_MODE', 'EMERGENCYSTATE_MODE']

In [18]:
columns_to_drop = columnList_string
application_train_01 = application_train.drop(*columns_to_drop)

In [19]:
application_train_01.printSchema()

root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- TARGET: integer (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- DAYS_REGISTRATION: double (nullable = true)
 |-- DAYS_ID_PUBLISH: integer (nullable = true)
 |-- OWN_CAR_AGE: double (nullable = true)
 |-- FLAG_MOBIL: integer (nullable = true)
 |-- FLAG_EMP_PHONE: integer (nullable = true)
 |-- FLAG_WORK_PHONE: integer (nullable = true)
 |-- FLAG_CONT_MOBILE: integer (nullable = true)
 |-- FLAG_PHONE: integer (nullable = true)
 |-- FLAG_EMAIL: integer (nullable = true)
 |-- CNT_FAM_MEMBERS: double (nullable = true)
 |-- REGION_RATING_CLIENT: integer (nullable = true)
 |-- REGION_RATING_CLIENT_W_

In [20]:
application_train_01.registerTempTable("application_train_01")

#### Gerando tabela de modelagem final

In [21]:
ETL_FINAL_v01 = spark.sql("""

select 
        a.*,
        b.*
    from 
        application_train_01 as a
    left join
        bureau_etl_01 as b
    on 
        a.SK_ID_CURR = b.PK_JOIN
""")

In [22]:
ETL_FINAL_v01.count()

307511

#### Salvando ABT para etapa de modelagem

In [23]:
nm_path_s3 = 's3://turing-bkt-treinamentos-etl/Alunos/ABT_v01_BrunoJ/'
ETL_FINAL_v01.write.parquet(nm_path_s3, mode='overwrite')