<a href="https://colab.research.google.com/github/PauloMarvin/ETL-CVM/blob/data-analysis/03_refined_data_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Instalando dependências do *Spark*

In [1]:
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

Importando OS

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

Importando *SparkSession*

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .getOrCreate()

In [5]:
spark

Importando bibliotecas

In [6]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import plotly.express as px
import seaborn as sns

In [7]:
sns.set()

# Carregando os dados no *dataframe*

In [8]:
df = spark.read.csv(
    '/content/drive/MyDrive/cvm_2000_2022.csv',
    header= True, inferSchema= True
)

In [9]:
df.show(5)

+---+--------+------------------+----------+--------+---------+-------------+---------+--------+--------+
|_c0|TP_FUNDO|        CNPJ_FUNDO| DT_COMPTC|VL_TOTAL| VL_QUOTA|VL_PATRIM_LIQ|CAPTC_DIA|RESG_DIA|NR_COTST|
+---+--------+------------------+----------+--------+---------+-------------+---------+--------+--------+
|  0|   FITVM|01.465.738/0001-97|2000-01-03|    null|2.5972313|    2588000.0|  91000.0|     0.0|    null|
|  1|   FITVM|01.465.738/0001-97|2000-01-04|    null|2.4597298|    2543000.0|      0.0| 17000.0|    null|
|  2|   FITVM|01.465.738/0001-97|2000-01-05|    null|2.4035701|    2467000.0|  44000.0|  2000.0|    null|
|  3|   FITVM|01.465.738/0001-97|2000-01-06|    null|2.4648947|    2573000.0|  21000.0|     0.0|    null|
|  4|   FITVM|01.465.738/0001-97|2000-01-07|    null|2.5131104|    2646000.0|   3000.0|     0.0|    null|
+---+--------+------------------+----------+--------+---------+-------------+---------+--------+--------+
only showing top 5 rows



## Renomeação da coluna `_c0`

In [10]:
df = df.withColumnRenamed('_c0', 'index')

In [11]:
df.show(5)

+-----+--------+------------------+----------+--------+---------+-------------+---------+--------+--------+
|index|TP_FUNDO|        CNPJ_FUNDO| DT_COMPTC|VL_TOTAL| VL_QUOTA|VL_PATRIM_LIQ|CAPTC_DIA|RESG_DIA|NR_COTST|
+-----+--------+------------------+----------+--------+---------+-------------+---------+--------+--------+
|    0|   FITVM|01.465.738/0001-97|2000-01-03|    null|2.5972313|    2588000.0|  91000.0|     0.0|    null|
|    1|   FITVM|01.465.738/0001-97|2000-01-04|    null|2.4597298|    2543000.0|      0.0| 17000.0|    null|
|    2|   FITVM|01.465.738/0001-97|2000-01-05|    null|2.4035701|    2467000.0|  44000.0|  2000.0|    null|
|    3|   FITVM|01.465.738/0001-97|2000-01-06|    null|2.4648947|    2573000.0|  21000.0|     0.0|    null|
|    4|   FITVM|01.465.738/0001-97|2000-01-07|    null|2.5131104|    2646000.0|   3000.0|     0.0|    null|
+-----+--------+------------------+----------+--------+---------+-------------+---------+--------+--------+
only showing top 5 rows



## Dimensões do *dataframe*

In [13]:
print(f'Há no dataframe {len(df.columns)} colunas.')
print(f'E {df.select("TP_FUNDO").count()} linhas.')

Há no dataframe 10 colunas.
E 57676391 linhas.


## *Schema* do *dataframe*

In [15]:
df.printSchema()

root
 |-- index: integer (nullable = true)
 |-- TP_FUNDO: string (nullable = true)
 |-- CNPJ_FUNDO: string (nullable = true)
 |-- DT_COMPTC: string (nullable = true)
 |-- VL_TOTAL: double (nullable = true)
 |-- VL_QUOTA: double (nullable = true)
 |-- VL_PATRIM_LIQ: double (nullable = true)
 |-- CAPTC_DIA: double (nullable = true)
 |-- RESG_DIA: double (nullable = true)
 |-- NR_COTST: double (nullable = true)



# Derivação de colunas `MONTH` e `YEAR`

In [39]:
import pyspark.sql.functions as f

In [40]:
df = df.withColumn('MONTH', f.month('DT_COMPTC'))\
       .withColumn('YEAR', f.year('DT_COMPTC'))

In [41]:
df.show(2)

+-----+--------+------------------+----------+--------+---------+-------------+---------+--------+--------+-----+----+
|index|TP_FUNDO|        CNPJ_FUNDO| DT_COMPTC|VL_TOTAL| VL_QUOTA|VL_PATRIM_LIQ|CAPTC_DIA|RESG_DIA|NR_COTST|MONTH|YEAR|
+-----+--------+------------------+----------+--------+---------+-------------+---------+--------+--------+-----+----+
|    0|   FITVM|01.465.738/0001-97|2000-01-03|    null|2.5972313|    2588000.0|  91000.0|     0.0|    null|    1|2000|
|    1|   FITVM|01.465.738/0001-97|2000-01-04|    null|2.4597298|    2543000.0|      0.0| 17000.0|    null|    1|2000|
+-----+--------+------------------+----------+--------+---------+-------------+---------+--------+--------+-----+----+
only showing top 2 rows



# Análise exploratória

## Quantidade de dados faltantes

In [42]:
df.select([f.count(f.when(f.isnull(c), c)).alias(c) for c in df.columns]).show()

+-----+--------+----------+---------+--------+--------+-------------+---------+--------+--------+-----+----+
|index|TP_FUNDO|CNPJ_FUNDO|DT_COMPTC|VL_TOTAL|VL_QUOTA|VL_PATRIM_LIQ|CAPTC_DIA|RESG_DIA|NR_COTST|MONTH|YEAR|
+-----+--------+----------+---------+--------+--------+-------------+---------+--------+--------+-----+----+
|    0|40860852|         0|        0|  366012|       0|            0|        0|       0|  365408|    0|   0|
+-----+--------+----------+---------+--------+--------+-------------+---------+--------+--------+-----+----+



In [51]:
df.agg(*[f.mean(f.when(f.col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).show()

+-----+------------------+----------+---------+-------------------+--------+-------------+---------+--------+--------------------+-----+----+
|index|          TP_FUNDO|CNPJ_FUNDO|DT_COMPTC|           VL_TOTAL|VL_QUOTA|VL_PATRIM_LIQ|CAPTC_DIA|RESG_DIA|            NR_COTST|MONTH|YEAR|
+-----+------------------+----------+---------+-------------------+--------+-------------+---------+--------+--------------------+-----+----+
|  0.0|0.7084502218594086|       0.0|      0.0|0.00634595878233782|     0.0|          0.0|      0.0|     0.0|0.006335486559830...|  0.0| 0.0|
+-----+------------------+----------+---------+-------------------+--------+-------------+---------+--------+--------------------+-----+----+



Há uma quantidade bastante expressiva de dados faltantes na coluna `TP_FUNDO`. Os dados faltantes respondem por, aproximadamente, 71% dos dados da coluna.

## Coluna `TP_FUNDO`

In [52]:
count_tp_fundo = df.groupBy('TP_FUNDO').count().toPandas().sort_values('count', ascending= False)

In [53]:
count_tp_fundo

Unnamed: 0,TP_FUNDO,count
2,,40860852
0,FI,15266406
5,FIF,500219
10,FACFIF,492624
9,FITVM,349153
4,FMP-FGTS,107997
7,FIC-FITVM,67022
8,FIEX,13624
1,FAPI,9395
11,FMP-FGTS CL,8226


## Coluna `VL_TOTAL`

### Remoção de *outliers* das variáveis

In [16]:
df.columns

['index',
 'TP_FUNDO',
 'CNPJ_FUNDO',
 'DT_COMPTC',
 'VL_TOTAL',
 'VL_QUOTA',
 'VL_PATRIM_LIQ',
 'CAPTC_DIA',
 'RESG_DIA',
 'NR_COTST']

## Para variável `VL_TOTAL`

In [25]:
Q1 = df.approxQuantile('VL_TOTAL', [0.25], 0.01)[0] # os parâmetros para o método approxQuantile são, respectivamente: coluna, quantil e erro.
Q3 = df.approxQuantile('VL_TOTAL', [0.75], 0.01)[0] # é retornada uma lista com o valor, por isso utiliza-se o [0], para obter apenas o valor.
IIQ = Q3 - Q1

In [27]:
inferior = Q1 - (1.5 * IIQ)
superior = Q3 + (1.5 * IIQ)

In [37]:
vl_total_without_outliers = df.select('VL_TOTAL').where((df['VL_TOTAL'] >= inferior) & (df['VL_TOTAL'] <= superior))

In [38]:
vl_total_without_outliers.show()

+----------+
|  VL_TOTAL|
+----------+
|4996961.25|
|       0.0|
|       0.0|
|       0.0|
|4880231.54|
|4669234.46|
|4669128.05|
|4678730.81|
|4684407.16|
|4684406.38|
| 4684405.3|
|4685391.27|
|4685389.48|
|4712318.08|
|4712317.77|
|4712318.03|
|4710518.42|
|4710518.47|
|4710518.68|
|4606848.61|
+----------+
only showing top 20 rows

