In [57]:
#Dowload and extract
import requests
from zipfile import ZipFile
from datetime import datetime

#Transforming data
import numpy as np
import pandas as pd

#loading on DB
from unicodedata import normalize
import psycopg2
from sqlalchemy import create_engine, text
from kedro.extras.datasets.pandas import SQLTableDataSet

In [58]:
#Adapter layer
def download_zip_file(url, path):
    file = requests.get(url)
    if file.status_code == requests.codes.OK:
        with open(path, "wb") as cda_fi:
            cda_fi.write(file.content)

def zip_extract(path):
    file = ZipFile(path, "r")
    file.extractall("data_notebooks")
    file.close()

def csv_reader(i:int, encoding:str, year, month):
    path = "data_notebooks/cda_fi_BLC_" + str(i) + "_" + year + month + ".csv"
    df = pd.read_csv(path, encoding = encoding, sep = ";")
    return df

def write_df_to_db(df, table_name, credentials):
    data_set = SQLTableDataSet(table_name=table_name,
                               credentials=credentials)

    data_set.save(df)
    reloaded = data_set.load()

def normalizador(elemento):
    elemento = normalize('NFKD', elemento).encode('ASCII','ignore').decode('ASCII').lower()
    elemento = elemento.replace(":","_")
    elemento = elemento.replace("/","_")
    return elemento

In [59]:
#Application layer
def extract(url, zip_file_path, encoding, year, month):
    """
    Extrai os dados da fonte
    """
    download_zip_file(url, zip_file_path)
    zip_extract(zip_file_path)
    df_all = pd.concat([csv_reader(i, encoding, year, month) for i in range(1,9)], ignore_index=True) #Arquivo agregado!
    return df_all

def transform(df_all):
    """
    Transforma os dados
    """
    # Pegando cnpj e denominação social
    df_cnpj = df_all.groupby(by=["CNPJ_FUNDO", "DENOM_SOCIAL"]).aggregate("sum").reset_index()[["CNPJ_FUNDO", "DENOM_SOCIAL"]]

    # Calculando percentual de ativos nos fundos e tirando colunas indesejáveis
    vl_fundo = pd.DataFrame(df_all.groupby(by="CNPJ_FUNDO").aggregate("sum")["VL_MERC_POS_FINAL"]).reset_index()
    df_all = pd.merge(df_all, vl_fundo, how = 'inner', on = 'CNPJ_FUNDO')
    df_all["PERCENTUAL_ATIVO"] = df_all["VL_MERC_POS_FINAL_x"]/df_all["VL_MERC_POS_FINAL_y"]
    df_all = df_all[["CNPJ_FUNDO","TP_ATIVO","VL_MERC_POS_FINAL_x", "VL_MERC_POS_FINAL_y","PERCENTUAL_ATIVO"]]
    df_all.columns = ["CNPJ_FUNDO","TP_ATIVO","VL_MERC_POS_FINAL", "VL_MERC_FUNDO","PERCENTUAL_ATIVO"]
    df_all = df_all.groupby(by=["CNPJ_FUNDO", "TP_ATIVO"]).aggregate("sum").reset_index()
    df_all = df_all.drop(columns=["VL_MERC_POS_FINAL", "VL_MERC_FUNDO"])

    #Separando df com os percentuais
    df_percentual = df_all.copy()

    #Pivotada a fim de conseguir o dataframe no formato ONEHOTENCODER
    df_one_hot= pd.pivot_table(df_all, index = ["CNPJ_FUNDO"], columns = ["TP_ATIVO"], values = "PERCENTUAL_ATIVO").reset_index().fillna(0)

    return df_cnpj , df_percentual, df_one_hot

def load(df_cnpj, df_percentual, df_one_hot, table_cnpj, table_percentual, table_one_hot, credentials):
    """
    Carrega os dataframes no banco de dados PostgreSQL
    """
    df_cnpj.columns = ["id_cnpj", "denom_social"]
    df_percentual.columns = ["id_cnpj", "nome_ativo", "percentual_ativo"]
    df_one_hot.columns = [normalizador(elemento) for elemento in df_one_hot.columns.to_list()]
    df_one_hot = df_one_hot.rename(columns={'cnpj_fundo': 'id_cnpj'})
    write_df_to_db(df_cnpj, table_cnpj, credentials)
    write_df_to_db(df_percentual, table_percentual, credentials)
    write_df_to_db(df_one_hot, table_one_hot, credentials)

def etl_report(url, zip_file_path, encoding_csv, year, month, table_cnpj, table_percentual, table_one_hot, credentials):
    df_all = extract(url, zip_file_path, encoding_csv, year, month)
    df_cnpj , df_percentual, df_one_hot = transform(df_all)
    load(df_cnpj, df_percentual, df_one_hot, table_cnpj, table_percentual, table_one_hot, credentials)

In [60]:
def main():
    """
    Função principal
    """

    #Constantes/Parâmetros
    #Conexão CMV
    encoding_csv = "ISO-8859-1"
    month = "12"
    year = "2022"
    url = "https://dados.cvm.gov.br/dados/FI/DOC/CDA/DADOS/cda_fi_" + year + month + ".zip"
    zip_file_path = "data_notebooks/cda_fi_"  + year + month + ".zip"
    #Banco de Dados SQL
    credentials = {"con": "postgresql://postgres:senha123@localhost/db_teste_2"}
    table_cnpj = "tb_cnpj_fundos"
    table_percentual = "tb_percentual"
    table_one_hot = "tb_one_hot"

    #Run
    etl_report(url, zip_file_path, encoding_csv, year, month, table_cnpj, table_percentual, table_one_hot, credentials)

In [56]:
main()

  df = pd.read_csv(path, encoding = encoding, sep = ";")
