<a href="https://colab.research.google.com/github/Denise-Pro/Case_Engenharia_Dados_Spark_Colab/blob/master/case_SharkIT_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')


# iniciar uma sessão local e importar dados do Airbnb
from pyspark.sql import SparkSession
sc = SparkSession.builder.master('local[*]').getOrCreate()

from pyspark.sql import *
from pyspark.sql.types import *                      
from pyspark.sql import functions
from pyspark.sql.functions import col
from pyspark.sql.functions import substring
from pyspark.sql.functions import explode

In [None]:
# download do http para arquivo local
!wget --quiet --show-progress https://noverde-data-engineering-test.s3.amazonaws.com/loans_sample.csv

# carregando dados do arquivo Csv
df_loans = sc.read.csv("./loans_sample.csv", inferSchema=True, header=True)

# ver algumas informações sobre os tipos de dados de cada coluna
df_loans.printSchema()
df_loans.show()

root
 |-- loan_id: integer (nullable = true)
 |-- period: integer (nullable = true)
 |-- accepted_at: timestamp (nullable = true)
 |-- payday: integer (nullable = true)
 |-- interest_rate: double (nullable = true)

+-------+------+--------------------+------+-------------+
|loan_id|period|         accepted_at|payday|interest_rate|
+-------+------+--------------------+------+-------------+
|      0|    12|2017-05-17 16:48:...|    25|         3.12|
|      1|    12|2017-05-19 03:12:...|    25|         7.55|
|      2|    12|2017-05-24 13:22:...|    25|         7.55|
|      3|    12|2017-05-21 23:51:...|     5|         7.55|
|      4|     9|2017-05-18 08:07:...|    15|         7.49|
|      5|    12|2017-05-20 08:39:...|    25|         3.12|
|      6|     9|2017-05-17 07:48:...|    15|         7.49|
|      7|    12|2017-05-23 14:16:...|     5|         7.55|
|      8|    12|2017-05-16 16:17:...|    15|         7.55|
|      9|    12|2017-05-21 17:47:...|    15|         7.55|
|     10|    12|20

In [None]:
# download do http para arquivo local
!wget --quiet --show-progress https://noverde-data-engineering-test.s3.amazonaws.com/installments_sample.json

Spark_DF_Json = (sc.read.option("multiline", "true").option("inferSchema", "true").json('./installments_sample.json')                 )
# data = Spark_DF_Json.select("data").collect()[0]['data']
# Spark_DF_Json.printSchema()
# Spark_DF_Json.show()

from pyspark.sql.functions import to_date
data_json = Spark_DF_Json.select(explode('data'))
data_json.printSchema()
installments_df = data_json.select(col('col.loan_id'),col('col.number').alias('installment_number'),col('col.due_date').alias('due_date'))
installments_df = installments_df.withColumn("due_date", to_date(substring('due_date', 0, 10), "yyyy-MM-dd"))
installments_df.show()

root
 |-- col: struct (nullable = true)
 |    |-- due_date: string (nullable = true)
 |    |-- installment_id: long (nullable = true)
 |    |-- installment_value: double (nullable = true)
 |    |-- loan_id: long (nullable = true)
 |    |-- number: long (nullable = true)

+-------+------------------+----------+
|loan_id|installment_number|  due_date|
+-------+------------------+----------+
|    291|                 1|2017-06-12|
|    291|                 2|2017-07-12|
|    291|                 3|2017-08-12|
|    291|                 5|2017-10-12|
|    291|                 4|2017-09-12|
|    291|                12|2018-05-12|
|    291|                 7|2017-12-12|
|    291|                 6|2017-11-12|
|    291|                 9|2018-02-12|
|    291|                11|2018-04-12|
|    291|                 8|2018-01-12|
|    291|                10|2018-03-12|
|    670|                 3|2017-09-01|
|    154|                 4|2017-08-22|
|    154|                 6|2017-10-22|
|    154

In [None]:
# download do http para arquivo local
!wget --quiet --show-progress https://noverde-data-engineering-test.s3.amazonaws.com/payments_sample.parquet
df_spark_parquet = sc.read.format("parquet").load("./payments_sample.parquet")
df_spark_parquet.printSchema()
df_payment = df_spark_parquet.select(col('loan_id'),col('payment_id').alias('id'), col('payment_date'),col('payment_method').alias('method'),\
                                     col('paid_amount').alias('amount'))
df_payment.show()

root
 |-- loan_id: long (nullable = true)
 |-- installment_id: long (nullable = true)
 |-- payment_date: date (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_id: string (nullable = true)
 |-- paid_amount: double (nullable = true)

+-------+--------------------+------------+----------+------+
|loan_id|                  id|payment_date|    method|amount|
+-------+--------------------+------------+----------+------+
|    291|8ad9e0e5-a232-477...|  2017-06-01|creditcard|454.94|
|    291|e747c7f0-bd95-4ac...|  2017-07-03|       ted|454.94|
|    291|35734218-a71f-411...|  2017-07-23|       ted|454.94|
|    291|1c957990-5245-41e...|  2017-10-19|    boleto|462.94|
|    291|4389d87f-570a-4a7...|  2018-05-14|creditcard|456.94|
|    291|91bdae2d-f1ba-40a...|  2017-12-05|creditcard|454.94|
|    291|46410647-eb71-49b...|  2017-11-19|       ted|457.94|
|    291|abc5cefe-0c08-481...|  2018-01-23|creditcard|454.94|
|    291|77b437f6-f1ea-4ad...|  2018-04-12|       ted|454.

In [None]:
# faço aqui o join de 3 dataframes, desse jeito o spark entende que 'loan_id' é uma coluna em comum para os 3 e cria apenas uma em df_concat
df_concat = df_loans.join(installments_df, ["loan_id"]).join(df_payment, ["loan_id"])
df_concat.show()

+-------+------+--------------------+------+-------------+------------------+----------+--------------------+------------+----------+------+
|loan_id|period|         accepted_at|payday|interest_rate|installment_number|  due_date|                  id|payment_date|    method|amount|
+-------+------+--------------------+------+-------------+------------------+----------+--------------------+------------+----------+------+
|    291|    12|2017-05-11 13:00:...|     5|         7.55|                 1|2017-06-12|4865e86e-4042-412...|  2018-02-23|creditcard|454.94|
|    291|    12|2017-05-11 13:00:...|     5|         7.55|                 1|2017-06-12|5b6bf993-faec-4bd...|  2017-12-31|creditcard|454.94|
|    291|    12|2017-05-11 13:00:...|     5|         7.55|                 1|2017-06-12|77b437f6-f1ea-4ad...|  2018-04-12|       ted|454.94|
|    291|    12|2017-05-11 13:00:...|     5|         7.55|                 1|2017-06-12|abc5cefe-0c08-481...|  2018-01-23|creditcard|454.94|
|    291|    

In [None]:
df = df_concat.toPandas()
df

Unnamed: 0,loan_id,period,accepted_at,payday,interest_rate,installment_number,due_date,id,payment_date,method,amount
0,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,4865e86e-4042-412f-be34-5df45cae39af,2018-02-23,creditcard,454.94
1,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,5b6bf993-faec-4bd0-89f4-e3600518acd7,2017-12-31,creditcard,454.94
2,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,77b437f6-f1ea-4ade-b074-b0d0a855a7c0,2018-04-12,ted,454.94
3,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,abc5cefe-0c08-4816-8182-c3723a47cc9b,2018-01-23,creditcard,454.94
4,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,46410647-eb71-49b9-a566-ffbc4c203423,2017-11-19,ted,457.94
...,...,...,...,...,...,...,...,...,...,...,...
65218,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,23324a12-e651-4572-9e64-bf2ddb4a0064,2017-10-17,boleto,245.52
65219,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,47822864-bccf-42c0-a231-001ed8f8215a,2017-09-08,ted,245.52
65220,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,0b2241a5-965f-4710-8d91-af178af4ec8f,2017-07-31,creditcard,245.52
65221,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,6217c90d-5bfa-4883-bad2-6014f2b65b32,2017-07-27,ted,256.52


In [None]:
# verifico se existem valores nulos na coluna de pagamantos
df.payment_date.isnull().sum()

0

In [None]:
df.loc [df ['payment_date'] <= df ['due_date'], 'latency'] = 'False'
df.loc [df ['payment_date'] > df ['due_date'], 'latency'] = 'True'
df

Unnamed: 0,loan_id,period,accepted_at,payday,interest_rate,installment_number,due_date,id,payment_date,method,amount,latency
0,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,4865e86e-4042-412f-be34-5df45cae39af,2018-02-23,creditcard,454.94,True
1,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,5b6bf993-faec-4bd0-89f4-e3600518acd7,2017-12-31,creditcard,454.94,True
2,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,77b437f6-f1ea-4ade-b074-b0d0a855a7c0,2018-04-12,ted,454.94,True
3,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,abc5cefe-0c08-4816-8182-c3723a47cc9b,2018-01-23,creditcard,454.94,True
4,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,46410647-eb71-49b9-a566-ffbc4c203423,2017-11-19,ted,457.94,True
...,...,...,...,...,...,...,...,...,...,...,...,...
65218,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,23324a12-e651-4572-9e64-bf2ddb4a0064,2017-10-17,boleto,245.52,False
65219,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,47822864-bccf-42c0-a231-001ed8f8215a,2017-09-08,ted,245.52,False
65220,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,0b2241a5-965f-4710-8d91-af178af4ec8f,2017-07-31,creditcard,245.52,False
65221,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,6217c90d-5bfa-4883-bad2-6014f2b65b32,2017-07-27,ted,256.52,False


In [None]:
import numpy as np
from datetime import datetime

df.loc[:,'Diference_pay_day'] = (df['payment_date'].sub(df['due_date']))/np.timedelta64(1, 'D')

# se o pagamento esta atrasado há mais de 30 dias o valor e 'True' - significa q a dívida está em aberto
# se o atraso for maior  q 30 dias , mas o pagamento foi efetuado = 'False'

df.loc [df ['Diference_pay_day'] >= 30 & df['payment_date'].isnull(), 'Over30'] = 'True'
df.loc [df ['Diference_pay_day'] >= 30 & df['payment_date'].notnull(), 'Over30'] = 'False'
df

Unnamed: 0,loan_id,period,accepted_at,payday,interest_rate,installment_number,due_date,id,payment_date,method,amount,latency,Diference_pay_day,Over30
0,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,4865e86e-4042-412f-be34-5df45cae39af,2018-02-23,creditcard,454.94,True,256.0,False
1,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,5b6bf993-faec-4bd0-89f4-e3600518acd7,2017-12-31,creditcard,454.94,True,202.0,False
2,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,77b437f6-f1ea-4ade-b074-b0d0a855a7c0,2018-04-12,ted,454.94,True,304.0,False
3,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,abc5cefe-0c08-4816-8182-c3723a47cc9b,2018-01-23,creditcard,454.94,True,225.0,False
4,291,12,2017-05-11 13:00:31.620,5,7.55,1,2017-06-12,46410647-eb71-49b9-a566-ffbc4c203423,2017-11-19,ted,457.94,True,160.0,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
65218,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,23324a12-e651-4572-9e64-bf2ddb4a0064,2017-10-17,boleto,245.52,False,-156.0,
65219,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,47822864-bccf-42c0-a231-001ed8f8215a,2017-09-08,ted,245.52,False,-195.0,
65220,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,0b2241a5-965f-4710-8d91-af178af4ec8f,2017-07-31,creditcard,245.52,False,-234.0,
65221,522,12,2017-05-25 05:51:23.106,15,7.55,10,2018-03-22,6217c90d-5bfa-4883-bad2-6014f2b65b32,2017-07-27,ted,256.52,False,-238.0,


In [None]:
df.columns

Index(['loan_id', 'period', 'accepted_at', 'payday', 'interest_rate',
       'installment_number', 'due_date', 'id', 'payment_date', 'method',
       'amount', 'latency', 'Diference_pay_day', 'Over30'],
      dtype='object')

Salvo a Table em um arquivo parquet

In [None]:
# descarto aqui a coluna auxiliar 'Diference_pay_day' e salvo a table no arquivo parquet

df.drop('Diference_pay_day', axis=1, inplace=True)
df.to_parquet('loan_documents.parquet')

Leio o arquivo parquet, transformo em um dataframe spark e dou início às respostas para as 4 perguntas 

In [316]:
df_loan_documents = sc.read.format('parquet').load('loan_documents.parquet')
df_loan_documents.show()

+-------+------+--------------------+------+-------------+------------------+----------+--------------------+------------+----------+------+-------+------+
|loan_id|period|         accepted_at|payday|interest_rate|installment_number|  due_date|                  id|payment_date|    method|amount|latency|Over30|
+-------+------+--------------------+------+-------------+------------------+----------+--------------------+------------+----------+------+-------+------+
|    291|    12|2017-05-11 13:00:...|     5|         7.55|                 1|2017-06-12|4865e86e-4042-412...|  2018-02-23|creditcard|454.94|   True| False|
|    291|    12|2017-05-11 13:00:...|     5|         7.55|                 1|2017-06-12|5b6bf993-faec-4bd...|  2017-12-31|creditcard|454.94|   True| False|
|    291|    12|2017-05-11 13:00:...|     5|         7.55|                 1|2017-06-12|77b437f6-f1ea-4ad...|  2018-04-12|       ted|454.94|   True| False|
|    291|    12|2017-05-11 13:00:...|     5|         7.55|      

In [None]:
df_vencimento_2019 = df_loan_documents.filter(df_loan_documents['due_date'].between('2019-01-01', '2019-12-31'))
df_faturamento_2019 = df_loan_documents.filter(df_loan_documents['payment_date'].between('2019-01-01', '2019-12-31'))
# df_vencimento_2019.show()
type(df_faturamento_2019)

pyspark.sql.dataframe.DataFrame

In [None]:
from pyspark.sql.functions import lit
# import org.apache.spark.sql.functions.typedLit
df_venc = df_vencimento_2019.sort('due_date').groupBy("due_date").sum().select(col('sum(amount)').alias('venc_amount')).withColumn('year', lit('2019')).toPandas()
df_venc

Unnamed: 0,venc_amount,year
0,2690.64,2019
1,2690.64,2019
2,2690.64,2019
3,2690.64,2019
4,4138.19,2019
5,4138.19,2019
6,1447.55,2019
7,1447.55,2019
8,1447.55,2019
9,1447.55,2019


In [None]:
df_fat = df_faturamento_2019.sort('payment_date').groupBy("payment_date").sum()\
.filter(df_faturamento_2019.payment_date != '2019-05-15').filter(df_faturamento_2019.payment_date != '2019-06-11')\
            .select('payment_date',col('sum(amount)').alias('amount')).toPandas()
df_fat

Unnamed: 0,payment_date,amount
0,2019-01-05,3539.52
1,2019-02-24,3719.52
2,2019-03-19,3539.52
3,2019-05-21,3539.52
4,2019-06-15,3474.12
5,2019-07-15,3474.12
6,2019-08-15,3474.12
7,2019-10-15,3474.12


In [None]:
import pandas as pd
p1 = pd.concat([df_fat,df_venc], axis=1) 
p1.loc[:,'Ratio'] = (round(p1['amount']/ (p1['venc_amount'])*100))
# p1 = p1[['year','amount', 'Ratio']]
# p1.loc['payment_date'] = p1.payment_date.astype(str)[4:7]

Resposta pergunta 1

In [None]:
datas = list(p1.payment_date.values)
month = []
for data in datas[:8]:
  # if data != 'NaN':
  month.append(data.strftime('%m'))
meses = pd.DataFrame(month, columns=['month'])
meses

P1 =pd.concat([p1,meses], axis = 1)
P1 = P1[['month', 'year', 'amount', 'Ratio']]
P1

Unnamed: 0,month,year,amount,Ratio
0,1.0,2019,3539.52,132.0
1,2.0,2019,3719.52,138.0
2,3.0,2019,3539.52,132.0
3,5.0,2019,3539.52,132.0
4,6.0,2019,3474.12,84.0
5,7.0,2019,3474.12,84.0
6,8.0,2019,3474.12,240.0
7,10.0,2019,3474.12,240.0
8,,2019,,
9,,2019,,


In [207]:
df_payday = df_spark_mens_2019.select('payday', 'accepted_at').toPandas()
df_payday.payday.mode()

0    15
dtype: int32

In [208]:
df_spark_mens_2019 = df_loan_documents.filter(df_loan_documents['accepted_at'].between('2019-01-01', '2019-12-31'))

df_means_pd = df_spark_mens_2019.sort('accepted_at').groupBy("accepted_at").mean()\
.select(col('accepted_at').alias('Date'),col('avg(period)').alias('avg_period'),col('avg(interest_rate)').alias('avg_interest_ratio'),\
        col('avg(payday)').alias('freq_payday')).withColumn('year', lit('2019'))\
        .withColumn("date", to_date(substring('Date', 0, 10), "yyyy-MM-dd"))\
        .toPandas()

df_means_pd


Unnamed: 0,date,avg_period,avg_interest_ratio,freq_payday,year
0,2019-04-26,12.0,7.55,15.0,2019


Resposta 2

In [181]:

# df_means_pd.Date = df_means_pd.Date
datas = list(df_means_pd.date.values)
month = []
for data in datas:
  # if data != 'NaN':
  month.append(data.strftime('%m'))


meses = pd.DataFrame(month, columns=['month'])
meses

P2 = pd.concat([df_means_pd, meses], axis = 1)
P2 = P2[['month', 'year', 'avg_period',	'avg_interest_ratio',	'avg_payday']]
P2

Unnamed: 0,month,year,avg_period,avg_interest_ratio,avg_payday
0,4,2019,12.0,7.55,15.0


In [259]:
df_loan_documents.groupby('period').count().show()
df_loan_documents.groupby('interest_rate').count().show()

+------+-----+
|period|count|
+------+-----+
|    12|61212|
|     6| 1356|
|     9| 2655|
+------+-----+

+-------------+-----+
|interest_rate|count|
+-------------+-----+
|         5.74|   60|
|         7.49| 2451|
|         3.12| 8124|
|         8.01| 1008|
|         5.23|   54|
|         5.15|  924|
|         3.25|  270|
|         7.55|52044|
|         3.68|  288|
+-------------+-----+



Identificando a taxa média de juros para tal período

In [287]:
avg_interest_rate_6 = df_loan_documents.filter(df_loan_documents['period'] == '6').groupby('interest_rate').mean().select(col('avg(interest_rate)').alias('avg_interest_rate')).toPandas()
avg_interest_rate_9 = df_loan_documents.filter(df_loan_documents['period'] == '9').groupby('interest_rate').mean().select(col('avg(interest_rate)').alias('avg_interest_rate')).toPandas()
avg_interest_rate_12 = df_loan_documents.filter(df_loan_documents['period'] == '12').groupby('interest_rate').mean().select(col('avg(interest_rate)').alias('avg_interest_rate')).toPandas()

Unnamed: 0,avg_interest_rate
0,5.74
1,8.01
2,3.68
0,7.49
1,5.23
2,3.25
0,7.49
1,3.12
2,5.15
3,7.55


Identificando o dia mais frequente de pagamento para tal periodo

In [None]:
freq_payday_6 = df_loan_documents.filter(df_loan_documents['period'] == '6').groupby('payday').count().toPandas()
freq_payday_9 = df_loan_documents.filter(df_loan_documents['period'] == '9').groupby('payday').count().toPandas()
freq_payday_12 = df_loan_documents.filter(df_loan_documents['period'] == '12').groupby('payday').count().toPandas()

In [307]:
# o dia 25 é o que aparece mais vezes para period = 12  - moda
freq_payday_12.max()

payday       25
count     25596
dtype: int64

In [308]:
# o dia 25 é o que aparece mais vezes para period = 9  - moda
freq_payday_9.max()

payday      25
count     1269
dtype: int64

In [309]:
# o dia 25 é o que aparece mais vezes para period = 6  - moda
freq_payday_6.max()

payday     25
count     624
dtype: int64

Resposta 3

In [315]:
interest_rate_6 = df_loan_documents.filter(df_loan_documents['period'] == '6').groupby('interest_rate').count().withColumn('period', lit('6')).withColumn('freq_payday', lit('25')).toPandas()
interest_rate_9 = df_loan_documents.filter(df_loan_documents['period'] == '9').groupby('interest_rate').count().withColumn('period', lit('9')).withColumn('freq_payday', lit('25')).toPandas()
interest_rate_12 = df_loan_documents.filter(df_loan_documents['period'] == '12').groupby('interest_rate').count().withColumn('period', lit('12')).withColumn('freq_payday', lit('25')).toPandas()


df_p6= pd.concat([interest_rate_6, avg_interest_rate_6], axis=1)
df_p9= pd.concat([interest_rate_9, avg_interest_rate_9], axis=1)
df_p12= pd.concat([interest_rate_12, avg_interest_rate_12], axis=1)


p3 = pd.concat([df_p6,df_p9,df_p12])
p3 = p3[['period', 'interest_rate', 'count', 'avg_interest_rate', 'freq_payday']]
p3.columns = ['period', 'interest_ratio', 'count_loan_id', 'avg_interest_rate', 'freq_payday']
p3

Unnamed: 0,period,interest_ratio,count_loan_id,avg_interest_rate,freq_payday
0,6,5.74,60,5.74,25
1,6,8.01,1008,8.01,25
2,6,3.68,288,3.68,25
0,9,7.49,2331,7.49,25
1,9,5.23,54,5.23,25
2,9,3.25,270,3.25,25
0,12,7.49,120,7.49,25
1,12,3.12,8124,3.12,25
2,12,5.15,924,5.15,25
3,12,7.55,52044,7.55,25


In [317]:
df_loan_documents.show()

+-------+------+--------------------+------+-------------+------------------+----------+--------------------+------------+----------+------+-------+------+
|loan_id|period|         accepted_at|payday|interest_rate|installment_number|  due_date|                  id|payment_date|    method|amount|latency|Over30|
+-------+------+--------------------+------+-------------+------------------+----------+--------------------+------------+----------+------+-------+------+
|    291|    12|2017-05-11 13:00:...|     5|         7.55|                 1|2017-06-12|4865e86e-4042-412...|  2018-02-23|creditcard|454.94|   True| False|
|    291|    12|2017-05-11 13:00:...|     5|         7.55|                 1|2017-06-12|5b6bf993-faec-4bd...|  2017-12-31|creditcard|454.94|   True| False|
|    291|    12|2017-05-11 13:00:...|     5|         7.55|                 1|2017-06-12|77b437f6-f1ea-4ad...|  2018-04-12|       ted|454.94|   True| False|
|    291|    12|2017-05-11 13:00:...|     5|         7.55|      

In [345]:
df_total_portfolio = df_loan_documents.groupby('loan_id').count().select('loan_id',col('count').alias('total_portfolio')).toPandas()
df_total_portfolio

Unnamed: 0,loan_id,total_portfolio
0,471,72
1,463,48
2,496,60
3,148,120
4,243,45
...,...,...
770,187,24
771,89,84
772,401,48
773,422,60


Quando 'Over30' é false não há atraso maior ou igual a 30 dias para tal contrato

Over30 = True implica débito em aberto

In [386]:
df_matured = df_loan_documents.filter(df_loan_documents['Over30'] == 'False').groupby('loan_id').count().select('loan_id', col('count').alias('matured_portfolio') ).toPandas()

df_over30 = df_loan_documents.filter(df_loan_documents['Over30'] == 'True').groupby('loan_id').count().select('loan_id', col('count').alias('over30_true') ).toPandas()

df_matured

Unnamed: 0,loan_id,matured_portfolio
0,471,29
1,463,15
2,496,18
3,148,48
4,243,14
...,...,...
770,187,14
771,89,35
772,401,24
773,422,30


Resposta 4

O motivo pelo qual os campos de 'ratio' são em maioria nulos se justifica:

Não existem contratos com atraso maior ou igual a 30 dias para essas linhas.

In [387]:
# df_total_portfolio.join(df_matured, ["loan_id"]).join((df_over30, ["loan_id"]))

p4 = pd.concat([df_total_portfolio,df_matured,df_over30], axis=1)
# p4.dropna(subset=['total_portfolio'], inplace=True)
p4 = p4[['total_portfolio','matured_portfolio', 'over30_true']]
# p4.loc[:,'ratio'] = p4.over30_true/p4.total_portfolio * 100

p4.loc [p4 ['over30_true'].notnull(), 'ratio'] = p4.over30_true/p4.total_portfolio * 100
p4

Unnamed: 0,total_portfolio,matured_portfolio,over30_true,ratio
0,72,29,,
1,48,15,,
2,60,18,,
3,120,48,,
4,45,14,,
...,...,...,...,...
770,24,14,,
771,84,35,,
772,48,24,,
773,60,30,,
