In [None]:
import os
import zipfile
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from tqdm import tqdm
import shutil
import gc
from IPython.display import display

In [None]:
# --- Configuração dos Diretórios ---
RAW_PATH = os.path.join('data', 'raw')
TEXTS_PATH = os.path.join('data', 'texts')
PROCESSED_PATH = os.path.join('data', 'processed')

# --- Caminho do Arquivo Final ---
FINAL_PARQUET_PATH = os.path.join(PROCESSED_PATH, 'dados_b3.parquet')

# --- Criação dos Diretórios (se não existirem) ---
os.makedirs(TEXTS_PATH, exist_ok=True)
os.makedirs(PROCESSED_PATH, exist_ok=True)

# --- Layout do Arquivo de Cotações Históricas ---
COTAHIST_LAYOUT = {
    'TIPREG': (1, 2), 'DATA_PREGAO': (3, 10), 'CODBDI': (11, 12), 'CODNEG': (13, 24),
    'TPMERC': (25, 27), 'NOMRES': (28, 39), 'ESPECI': (40, 49), 'PRAZOT': (50, 52),
    'MODREF': (53, 56), 'PREABE': (57, 69), 'PREMAX': (70, 82), 'PREMIN': (83, 95),
    'PREMED': (96, 108), 'PREULT': (109, 121), 'PREOFC': (122, 134), 'PREOFV': (135, 147),
    'TOTNEG': (148, 152), 'QUATOT': (153, 170), 'VOLTOT': (171, 188), 'PREEXE': (189, 201),
    'INDOPC': (202, 202), 'DATVEN': (203, 210), 'FATCOT': (211, 217), 'PTOEXE': (218, 230),
    'CODISI': (231, 242), 'DISMES': (243, 245),
}

COLSPECS = [(v[0] - 1, v[1]) for k, v in COTAHIST_LAYOUT.items()]
NAMES = list(COTAHIST_LAYOUT.keys())

# --- Definição das Colunas para Limpeza ---
DATE_COLS = ['DATA_PREGAO', 'DATVEN']
PRICE_COLS = ['PREABE', 'PREMAX', 'PREMIN', 'PREMED', 'PREULT', 'PREOFC', 'PREOFV', 'PREEXE', 'VOLTOT']

In [None]:
def extrair_arquivos_zip():
    """
    Extrai arquivos .zip de data/raw para data/texts.
    """
    zip_files = [f for f in os.listdir(RAW_PATH) if f.lower().endswith('.zip')]
    if not zip_files:
        print("Nenhum arquivo .zip encontrado em data/raw.")
        return
        
    print(f"Encontrados {len(zip_files)} arquivos ZIP para extrair.")
    for filename in tqdm(zip_files, desc="Extraindo arquivos ZIP"):
        zip_path = os.path.join(RAW_PATH, filename)
        try:
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                for member in zip_ref.namelist():
                    base_name = os.path.basename(member)
                    if not base_name.lower().endswith('.txt'):
                        output_filename = os.path.join(TEXTS_PATH, base_name + '.TXT')
                    else:
                        output_filename = os.path.join(TEXTS_PATH, base_name)
                    with zip_ref.open(member) as source, open(output_filename, 'wb') as target:
                        shutil.copyfileobj(source, target)
        except zipfile.BadZipFile:
            print(f"AVISO: O arquivo '{filename}' não é um ZIP válido. Pulando.")
        except Exception as e:
            print(f"ERRO ao extrair '{filename}': {e}")

extrair_arquivos_zip()

In [None]:
def processar_e_consolidar_em_parquet_unico_final():
    """
    Lê os arquivos de texto linha por linha para contornar a limitação do 'skipfooter',
    processa em lotes para uso mínimo de memória, impõe um schema consistente e 
    anexa a um único arquivo Parquet.
    """
    files_to_process = [f for f in os.listdir(TEXTS_PATH) if f.lower().endswith('.txt')]
    if not files_to_process:
        print("Nenhum arquivo .txt encontrado em data/texts para processar.")
        return

    print(f"\nIniciando consolidação de {len(files_to_process)} arquivos (Modo Linha-a-Linha).")
    
    INT_COLS = ['TIPREG', 'CODBDI', 'TPMERC', 'TOTNEG', 'QUATOT', 
                'FATCOT', 'INDOPC', 'DISMES', 'PRAZOT', 'PTOEXE']
    
    BATCH_SIZE = 500_000 # Quantas linhas processar em memória por vez
    writer = None
    final_schema = None

    if os.path.exists(FINAL_PARQUET_PATH):
        os.remove(FINAL_PARQUET_PATH)
        print(f"Arquivo parquet antigo '{FINAL_PARQUET_PATH}' removido.")

    # Loop pelos NOMES DOS ARQUIVOS
    for filename in tqdm(files_to_process, desc="Processando Arquivos"):
        file_path = os.path.join(TEXTS_PATH, filename)
        
        try:
            with open(file_path, 'r', encoding='latin-1') as f:
                # Lê todas as linhas para a memória (apenas para este arquivo)
                # para podermos ignorar a primeira e a última
                lines = f.readlines()

            # Ignora o header (primeira linha) e o trailer (última linha)
            content_lines = lines[1:-1]
            
            # Se não houver conteúdo após remover header/trailer, pula para o próximo arquivo
            if not content_lines:
                continue

            # Processa o conteúdo em lotes
            for i in range(0, len(content_lines), BATCH_SIZE):
                batch_lines = content_lines[i:i + BATCH_SIZE]
                
                # Converte o lote de linhas em um DataFrame
                # Usamos um truque com StringIO para que o pandas leia a lista de strings
                # como se fosse um arquivo.
                from io import StringIO
                data_io = StringIO("".join(batch_lines))
                
                df_batch = pd.read_fwf(
                    data_io,
                    colspecs=COLSPECS,
                    names=NAMES,
                    header=None # Já removemos o header manualmente
                )

                if df_batch.empty:
                    continue

                # --- LIMPEZA E IMPOSIÇÃO DE SCHEMA (aplicado ao lote) ---
                for col in df_batch.select_dtypes(['object']).columns:
                    df_batch[col] = df_batch[col].str.strip()
                for col in DATE_COLS:
                    df_batch[col] = pd.to_datetime(df_batch[col], format='%Y%m%d', errors='coerce')
                for col in PRICE_COLS:
                    df_batch[col] = pd.to_numeric(df_batch[col], errors='coerce') / 100
                for col in INT_COLS:
                    df_batch[col] = pd.to_numeric(df_batch[col], errors='coerce').astype('Int64')

                # --- ESCRITA NO PARQUET ---
                table = pa.Table.from_pandas(df_batch, preserve_index=False)
                
                if writer is None:
                    final_schema = table.schema
                    writer = pq.ParquetWriter(FINAL_PARQUET_PATH, final_schema, compression='snappy')
                
                table = table.cast(final_schema)
                writer.write_table(table)

        except Exception as e:
            print(f"ERRO CRÍTICO ao processar o arquivo '{filename}': {e}. Este arquivo será pulado.")
        
        gc.collect()

    if writer:
        writer.close()
        print(f"\n[SUCESSO] Todos os arquivos foram consolidados em:\n{FINAL_PARQUET_PATH}")
    else:
        print("\nNenhum dado foi processado para o arquivo Parquet.")

processar_e_consolidar_em_parquet_unico_final()