# **BLU365 - Teste técnico** ![1](img/índice.png)

## **Implementação em Pyspark**

Este *notebook* é referente à solução do teste proposto pela equipe técnica da BLU365. O teste consiste em analisar uma [base de dados](teste.csv), buscando averiguar quais acordos registrados estão ou não estão válidos. Os critérios básicos para considerar um acordo como válido são:
- O valor total do acordo deve ser igual à soma de todas as parcelas
- O vencimento da primeira parcela deve cair em um dia útil

Este *notebook* possui a versão da análise feita em `pyspark`. Assim, para que seja possível rodá-lo, se faz necessário configurar o *jupyter* como o driver do pyspark.

Além deste pacote, ainda é usado o pacote `wokalendar`, que analisa quais *timestamps* são referentes aos dias de trabalho.

Caso deseje saber mais informações, sinta-se livre [para me contatar](https://github.com/igor-yamamoto).


🐉 *Code on!* 🐉

## Importação de bibliotecas e dos dados

In [1]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import udf, when, col, to_date

In [2]:
dados = spark.read.csv("teste.csv", header=True, inferSchema=False, sep = '|', dateFormat='YYYY-mm-dd')
dados.show()

+--------------+--------------------+--------------------+-------------+-------------+------------+--------------+
|     documento|                nome|            contrato|ValorContrato|ContratoPlano|ValorParcela|DataVencimento|
+--------------+--------------------+--------------------+-------------+-------------+------------+--------------+
|   76748291623|        Bryan Barros|CX-28715774669644...|        38610|           10|        3861|    2018-05-21|
|   21056684887| Maria Julia da Rosa|CX-61718558756223...|         9464|            4|        2366|    2018-05-08|
|   31133201431|        Anthony Melo|CX-16373131100994...|        14285|            5|        2857|    2018-05-05|
|94110246446680|Dr. Augusto Silveira|CX-18249643754814...|        37158|           22|        1689|    2018-04-05|
|64431473462766|         Maysa Lopes|CX-81229770554257...|        21284|           17|        1252|    2018-03-06|
|38126148919828|      Daniela Aragão|CX-30455534916519...|        40944|        

In [3]:
dados.printSchema()

root
 |-- documento: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- contrato: string (nullable = true)
 |-- ValorContrato: string (nullable = true)
 |-- ContratoPlano: string (nullable = true)
 |-- ValorParcela: string (nullable = true)
 |-- DataVencimento: string (nullable = true)



In [4]:
para_converter = ['ValorContrato', 'ContratoPlano', 'ValorParcela']

for i in para_converter:
    dados = dados.withColumn(i,
                     dados[i].cast(IntegerType())
    )

Convertendo o campo de data para o tipo datetime:

In [5]:
dados = dados.withColumn('DataVencimento', 
                 to_date(dados.DataVencimento).cast('timestamp')
                )

In [6]:
dados.printSchema()

root
 |-- documento: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- contrato: string (nullable = true)
 |-- ValorContrato: integer (nullable = true)
 |-- ContratoPlano: integer (nullable = true)
 |-- ValorParcela: integer (nullable = true)
 |-- DataVencimento: timestamp (nullable = true)



## Verificando os valores dos acordos

Os valores dos acordos (valor negociado para quitar as dívidas) devem ser iguais ao valor da soma de todas as parcelas do acordo. Assim, é possível procurar pelos valores de acordo que **não são iguais à quantidade de parcelas vezes o valor de cada parcela**:

In [7]:
dados.filter(dados.ValorContrato != dados.ValorParcela*dados.ContratoPlano).count()

0

Logo, todos os valores dos acordos são iguais ao valor de cada parcela vezes a quantidade de parcelas a serem pagas. Portanto, **nenhum acordo é invalidado** por esta condição.

## Verificando quais contratos vencem em dias úteis

Para verificar quais contratos vencem em dias úteis, será utilizado o pacote `workalendar`. Assim:

In [8]:
from workalendar.america import Brazil

cal = Brazil()

É possível averiguar quais são os feriados registrados no pacote. Para tal, é possível identificar que o intervalo de tempo que diz respeito aos dados no arquivo csv é referente ao ano de $2018$:

In [9]:
(dados.agg({'DataVencimento': 'min'}).collect()[0]['min(DataVencimento)'],
 dados.agg({'DataVencimento': 'max'}).collect()[0]['max(DataVencimento)'])

(datetime.datetime(2018, 1, 1, 0, 0), datetime.datetime(2018, 12, 25, 0, 0))

In [10]:
cal.holidays(2018)

[(datetime.date(2018, 1, 1), 'New year'),
 (datetime.date(2018, 4, 1), 'Easter Sunday'),
 (datetime.date(2018, 4, 21), "Tiradentes' Day"),
 (datetime.date(2018, 5, 1), 'Labour Day'),
 (datetime.date(2018, 9, 7), 'Independence Day'),
 (datetime.date(2018, 10, 12), 'Our Lady of Aparecida'),
 (datetime.date(2018, 11, 2), "All Souls' Day"),
 (datetime.date(2018, 11, 15), 'Republic Day'),
 (datetime.date(2018, 12, 25), 'Christmas Day')]

Além dos feriados registrados acima referente ao ano de 2018, ainda se faz necessário considerar [os feriados nacionais registrados no calendário bancário](https://feriadosbancarios.febraban.org.br/feriados.asp?ano=2018). Dentro deles, **Carnaval** (12/02/2018 e 13/02/2018), **Sexta-feira santa** (30/03/2018) e **Corpus Christi** (31/05/2018). 

Desta forma, é criado um novo campo sobre o *dataframe* original chamado `DiaUtil`, que leva o valor booleano `True` se for dia útil e `False` caso não.

In [11]:
udf_holiday = udf(lambda x : (cal.is_working_day(x)))
                   
dados = dados.withColumn('DiaUtil', udf_holiday('DataVencimento'))

dados = dados.withColumn('DiaUtil',
                         when(((col('DataVencimento') == '2018-02-12 00:00:00') |
                               (col('DataVencimento') == '2018-02-13 00:00:00') |
                               (col('DataVencimento') == '2018-05-31 00:00:00') |
                               (col('DataVencimento') == '2018-03-30 00:00:00')),
                              'false').otherwise(col('DiaUtil')))

In [12]:
dados.filter(dados.DiaUtil == False).select(dados.DataVencimento, dados.DiaUtil).show()

PythonException: 
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 589, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 254, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'workalendar'


In [13]:
dados.filter(dados.DiaUtil == True).select(dados.DataVencimento, dados.DiaUtil).count()

PythonException: 
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 589, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 447, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 254, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/worker.py", line 74, in read_command
    command = serializer._read_with_length(file)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 172, in _read_with_length
    return self.loads(obj)
  File "/opt/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 458, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'workalendar'


In [None]:
dados.filter(dados.DiaUtil == False).select(dados.DataVencimento, dados.DiaUtil).count()

Portanto, dentro de todos os dados, $316$ contratos possuem vencimento em dias não úteis, **tornando-os inválidos**. Assim, é possível passar os valores booleanos do atributo `DiaUtil` para um novo atributo `Valido`, que possue os valores `True` caso o acordo seja válido e `False` quando for inválido.

In [None]:
dados = dados.withColumn('Valido', dados.DiaUtil)

## Verificando os CPFs e CNPJs

Os documentos de [CPF possuem 11 dígitos](https://pt.wikipedia.org/wiki/Cadastro_de_pessoas_f%C3%ADsicas#N%C3%BAmero_de_inscri%C3%A7%C3%A3o), ao passo que [o CNPJ possui 14](https://pt.wikipedia.org/wiki/Cadastro_Nacional_da_Pessoa_Jur%C3%ADdica). Assim, é possível averiguar o comprimento dos dados fornecidos no campo `documento` quando em formato de *string*.

In [None]:
dados = dados.withColumn('doc_tipo', when(F.length(col('documento'))==11,'CPF').otherwise('CNPJ'))

In [None]:
dados.filter(dados.doc_tipo == 'CPF').count()

In [None]:
dados.filter(dados.doc_tipo == 'CNPJ').count()

De acordo com o resultado acima, existem $512$ instâncias referentes ao registros de CNPJ e $488$ à CPFs, totalizando $1000$ entradas no total. Para manter registro de qual tipo é cada documento, é criado o atributo `doc_tipo`.

## Resumo dos dados

Do total de **$1000$ acordos** previamente registrados na base, **$316$ são inválidos**. Assim, a quantidade de **acordos válidos gerados é de $684$**.
A invalidez dos acordos gerados se deve ao vencimento em dias não úteis. Ainda, é possível avaliar a distribuição dos acordos em relação aos tipos de documentos associados.

In [None]:
dados.toPandas()['Valido'].value_counts().plot.pie(title='Acordos válidos', autopct='%1.1f%%')

In [None]:
acordos_validos = dados.filter(dados.Valido == True)
acordos_invalidos = dados.filter(dados.Valido == False)

In [None]:
(acordos_validos.filter(dados.doc_tipo == 'CPF').count(),
 acordos_validos.filter(dados.doc_tipo == 'CNPJ').count())

In [None]:
acordos_validos.toPandas()['doc_tipo'].value_counts().plot.pie(title='Acordos validados', autopct='%1.1f%%')

Assim, dos acordos validados, **$337$ deles são referentes a pessoas físicas** (CPF, com comprimento de 11) e **$347$ são sobre pessoas jurídicas** (CNPJ, com comprimento de 14).

In [None]:
(acordos_invalidos.filter(dados.doc_tipo == 'CPF').count(),
 acordos_invalidos.filter(dados.doc_tipo == 'CNPJ').count())

In [None]:
acordos_invalidos.toPandas()['doc_tipo'].value_counts().plot.pie(title='Acordos invalidados', autopct='%1.1f%%')

Dos acordos invalidos, **$151$ são referentes a CPF** e **$165$ são de CNPJ**.

## Valor total em reais

Para os acordos validados, **o montante total gerado é de R$\$41.7$ milhões**, enquanto os **acordos invalidados geram um total de R$\$19.1$ milhões** (**total: R$\$60.8$ milhões**). Assim, os acordos invalidados representam uma receita de um total de $31.54\%$, ao passo que para os acordos validados este valor é de $68.54\%$.

In [None]:
import numpy as np

In [None]:
total = dados.select('ValorContrato').groupBy().sum().collect()[0][0]
total_valido = acordos_validos.select('ValorContrato').groupBy().sum().collect()[0][0]
total_invalido = acordos_invalidos.select('ValorContrato').groupBy().sum().collect()[0][0]

print('Valor total gerado pelos acordos válidos: R$' +
      str(round(total_valido/1e6, 1)) +
      ' mi')
print('Porcentagem em relação à todos os acordos (válidos e inválidos): ' +
      str(round(100*total_valido/total, 2)) +
      '%'
     )

In [None]:
print('Valor total gerado pelos acordos válidos: R$' +
      str(round(total_invalido/1e6, 1)) +
      ' mi')
print('Porcentagem em relação à todos os acordos (válidos e inválidos): ' +
      str(round(100*total_invalido/total, 2)) +
      '%'
     )

Dos cálculos apresentados abaixo, é possível concluir que,
- Para os acordos válidos, **R$\$19.8$ milhões derivam de contratos com pessoas físicas**, enquanto **R$\$21.9$ milhões são relacionados a pessoas jurídicas**
- Já para os acordos inválidos, os contratos com **CPF representam um montante de R$\$9.1$ milhões**, ao passo que, para o caso de **CNPJ, este valor é de R$\$10.0$** milhões
- Tanto para os acordos válidos quanto para os inválidos, a porcentagem de receita gerado por CPF equivale à aproximadamente $48.0\%$. Para CNPJ, este valor é de $52.0\%$

In [None]:
acordos_validos.filter(dados.doc_tipo == 'CPF').select('ValorContrato').groupBy().sum().collect()[0][0]

In [None]:
validos_CPF = acordos_validos.filter(dados.doc_tipo == 'CPF').select('ValorContrato').groupBy().sum().collect()[0][0]
validos_CNPJ = acordos_validos.filter(dados.doc_tipo == 'CNPJ').select('ValorContrato').groupBy().sum().collect()[0][0]
print('Acordos válidos:')
print('Valor total gerado por CPF: R$' +
      str(round(validos_CPF/1e6, 1)) +
      ' mi (' +
      str(round(100*validos_CPF/total_valido, 2)) +
      '%)'
     )

print('Valor total gerado por CPF: R$' +
      str(round(validos_CNPJ/1e6, 1)) +
      ' mi (' +
      str(round(100*validos_CNPJ/total_valido, 2)) +
      '%)'
     )

In [None]:
invalidos_CPF = acordos_invalidos.filter(dados.doc_tipo == 'CPF').select('ValorContrato').groupBy().sum().collect()[0][0]
invalidos_CNPJ = acordos_invalidos.filter(dados.doc_tipo == 'CNPJ').select('ValorContrato').groupBy().sum().collect()[0][0]

print('Acordos inválidos:')
print('Valor total gerado por CPF: R$' +
      str(round(invalidos_CPF/1e6, 1)) +
      ' mi (' +
      str(round(100*invalidos_CPF/total_invalido, 2)) +
      '%)'
     )
print('Valor total gerado por CNPJ: R$' +
      str(round(invalidos_CNPJ/1e6, 1)) +
      ' mi (' +
      str(round(100*invalidos_CNPJ/total_invalido, 2)) +
      '%)'
     )

Por fim, um resumo de toda a receita presente no arquivo csv, bem como as projeções desta receita sobre o tipo de documento e a validade do contrato, são apresentadas na tabela abaixo. Cada valor na tabela representa milhões de reais.

|  | Total | CPF | CNPJ |
| - | - | - | - |
| **Total** | $60.8$ | $28.9$ | $31.9$ |
| **Válido** | $41.7$ | $19.8$ | $21.9$ |
| **Inválido** | $19.1$ | $9.1$ | $10.0$ |