In [None]:
%pip install selenium
%pip install webdriver-manager
%pip install pandas
%pip install lxml
%pip install openpyxl
%pip install xlrd

In [2]:
# Célula 1: Importações e Configurações Iniciais
import os
import zipfile
import pandas as pd
import logging
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import NoSuchElementException, TimeoutException
from io import StringIO
from datetime import datetime
import time

# Configuração de logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

# Configuração das pastas
DOWNLOADS_FOLDER = "downloads"
BALANCOS_FOLDER = "balancos"
BALANCOS_XLSX_FOLDER = "balancos_xlsx"
BALANCOS_CONCATENADOS_FOLDER = "balancos_concatenados"
BALANCOS_DEFINITIVO_FOLDER = "balancos_definitivos"

# Criar pastas se não existirem
for folder in [DOWNLOADS_FOLDER, BALANCOS_FOLDER, BALANCOS_XLSX_FOLDER, BALANCOS_CONCATENADOS_FOLDER, BALANCOS_DEFINITIVO_FOLDER]:
    os.makedirs(folder, exist_ok=True)

In [3]:
# Célula 2: Configuração do WebDriver e Extração da Tabela de Resultados
# Configuração do WebDriver em modo headless
options = webdriver.ChromeOptions()
options.add_argument("--headless")
options.add_argument("--disable-gpu")
options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(options=options)

# URL da página de resultados
url_resultado = "https://www.fundamentus.com.br/resultado.php"

# Função para obter a tabela HTML
def obter_tabela_html(url, xpath):
    driver.get(url)
    WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, xpath)))
    elemento = driver.find_element("xpath", xpath)
    return elemento.get_attribute("outerHTML")

# Obter a tabela HTML
html_tabela = obter_tabela_html(url_resultado, "/html/body/div[1]/div[2]/table")

# Converter a tabela HTML em DataFrame
df_dados_financeiros = pd.read_html(StringIO(html_tabela), decimal=",", thousands=".")[0]
df_dados_financeiros = df_dados_financeiros.sort_values(by="Papel", ascending=True).reset_index(drop=True)

# Exibir o DataFrame
print("Tabela com os dados financeiros:")
display(df_dados_financeiros)

# Lista de empresas
lista_de_empresas = df_dados_financeiros["Papel"].tolist()
print("Quantidade de empresas:", len(lista_de_empresas))

# Fechar o WebDriver
driver.quit()

Tabela com os dados financeiros:


Unnamed: 0,Papel,Cotação,P/L,P/VP,PSR,Div.Yield,P/Ativo,P/Cap.Giro,P/EBIT,P/Ativ Circ.Liq,...,EV/EBITDA,Mrg Ebit,Mrg. Líq.,Liq. Corr.,ROIC,ROE,Liq.2meses,Patrim. Líq,Dív.Brut/ Patrim.,Cresc. Rec.5a
0,AALR3,13.51,-7.32,1.47,1.344,"0,00%",0.594,-10.28,31.44,-1.58,...,13.62,"4,28%","-17,52%",0.78,"2,21%","-20,06%",749664.0,1.089040e+09,0.77,"5,49%"
1,ABCB3,0.00,0.00,0.00,0.000,"0,00%",0.000,0.00,0.00,0.00,...,0.00,"0,00%","0,00%",0.00,"0,00%","14,96%",0.0,6.235070e+09,0.00,"13,50%"
2,ABCB4,23.51,6.17,0.92,0.000,"6,39%",0.000,0.00,0.00,0.00,...,0.00,"0,00%","0,00%",0.00,"0,00%","14,96%",15426500.0,6.235070e+09,0.00,"13,50%"
3,ABEV3,13.01,14.24,2.15,2.543,"5,61%",1.429,55.43,11.87,-22.56,...,8.24,"21,43%","18,36%",1.11,"15,82%","15,11%",348074000.0,9.526280e+10,0.04,"11,56%"
4,ABYA3,4.91,-214.80,1.76,2.055,"0,00%",0.527,1.98,19.96,-2.75,...,33.67,"10,29%","-0,96%",2.09,"2,78%","-0,82%",0.0,2.920600e+08,1.31,"16,41%"
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
982,WLMM4,33.00,8.94,1.63,0.394,"4,29%",1.114,2.97,6.75,3.71,...,6.85,"5,83%","4,40%",2.56,"19,69%","18,21%",24487.6,7.385320e+08,0.25,"28,41%"
983,WMBY3,25.39,-19.30,2.87,0.836,"0,00%",0.182,1.20,8.62,-1.50,...,25.02,"9,70%","-7,05%",1.44,"2,39%","-14,86%",0.0,2.124390e+08,6.51,"-14,48%"
984,WSON33,67.00,8.07,0.98,1.067,"0,00%",0.400,13.41,2.42,-0.89,...,5.08,"44,14%","13,58%",1.26,"18,36%","12,17%",0.0,2.148530e+09,1.21,"5,23%"
985,YDUQ3,10.61,22.40,1.04,0.618,"2,58%",0.349,3.73,4.15,-0.81,...,4.87,"14,91%","2,80%",1.68,"9,27%","4,66%",43871200.0,3.142670e+09,1.66,"9,76%"


Quantidade de empresas: 987


In [4]:
# Célula 3: Funções Auxiliares
def obter_trimestre_anterior():
    mes_atual = datetime.now().month
    ano_atual = datetime.now().year
    if mes_atual <= 3:
        return f"Q4-{ano_atual-1}"
    elif mes_atual <= 6:
        return f"Q1-{ano_atual}"
    elif mes_atual <= 9:
        return f"Q2-{ano_atual}"
    else:
        return f"Q3-{ano_atual}"

trimestre_anterior = obter_trimestre_anterior()
pasta_trimestre = os.path.join(DOWNLOADS_FOLDER, trimestre_anterior)
os.makedirs(pasta_trimestre, exist_ok=True)

In [None]:
# Célula 4: Download dos Arquivos
options = webdriver.ChromeOptions()
prefs = {
    "download.default_directory": os.path.join(os.getcwd(), pasta_trimestre),
    "download.prompt_for_download": False,
    "download.directory_upgrade": True,
    "safebrowsing.enabled": True,   
}
options.add_experimental_option("prefs", prefs)
driver = webdriver.Chrome(options=options)

def baixar_e_renomear_arquivo(papel):
    arquivo_destino = os.path.join(pasta_trimestre, f"{papel}.zip")
    if os.path.isfile(arquivo_destino):
        logging.info(f"Arquivo {arquivo_destino} já existe. Pulando o papel {papel}.")
        return

    url_papeis = f"https://www.fundamentus.com.br/balancos.php?papel={papel}&interface=mobile"
    driver.get(url_papeis)

    try:
        time.sleep(2)
        botao_download = WebDriverWait(driver, 10).until(
            EC.visibility_of_element_located((By.CSS_SELECTOR, "a.bt-baixar"))
        )
        botao_download.click()

        # Esperar o download começar
        for _ in range(10):
            time.sleep(5)
            if any(f.startswith("bal_") for f in os.listdir(pasta_trimestre)):
                break

        novo_arquivo = next((f for f in os.listdir(pasta_trimestre) if f.startswith("bal_")), None)

        if novo_arquivo:
            caminho_arquivo = os.path.join(pasta_trimestre, novo_arquivo)
            os.rename(caminho_arquivo, arquivo_destino)
            logging.info(f"Arquivo {arquivo_destino} baixado e renomeado com sucesso!")
        else:
            logging.error(f"Erro ao encontrar o arquivo baixado para {papel}")

    except Exception as e:
        logging.error(f"Erro ao baixar arquivo para {papel}: {e}")

# Baixar e renomear arquivos para cada empresa
for papel in lista_de_empresas:
    baixar_e_renomear_arquivo(papel)

# Fechar o WebDriver
driver.quit()

In [None]:
# Célula 5: Extração e Renomeação dos Arquivos
def extrair_e_renomear_arquivo(papel):
    caminho_arquivo_zip = os.path.join(pasta_trimestre, f"{papel}.zip")
    pasta_balancos_trimestre = os.path.join(BALANCOS_FOLDER, trimestre_anterior)
    os.makedirs(pasta_balancos_trimestre, exist_ok=True)

    try:
        with zipfile.ZipFile(caminho_arquivo_zip, "r") as zip_ref:
            zip_ref.extractall(pasta_balancos_trimestre)

        caminho_arquivo_xls = os.path.join(pasta_balancos_trimestre, "balanco.xls")
        novo_nome_arquivo = os.path.join(pasta_balancos_trimestre, f"{papel}.xls")

        if os.path.exists(caminho_arquivo_xls):
            os.rename(caminho_arquivo_xls, novo_nome_arquivo)
            print(f"Arquivo {novo_nome_arquivo} extraído e renomeado com sucesso!")
        else:
            print(f"Arquivo balanco.xls não encontrado em {pasta_balancos_trimestre}")

    except Exception as e:
        logging.error(f"Erro ao extrair e renomear arquivo para {papel}: {e}")

# Extrair e renomear arquivos para cada empresa
for papel in lista_de_empresas:
    extrair_e_renomear_arquivo(papel)

In [None]:
# Célula 6: Conversão de XLS para XLSX
def converter_xls_para_xlsx(papel):
    pasta_balancos_trimestre = os.path.join(BALANCOS_FOLDER, trimestre_anterior)
    pasta_balancos_xlsx_trimestre = os.path.join(BALANCOS_XLSX_FOLDER, trimestre_anterior)
    os.makedirs(pasta_balancos_xlsx_trimestre, exist_ok=True)

    caminho_xls = os.path.join(pasta_balancos_trimestre, f"{papel}.xls")
    caminho_xlsx = os.path.join(pasta_balancos_xlsx_trimestre, f"{papel}.xlsx")

    if os.path.isfile(caminho_xlsx):
        print(f"Arquivo {papel}.xlsx já foi convertido.")
        return

    try:
        xls = pd.ExcelFile(caminho_xls, engine="xlrd")
        with pd.ExcelWriter(caminho_xlsx, engine="openpyxl") as writer:
            for sheet_name in xls.sheet_names:
                df = pd.read_excel(xls, sheet_name=sheet_name)
                df.to_excel(writer, sheet_name=sheet_name, index=False)
        print(f"Arquivo {papel}.xls convertido para {papel}.xlsx.")
    except Exception as e:
        print(f"Erro ao converter {papel}.xls: {e}")

# Converter arquivos XLS para XLSX para cada empresa
for papel in lista_de_empresas:
    converter_xls_para_xlsx(papel)

In [5]:
# Configurar logging
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

# Constantes
BAL_PATRIM = "Bal. Patrim."
DEM_RESULT = "Dem. Result."
EXTENSAO_EXCEL = ".xlsx"


def reordenar_primeira_linha(df: pd.DataFrame) -> pd.DataFrame:
    """
    Reordena a primeira linha de um DataFrame, colocando as datas da mais antiga para a mais recente.
    """
    # Identificar a primeira linha (após remoção da original) para as datas
    primeira_linha = df.iloc[0, 1:]

    # Tentar converter os valores para datetime, forçando o formato "dd/mm/yyyy"
    datas = pd.to_datetime(primeira_linha, format="%d/%m/%Y", errors="coerce")

    # Verificar se todas as datas são válidas
    if datas.isnull().all():
        logging.warning(
            "Nenhuma data foi reconhecida na primeira linha. Retornando DataFrame original."
        )
        return df

    # Criar um DataFrame temporário para facilitar a reordenação
    temp_df = pd.DataFrame({"original_col": df.columns[1:], "date": datas})

    # Remover entradas que não são datas válidas
    temp_df = temp_df.dropna(subset=["date"])

    # Ordenar as colunas com base nas datas
    temp_df = temp_df.sort_values(by="date")

    # Reordenar o DataFrame original de acordo com as colunas ordenadas
    new_order = [df.columns[0]] + temp_df["original_col"].tolist()
    df = df[new_order]

    # Substituir a primeira linha com as datas ordenadas
    df.iloc[0, 1:] = temp_df["date"].dt.strftime("%d/%m/%Y").values

    return df


def processar_arquivo(caminho_arquivo: str, pasta_saida: str) -> None:
    try:
        nome_arquivo = os.path.basename(caminho_arquivo)
        nome_papel = nome_arquivo.replace(EXTENSAO_EXCEL, "")

        bal_patrim = pd.read_excel(
            caminho_arquivo, sheet_name=BAL_PATRIM, header=None, engine="openpyxl"
        )
        dem_result = pd.read_excel(
            caminho_arquivo, sheet_name=DEM_RESULT, header=None, engine="openpyxl"
        )

        # Remover a primeira linha de ambas as planilhas
        bal_patrim = bal_patrim.iloc[1:].reset_index(drop=True)
        dem_result = dem_result.iloc[1:].reset_index(drop=True)

        # Reordenar a primeira linha de ambas as planilhas
        bal_patrim = reordenar_primeira_linha(bal_patrim)
        dem_result = reordenar_primeira_linha(dem_result)

        # Verificar o conteúdo da célula [0, 0] da planilha 'Bal. Patrim.'
        primeira_celula = bal_patrim.iloc[0, 0]

        if (
            pd.to_datetime(primeira_celula, format="%d/%m/%Y", errors="coerce")
            is not pd.NaT
        ):
            # Se a célula contém uma data, mover as colunas para a direita
            bal_patrim = bal_patrim.shift(1, axis=1)

        # Definir o nome do papel na célula [0, 0] da planilha 'Bal. Patrim.'
        bal_patrim.iloc[0, 0] = nome_papel

        # Remover a primeira linha de 'Dem. Result.' antes de concatenar
        dem_result = dem_result.iloc[1:].reset_index(drop=True)

        # Concatenar as duas planilhas
        combinado = pd.concat([bal_patrim, dem_result], ignore_index=True)
        
        # Verificação final: Se a célula [0, 0] não contiver o nome do papel, remover a primeira linha
        if combinado.iloc[0, 0] != nome_papel:
            combinado = combinado.iloc[1:].reset_index(drop=True)

        # Criar a pasta de saída correspondente
        nome_subpasta = os.path.basename(os.path.dirname(caminho_arquivo))
        subpasta_saida = os.path.join(pasta_saida, nome_subpasta)
        os.makedirs(subpasta_saida, exist_ok=True)

        # Criar o caminho para o arquivo de saída
        arquivo_saida = os.path.join(subpasta_saida, nome_arquivo)

        # Verificar se o arquivo já existe antes de salvar
        if os.path.exists(arquivo_saida):
            logging.info(f"Arquivo já existe e será ignorado: {arquivo_saida}")
        else:
            # Salvar o resultado em uma nova planilha
            combinado.to_excel(arquivo_saida, index=False)
            logging.info(f"Arquivo combinado e ordenado salvo como: {arquivo_saida}")

    except Exception as e:
        logging.error(f"Erro ao processar o arquivo {caminho_arquivo}: {str(e)}")


def processar_todos_arquivos(pasta_base: str, pasta_saida: str) -> None:
    for nome_subpasta in sorted(os.listdir(pasta_base)):
        caminho_subpasta = os.path.join(pasta_base, nome_subpasta)
        if os.path.isdir(caminho_subpasta):
            for nome_arquivo in sorted(os.listdir(caminho_subpasta)):
                if nome_arquivo.endswith(EXTENSAO_EXCEL):
                    caminho_arquivo = os.path.join(caminho_subpasta, nome_arquivo)
                    processar_arquivo(caminho_arquivo, pasta_saida)


# Execução do processamento
processar_todos_arquivos(BALANCOS_XLSX_FOLDER, BALANCOS_CONCATENADOS_FOLDER)


2024-08-20 01:43:42,834 - INFO - Arquivo combinado e ordenado salvo como: balancos_concatenados\Q1-2021\AALR3.xlsx
2024-08-20 01:43:43,374 - INFO - Arquivo combinado e ordenado salvo como: balancos_concatenados\Q1-2021\ABCB3.xlsx
2024-08-20 01:43:43,664 - INFO - Arquivo combinado e ordenado salvo como: balancos_concatenados\Q1-2021\ABCB4.xlsx
2024-08-20 01:43:43,898 - INFO - Arquivo combinado e ordenado salvo como: balancos_concatenados\Q1-2021\ABEV3.xlsx
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.iloc[0, 1:] = temp_df["date"].dt.strftime("%d/%m/%Y").values
2024-08-20 01:43:44,219 - INFO - Arquivo combinado e ordenado salvo como: balancos_concatenados\Q1-2021\ABYA3.xlsx
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/use

In [7]:
import os
import pandas as pd
from datetime import datetime

# Função para obter as subpastas ordenadas da mais antiga para a mais recente
def obter_subpastas_ordenadas(base_folder):
    subpastas = [f.name for f in os.scandir(base_folder) if f.is_dir()]
    subpastas.sort(
        key=lambda x: datetime.strptime(x.split("-")[1], "%Y")
    )  # Ordenar por ano
    return subpastas

# Função para concatenar as planilhas
def concatenar_planilhas(base_folder, output_folder):
    planilhas_dict = {}

    # Obter subpastas ordenadas
    subpastas_ordenadas = obter_subpastas_ordenadas(base_folder)
    
    # Percorre todas as subpastas (ordenadas pela data da pasta mais antiga para a mais recente)
    for subpasta in subpastas_ordenadas:
        subpasta_path = os.path.join(base_folder, subpasta)
        if os.path.isdir(subpasta_path):
            for file_name in os.listdir(subpasta_path):
                if file_name.endswith(".xlsx"):
                    file_path = os.path.join(subpasta_path, file_name)

                    # Carregar a planilha, ignorando a primeira linha (índice de colunas)
                    df = pd.read_excel(file_path, skiprows=1)

                    # Se o arquivo já foi processado, concatenar as novas colunas
                    if file_name in planilhas_dict:
                        df_existente = planilhas_dict[file_name]

                        # Ignorar a primeira coluna (indicadores financeiros)
                        for col in df.columns[1:]:
                            if col not in df_existente.columns:
                                df_existente[col] = df[col]

                        # Reordenar as colunas pela data
                        colunas_datas = [
                            col
                            for col in df_existente.columns
                            if col != df_existente.columns[0]
                        ]
                        colunas_datas.sort(
                            key=lambda date: datetime.strptime(str(date), "%d/%m/%Y")
                        )
                        colunas_ordenadas = [df_existente.columns[0]] + colunas_datas
                        planilhas_dict[file_name] = df_existente[colunas_ordenadas]
                    else:
                        # Armazenar a primeira planilha encontrada
                        planilhas_dict[file_name] = df

    # Salvar os arquivos concatenados na pasta de destino
    for nome_arquivo, df_final in planilhas_dict.items():
        # Salvar o DataFrame concatenado no arquivo de destino
        output_path = os.path.join(output_folder, nome_arquivo)
        df_final.to_excel(output_path, index=False)

# Executar a função
concatenar_planilhas(BALANCOS_CONCATENADOS_FOLDER, BALANCOS_DEFINITIVO_FOLDER)


In [None]:
# Célula 7: Processamento dos Arquivos de Balanços
def processar_arquivos_balancos():
    dicionario_de_fundamentos = {}
    subpastas = sorted(
        [f.path for f in os.scandir(BALANCOS_XLSX_FOLDER) if f.is_dir()],
        key=lambda x: x.split('-')[::-1]
    )

    for subpasta in subpastas:
        for arquivo in os.listdir(subpasta):
            if arquivo.endswith(".xlsx"):
                nome = arquivo.replace(".xlsx", "").strip()
                if nome in lista_de_empresas:
                    try:
                        balanco = pd.read_excel(os.path.join(subpasta, arquivo), sheet_name=0, header=None)
                        dre = pd.read_excel(os.path.join(subpasta, arquivo), sheet_name=1, header=None)

                        for df in [balanco, dre]:
                            df = df.iloc[1:]
                            first_col_value = pd.to_datetime(df.iloc[0, 0], errors='coerce', dayfirst=True)
                            if pd.isna(first_col_value):
                                df.columns = df.iloc[0, :]
                                df = df[1:]
                            else:
                                df.columns = df.iloc[0]
                                df = df[1:]

                        if nome in dicionario_de_fundamentos:
                            dicionario_de_fundamentos[nome] = pd.concat([dicionario_de_fundamentos[nome], balanco, dre], axis=1)
                        else:
                            dicionario_de_fundamentos[nome] = pd.concat([balanco, dre], axis=1)

                        print(f"{nome} foi adicionado ao dicionário.")
                    except Exception as e:
                        print(f"Erro ao processar o arquivo {arquivo}: {e}")

    return dicionario_de_fundamentos

# Processar arquivos de balanços
dicionario_de_balancos = processar_arquivos_balancos()

In [None]:
display(dicionario_de_balancos["ABCB4"])

In [None]:
# Célula 8: Eliminação de Empresas com Dados Antigos
def eliminar_empresas_com_dados_antigos(dicionario_de_balancos):
    data_limite = datetime(2023, 1, 1)
    empresas_para_remover = []

    for empresa, df in dicionario_de_balancos.items():
        datas = pd.to_datetime(df.columns, errors='coerce', dayfirst=True)
        ultima_data = datas.max()

        if pd.isna(ultima_data) or ultima_data < data_limite:
            empresas_para_remover.append(empresa)

    for empresa in empresas_para_remover:
        del dicionario_de_balancos[empresa]
        print(f"Empresa {empresa} foi removida devido a dados anteriores a 01/01/2023.")

    print(f"{len(empresas_para_remover)} empresas foram removidas.")

    return dicionario_de_balancos

# Eliminar empresas com dados antigos
dicionario_de_balancos = eliminar_empresas_com_dados_antigos(dicionario_de_balancos)

In [None]:
# Exibir um exemplo do dicionário de balanços
display(dicionario_de_balancos["ABCB4"])