In [2]:
#preparando o ambiente
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!pip install pyspark -q

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [5]:
spark = SparkSession.builder.master("local[*]").appName("CVM_Silver").getOrCreate()

# 2. Carregar os dados da Camada Bronze
path_bronze = '/content/drive/MyDrive/pipeline_bigdata_pyspark/bronze/fundos_cvm'
df_silver = spark.read.parquet(path_bronze)

# 3. Visualizar o esquema atual para planejar a limpeza
df_silver.printSchema()

root
 |-- TP_FUNDO_CLASSE: string (nullable = true)
 |-- CNPJ_FUNDO_CLASSE: string (nullable = true)
 |-- ID_SUBCLASSE: string (nullable = true)
 |-- DT_COMPTC: date (nullable = true)
 |-- VL_TOTAL: double (nullable = true)
 |-- VL_QUOTA: double (nullable = true)
 |-- VL_PATRIM_LIQ: double (nullable = true)
 |-- CAPTC_DIA: double (nullable = true)
 |-- RESG_DIA: double (nullable = true)
 |-- NR_COTST: integer (nullable = true)



In [6]:
# Ver as primeiras 10 linhas de forma organizada
df_silver.show(10, truncate=False)

+---------------+------------------+------------+----------+----------+----------+-------------+---------+--------+--------+
|TP_FUNDO_CLASSE|CNPJ_FUNDO_CLASSE |ID_SUBCLASSE|DT_COMPTC |VL_TOTAL  |VL_QUOTA  |VL_PATRIM_LIQ|CAPTC_DIA|RESG_DIA|NR_COTST|
+---------------+------------------+------------+----------+----------+----------+-------------+---------+--------+--------+
|FI             |00.017.024/0001-53|NULL        |2024-01-02|1136699.13|34.2988597|1139708.1    |0.0      |0.0     |1       |
|FI             |00.017.024/0001-53|NULL        |2024-01-03|1137245.82|34.3123029|1140154.8    |0.0      |0.0     |1       |
|FI             |00.017.024/0001-53|NULL        |2024-01-04|1137741.93|34.3260232|1140610.71   |0.0      |0.0     |1       |
|FI             |00.017.024/0001-53|NULL        |2024-01-05|1138240.64|34.3382208|1141016.02   |0.0      |0.0     |1       |
|FI             |00.017.024/0001-53|NULL        |2024-01-08|1138427.98|34.3504954|1141423.89   |0.0      |0.0     |1       |


In [7]:
# Contagem de linhas
total_linhas = df_silver.count()

# Contagem de colunas
total_colunas = len(df_silver.columns)

print(f"DimensÃµes do Dataset: {total_linhas} linhas e {total_colunas} colunas.")

DimensÃµes do Dataset: 567834 linhas e 10 colunas.


In [9]:
from pyspark.sql import functions as F

# Criando uma lista de contagem de nulos para cada coluna
contagem_nulos = df_silver.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_silver.columns])

print("ðŸ”Ž Contagem de valores nulos por coluna:")
contagem_nulos.show(vertical=True)

ðŸ”Ž Contagem de valores nulos por coluna:
-RECORD 0-------------------
 TP_FUNDO_CLASSE   | 0      
 CNPJ_FUNDO_CLASSE | 0      
 ID_SUBCLASSE      | 567834 
 DT_COMPTC         | 0      
 VL_TOTAL          | 0      
 VL_QUOTA          | 0      
 VL_PATRIM_LIQ     | 0      
 CAPTC_DIA         | 0      
 RESG_DIA          | 0      
 NR_COTST          | 0      



In [10]:
# 1. Remover duplicatas (Garantindo integridade para o Analytics)
# Em finanÃ§as, nÃ£o podemos ter a mesma foto do fundo no mesmo dia duplicada
df_silver_final = df_silver.dropDuplicates(["CNPJ_FUNDO_CLASSE", "DT_COMPTC", "ID_SUBCLASSE"])

# 2. Definir o caminho da Silver
path_silver = '/content/drive/MyDrive/pipeline_bigdata_pyspark/silver/fundos_cvm'

# 3. Salvar em Parquet Snappy
df_silver_final.write.mode("overwrite") \
    .option("compression", "snappy") \
    .parquet(path_silver)

print(f"âœ… Camada Silver concluÃ­da!")
print(f"ðŸ“Š Registros finais na Silver: {df_silver_final.count()}")
print(f"ðŸ“‚ Salvo em: {path_silver}")

âœ… Camada Silver concluÃ­da!
ðŸ“Š Registros finais na Silver: 567812
ðŸ“‚ Salvo em: /content/drive/MyDrive/pipeline_bigdata_pyspark/silver/fundos_cvm
