# Configurando o Kernel Spark

In [1]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

O sistema nÆo pode encontrar o caminho especificado.
'wget' nÆo ‚ reconhecido como um comando interno
ou externo, um programa oper vel ou um arquivo em lotes.
tar: Error opening archive: Failed to open 'spark-2.4.4-bin-hadoop2.7.tgz'


In [None]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
 
# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')


In [None]:
from pyspark.sql import SparkSession
 
 #inicializando uma sessão Spark
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# Explorando os Dados

### Lendo publico de modelagem (tabela principal)

In [1]:
app_train = spark.read.csv("/content/drive/MyDrive/LAMIA/projeto_final/dados/InputData/application_train.csv", inferSchema=True, header=True)
app_test = spark.read.csv("/content/drive/MyDrive/LAMIA/projeto_final/dados/InputData/application_test.csv", inferSchema=True, header=True)

app_train.registerTempTable('app_train')
 
app_train.count()

NameError: ignored

# Lendo tabela transacional Bureau

In [None]:
bureau = spark.read.csv("/content/drive/MyDrive/LAMIA/projeto_final/dados/InputData/bureau.csv", inferSchema=True, header=True)
 
bureau.registerTempTable('bureau')
 
bureau.count()

1716428

Observe a quantidade de registros, estes serão transaformados em variaveis explicativas da granulidade da nossa tabela principal

## Verificando variaveis da tabela principal

In [None]:
app_train.printSchema()

root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- TARGET: integer (nullable = true)
 |-- NAME_CONTRACT_TYPE: string (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- NAME_TYPE_SUITE: string (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- DAYS_REGISTRATION: double (nullable = true)
 |-- DAYS_ID_PUBLISH: integer (nullable = true)
 |-- OWN_CAR_AG

## Listando variaveis do tipo string pois provavelmente carregam informações de negocio

In [None]:
 string_list = [item[0] for item in app_train.dtypes if item[1].startswith('string')]
 string_list


['NAME_CONTRACT_TYPE',
 'CODE_GENDER',
 'FLAG_OWN_CAR',
 'FLAG_OWN_REALTY',
 'NAME_TYPE_SUITE',
 'NAME_INCOME_TYPE',
 'NAME_EDUCATION_TYPE',
 'NAME_FAMILY_STATUS',
 'NAME_HOUSING_TYPE',
 'OCCUPATION_TYPE',
 'WEEKDAY_APPR_PROCESS_START',
 'ORGANIZATION_TYPE',
 'FONDKAPREMONT_MODE',
 'HOUSETYPE_MODE',
 'WALLSMATERIAL_MODE',
 'EMERGENCYSTATE_MODE']

## Calculando a volumetria e taxa de evento parqa cada dominio de cada uma das variaveis tipo string

**TARGET**: Diz respeito a taxa de inadimplencia de determinada feature 

# 1) NAME_CONTRACT_TYPE
Identificação se o empréstimo é à vista ou rotativo

**Cash loans**: Emprestimos em dinheiro   
**Revolving loans**: Emprestimos Rotativo

In [None]:
spark.sql("""
            select 
              NAME_CONTRACT_TYPE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              NAME_CONTRACT_TYPE
            order by 
              AVG_TARGET desc
""").show()

+------------------+----------+------+
|NAME_CONTRACT_TYPE|AVG_TARGET|Volume|
+------------------+----------+------+
|        Cash loans|      8.35|278232|
|   Revolving loans|      5.48| 29279|
+------------------+----------+------+



Podemos identificar que quem faz emprestimos do tipo rotativo possuem menor taxa de inadimplencia que o em dinheiro, mas tambem o volume para essas pessoas e muito menor

# 2) CODE_GENDER
Sexo do cliente   
M: Masculino  
F: Feminino

In [None]:
spark.sql("""
           select 
              CODE_GENDER,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              CODE_GENDER
            order by 
              AVG_TARGET desc
""").show()

+-----------+----------+------+
|CODE_GENDER|AVG_TARGET|Volume|
+-----------+----------+------+
|          M|     10.14|105059|
|          F|       7.0|202448|
|        XNA|       0.0|     4|
+-----------+----------+------+



- Apesar de grande parte do sexo masculino realizar menos emprestimos, a taxa de inadimplencia supera a do sexo feminino
- mesmo que o sexo feminino tenha maior volume de aprovação

# 3) FLAG_OWN_CAR
Se o cliente possui um carro   

N: não  
Y: sim

In [None]:
spark.sql("""
            select 
              FLAG_OWN_CAR,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              FLAG_OWN_CAR
            order by 
              AVG_TARGET desc
""").show()

+------------+----------+------+
|FLAG_OWN_CAR|AVG_TARGET|Volume|
+------------+----------+------+
|           N|       8.5|202924|
|           Y|      7.24|104587|
+------------+----------+------+



- pessoal que não tem carro tem um maior volume, porem uma taxa maior de inadimplencia, sera que o emprestimo era pra comprar um carro?

# 4) FLAG_OWN_REALTY
Se o cliente possui uma casa ou apartamento   

N: não   
Y: sim

In [None]:
spark.sql("""
             select 
              FLAG_OWN_REALTY,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              FLAG_OWN_REALTY
            order by 
              AVG_TARGET desc
""").show()

+---------------+----------+------+
|FLAG_OWN_REALTY|AVG_TARGET|Volume|
+---------------+----------+------+
|              N|      8.32| 94199|
|              Y|      7.96|213312|
+---------------+----------+------+



- Novamente os clientes que não possuem o bem, com uma maior taxa de inadimplencia.
- Tendencia para o emprestimo ser aprovado para pessoas que tem residencia propria

# 5) NAME_TYPE_SUITE
Quem estava acompanhando o cliente quando ele estava solicitando o empréstimo  

In [None]:
spark.sql("""
           select 
              NAME_TYPE_SUITE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              NAME_TYPE_SUITE
            order by 
              AVG_TARGET desc
""").show()

+---------------+----------+------+
|NAME_TYPE_SUITE|AVG_TARGET|Volume|
+---------------+----------+------+
|        Other_B|      9.83|  1770|
|        Other_A|      8.78|   866|
|Group of people|      8.49|   271|
|  Unaccompanied|      8.18|248526|
|Spouse, partner|      7.87| 11370|
|         Family|      7.49| 40149|
|       Children|      7.38|  3267|
|           null|      5.42|  1292|
+---------------+----------+------+



- Aprovação massiva em clientes que foram desacompanhados, talvez de uma segurança ao banco ver que a pessoa parece ser idependente.  


# 6) NAME_INCOME_TYPE
Tipo de renda dos clientes (empresário, trabalhador, licença maternidade)

In [None]:
spark.sql("""
            select 
              NAME_INCOME_TYPE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              NAME_INCOME_TYPE
            order by 
              AVG_TARGET desc
""").show()

+--------------------+----------+------+
|    NAME_INCOME_TYPE|AVG_TARGET|Volume|
+--------------------+----------+------+
|     Maternity leave|      40.0|     5|
|          Unemployed|     36.36|    22|
|             Working|      9.59|158774|
|Commercial associate|      7.48| 71617|
|       State servant|      5.75| 21703|
|           Pensioner|      5.39| 55362|
|             Student|       0.0|    18|
|         Businessman|       0.0|    10|
+--------------------+----------+------+



- Baixo volume para rendas de licença maternidade, inadimplencia  bem alta, pela quantidade de imprestimos faz sentido

- Maiores volumes para pensionistas, trabalhadores e servidores do estado, estariam essas pessoas ganhando mal? 
- o caso dos aposentados/pensionistas sabemos que são meio que alvos para agencias de emprestimos.

- Mas o caso do trabnalhador estar solicitando varios emprestimos e preocupante

# 7) NAME_EDUCATION_TYPE
Nível de educação mais alto que o cliente alcançou

In [None]:
spark.sql("""
           select 
              NAME_EDUCATION_TYPE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              NAME_EDUCATION_TYPE
            order by 
              AVG_TARGET desc
""").show()

+--------------------+----------+------+
| NAME_EDUCATION_TYPE|AVG_TARGET|Volume|
+--------------------+----------+------+
|     Lower secondary|     10.93|  3816|
|Secondary / secon...|      8.94|218391|
|   Incomplete higher|      8.48| 10277|
|    Higher education|      5.36| 74863|
|     Academic degree|      1.83|   164|
+--------------------+----------+------+



- Pessoas que concluiram ate o segundo do ensino medio tem uma altissima tendencia a realizar emprestimos, ja os que concluiram o ensino academico tem o menor volume, seria algum tipo de insight sobre status social de pessoas que chegaram ate o ensino academico

- Ou poderia ser que o academico e so a escola normal e essas pessoas não necessitam ou não buscam credito pois não objetivam algo 

# 8) NAME_FAMILY_STATUS
Situação familiar do cliente

In [None]:
spark.sql("""
           select 
              NAME_FAMILY_STATUS,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              NAME_FAMILY_STATUS
            order by 
              AVG_TARGET desc
""").show()

+--------------------+----------+------+
|  NAME_FAMILY_STATUS|AVG_TARGET|Volume|
+--------------------+----------+------+
|      Civil marriage|      9.94| 29775|
|Single / not married|      9.81| 45444|
|           Separated|      8.19| 19770|
|             Married|      7.56|196432|
|               Widow|      5.82| 16088|
|             Unknown|       0.0|     2|
+--------------------+----------+------+



- Casais tendem a realizar mais emprestimos, baixa taxa de inadimplencia.
- Talvez um compromisso os tornam otimos clientes


# 9) NAME_HOUSING_TYPE
Qual é a situação habitacional do cliente (alugar, viver com os pais)

In [None]:
spark.sql("""
           select 
              NAME_HOUSING_TYPE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              NAME_HOUSING_TYPE
            order by 
              AVG_TARGET desc
""").show()

+-------------------+----------+------+
|  NAME_HOUSING_TYPE|AVG_TARGET|Volume|
+-------------------+----------+------+
|   Rented apartment|     12.31|  4881|
|       With parents|      11.7| 14840|
|Municipal apartment|      8.54| 11183|
|    Co-op apartment|      7.93|  1122|
|  House / apartment|       7.8|272868|
|   Office apartment|      6.57|  2617|
+-------------------+----------+------+



Novamente a situação de pessoas com casa/moradia fixa, infere muito no volume de imprestimos, alem da baixa taxa de inadimplencia   

Seria esse uma caracteristica em um futuro perfil de 'cliente ideal'?

# 10) OCCUPATION_TYPE
Diz respeito a ocupação do cliente

**Low-skill Laborers**: Trabalhadores de baixa qualificação

In [None]:
spark.sql("""
            select 
              OCCUPATION_TYPE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              OCCUPATION_TYPE
            order by 
              AVG_TARGET desc
""").show()


+--------------------+----------+------+
|     OCCUPATION_TYPE|AVG_TARGET|Volume|
+--------------------+----------+------+
|  Low-skill Laborers|     17.15|  2093|
|             Drivers|     11.33| 18603|
|Waiters/barmen staff|     11.28|  1348|
|      Security staff|     10.74|  6721|
|            Laborers|     10.58| 55186|
|       Cooking staff|     10.44|  5946|
|         Sales staff|      9.63| 32102|
|      Cleaning staff|      9.61|  4653|
|       Realty agents|      7.86|   751|
|         Secretaries|      7.05|  1305|
|      Medicine staff|       6.7|  8537|
|Private service s...|       6.6|  2652|
|                null|      6.51| 96391|
|            IT staff|      6.46|   526|
|            HR staff|      6.39|   563|
|          Core staff|       6.3| 27570|
|            Managers|      6.21| 21371|
|High skill tech s...|      6.16| 11380|
|         Accountants|      4.83|  9813|
+--------------------+----------+------+



Mais uma observação seria que os 'trabalhadores de baixa qualificação' tem a maior taxa de inadimplencia, ja os contadores a menor, seria algo logico? kkk


#11) WEEKDAY_APPR_PROCESS_START 
Em que dia da semana o cliente solicitou o empréstimo

In [None]:
spark.sql("""
             select 
              WEEKDAY_APPR_PROCESS_START,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              WEEKDAY_APPR_PROCESS_START
            order by 
              AVG_TARGET desc
""").show()


+--------------------------+----------+------+
|WEEKDAY_APPR_PROCESS_START|AVG_TARGET|Volume|
+--------------------------+----------+------+
|                   TUESDAY|      8.35| 53901|
|                 WEDNESDAY|      8.16| 51934|
|                    FRIDAY|      8.15| 50338|
|                  THURSDAY|       8.1| 50591|
|                    SUNDAY|      7.93| 16181|
|                  SATURDAY|      7.89| 33852|
|                    MONDAY|      7.76| 50714|
+--------------------------+----------+------+



- Mais volume nos dias da semana, dados medianos nada a declarar

## 12) FONDKAPREMONT_MODE 
Informações normalizadas sobre o prédio onde o cliente mora, Qual é a média (sufixo _AVG), modus (sufixo _MODE), tamanho médio do apartamento (sufixo _MEDI), área comum, área de estar, idade do edifício, número de elevadores, número de entradas, estado do edifício, número de andares

In [None]:
spark.sql("""
            select 
              FONDKAPREMONT_MODE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              FONDKAPREMONT_MODE
            order by 
              AVG_TARGET desc
""").show()


+--------------------+----------+------+
|  FONDKAPREMONT_MODE|AVG_TARGET|Volume|
+--------------------+----------+------+
|                null|      8.62|210295|
|       not specified|      7.54|  5687|
|    reg oper account|      6.98| 73830|
|reg oper spec acc...|      6.56| 12080|
|    org spec account|      5.82|  5619|
+--------------------+----------+------+



- Muitos dados nulls, aparentemente um dado muito especifico, ou muitas pessoas n tinham organizações

# 13) HOUSETYPE_MODE 
Informações normalizadas sobre o edifício onde o cliente mora, Qual é a média (sufixo _AVG), modus (sufixo _MODE), tamanho médio do apartamento (sufixo _MEDI), área comum, área de estar, idade do edifício, número de elevadores, número de entradas, estado do edifício, número de pisos

In [None]:
spark.sql("""
            select 
              HOUSETYPE_MODE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              HOUSETYPE_MODE
            order by 
              AVG_TARGET desc
""").show()


+----------------+----------+------+
|  HOUSETYPE_MODE|AVG_TARGET|Volume|
+----------------+----------+------+
|specific housing|     10.14|  1499|
|            null|      9.15|154297|
|  terraced house|       8.5|  1212|
|  block of flats|      6.94|150503|
+----------------+----------+------+



- Alem dos nulls, grande volume para block of flats(blocos de apartamento)

# 14) WALLSMATERIAL_MODE 
Informações normalizadas sobre o edifício onde o cliente mora, Qual é a média (sufixo _AVG), modus (sufixo _MODE), tamanho médio do apartamento (sufixo _MEDI), área comum, área de estar, idade do edifício, número de elevadores, número de entradas, estado do edifício, número de pisos

In [None]:
spark.sql("""
            select 
              WALLSMATERIAL_MODE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              WALLSMATERIAL_MODE
            order by 
              AVG_TARGET desc
""").show()


+------------------+----------+------+
|WALLSMATERIAL_MODE|AVG_TARGET|Volume|
+------------------+----------+------+
|            Wooden|       9.7|  5362|
|              null|      9.13|156341|
|            Others|      8.31|  1625|
|             Mixed|      7.53|  2296|
|      Stone, brick|      7.41| 64815|
|             Block|      7.02|  9253|
|             Panel|      6.35| 66040|
|        Monolithic|      4.72|  1779|
+------------------+----------+------+



## 15) EMERGENCYSTATE_MODE 
Informações normalizadas sobre o prédio onde o cliente mora, Qual é a média (sufixo _AVG), modus (sufixo _MODE), tamanho médio (sufixo _MEDI) do apartamento, área comum, área de estar, idade do edifício, número de elevadores, número de entradas, estado do edifício, número de andares

In [None]:
spark.sql("""select NAME_INCOME_TYPE, 
            EMERGENCYSTATE_MODE, 
            FLAG_OWN_REALTY, 
            CNT_CHILDREN, 
            AMT_INCOME_TOTAL,
            AMT_CREDIT
            
            from app_train 
            
            WHERE EMERGENCYSTATE_MODE = 'Yes'
            
            
            
            
            ORDER BY AMT_INCOME_TOTAL DESC
            """).show(100)

+--------------------+-------------------+---------------+------------+----------------+----------+
|    NAME_INCOME_TYPE|EMERGENCYSTATE_MODE|FLAG_OWN_REALTY|CNT_CHILDREN|AMT_INCOME_TOTAL|AMT_CREDIT|
+--------------------+-------------------+---------------+------------+----------------+----------+
|Commercial associate|                Yes|              Y|           0|       1350000.0|  675000.0|
|Commercial associate|                Yes|              N|           0|       1260000.0| 1864152.0|
|Commercial associate|                Yes|              N|           0|       1125000.0|  592560.0|
|             Working|                Yes|              Y|           0|        900000.0|  545040.0|
|             Working|                Yes|              Y|           0|        675000.0| 1800000.0|
|             Working|                Yes|              Y|           1|        675000.0| 1096020.0|
|             Working|                Yes|              Y|           1|        675000.0|  463500.0|


In [None]:
spark.sql("""
            select 
              EMERGENCYSTATE_MODE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              sum()
              count(*) as Volume
    
            from 
              app_train
            group by 
              EMERGENCYSTATE_MODE
            order by 
              AVG_TARGET desc
""").show()


ParseException: ignored

- Estado de emergencia na moradia, grande paerte null/não porem a menor taxa é dos que não estão necessitados, complicado

# 16) ORGANIZATION_TYPE 
Tipo de organização onde o cliente trabalha

In [None]:
spark.sql("""
            select 
              ORGANIZATION_TYPE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              ORGANIZATION_TYPE
            order by 
              AVG_TARGET desc
""").show()


+--------------------+----------+------+
|   ORGANIZATION_TYPE|AVG_TARGET|Volume|
+--------------------+----------+------+
|   Transport: type 3|     15.75|  1187|
|   Industry: type 13|     13.43|    67|
|    Industry: type 8|      12.5|    24|
|          Restaurant|     11.71|  1811|
|        Construction|     11.68|  6721|
|            Cleaning|     11.15|   260|
|    Industry: type 1|     11.07|  1039|
|    Industry: type 3|     10.62|  3278|
|             Realtor|     10.61|   396|
|         Agriculture|     10.47|  2454|
|       Trade: type 3|     10.34|  3492|
|       Self-employed|     10.17| 38412|
|    Industry: type 4|     10.15|   877|
|            Security|      9.98|  3247|
|       Trade: type 7|      9.45|  7831|
|Business Entity T...|       9.3| 67992|
|   Transport: type 4|      9.28|  5398|
|              Mobile|      9.15|   317|
|       Trade: type 1|      8.91|   348|
|   Industry: type 11|      8.65|  2704|
+--------------------+----------+------+
only showing top

- Nada a declarar

In [None]:
spark.sql("select EMERGENCY_STATE from ")

# Dados

In [None]:
app_train.show()

+----------+------+------------------+-----------+------------+---------------+------------+----------------+----------+-----------+---------------+---------------+--------------------+--------------------+--------------------+-----------------+--------------------------+----------+-------------+-----------------+---------------+-----------+----------+--------------+---------------+----------------+----------+----------+---------------+---------------+--------------------+---------------------------+--------------------------+-----------------------+--------------------------+--------------------------+---------------------------+----------------------+----------------------+-----------------------+--------------------+-------------------+-------------------+-------------------+--------------+----------------+---------------------------+------------------+--------------+-------------+-------------+-------------+-------------+------------+--------------------+--------------+------------

In [None]:
bureau.show()

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|    CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|    215354|     5714462|       Closed|     currency 1|       -497|                 0|             -153.0|           -153

In [None]:
bureau.printSchema()

root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- SK_ID_BUREAU: integer (nullable = true)
 |-- CREDIT_ACTIVE: string (nullable = true)
 |-- CREDIT_CURRENCY: string (nullable = true)
 |-- DAYS_CREDIT: integer (nullable = true)
 |-- CREDIT_DAY_OVERDUE: integer (nullable = true)
 |-- DAYS_CREDIT_ENDDATE: double (nullable = true)
 |-- DAYS_ENDDATE_FACT: double (nullable = true)
 |-- AMT_CREDIT_MAX_OVERDUE: double (nullable = true)
 |-- CNT_CREDIT_PROLONG: integer (nullable = true)
 |-- AMT_CREDIT_SUM: double (nullable = true)
 |-- AMT_CREDIT_SUM_DEBT: double (nullable = true)
 |-- AMT_CREDIT_SUM_LIMIT: double (nullable = true)
 |-- AMT_CREDIT_SUM_OVERDUE: double (nullable = true)
 |-- CREDIT_TYPE: string (nullable = true)
 |-- DAYS_CREDIT_UPDATE: integer (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)



In [None]:
spark.sql("""
            select 
              *
            from 
              bureau
            where 
            SK_ID_CURR = 100002
""").show()

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|    CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|    100002|     6158904|       Closed|     currency 1|      -1125|                 0|            -1038.0|          -1038

In [None]:
string_list = [item[0] for item in bureau.dtypes if item[1].startswith('string')]
string_list

['CREDIT_ACTIVE', 'CREDIT_CURRENCY', 'CREDIT_TYPE']

#Avaliando variaveis do tipo string na tabela bureau

# 1) CREDIT_ACTIVE
Situação dos créditos reportados pelo Credit Bureau (CB)

In [None]:
spark.sql("""
            select 
              CREDIT_ACTIVE,
              count(*) as Volume
 
            from 
              bureau
            group by 
              CREDIT_ACTIVE
            order by 
              volume desc
""").show()

+-------------+-------+
|CREDIT_ACTIVE| Volume|
+-------------+-------+
|       Closed|1079273|
|       Active| 630607|
|         Sold|   6527|
|     Bad debt|     21|
+-------------+-------+



- Credito fechado é credito pago?
- baixo numero de incobraveis, o arquivo ta na casa de milhões, esses dados poderiam ser esxcluidos por nao inferirem em nada no negocio?

# 2) CREDIT_CURRENCY 
Moeda recodificado do crédito do Bureau de Crédito

In [None]:
 spark.sql("""
            select 
              CREDIT_CURRENCY,
              count(*) as Volume
 
            from 
              bureau
            group by 
              CREDIT_CURRENCY
            order by 
              volume desc
""").show()

+---------------+-------+
|CREDIT_CURRENCY| Volume|
+---------------+-------+
|     currency 1|1715020|
|     currency 2|   1224|
|     currency 3|    174|
|     currency 4|     10|
+---------------+-------+



- currency 1? muito volume, nada explicativo?
- Pensando que a tabela tem milhoes de linha, essas ocorrencias pouco volumosas poderiam ser exlcuidas?

## 3) CREDIT_TYPE 
Tipo de crédito da Agência de Crédito (Carro, dinheiro,...)

In [None]:
 spark.sql("""
             select 
              CREDIT_TYPE,
              count(*) as Volume
 
            from 
              bureau
            group by 
              CREDIT_TYPE
            order by 
              volume desc
""").show()

+--------------------+-------+
|         CREDIT_TYPE| Volume|
+--------------------+-------+
|     Consumer credit|1251615|
|         Credit card| 402195|
|            Car loan|  27690|
|            Mortgage|  18391|
|           Microloan|  12413|
|Loan for business...|   1975|
|Another type of loan|   1017|
|Unknown type of loan|    555|
|Loan for working ...|    469|
|Cash loan (non-ea...|     56|
|    Real estate loan|     27|
|Loan for the purc...|     19|
|Loan for purchase...|      4|
|Mobile operator loan|      1|
|    Interbank credit|      1|
+--------------------+-------+



- Grande tendencia do credito de consumo
- Talvez um bom ponto para estudo de caso "qual segmento cresce no mercado de credito"

#Indicadores preditivos (Variaveis Explicativas)
 variáveis que explicam um determinado fenômeno, neste caso explicam a probabilidade de quitação de um empréstimo da modalidade imobiliária

In [None]:
metadados = spark.read.csv("/content/drive/MyDrive/LAMIA/projeto_final/dados/InputData/HomeCredit_columns_description.csv", inferSchema=True, header=True)
 
metadados.registerTempTable('metadados')
 
metadados.count()

219

In [None]:
spark.sql("""
        select 
          *
        from 
        
          metadados
    
        where Table = 'bureau.csv'

""").show(100,False)

+---+----------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------+
|_c0|Table     |Row                   |Description                                                                                                                                     |Special                              |
+---+----------+----------------------+------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------+
|122|bureau.csv|SK_ID_CURR            |ID do empréstimo em nossa amostra - um empréstimo em nossa amostra pode ter 0,1,2 ou mais créditos anteriores relacionados na agência de crédito|hashed                               |
|123|bureau.csv|SK_BUREAU_ID          |ID recodificado do crédito anterior do Credit Bureau relacionado ao n

In [None]:
 numeric_list = [item[0] for item in bureau.dtypes if not item[1].startswith('string')]
 numeric_list


['SK_ID_CURR',
 'SK_ID_BUREAU',
 'DAYS_CREDIT',
 'CREDIT_DAY_OVERDUE',
 'DAYS_CREDIT_ENDDATE',
 'DAYS_ENDDATE_FACT',
 'AMT_CREDIT_MAX_OVERDUE',
 'CNT_CREDIT_PROLONG',
 'AMT_CREDIT_SUM',
 'AMT_CREDIT_SUM_DEBT',
 'AMT_CREDIT_SUM_LIMIT',
 'AMT_CREDIT_SUM_OVERDUE',
 'DAYS_CREDIT_UPDATE',
 'AMT_ANNUITY']

In [None]:
spark.sql("""select distinct CREDIT_ACTIVE from bureau""").show()

+-------------+
|CREDIT_ACTIVE|
+-------------+
|     Bad debt|
|         Sold|
|       Active|
|       Closed|
+-------------+



# **VAR**: AMT_CREDIT_SUM_LIMIT
**Variavel**: Media do limite dos cartões de credito do cliente bureau   
**Feature bureau**: Limite de crédito atual do cartão de crédito informado no Credit Bureau

In [None]:
df_bureau_02 = spark.sql("""
            select 
                 SK_ID_CURR,
                 round(avg(AMT_CREDIT_SUM_LIMIT),2) as AVG_CREDIT_LIMIT
            from 
              bureau
            group by 
              SK_ID_CURR
""")
df_bureau_02.registerTempTable('df_bureau_02')
 
df_bureau_02.count()

305811

Mais uma agregação, esta variavel é a media do limite de cartão de credito deste cliente, tem uma importancia na hora de ver se é inadimplente ou não, tendo em vista que outras empresas tambem ja emprestaram crtedito a este cliente

# **VAR**: AMT_CREDIT_SUM_DEBT 
**Variavel**: media de dividas sobre credito   
**Feature bureau**: Dívida atual sobre crédito do Bureau de Crédito

In [None]:
df_bureau_03 = spark.sql("""
            select 
                 SK_ID_CURR,
                 round(avg(AMT_CREDIT_SUM_DEBT),2) as AMT_CREDIT_DEBT
            from 
              bureau
            group by 
              SK_ID_CURR
""")
df_bureau_03.registerTempTable('df_bureau_03')
 
df_bureau_03.count()

305811

In [None]:
spark.sql(""" select * from df_bureau_03 where SK_ID_CURR = 100002""").show()

+----------+---------------+
|SK_ID_CURR|AMT_CREDIT_DEBT|
+----------+---------------+
|    100002|        49156.2|
+----------+---------------+



Mais uma agregação, este por exemplo é o limite utilizado, ou a divida de credito que o cliente tem, este cliente tem quase 50 mil em dividas mais ainda tem aquilo tudo de debito?

# TABELA GERAL COM VARS EXPLICATIVAS SOBRE O **CREDICT_ATIVE**

In [None]:
df_bureau_01 = spark.sql("""
            select 
                 SK_ID_CURR,
                 round(avg(abs(DAYS_CREDIT)),2) as AVG_DAYS_CREDIT,
                 round(avg(case when CREDIT_ACTIVE in ('Active') then abs(DAYS_CREDIT) else 0 end),2) as AVG_CRED_ACTV_ACTIVE,
                 round(avg(case when CREDIT_ACTIVE in ('Closed') then abs(DAYS_CREDIT) else 0 end),2) as AVG_CRED_ACTV_CLOSED,
                 round(avg(case when CREDIT_ACTIVE in ('Sold') then abs(DAYS_CREDIT) else 0 end),2) as AVG_CRED_ACTV_SOLD,
                 round(avg(case when CREDIT_ACTIVE in ('Bad debt') then abs(DAYS_CREDIT) else 0 end),2) as AVG_CRED_ACTV_BADDBT,

                 round(sum(abs(DAYS_CREDIT)),2) as SUM_DAYS_CREDIT,
                 round(sum(case when CREDIT_ACTIVE in ('Active') then abs(DAYS_CREDIT) else 0 end),2) as SUM_CRED_ACTV_ACTIVE,
                 round(sum(case when CREDIT_ACTIVE in ('Closed') then abs(DAYS_CREDIT) else 0 end),2) as SUM_CRED_ACTV_CLOSED,
                 round(sum(case when CREDIT_ACTIVE in ('Sold') then abs(DAYS_CREDIT) else 0 end),2) as SUM_CRED_ACTV_SOLD,
                 round(sum(case when CREDIT_ACTIVE in ('Bad debt') then abs(DAYS_CREDIT) else 0 end),2) as SUM_CRED_ACTV_BADDBT,

                 round(avg(AMT_CREDIT_SUM_LIMIT),2) as AVG_CREDIT_LIMIT,
                 round(avg(AMT_CREDIT_SUM_DEBT),2) as  SUM_AMT_CREDIT_DEBT
            from 
              bureau
            group by 
              SK_ID_CURR
""")
df_bureau_01.registerTempTable('df_bureau_01')
 
df_bureau_01.count()

305811

In [None]:
df_bureau_01.show()

+----------+---------------+--------------------+--------------------+------------------+--------------------+---------------+--------------------+--------------------+------------------+--------------------+---------------+--------------------+--------------------+------------------+--------------------+----------------+-------------------+
|SK_ID_CURR|AVG_DAYS_CREDIT|AVG_CRED_ACTV_ACTIVE|AVG_CRED_ACTV_CLOSED|AVG_CRED_ACTV_SOLD|AVG_CRED_ACTV_BADDBT|SUM_DAYS_CREDIT|SUM_CRED_ACTV_ACTIVE|SUM_CRED_ACTV_CLOSED|SUM_CRED_ACTV_SOLD|SUM_CRED_ACTV_BADDBT|MIN_DAYS_CREDIT|MIN_CRED_ACTV_ACTIVE|MIN_CRED_ACTV_CLOSED|MIN_CRED_ACTV_SOLD|MIN_CRED_ACTV_BADDBT|AVG_CREDIT_LIMIT|SUM_AMT_CREDIT_DEBT|
+----------+---------------+--------------------+--------------------+------------------+--------------------+---------------+--------------------+--------------------+------------------+--------------------+---------------+--------------------+--------------------+------------------+--------------------+------

- Podemos observar que com a criação destas variaveis o intuito sera de agregar valor e transformar para o mesmo nivel de granulidade da tabela original, esta bureau é a trasacional  

- Conseguimos agregar algumas novas vars com a tabela bureau, para o mesmo nivel de granulidade da tabela principal, agora e so juntar elas

# Compondo ABT
tabela analítica de modelagem, para compor precisamos da tabela de público (tabela principal) e mais as variáveis explicativas e target (quando houver).

In [None]:
abt_bureau = spark.sql("""
          Select 
 
            a.*,
            b.AVG_DAYS_CREDIT,
            b.AVG_CRED_ACTV_ACTIVE,
            b.AVG_CRED_ACTV_CLOSED,
            b.AVG_CRED_ACTV_SOLD,
            b.AVG_CRED_ACTV_BADDBT,
            b.SUM_DAYS_CREDIT,
            b.SUM_CRED_ACTV_ACTIVE,
            b.SUM_CRED_ACTV_CLOSED,
            b.SUM_CRED_ACTV_SOLD,
            b.SUM_CRED_ACTV_BADDBT,
            
            b.AVG_CREDIT_LIMIT,
            b.SUM_AMT_CREDIT_DEBT

          from app_train as a
          left join 
          df_bureau_01 as b
          on
          a.SK_ID_CURR = b.SK_ID_CURR
 """)
 
abt_bureau.registerTempTable('abt_bureau')
 
abt_bureau.count()


307511

Granulidade ok, tudo ajustado, variaveis criadas e agregadas a tabela principal

# Salvando a tabela gerada no diretorio das ABTs

In [None]:
abt_bureau.write.mode('overwrite').parquet("/content/drive/MyDrive/LAMIA/projeto_final/Arquitetura_Dados/ABT/VARS_BUREAU")
#Com o metodo parquet sao salvos diversos arquivos no diretorio novo que é o VARS_BUREAU, pique hdfs mesmo

In [None]:
#Lendo com o metodo do spark fica bem bacana
read_abt_bureau = spark.read.format("parquet").load("/content/drive/MyDrive/LAMIA/projeto_final/Arquitetura_Dados/ABT/VARS_BUREAU")
 
read_abt_bureau.count()

307511

# Transformando em um .csv

In [None]:
import pandas as pd

In [None]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [None]:
dataframe = read_abt_bureau.select("*").toPandas()

In [None]:
dataframe.to_csv('/content/drive/MyDrive/LAMIA/projeto_final/dados/ABT/Abt_Bureau.csv')