In [16]:
# Tratamento do arquivo inf_diario e gravando dados no banco.

In [1]:
import os
import re
import requests

In [2]:
param_dic = {
    "host"      : "localhost",
    "database"  : "postgres",
    "user"      : "postgres",
    "password"  : "postgres"
}

In [3]:
url = "http://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/inf_diario_fi_202103.csv"
file_name = "inf_diario_fi_202103.csv"
temp_dir = "tmp"


In [7]:
def download_file(url: str, filename: str, temp_dir: str) -> dict:
    """
    Download file from url and save on temp_dir
    """
    temp_file = f"{temp_dir}/{filename}"
    response = requests.get(url, stream=True)
    if response.status_code == 200:
        with open(temp_file, "wb") as new_file:
            for partial in response.iter_content(chunk_size=256):
                new_file.write(partial)
        return {"file_path": temp_file, "file_name": filename}
    else:
        response.raise_for_status()
        
response = download_file(url, file_name, temp_dir)

print(response)


{'file_path': 'tmp/inf_diario_fi_202103.csv', 'file_name': 'inf_diario_fi_202103.csv'}


In [8]:
def format_doc(doc):
    '''
        Funcão que formata CPF e CNPJ
    '''
    if pd.isna(doc):
        return None
    doc = re.sub(r"[^0-9]", "", doc)
    return doc.zfill(14)

In [9]:
import pandas as pd

def open_file(file: str):
    return pd.read_csv(
        file,
        sep=";",
        encoding="latin-1",
    )

In [10]:
# Abrindo o arquivo com pandas
df = open_file(response.get('file_path'))

In [11]:
# Trandando os dados do DataFrame
# - 1 Filtrar por tipo de Fundo (Validar com Lucas)
# - 1 Limpar dados vazios (Validar com Lucas)
# - 2 Formatando o CPF 

df = df.loc[
        (df["TP_FUNDO"] == "FI")
        | (df["TP_FUNDO"] == "FIP")
        | (df["TP_FUNDO"] == "FIIM")
    ]

# Format documents
df["CNPJ_FUNDO"] = df["CNPJ_FUNDO"].apply(format_doc)

# Rename columns
rename_map = {
    "CNPJ_FUNDO": "fund_doc",
    "DT_COMPTC": "competency_date",
    "VL_TOTAL": "amount",
    "VL_QUOTA": "share_value",
    "VL_PATRIM_LIQ": "equity_value",
    "NR_COTST": "quotaholder_number",
}
df = df.rename(columns=rename_map)

# Columns to keep
cols_keep = [
    "fund_doc",
    "competency_date",
    "amount",
    "share_value",
    "equity_value",
    "quotaholder_number",
]
df = df[cols_keep]

In [15]:
from sqlalchemy import create_engine
connect = "postgresql+psycopg2://%s:%s@%s:54325/%s" % (
    param_dic['user'],
    param_dic['password'],
    param_dic['host'],
    param_dic['database']
)
def to_alchemy(df):
    engine = create_engine(connect)
    df.to_sql(
        'fi_inf_diario', 
        con=engine,
        schema='cvm_ckan',
        index=False, 
        if_exists='replace'
    )
    print("=== to_sql() done with sqlalchemy ===")

In [17]:
to_alchemy(df)

=== to_sql() done with sqlalchemy ===
