In [1]:
import os
import pandas as pd

from datetime import datetime

DFP_PATH = "/Users/thiagocastroferreira/Desktop/kubernetes/datasets/DFP"
ITR_PATH = "/Users/thiagocastroferreira/Desktop/kubernetes/datasets/ITR"

stocks = [
    {
        "id": "AFLT3",
        "cnpj": "10.338.320/0001-00",
        "name": "Afluente Transmissão de Energia",
    },
    {"id": "ALUP11", "cnpj": "08.364.948/0001-38", "name": "Alupar Investimento"},
    {"id": "AURE3", "cnpj": "28.594.234/0001-23", "name": "Auren Energia"},
    {"id": "CBEE3", "cnpj": "33.050.071/0001-58", "name": "Ampla Energia e Serviços"},
    {"id": "CEEB3", "cnpj": "15.139.629/0001-94", "name": "COELBA"},
    {
        "id": "CPLE3",
        "cnpj": "76.483.817/0001-20",
        "name": "Companhia Paranaense de Energia",
    },
    {"id": "EGIE3", "cnpj": "02.474.103/0001-19", "name": "Engie Brasil Energia"},
    {
        "id": "COCE3",
        "cnpj": "07.047.251/0001-70",
        "name": "Companhia Energértica do Ceará",
    },
    {"id": "EKTR4", "cnpj": "02.328.280/0001-97", "name": "Elektro Redes"},
    {"id": "ELET3", "cnpj": "00.001.180/0001-26", "name": "Eletrobrás"},
    {"id": "ENEV3", "cnpj": "04.423.567/0001-21", "name": "Eneva"},
    {"id": "ENGI3", "cnpj": "00.864.214/0001-06", "name": "Energisa"},
    {"id": "ENMT3", "cnpj": "03.467.321/0001-99", "name": "Energisa Mato Grosso"},
    {"id": "EQTL3", "cnpj": "03.220.438/0001-73", "name": "Equatorial"},
    {"id": "ISAE3", "cnpj": "02.998.611/0001-04", "name": "ISA Energia Brasil"},
    {"id": "LIGT3", "cnpj": "03.378.521/0001-75", "name": "Light"},
    {"id": "NEOE3", "cnpj": "01.083.200/0001-18", "name": "Neoenergia"},
    {"id": "RNEW11", "cnpj": "08.534.605/0001-74", "name": "Renova Energia"},
    {"id": "SRNA3", "cnpj": "42.500.384/0001-51", "name": "Serena Energia"},
]
stock_cnpjs = [stock["cnpj"] for stock in stocks]

forms = [
    "BPA_con",
    "BPP_con",
    "DFC_MD_con",
    "DFC_MI_con",
    "DMPL_con",
    "DRA_con",
    "DRE_con",
    "composicao",
    "DVA_con",
    "parecer",
]

ORDEM_EXERC = "ÚLTIMO"
mapping = {
    "CNPJ": "CNPJ_CIA",
    "REPORT_DATE": "DT_REFER",
    "COMPANY_NAME": "DENOM_CIA",
    "CVM_CODE": "CD_CVM",
    "DFP_GROUP": "GRUPO_DFP",
    "VERSION": "VERSAO",
    "CURRENCY": "MOEDA",
    "ANALYSIS_START_PERIOD_DATE": "DT_INI_EXERC",
    "ANALYSIS_END_PERIOD_DATE": "DT_FIM_EXERC",
    "ACCOUNT_NUMBER": "CD_CONTA",
    "ACCOUNT_NAME": "DS_CONTA",
    "ACCOUNT_VALUE": "VL_CONTA",
    "IS_FIXED_ACCOUNT": "ST_CONTA_FIXA",
}

DATABASE_PATH = "cvm.db"

with open(DATABASE_PATH, "w") as f:
    f.write("")

# Processamento

- Balanço Patrimonial Ativo (BPA)
- Balanço Patrimonial Passivo (BPP)
- Demonstração de Fluxo de Caixa - Método Direto (DFC-MD)
- Demonstração de Fluxo de Caixa - Método Indireto (DFC-MI)
- Demonstração das Mutações do Patrimônio Líquido (DMPL)
- Demonstração de Resultado Abrangente (DRA)
- Demonstração de Resultado (DRE)
- Demonstração de Valor Adicionado (DVA)

In [2]:
def process_forms(form_path: str) -> pd.DataFrame:
    result_forms = []
    for year in [fname for fname in os.listdir(form_path) if not fname.startswith(".")]:
        year_path = os.path.join(form_path, year)
        for form in [
            "BPA_con",
            "BPP_con",
            "DFC_MD_con",
            "DFC_MI_con",
            "DMPL_con",
            "DRA_con",
            "DRE_con",
            "DVA_con",
        ]:
            path = [path for path in os.listdir(year_path) if form in path][0]
            path = os.path.join(year_path, path)

            with open(path, "r", encoding="latin-1") as f:
                doc = f.readlines()

            header, rows = doc[0].strip().split(";"), doc[1:]

            results = []
            for row in rows:
                row = row.strip().split(";")
                key_value = {key: value for key, value in zip(header, row)}
                if (
                    key_value["ORDEM_EXERC"] != "ÚLTIMO"
                    or key_value["CNPJ_CIA"] not in stock_cnpjs
                ):
                    continue

                unit = 1
                if key_value["ESCALA_MOEDA"] == "MIL":
                    unit = 1000

                if form in ["BPA_con", "BPP_con"]:
                    dict_row = {
                        key: key_value[mapping[key]]
                        for key in mapping
                        if key != "ANALYSIS_START_PERIOD_DATE"
                    }
                else:
                    dict_row = {key: key_value[mapping[key]] for key in mapping}
                    dict_row["ANALYSIS_START_PERIOD_DATE"] = datetime.strptime(
                        dict_row["ANALYSIS_START_PERIOD_DATE"], "%Y-%m-%d"
                    )

                dict_row["IS_FIXED_ACCOUNT"] = (
                    True if dict_row["IS_FIXED_ACCOUNT"] == "S" else False
                )
                dict_row["ACCOUNT_VALUE"] = float(dict_row["ACCOUNT_VALUE"]) * unit
                dict_row["ANALYSIS_END_PERIOD_DATE"] = datetime.strptime(
                    dict_row["ANALYSIS_END_PERIOD_DATE"], "%Y-%m-%d"
                )
                dict_row["REPORT_DATE"] = datetime.strptime(
                    dict_row["REPORT_DATE"], "%Y-%m-%d"
                )

                results.append(dict_row)

            df = pd.DataFrame(results)
            result_forms.append(df)

    df = pd.concat(result_forms)
    return df

In [4]:
df_dfp = process_forms(DFP_PATH)
df_itr = process_forms(ITR_PATH)

df = pd.concat([df_dfp, df_itr])

Criando base de dados

In [5]:
import sqlite3

command = """CREATE TABLE IF NOT EXISTS DFP_ITR_CVM (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
	CNPJ TEXT NOT NULL,
	REPORT_DATE DATETIME NOT NULL,
	COMPANY_NAME TEXT NOT NULL,
	CVM_CODE TEXT NOT NULL,
	DFP_GROUP TEXT NOT NULL,
	VERSION TEXT NOT NULL,
	CURRENCY TEXT NOT NULL,
	ANALYSIS_START_PERIOD_DATE DATETIME,
	ANALYSIS_END_PERIOD_DATE DATETIME NOT NULL,
	ACCOUNT_NUMBER TEXT NOT NULL,
	ACCOUNT_NAME TEXT NOT NULL,
	ACCOUNT_VALUE FLOAT NOT NULL,
	IS_FIXED_ACCOUNT TEXT
);

-- Índice para consultas por CNPJ
CREATE INDEX IF NOT EXISTS idx_dfp_itr_cvm_cnpj 
ON DFP_ITR_CVM (CNPJ);

-- Índice para consultas por data de início de período
CREATE INDEX IF NOT EXISTS idx_dfp_itr_cvm_start_date 
ON DFP_ITR_CVM (ANALYSIS_START_PERIOD_DATE);"""

# Connect to a SQLite database (or create it if it doesn't exist)
connection = sqlite3.connect(DATABASE_PATH)

# Create a cursor object using the connection
cursor = connection.cursor()

# Execute the command
cursor.executescript(command)

# Commit the changes to the database
connection.commit()

# Close the cursor and connection
cursor.close()
connection.close()

Populando base de dados

In [6]:
queries = []

fields = list(mapping.keys())
fields_str = ", ".join(fields)
placeholders = ", ".join(["?"] * len(fields))

query = f"INSERT INTO DFP_ITR_CVM ({fields_str}) VALUES ({placeholders})"

data = []  # list of tuples to insert

for idx, row in df.iterrows():
    if len(row["ACCOUNT_NUMBER"].split(".")) > 3:
        continue

    row_values = []
    for field in fields:
        value = row[field]
        if isinstance(value, datetime):
            try:
                value = value.strftime("%Y-%m-%d")
            except Exception:
                value = "NULL"
        elif isinstance(value, float) and (value != value):  # NaN check
            value = None
        elif value == "NULL" or value == "":
            value = None
        row_values.append(value)

    data.append(tuple(row_values))

# Connect to a SQLite database (or create it if it doesn't exist)
connection = sqlite3.connect(DATABASE_PATH)

# Create a cursor object using the connection
cursor = connection.cursor()

# Execute the command
# Bulk insert safely
cursor.executemany(query, data)
# Commit the changes to the database
connection.commit()

# Close the cursor and connection
cursor.close()
connection.close()

# Processamento Composição

Carregando e limpando os dados

In [7]:
mapping = {
    "CNPJ": "CNPJ_CIA",
    "REPORT_DATE": "DT_REFER",
    "COMPANY_NAME": "DENOM_CIA",
    "VERSION": "VERSAO",
    "ORDINARY_SHARES_ISSUED": "QT_ACAO_ORDIN_CAP_INTEGR",
    "ORDINARY_SHARES_TREASURY": "QT_ACAO_ORDIN_TESOURO",
    "PREFERRED_SHARES_ISSUED": "QT_ACAO_PREF_CAP_INTEGR",
    "PREFERRED_SHARES_TREASURY": "QT_ACAO_PREF_TESOURO",
    "TOTAL_SHARES_ISSUED": "QT_ACAO_TOTAL_CAP_INTEGR",
    "TOTAL_SHARES_TREASURY": "QT_ACAO_TOTAL_TESOURO",
}

form = "composicao"
results = []
for PATH in [DFP_PATH, ITR_PATH]:
    for year in [fname for fname in os.listdir(PATH) if not fname.startswith(".")]:
        year_path = os.path.join(PATH, year)
        path = [path for path in os.listdir(year_path) if form in path][0]
        path = os.path.join(year_path, path)

        with open(path, "r", encoding="latin-1") as f:
            doc = f.readlines()

        header, rows = doc[0].strip().split(";"), doc[1:]

        for row in rows:
            row = row.strip().split(";")
            key_value = {key: value for key, value in zip(header, row)}
            if key_value["CNPJ_CIA"] not in stock_cnpjs:
                continue

            dict_row = {key: key_value[mapping[key]] for key in mapping}
            dict_row["REPORT_DATE"] = datetime.strptime(
                dict_row["REPORT_DATE"], "%Y-%m-%d"
            )
            dict_row["ORDINARY_SHARES_ISSUED"] = int(dict_row["ORDINARY_SHARES_ISSUED"])
            dict_row["ORDINARY_SHARES_TREASURY"] = int(
                dict_row["ORDINARY_SHARES_TREASURY"]
            )
            dict_row["PREFERRED_SHARES_ISSUED"] = int(
                dict_row["PREFERRED_SHARES_ISSUED"]
            )
            dict_row["PREFERRED_SHARES_TREASURY"] = int(
                dict_row["PREFERRED_SHARES_TREASURY"]
            )
            dict_row["TOTAL_SHARES_ISSUED"] = int(dict_row["TOTAL_SHARES_ISSUED"])
            dict_row["TOTAL_SHARES_TREASURY"] = int(dict_row["TOTAL_SHARES_TREASURY"])

            results.append(dict_row)

df = pd.DataFrame(results)

Criando base de dados

In [8]:
command = """CREATE TABLE IF NOT EXISTS CVM_SHARE_COMPOSITION (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
	CNPJ TEXT NOT NULL,
    REPORT_DATE DATETIME NOT NULL,
    COMPANY_NAME TEXT NOT NULL,
    VERSION TEXT NOT NULL,
    ORDINARY_SHARES_ISSUED INTEGER NOT NULL,
    ORDINARY_SHARES_TREASURY INTEGER NOT NULL,
    PREFERRED_SHARES_ISSUED INTEGER NOT NULL,
    PREFERRED_SHARES_TREASURY INTEGER NOT NULL,
    TOTAL_SHARES_ISSUED INTEGER NOT NULL,
    TOTAL_SHARES_TREASURY INTEGER NOT NULL
)"""

# Connect to a SQLite database (or create it if it doesn't exist)
connection = sqlite3.connect(DATABASE_PATH)

# Create a cursor object using the connection
cursor = connection.cursor()

# Execute the command
cursor.executescript(command)

# Commit the changes to the database
connection.commit()

# Close the cursor and connection
cursor.close()
connection.close()

Inserindo os dados

In [9]:
queries = []

fields = list(mapping.keys())
fields_str = ", ".join(fields)
placeholders = ", ".join(["?"] * len(fields))

query = f"INSERT INTO CVM_SHARE_COMPOSITION ({fields_str}) VALUES ({placeholders})"

data = []  # list of tuples to insert

for idx, row in df.iterrows():
    row_values = []
    for field in fields:
        value = row[field]
        if isinstance(value, datetime):
            value = value.strftime("%Y-%m-%d")
        row_values.append(value)

    data.append(tuple(row_values))

# Connect to a SQLite database (or create it if it doesn't exist)
connection = sqlite3.connect(DATABASE_PATH)

# Create a cursor object using the connection
cursor = connection.cursor()

# Execute the command
# Bulk insert safely
cursor.executemany(query, data)
# Commit the changes to the database
connection.commit()

# Close the cursor and connection
cursor.close()
connection.close()

In [10]:
command = """SELECT DISTINCT ACCOUNT_NUMBER, ACCOUNT_NAME FROM DFP_ITR_CVM ORDER BY ACCOUNT_NUMBER"""

# Connect to a SQLite database (or create it if it doesn't exist)
connection = sqlite3.connect(DATABASE_PATH)

# Create a cursor object using the connection
cursor = connection.cursor()

# Query the database
cursor.execute(command)

# Fetch column names
columns = (
    [description[0] for description in cursor.description] if cursor.description else []
)

# Fetch results
rows = cursor.fetchall()

# Close the cursor and connection
cursor.close()
connection.close()

# Format results as markdown table
if not rows:
    output = "No data found with given query"
else:
    # Create markdown table
    output = "| " + " | ".join(columns) + " |\n"
    output += "| " + " | ".join(["---" for _ in columns]) + " |\n"
    for row in rows:
        output += (
            "| "
            + " | ".join([str(cell) if cell is not None else "" for cell in row])
            + " |\n"
        )

print(output)

| ACCOUNT_NUMBER | ACCOUNT_NAME |
| --- | --- |
| 1 | Ativo Total |
| 1.01 | Ativo Circulante |
| 1.01.01 | Caixa e Equivalentes de Caixa |
| 1.01.02 | Aplicações Financeiras |
| 1.01.03 | Contas a Receber |
| 1.01.04 | Estoques |
| 1.01.05 | Ativos Biológicos |
| 1.01.06 | Tributos a Recuperar |
| 1.01.07 | Despesas Antecipadas |
| 1.01.08 | Outros Ativos Circulantes |
| 1.02 | Ativo Não Circulante |
| 1.02.01 | Ativo Realizável a Longo Prazo |
| 1.02.02 | Investimentos |
| 1.02.03 | Imobilizado |
| 1.02.04 | Intangível |
| 2 | Passivo Total |
| 2.01 | Passivo Circulante |
| 2.01.01 | Obrigações Sociais e Trabalhistas |
| 2.01.02 | Fornecedores |
| 2.01.03 | Obrigações Fiscais |
| 2.01.04 | Empréstimos e Financiamentos |
| 2.01.05 | Outras Obrigações |
| 2.01.06 | Provisões |
| 2.01.07 | Passivos sobre Ativos Não-Correntes a Venda e Descontinuados |
| 2.02 | Passivo Não Circulante |
| 2.02.01 | Empréstimos e Financiamentos |
| 2.02.02 | Outras Obrigações |
| 2.02.03 | Tributos Diferid