# *Configurando Kernel Spark*

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"
 
# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [None]:
from pyspark.sql import SparkSession
 
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# *Explorando Dados*

### *Lendo Publico de Modelagem*

In [5]:

# carregar dados do Público de Modelagem
app_train = spark.read.csv("/content/drive/MyDrive/SQL/InputData/application_train.csv", inferSchema=True, header=True)
 
app_train.registerTempTable('app_train')
 
app_train.count()

307511

### *Lendo A Tabela Transacional Bureau*

In [6]:
# carregar dados da tabela Bureau
bureau = spark.read.csv("/content/drive/MyDrive/SQL/InputData/bureau.csv", inferSchema=True, header=True)
 
bureau.registerTempTable('bureau')
 
bureau.count()

1716428

# Verificando Variaveis da Tabela *Principal*

In [7]:
app_train.printSchema()

root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- TARGET: integer (nullable = true)
 |-- NAME_CONTRACT_TYPE: string (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- NAME_TYPE_SUITE: string (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
 |-- DAYS_BIRTH: integer (nullable = true)
 |-- DAYS_EMPLOYED: integer (nullable = true)
 |-- DAYS_REGISTRATION: double (nullable = true)
 |-- DAYS_ID_PUBLISH: integer (nullable = true)
 |-- OWN_CAR_AG

# Listando Variaveis Tipo *String*

In [8]:
 string_list = [item[0] for item in app_train.dtypes if item[1].startswith('string')]
 string_list


['NAME_CONTRACT_TYPE',
 'CODE_GENDER',
 'FLAG_OWN_CAR',
 'FLAG_OWN_REALTY',
 'NAME_TYPE_SUITE',
 'NAME_INCOME_TYPE',
 'NAME_EDUCATION_TYPE',
 'NAME_FAMILY_STATUS',
 'NAME_HOUSING_TYPE',
 'OCCUPATION_TYPE',
 'WEEKDAY_APPR_PROCESS_START',
 'ORGANIZATION_TYPE',
 'FONDKAPREMONT_MODE',
 'HOUSETYPE_MODE',
 'WALLSMATERIAL_MODE',
 'EMERGENCYSTATE_MODE']

# Calculando A Volumetria de Inadimplencia

In [9]:
spark.sql("""
            select 
              NAME_CONTRACT_TYPE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              NAME_CONTRACT_TYPE
            order by 
              AVG_TARGET desc
""").show()

+------------------+----------+------+
|NAME_CONTRACT_TYPE|AVG_TARGET|Volume|
+------------------+----------+------+
|        Cash loans|      8.35|278232|
|   Revolving loans|      5.48| 29279|
+------------------+----------+------+



Neste caso acima, observamos que quem faz empréstimo do tipo rotativo (Revolving loans) possui menor taxa de evento do que quem faz empréstimo do tipo dinheiro (Cash loans). Mas também o volume para essas pessoas é muito menor

# *Inadimplência Por Profissão*

In [10]:
spark.sql("""
            select 
              OCCUPATION_TYPE,
              round(100*avg(TARGET),2) as AVG_TARGET,
              count(*) as Volume
 
            from 
              app_train
            group by 
              OCCUPATION_TYPE
            order by 
              AVG_TARGET desc
""").show()

+--------------------+----------+------+
|     OCCUPATION_TYPE|AVG_TARGET|Volume|
+--------------------+----------+------+
|  Low-skill Laborers|     17.15|  2093|
|             Drivers|     11.33| 18603|
|Waiters/barmen staff|     11.28|  1348|
|      Security staff|     10.74|  6721|
|            Laborers|     10.58| 55186|
|       Cooking staff|     10.44|  5946|
|         Sales staff|      9.63| 32102|
|      Cleaning staff|      9.61|  4653|
|       Realty agents|      7.86|   751|
|         Secretaries|      7.05|  1305|
|      Medicine staff|       6.7|  8537|
|Private service s...|       6.6|  2652|
|                null|      6.51| 96391|
|            IT staff|      6.46|   526|
|            HR staff|      6.39|   563|
|          Core staff|       6.3| 27570|
|            Managers|      6.21| 21371|
|High skill tech s...|      6.16| 11380|
|         Accountants|      4.83|  9813|
+--------------------+----------+------+



Neste caso, observamos que os trabalhadores com menor skill (habilidades técnicas) são os mais inadimplentes neste cenário (apresentam a maior taxa de evento). 


# *Verificando Variaveis da Tabela Bureau*

In [11]:
bureau.show()

+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|    CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|    215354|     5714462|       Closed|     currency 1|       -497|                 0|             -153.0|           -153

In [12]:
bureau.printSchema()

root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- SK_ID_BUREAU: integer (nullable = true)
 |-- CREDIT_ACTIVE: string (nullable = true)
 |-- CREDIT_CURRENCY: string (nullable = true)
 |-- DAYS_CREDIT: integer (nullable = true)
 |-- CREDIT_DAY_OVERDUE: integer (nullable = true)
 |-- DAYS_CREDIT_ENDDATE: double (nullable = true)
 |-- DAYS_ENDDATE_FACT: double (nullable = true)
 |-- AMT_CREDIT_MAX_OVERDUE: double (nullable = true)
 |-- CNT_CREDIT_PROLONG: integer (nullable = true)
 |-- AMT_CREDIT_SUM: double (nullable = true)
 |-- AMT_CREDIT_SUM_DEBT: double (nullable = true)
 |-- AMT_CREDIT_SUM_LIMIT: double (nullable = true)
 |-- AMT_CREDIT_SUM_OVERDUE: double (nullable = true)
 |-- CREDIT_TYPE: string (nullable = true)
 |-- DAYS_CREDIT_UPDATE: integer (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)




Vamos selecionar um único indivíduo da tabela principal, filtrar este indivíduo na tabela bureau e verificar seu comportamento transacional ID:100002


In [13]:
spark.sql("""
              select
                *
              from
                bureau
              where
              SK_ID_CURR = 100002
""").show()
#Este comportamento é chamado de granularidade do dado. 
#Este dado da tabela bureau tem granularidade diferente da tabela principal.
#Veja que uma linha da tabela principal, para este ID, gera 8 linhas na tabela Bureau, que representam as transações que o indivíduo realizou. 


+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|SK_ID_CURR|SK_ID_BUREAU|CREDIT_ACTIVE|CREDIT_CURRENCY|DAYS_CREDIT|CREDIT_DAY_OVERDUE|DAYS_CREDIT_ENDDATE|DAYS_ENDDATE_FACT|AMT_CREDIT_MAX_OVERDUE|CNT_CREDIT_PROLONG|AMT_CREDIT_SUM|AMT_CREDIT_SUM_DEBT|AMT_CREDIT_SUM_LIMIT|AMT_CREDIT_SUM_OVERDUE|    CREDIT_TYPE|DAYS_CREDIT_UPDATE|AMT_ANNUITY|
+----------+------------+-------------+---------------+-----------+------------------+-------------------+-----------------+----------------------+------------------+--------------+-------------------+--------------------+----------------------+---------------+------------------+-----------+
|    100002|     6158904|       Closed|     currency 1|      -1125|                 0|            -1038.0|          -1038

## Volumetria do Credito Ativo

In [17]:
spark.sql("""
            select 
              CREDIT_ACTIVE,
              count(*) as Volume
 
            from 
              bureau
            group by 
              CREDIT_ACTIVE
            order by 
             volume desc
""").show()

"""Neste caso acima, note que “Bad debt” apresenta volumetria baixa demais,
 pensando na volumetria da tabela que é da ordem de milhões de linhas,
  este domínio não tem representatividade nenhuma sobre o negócio
  """

+-------------+-------+
|CREDIT_ACTIVE| Volume|
+-------------+-------+
|       Closed|1079273|
|       Active| 630607|
|         Sold|   6527|
|     Bad debt|     21|
+-------------+-------+



'Neste caso acima, note que “Bad debt” apresenta volumetria baixa demais,\n pensando na volumetria da tabela que é da ordem de milhões de linhas,\n  este domínio não tem representatividade nenhuma sobre o negócio\n  '

# - Indicadores Preditivos

In [20]:

df_bureau_01 = spark.sql("""
            select 
                 SK_ID_CURR,
                 round(avg(DAYS_CREDIT),2) as AVG_DAYS_CREDIT
            from 
              bureau
            group by 
              SK_ID_CURR
""")
 
df_bureau_01.registerTempTable('df_bureau_01')
 
df_bureau_01.show()

""" Criamos uma variável que seja descrita por ser a Média da quantidade de dias 
antes da aplicação atual que o cliente solicitou crédito do Bureau de Crédito (DAYS_CREDIT)
"""

+----------+---------------+
|SK_ID_CURR|AVG_DAYS_CREDIT|
+----------+---------------+
|    341504|         -283.5|
|    197603|       -1916.43|
|    330299|        -1645.0|
|    355377|        -1540.0|
|    454739|       -1215.86|
|    295286|       -1264.89|
|    192082|       -1451.21|
|    109608|        -504.33|
|    375705|        -896.75|
|    380065|        -638.67|
|    416191|        -1429.0|
|    162260|        -1507.4|
|    166160|         -856.0|
|    191350|         -151.0|
|    258865|         -979.5|
|    439807|         -795.2|
|    258129|        -434.75|
|    133018|       -1261.64|
|    338173|       -1318.24|
|    338091|        -1022.0|
+----------+---------------+
only showing top 20 rows



' Criamos uma variável que seja descrita por ser a Média da quantidade de dias \nantes da aplicação atual que o cliente solicitou crédito do Bureau de Crédito (DAYS_CREDIT)\n'

## Compondo ABT considerando os indicadores preditivos da tabela Bureau.


In [None]:
abt_bureau = spark.sql("""
          Select 
 
          a.*,
          b.AVG_DAYS_CREDIT,
          b.AVG_CRED_ACTV_ACTIVE,
          b.AVG_CRED_ACTV_CLOSED,
          b.AVG_CRED_ACTV_SOLD,
          b.AVG_CRED_ACTV_BADDBT,
          b.SUM_DAYS_CREDIT,
          b.SUM_CRED_ACTV_ACTIVE,
          b.SUM_CRED_ACTV_CLOSED,
          b.SUM_CRED_ACTV_SOLD,
          b.SUM_CRED_ACTV_BADDBT,
          b.MIN_DAYS_CREDIT,
          b.MIN_CRED_ACTV_ACTIVE,
          b.MIN_CRED_ACTV_CLOSED,
          b.MIN_CRED_ACTV_SOLD,
          b.MIN_CRED_ACTV_BADDBT
 
          from app_train  as a
          left join 
          df_bureau_01 as b
          on
          a.SK_ID_CURR = b.SK_ID_CURR
 
 """)
 
abt_bureau.registerTempTable('abt_bureau')
 
abt_bureau.count()