In [2]:
# ETL_Cotacoes_B3_Diario.ipynb

import polars as pl
import requests
import zipfile
import logging
import os
import tempfile
import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime, timedelta
import warnings
from urllib3.exceptions import InsecureRequestWarning

# Suppress insecure request warnings
warnings.simplefilter('ignore', InsecureRequestWarning)


In [3]:
# Configuração de logs
def setup_logging():
    """Configura o sistema de logs padronizados."""
    log_dir = "logs"
    os.makedirs(log_dir, exist_ok=True)

    # Nome do arquivo de log com data atual
    log_file = os.path.join(log_dir, f"etl_cotacoes_b3_{datetime.now().strftime('%Y%m%d')}.log")

    # Configuração do logger
    logger = logging.getLogger("ETL_Cotacoes_B3")
    logger.setLevel(logging.INFO)

    # Handler para arquivo
    file_handler = logging.FileHandler(log_file)
    file_handler.setLevel(logging.INFO)

    # Handler para console
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)

    # Formato padronizado para os logs
    log_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(log_format)
    console_handler.setFormatter(log_format)

    # Adicionar handlers ao logger
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)

    return logger

# Inicializar logger
logger = setup_logging()

In [4]:
# Configurações de banco de dados
DATABASE_CONFIG = {
    "host": "db",
    "database": "meu_banco",
    "user": "admin",
    "password": "admin_password",
    "port": 5432
}

# --- Definições de colunas, mercados, etc. ---
FIELD_SIZES = {
    'TIPO_DE_REGISTRO': 2, 'DATA_DO_PREGAO': 8, 'CODIGO_BDI': 2, 'CODIGO_DE_NEGOCIACAO': 12,
    'TIPO_DE_MERCADO': 3, 'NOME_DA_EMPRESA': 12, 'ESPECIFICACAO_DO_PAPEL': 10,
    'PRAZO_EM_DIAS_DO_MERCADO_A_TERMO': 3, 'MOEDA_DE_REFERENCIA': 4, 'PRECO_DE_ABERTURA': 13,
    'PRECO_MAXIMO': 13, 'PRECO_MINIMO': 13, 'PRECO_MEDIO': 13, 'PRECO_ULTIMO_NEGOCIO': 13,
    'PRECO_MELHOR_OFERTA_DE_COMPRA': 13, 'PRECO_MELHOR_OFERTA_DE_VENDAS': 13,
    'NUMERO_DE_NEGOCIOS': 5, 'QUANTIDADE_NEGOCIADA': 18, 'VOLUME_TOTAL_NEGOCIADO': 18,
    'PRECO_DE_EXERCICIO': 13, 'INDICADOR_DE_CORRECAO_DE_PRECOS': 1, 'DATA_DE_VENCIMENTO': 8,
    'FATOR_DE_COTACAO': 7, 'PRECO_DE_EXERCICIO_EM_PONTOS': 13, 'CODIGO_ISIN': 12,
    'NUMERO_DE_DISTRIBUICAO': 3
}

BASE_URL = "https://bvmf.bmfbovespa.com.br/InstDados/SerHist/COTAHIST"

In [5]:
class B3DailyETL:
    """Classe para ETL diário de cotações da B3."""

    def __init__(self):
        """Inicializa o ETL com data atual."""
        # Usar o dia anterior por padrão, já que os dados da B3 são disponibilizados no dia seguinte
        self.target_date = datetime.now() - timedelta(days=1)
        self.url_suffix = f"_D{self.target_date.day:02d}{self.target_date.month:02d}{self.target_date.year}.ZIP"
        self.existing_tickers = set()
        logger.info(f"ETL inicializado para a data: {self.target_date.strftime('%d/%m/%Y')}")

    def get_existing_tickers(self):
        """Busca os tickers existentes no banco de dados."""
        conn = None
        try:
            logger.info("Buscando tickers existentes no banco de dados...")
            conn = psycopg2.connect(**DATABASE_CONFIG)
            with conn.cursor() as cursor:
                cursor.execute("SELECT codigo_isin FROM ticker")
                self.existing_tickers = set(row[0] for row in cursor.fetchall())
            logger.info(f"Encontrados {len(self.existing_tickers)} tickers no banco de dados.")
            return True
        except Exception as e:
            logger.error(f"Erro ao buscar tickers: {e}")
            return False
        finally:
            if conn:
                conn.close()

    def download_and_extract(self):
        """Baixa e extrai o arquivo ZIP da B3 para o dia atual."""
        url = f"{BASE_URL}{self.url_suffix}"
        tmp_zip_path = None
        tmp_txt_path = None

        try:
            logger.info(f"Baixando arquivo de {self.target_date.strftime('%d/%m/%Y')}: {url}")
            response = requests.get(url, verify=False, stream=True)
            response.raise_for_status()

            # Salvar ZIP em arquivo temporário
            with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_zip_file:
                for chunk in response.iter_content(chunk_size=8192 * 16):
                    tmp_zip_file.write(chunk)
                tmp_zip_path = tmp_zip_file.name
            logger.info(f"Arquivo ZIP salvo em: {tmp_zip_path}")

            # Criar um arquivo temporário para o TXT
            with tempfile.NamedTemporaryFile(delete=False, suffix=".txt") as tmp_txt_file:
                tmp_txt_path = tmp_txt_file.name

            # Fechar e remover o arquivo temporário vazio para evitar conflitos
            os.remove(tmp_txt_path)

            # Extrair TXT do ZIP
            file_name_in_zip = self.url_suffix.replace('.ZIP', '.TXT')
            file_name_in_zip = "COTAHIST" + file_name_in_zip

            logger.info(f"Extraindo arquivo {file_name_in_zip} do ZIP...")
            with zipfile.ZipFile(tmp_zip_path, 'r') as zf:
                if file_name_in_zip not in zf.namelist():
                    logger.error(f"Arquivo {file_name_in_zip} não encontrado no ZIP")
                    logger.info(f"Arquivos disponíveis no ZIP: {zf.namelist()}")
                    return None

                # Extrair diretamente para o caminho temporário
                with zf.open(file_name_in_zip) as source, open(tmp_txt_path, 'wb') as target:
                    target.write(source.read())

            logger.info(f"Arquivo extraído com sucesso para: {tmp_txt_path}")
            return tmp_txt_path

        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                logger.warning(f"Arquivo não encontrado para {self.target_date.strftime('%d/%m/%Y')}. "
                              f"Possivelmente não houve pregão neste dia ou os dados ainda não foram publicados.")
            else:
                logger.error(f"Erro HTTP ao baixar arquivo: {e}")
            return None
        except Exception as e:
            logger.error(f"Erro ao baixar/extrair arquivo: {e}", exc_info=True)
            return None
        finally:
            # Limpar arquivo ZIP temporário
            if tmp_zip_path and os.path.exists(tmp_zip_path):
                os.remove(tmp_zip_path)
                logger.debug(f"Arquivo ZIP temporário removido: {tmp_zip_path}")

    def process_file(self, file_path):
        """Processa o arquivo de cotações usando Polars."""
        logger.info(f"Processando arquivo: {file_path}")

        try:
            # Criar uma lista de posições de coluna baseada no FIELD_SIZES
            column_positions = []
            current_pos = 0
            for col, width in FIELD_SIZES.items():
                column_positions.append((current_pos, current_pos + width))
                current_pos += width

            # Filtrar apenas as colunas que precisamos
            needed_columns = ['TIPO_DE_REGISTRO', 'DATA_DO_PREGAO', 'CODIGO_BDI', 'PRECO_DE_ABERTURA',
                            'PRECO_ULTIMO_NEGOCIO', 'NUMERO_DE_NEGOCIOS', 'QUANTIDADE_NEGOCIADA',
                            'VOLUME_TOTAL_NEGOCIADO', 'CODIGO_ISIN']

            needed_positions = []
            column_names = []
            for i, col in enumerate(FIELD_SIZES.keys()):
                if col in needed_columns:
                    needed_positions.append(column_positions[i])
                    column_names.append(col)

            # Ler o arquivo usando Polars com leitura por colunas fixas
            logger.info("Iniciando leitura do arquivo com Polars...")
            df = pl.read_csv(
                file_path,
                has_header=False,
                skip_rows=1,
                encoding='latin1',
                separator='\n',  # Cada linha é um registro
                columns=[0],     # Lê apenas a primeira coluna que contém toda a linha
                new_columns=["line"]  # Renomeia para "line"
            ).lazy()

            # Extrair as colunas necessárias da linha usando expressões
            for i, (col_name, (start, end)) in enumerate(zip(column_names, needed_positions)):
                df = df.with_columns([
                    pl.col("line").str.slice(start, end - start).alias(col_name)
                ])

            # Filtrar linhas que não são trailer (TIPO_DE_REGISTRO != '99')
            df = df.filter(pl.col("TIPO_DE_REGISTRO") != "99")

            # Filtrar por CODIGO_BDI == "02" (LOTE_PADRAO)
            df = df.filter(pl.col("CODIGO_BDI") == "02")

            # Converter tipos de dados
            df = df.with_columns([
                # Converter datas
                pl.col("DATA_DO_PREGAO").str.to_date("%Y%m%d").alias("data_pregao"),

                # Converter valores numéricos
                (pl.col("PRECO_DE_ABERTURA").cast(pl.Float64) / 100).alias("abertura"),
                (pl.col("PRECO_ULTIMO_NEGOCIO").cast(pl.Float64) / 100).alias("fechamento"),
                pl.col("NUMERO_DE_NEGOCIOS").cast(pl.Int64).alias("numero_de_negocios"),
                pl.col("QUANTIDADE_NEGOCIADA").cast(pl.Float64).alias("quantidade_negociada"),
                pl.col("VOLUME_TOTAL_NEGOCIADO").cast(pl.Float64).alias("volume_negociado"),

                # Manter CODIGO_ISIN como está
                pl.col("CODIGO_ISIN").alias("codigo_isin")
            ])

            # Selecionar apenas as colunas finais e filtrar por tickers existentes
            df = df.select([
                "codigo_isin", "data_pregao", "abertura", "fechamento",
                "numero_de_negocios", "quantidade_negociada", "volume_negociado"
            ]).filter(
                pl.col("codigo_isin").is_in(list(self.existing_tickers))
            )

            # Executar o processamento e retornar o DataFrame materializado
            result_df = df.collect()
            logger.info(f"Processamento concluído. Registros encontrados: {result_df.height}")
            return result_df

        except Exception as e:
            logger.error(f"Erro ao processar arquivo: {e}", exc_info=True)
            return None

    def insert_into_database(self, df, batch_size=10000):
        """Insere os dados processados no banco de dados."""
        if df is None or df.is_empty():
            logger.warning("DataFrame vazio, nada a inserir.")
            return 0

        conn = None
        inserted = 0
        try:
            logger.info("Conectando ao banco de dados para inserção...")
            conn = psycopg2.connect(**DATABASE_CONFIG)
            with conn.cursor() as cursor:
                # Preparar dados para inserção
                data = df.to_pandas().to_dict('records')
                logger.info(f"Preparando {len(data)} registros para inserção em lotes de {batch_size}")

                # Inserir em lotes para melhor performance
                for i in range(0, len(data), batch_size):
                    batch = data[i:i+batch_size]
                    values = [(
                        row['codigo_isin'],
                        row['data_pregao'],
                        row['abertura'],
                        row['fechamento'],
                        row['numero_de_negocios'],
                        row['quantidade_negociada'],
                        row['volume_negociado']
                    ) for row in batch]

                    # Usar execute_values para inserção em lote
                    execute_values(
                        cursor,
                        """
                        INSERT INTO cotacao
                        (codigo_isin, data_pregao, abertura, fechamento, numero_de_negocios,
                        quantidade_negociada, volume_negociado)
                        VALUES %s
                        ON CONFLICT (codigo_isin, data_pregao) DO NOTHING
                        """,
                        values
                    )
                    logger.info(f"Lote {i//batch_size + 1} inserido.")

                conn.commit()
                inserted = len(data)
                logger.info(f"Inserção concluída. Total de registros processados: {inserted}")

        except Exception as e:
            if conn:
                conn.rollback()
            logger.error(f"Erro ao inserir no banco de dados: {e}", exc_info=True)
        finally:
            if conn:
                conn.close()

        return inserted

    def run(self):
        """Executa o ETL completo para o dia atual."""
        logger.info("=== Iniciando ETL diário de cotações B3 ===")

        # Etapa 1: Obter tickers existentes
        if not self.get_existing_tickers():
            logger.error("Falha ao obter tickers. Abortando ETL.")
            return False

        if not self.existing_tickers:
            logger.warning("Nenhum ticker encontrado no banco. Nenhuma cotação será inserida.")
            return False

        # Etapa 2: Download e extração do arquivo
        txt_path = self.download_and_extract()
        if not txt_path:
            logger.error("Falha ao baixar/extrair arquivo. Abortando ETL.")
            return False

        try:
            # Etapa 3: Processar arquivo
            df = self.process_file(txt_path)
            if df is None:
                logger.error("Falha ao processar arquivo. Abortando ETL.")
                return False

            if df.is_empty():
                logger.warning("Nenhum registro válido encontrado após processamento.")
                return True  # Consideramos sucesso, apenas não há dados

            # Etapa 4: Inserir no banco
            inserted = self.insert_into_database(df)
            if inserted > 0:
                logger.info(f"ETL concluído com sucesso. {inserted} registros processados.")
                return True
            else:
                logger.warning("Nenhum registro inserido no banco de dados.")
                return True  # Consideramos sucesso, apenas não há dados novos

        
        finally:
            # Limpeza: remover arquivo temporário
            if txt_path and os.path.exists(txt_path):
                os.remove(txt_path)
                logger.debug(f"Arquivo temporário removido: {txt_path}")

        logger.info("=== ETL diário de cotações B3 finalizado ===")



In [None]:

# Executar o ETL
if __name__ == "__main__":
    etl = B3DailyETL()
    success = etl.run()
    exit_code = 0 if success else 1
    logger.info(f"Programa finalizado com código de saída: {exit_code}")
    exit(exit_code)