In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"


In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
POS_CASH = spark.read.csv('/content/gdrive/My Drive/TCC/POS_CASH_balance.csv', header=True, inferSchema=True)
POS_CASH.createOrReplaceTempView('POS_CASH')

In [None]:
POS_CASH.printSchema()

root
 |-- SK_ID_PREV: integer (nullable = true)
 |-- SK_ID_CURR: integer (nullable = true)
 |-- MONTHS_BALANCE: integer (nullable = true)
 |-- CNT_INSTALMENT: double (nullable = true)
 |-- CNT_INSTALMENT_FUTURE: double (nullable = true)
 |-- NAME_CONTRACT_STATUS: string (nullable = true)
 |-- SK_DPD: integer (nullable = true)
 |-- SK_DPD_DEF: integer (nullable = true)



In [None]:
POS_CASH.columns

['SK_ID_PREV',
 'SK_ID_CURR',
 'MONTHS_BALANCE',
 'CNT_INSTALMENT',
 'CNT_INSTALMENT_FUTURE',
 'NAME_CONTRACT_STATUS',
 'SK_DPD',
 'SK_DPD_DEF']

In [None]:
POS_CASH.count()

10001358

In [None]:
POS_CASH.show(5)

+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+
|SK_ID_PREV|SK_ID_CURR|MONTHS_BALANCE|CNT_INSTALMENT|CNT_INSTALMENT_FUTURE|NAME_CONTRACT_STATUS|SK_DPD|SK_DPD_DEF|
+----------+----------+--------------+--------------+---------------------+--------------------+------+----------+
|   1803195|    182943|           -31|          48.0|                 45.0|              Active|     0|         0|
|   1715348|    367990|           -33|          36.0|                 35.0|              Active|     0|         0|
|   1784872|    397406|           -32|          12.0|                  9.0|              Active|     0|         0|
|   1903291|    269225|           -35|          48.0|                 42.0|              Active|     0|         0|
|   2341044|    334279|           -35|          36.0|                 35.0|              Active|     0|         0|
+----------+----------+--------------+--------------+---------------------+-----

In [None]:
POS_CASH.select('SK_DPD').describe().show()

+-------+------------------+
|summary|            SK_DPD|
+-------+------------------+
|  count|          10001358|
|   mean|11.606928279139693|
| stddev| 132.7140434930964|
|    min|                 0|
|    max|              4231|
+-------+------------------+



In [None]:
valores_unicos = spark.sql(f"SELECT DISTINCT {'NAME_CONTRACT_STATUS'} FROM POS_CASH").toPandas()
print(valores_unicos)

    NAME_CONTRACT_STATUS
0                 Demand
1               Approved
2              Completed
3         Amortized debt
4  Returned to the store
5                    XNA
6                 Active
7                 Signed
8               Canceled


In [None]:
#Média de desvio padrão dos dias de atraso e número de parcelas pendentes
Pos1 =  spark.sql("""
SELECT
  *,
  ROUND(AVG(SK_DPD) OVER (PARTITION BY SK_ID_PREV), 2) AS Media_dias_atraso
, ROUND(STDDEV(SK_DPD) OVER (PARTITION BY SK_ID_PREV), 2) AS DesvioP_dias_atraso
, CNT_INSTALMENT - CNT_INSTALMENT_FUTURE AS Parcelas_pendentes
, COUNT(SK_ID_PREV) OVER (PARTITION BY SK_ID_PREV) AS Num_Transacao
, AVG(CASE WHEN SK_DPD > 0 THEN 1 ELSE 0 END) OVER (PARTITION BY SK_ID_PREV) AS Percentual_atraso
, MONTHS_BALANCE % 12 AS Mes_no_ano
, SK_DPD - LAG(SK_DPD) OVER (PARTITION BY SK_ID_PREV ORDER BY MONTHS_BALANCE) AS Diferenca_meses_atraso
FROM POS_CASH
""")
Pos1.createOrReplaceTempView('Pos1')

In [None]:
Pos2 = spark.sql("""
SELECT
    SK_ID_PREV
,    MAX(MONTHS_BALANCE) - MIN(MONTHS_BALANCE) + 1 AS Duracao_Relacionamento
,    AVG(SK_DPD) AS Media_ultimos_6_meses
,    SUM(SK_DPD) AS Soma_ultimos_6_meses
,    AVG(SK_DPD) AS Media_anual_atraso
,    SUM(SK_DPD) AS Soma_anual_atraso
,    SUM(CASE WHEN NAME_CONTRACT_STATUS = 'Demand' THEN 1 ELSE 0 END) AS QTDE_Demanda
,    SUM(CASE WHEN NAME_CONTRACT_STATUS = 'Approved' THEN 1 ELSE 0 END) AS QTDE_Aprovado
,    SUM(CASE WHEN NAME_CONTRACT_STATUS = 'Completed' THEN 1 ELSE 0 END) AS QTDE_Concluido
,    SUM(CASE WHEN NAME_CONTRACT_STATUS = 'Amortized debt' THEN 1 ELSE 0 END) AS Qtde_Amortized
,    SUM(CASE WHEN NAME_CONTRACT_STATUS = 'XNA' THEN 1 ELSE 0 END) AS QTDE_Valo_Desconhecido
,    SUM(CASE WHEN NAME_CONTRACT_STATUS = 'Active' THEN 1 ELSE 0 END) AS QTDE_Ativo
,    SUM(CASE WHEN NAME_CONTRACT_STATUS = 'Signed' THEN 1 ELSE 0 END) AS QTDE_Assinado
,    SUM(CASE WHEN NAME_CONTRACT_STATUS = 'Canceled' THEN 1 ELSE 0 END) AS QTDE_Cancelado
,    SUM(CASE WHEN NAME_CONTRACT_STATUS = 'Returned to the store' THEN 1 ELSE 0 END) AS QTDE_Devolvido
FROM POS_CASH
WHERE MONTHS_BALANCE >= -6
GROUP BY
    SK_ID_PREV, FLOOR(MONTHS_BALANCE / 12)
""")
Pos2.createOrReplaceTempView('Pos2')

In [None]:
Posfinal = spark.sql("""
SELECT
    Pos1.*,
    Pos2.Duracao_Relacionamento,
    Pos2.Media_ultimos_6_meses,
    Pos2.Soma_ultimos_6_meses,
    Pos2.Media_anual_atraso,
    Pos2.Soma_anual_atraso,
    Pos2.QTDE_Demanda,
    Pos2.QTDE_Aprovado,
    Pos2.QTDE_Concluido,
    Pos2.Qtde_Amortized,
    Pos2.QTDE_Valo_Desconhecido,
    Pos2.QTDE_Ativo,
    Pos2.QTDE_Assinado,
    Pos2.QTDE_Cancelado,
    Pos2.QTDE_Devolvido
FROM Pos1
JOIN Pos2
ON Pos1.SK_ID_PREV = Pos2.SK_ID_PREV;""")

Posfinal.createOrReplaceTempView('Posfinal')

In [None]:
install = spark.read.csv('/content/gdrive/My Drive/TCC/install.csv', header=True, inferSchema=True)
install.createOrReplaceTempView('install')

In [None]:
install.show()

+----------+----------+----------------------+---------------------+---------------+------------------+--------------+-----------+
|SK_ID_PREV|SK_ID_CURR|NUM_INSTALMENT_VERSION|NUM_INSTALMENT_NUMBER|DAYS_INSTALMENT|DAYS_ENTRY_PAYMENT|AMT_INSTALMENT|AMT_PAYMENT|
+----------+----------+----------------------+---------------------+---------------+------------------+--------------+-----------+
|   1054186|    161674|                   1.0|                    6|        -1180.0|           -1187.0|       6948.36|    6948.36|
|   1330831|    151639|                   0.0|                   34|        -2156.0|           -2156.0|      1716.525|   1716.525|
|   2085231|    193053|                   2.0|                    1|          -63.0|             -63.0|       25425.0|    25425.0|
|   2452527|    199697|                   1.0|                    3|        -2418.0|           -2426.0|      24350.13|   24350.13|
|   2714724|    167756|                   1.0|                    2|        -1383.0

In [None]:
#verificando taxa de pagamento, diferença do valor, pagamento adiantado

install1 = spark.sql("""
Select
  SK_ID_PREV
, AMT_INSTALMENT
, AMT_PAYMENT
, (AMT_PAYMENT / AMT_INSTALMENT) * 100 AS Percentual_pago
, ABS(AMT_PAYMENT - AMT_INSTALMENT) AS Diferenca_pagamento
, CASE WHEN DAYS_ENTRY_PAYMENT < 0 THEN 1 ELSE 0 END AS Pagamento_antecipado
FROM install
""")
install1.createOrReplaceTempView('install1')

In [None]:
#verificando qtde de versões
install12 = spark.sql("""
SELECT
  SK_ID_PREV
, COUNT(DISTINCT NUM_INSTALMENT_VERSION) AS Qtde_versoes_contrato
, AVG(NUM_INSTALMENT_NUMBER) AS Media_Parcelas_versao
, COUNT(NUM_INSTALMENT_NUMBER) AS Numero_parcelas
, MAX(NUM_INSTALMENT_NUMBER) - MIN(NUM_INSTALMENT_NUMBER) AS Variacao_parcelas
FROM install
GROUP BY SK_ID_PREV
""")
install12.createTempView('install12')

In [None]:
installfinal = spark.sql("""
SELECT
    install1.*,
    install12.Qtde_versoes_contrato,
    install12.Media_Parcelas_versao,
    install12.Numero_parcelas,
    install12.Variacao_parcelas
FROM install1
INNER JOIN install12 ON install1.SK_ID_PREV = install12.SK_ID_PREV
""").show()
installfinal.createOrReplaceTempView('installfinal')

+----------+--------------+-----------+------------------+-------------------+--------------------+---------------------+---------------------+---------------+-----------------+
|SK_ID_PREV|AMT_INSTALMENT|AMT_PAYMENT|   Percentual_pago|Diferenca_pagamento|Pagamento_antecipado|Qtde_versoes_contrato|Media_Parcelas_versao|Numero_parcelas|Variacao_parcelas|
+----------+--------------+-----------+------------------+-------------------+--------------------+---------------------+---------------------+---------------+-----------------+
|   1000149|       5108.58|    5108.58|             100.0|                0.0|                   1|                    4|               5.8125|             16|                9|
|   1000149|       5108.58|    3426.48| 67.07304182375533|             1682.1|                   1|                    4|               5.8125|             16|                9|
|   1000149|     21259.395|  21259.395|             100.0|                0.0|                   1|           

AttributeError: ignored

In [None]:
Pos_Install = spark.sql("""
SELECT
  Posfinal.*
, installfinal.*

from Posfinal
INNER JOIN installfinal ON Posfinal.SK_ID_PREV = installfinal.SK_ID_PREV

""")

Pos_Install.createOrReplaceTempView('Pos_Install')

In [None]:
Pos_Install.show()

In [None]:
Pos_Install = spark.sql("""
SELECT
    Posfinal.*,
    installfinal.*
FROM Posfinal
INNER JOIN installfinal ON Posfinal.SK_ID_PREV = installfinal.SK_ID_PREV
""")