In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import gc
import os

# --- CONFIGURA√á√ÉO ---
BASE_PATH = "../data/raw/CNPJ"
ARQUIVOS_ESTAB = [os.path.join(BASE_PATH, f"Estabelecimentos{i}/K3241.K03200Y{i}.D50913.ESTABELE") for i in range(10)]
CAMINHO_SIMPLES = os.path.join(BASE_PATH, "Simples/F.K03200$W.SIMPLES.CSV.D50913")
SAIDA_FINAL = "../data/processed/painel_mei_rf_anual.parquet"

def main():
    print("--- INICIANDO ETL MEI OTIMIZADO ---")
    
    # 1. Carregar Simples com tipos otimizados
    # O CNPJ B√°sico deve ser string ou int para o merge bater
    df_simples = pd.read_csv(CAMINHO_SIMPLES, sep=";", encoding="latin-1", header=None, 
                             usecols=[0, 4, 5, 6], names=["cnpj_basico", "mei", "ini", "fim"],
                             dtype={"cnpj_basico": "int32", "mei": "category"})
    
    df_simples = df_simples[df_simples["mei"] == "S"].copy()
    
    # Datas para Ano (Vetorizado) - convertemos para int16 para economizar RAM
    df_simples['ano_ini'] = (df_simples['ini'] // 10000).fillna(0).astype("int16")
    df_simples['ano_fim'] = (df_simples['fim'] // 10000).fillna(0).astype("int16")
    df_simples.drop(columns=["mei", "ini", "fim"], inplace=True)

    painel_final_lista = []

    # 2. Loop Estabelecimentos
    for path in ARQUIVOS_ESTAB:
        if not os.path.exists(path): continue
        
        # Lista tempor√°ria por ARQUIVO para evitar fragmenta√ß√£o global
        painel_arquivo = []
        
        reader = pd.read_csv(path, sep=";", encoding="latin-1", header=None,
                             usecols=[0, 11, 20], names=["cnpj_basico", "cnae", "mun"],
                             dtype={"cnpj_basico": "int32", "cnae": "float64", "mun": "float64"},
                             chunksize=2_000_000)
        
        for chunk in tqdm(reader, desc=f"üìÇ {os.path.basename(path)}"):
            # Merge Inner (remove quem n√£o √© MEI logo de cara)
            chunk = chunk.merge(df_simples, on="cnpj_basico", how="inner")
            
            # Setor Vetorizado (Muito mais r√°pido que .apply)
            # CNAE na RF costuma ter 7 d√≠gitos. Divis√£o s√£o os 2 primeiros.
            divisao = (chunk['cnae'] // 100000).fillna(0).astype(int)
            chunk['setor'] = 'Servicos'
            chunk.loc[divisao.between(1, 3), 'setor'] = 'Agro'
            chunk.loc[divisao.between(5, 33), 'setor'] = 'Industria'
            chunk.loc[divisao == 84, 'setor'] = 'Setor Publico'
            
            # Agrega√ß√£o r√°pida
            ent = chunk.groupby(['mun', 'setor', 'ano_ini']).size().reset_index(name='entradas')
            sai = chunk.dropna(subset=['ano_fim']).groupby(['mun', 'setor', 'ano_fim']).size().reset_index(name='saidas')
            
            painel_arquivo.append(pd.merge(ent.rename(columns={'ano_ini': 'ano'}), 
                                           sai.rename(columns={'ano_fim': 'ano'}), 
                                           on=['mun', 'setor', 'ano'], how='outer').fillna(0))
            
        # Consolida o arquivo atual e limpa a RAM antes do pr√≥ximo
        if painel_arquivo:
            df_arq = pd.concat(painel_arquivo).groupby(['mun', 'setor', 'ano']).sum().reset_index()
            painel_final_lista.append(df_arq)
            del painel_arquivo; gc.collect()

    # 3. Consolida√ß√£o e C√°lculo de Estoque
    df_final = pd.concat(painel_final_lista).groupby(['mun', 'setor', 'ano']).sum().reset_index()
    
    # Ordenar para o cumsum fazer sentido (Econometria de Painel)
    df_final = df_final.sort_values(['mun', 'setor', 'ano'])
    df_final['estoque_mei'] = (
        df_final.groupby(['mun', 'setor'])['entradas'].cumsum() - 
        df_final.groupby(['mun', 'setor'])['saidas'].cumsum()
    )
    df_final.to_parquet(SAIDA_FINAL, index=False)
    print(f"‚úÖ Painel conclu√≠do: {SAIDA_FINAL}")

if __name__ == "__main__":
    main()

--- INICIANDO ETL MEI OTIMIZADO ---


üìÇ K3241.K03200Y0.D50913.ESTABELE: 2it [13:35, 409.07s/it]

In [None]:
import pandas as pd
import numpy as np
import os
import gc

# --- CONFIGURA√á√ÉO DE CAMINHOS ---
INPUT_FILE = '../data/raw/test.parquet'
OUTPUT_DIR = '../data/processed'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, 'rais_painel_balanceado.parquet')

def processar_rais_otimizado(path):
    """
    L√™ apenas colunas necess√°rias e usa vetoriza√ß√£o para economizar RAM.
    """
    print("   [1/3] Lendo colunas selecionadas e mapeando setores...")
    
    # L√™ apenas o necess√°rio
    cols = ['id_municipio', 'cnae_2', 'ano', 'quantidade_vinculos_ativos']
    df = pd.read_parquet(path, columns=cols)

    # Vetoriza√ß√£o em vez de apply: muito mais r√°pido e leve
    # CNAE Divis√£o (2 d√≠gitos)
    div = (df['cnae_2'].fillna(0).astype(np.int32) // 1000)
    
    df['setor'] = 'Outros'
    df.loc[div.between(1, 3), 'setor'] = 'Agro'
    df.loc[div.between(5, 33), 'setor'] = 'Industria'
    df.loc[div == 84, 'setor'] = 'Setor Publico'
    df.loc[div.between(35, 99), 'setor'] = 'Servicos'
    
    # Remove coluna tempor√°ria e libera mem√≥ria
    df.drop(columns=['cnae_2'], inplace=True)

    print("   [2/3] Agregando estoque...")
    df_agg = (df.groupby(['id_municipio', 'setor', 'ano'], as_index=False)
                ['quantidade_vinculos_ativos'].sum())
    
    del df
    gc.collect()
    return df_agg

def rebalancear_painel_otimizado(df_agg):
    """
    Usa tipos de dados categ√≥ricos para reduzir o tamanho do produto cartesiano.
    """
    print("   [3/3] Rebalanceando o painel...")

    # Remover 'Outros' antes de balancear economiza mem√≥ria
    df_agg = df_agg[df_agg['setor'] != 'Outros'].copy()

    todos_municipios = df_agg['id_municipio'].unique()
    todos_setores = ['Agro', 'Industria', 'Servicos', 'Setor Publico']
    todos_anos = df_agg['ano'].unique()

    # Criar index e esqueleto
    index = pd.MultiIndex.from_product(
        [todos_municipios, todos_setores, todos_anos], 
        names=['id_municipio', 'setor', 'ano']
    )
    
    # Reindex √© mais eficiente que Merge para balanceamento
    df_final = (df_agg.set_index(['id_municipio', 'setor', 'ano'])
                      .reindex(index, fill_value=0)
                      .reset_index())

    # Vari√°vel para DID
    df_final['log_estoque'] = np.log1p(df_final['quantidade_vinculos_ativos'].astype(np.float32))
    
    return df_final

def main():
    print("--- INICIANDO ETL RAIS OTIMIZADO ---")
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    
    if not os.path.exists(INPUT_FILE):
        print(f"ERRO: Arquivo de entrada n√£o encontrado: {INPUT_FILE}")
        return

    # Processamento em etapas com limpeza de cache
    df_agg = processar_rais_otimizado(INPUT_FILE)
    df_final = rebalancear_painel_otimizado(df_agg)
    
    print(f"Salvando em: {OUTPUT_FILE}")
    # compress√£o snappy √© padr√£o e r√°pida
    df_final.to_parquet(OUTPUT_FILE, index=False, compression='snappy')
    print("--- CONCLU√çDO ---")

if __name__ == "__main__":
    main()

In [None]:
import pandas as pd

def get_simples_base(path, dtypes):
    df = pd.read_csv(path, sep=";", encoding="latin-1", header=None, 
                     usecols=[0, 4, 5, 6], names=["cnpj_basico", "mei", "ini", "fim"],
                     dtype=dtypes)
    df = df[df["mei"] == "S"].copy()
    df['ano_ini'] = (df['ini'] // 10000).fillna(0).astype("int16")
    df['ano_fim'] = (df['fim'] // 10000).fillna(0).astype("int16")
    return df.drop(columns=["mei", "ini", "fim"])

def classify_sector(df):
    divisao = (df['cnae'] // 100000).fillna(0).astype(int)
    df['setor'] = 'Servicos'
    df.loc[divisao.between(1, 3), 'setor'] = 'Agro'
    df.loc[divisao.between(5, 33), 'setor'] = 'Industria'
    df.loc[divisao == 84, 'setor'] = 'Setor Publico'
    return df