In [1]:
import duckdb
import os
import pandas as pd
from tqdm import tqdm
import re

In [2]:
# Diretório onde os arquivos foram extraídos
data_dir = 'D:/CNPJ_Dados_Abertos/Extracted/'

# Conectar ao banco de dados DuckDB com um novo nome de arquivo
con = duckdb.connect(database='cnpj_data_new.db', read_only=False)

# Função para verificar se a tabela já existe
def tabela_existe(con, table_name):
    result = con.execute(f"SELECT COUNT(*) FROM information_schema.tables WHERE table_name = '{table_name}'").fetchone()
    return result[0] > 0

# Função para remover a tabela se ela já existir
def remover_tabela_se_existir(con, table_name):
    con.execute(f"DROP TABLE IF EXISTS {table_name}")

# Função para carregar e unificar os arquivos diretamente no DuckDB em chunks
def carregar_e_unificar_dados(data_dir, con):
    print("Arquivos no diretório:")
    all_files = []
    for root, dirs, files in os.walk(data_dir):
        for file in files:
            if any(keyword in file for keyword in ['ESTABELE', 'EMPRECSV', 'SIMPLES', 'NATJUCSV']):
                all_files.append((root, file))
                print(file)

    # Remover tabelas se já existirem
    remover_tabela_se_existir(con, 'estabelecimentos')
    remover_tabela_se_existir(con, 'empresas')
    remover_tabela_se_existir(con, 'simples')
    remover_tabela_se_existir(con, 'natureza_juridica')

    # Definir as colunas para as tabelas
    colunas_estabelecimentos = [
        "CNPJ_BASICO", "CNPJ_ORDEM", "CNPJ_DV", "IDENTIFICADOR_MATRIZ_FILIAL", 
        "NOME_FANTASIA", "SITUACAO_CADASTRAL", "DATA_SITUACAO_CADASTRAL", 
        "MOTIVO_SITUACAO_CADASTRAL", "NOME_DA_CIDADE_NO_EXTERIOR", "PAIS", 
        "DATA_DE_INICIO_ATIVIDADE", "CNAE_FISCAL_PRINCIPAL", "CNAE_FISCAL_SECUNDARIA", 
        "TIPO_DE_LOGRADOURO", "LOGRADOURO", "NUMERO", "COMPLEMENTO", "BAIRRO", 
        "CEP", "UF", "MUNICIPIO", "DDD_1", "TELEFONE_1", "DDD_2", "TELEFONE_2", 
        "DDD_DO_FAX", "FAX", "CORREIO_ELETRONICO", "SITUACAO_ESPECIAL", 
        "DATA_DA_SITUACAO_ESPECIAL"
    ]
    colunas_empresas = [
        "CNPJ_BASICO", "RAZAO_SOCIAL", "NATUREZA_JURIDICA", "QUALIFICACAO_DO_RESPONSAVEL", 
        "CAPITAL_SOCIAL_DA_EMPRESA", "PORTE_DA_EMPRESA", "ENTE_FEDERATIVO_RESPONSAVEL"
    ]
    colunas_simples = [
        "CNPJ_BASICO", "OPCAO_PELO_SIMPLES", "DATA_OPCAO_PELO_SIMPLES", "DATA_EXCLUSAO_DO_SIMPLES",
        "OPCAO_PELO_MEI", "DATA_OPCAO_PELO_MEI", "DATA_EXCLUSAO_DO_MEI"
    ]

    colunas_varchar_estabelecimentos = ', '.join([f"{col} VARCHAR" for col in colunas_estabelecimentos])
    colunas_varchar_empresas = ', '.join([f"{col} VARCHAR" for col in colunas_empresas])
    colunas_varchar_simples = ', '.join([f"{col} VARCHAR" for col in colunas_simples])

    # Criar tabelas vazias com colunas como VARCHAR
    con.execute(f"CREATE TABLE estabelecimentos ({colunas_varchar_estabelecimentos})")
    con.execute(f"CREATE TABLE empresas ({colunas_varchar_empresas})")
    con.execute(f"CREATE TABLE simples ({colunas_varchar_simples})")

    # Processar os arquivos
    for root, file in tqdm(all_files, desc="Processando arquivos"):
        file_path = os.path.join(root, file)

        # Determinar a tabela de destino com base no nome do arquivo
        if 'ESTABELE' in file:
            tabela_destino = 'estabelecimentos'
        elif 'EMPRECSV' in file:
            tabela_destino = 'empresas'
        elif 'SIMPLES' in file:
            tabela_destino = 'simples'
        else:
            continue
        
        # Concatenar os arquivos diretamente no DuckDB em chunks
        print(f"Lendo e carregando arquivo {file_path} no DuckDB em chunks...")
        try:
            chunk_size = 100000  # Tamanho do chunk
            for chunk in pd.read_csv(file_path, delimiter=';', low_memory=False, encoding='latin1', chunksize=chunk_size, dtype=str):
                con.execute(f"INSERT INTO {tabela_destino} SELECT * FROM chunk")
        except Exception as e:
            print(f"Erro ao ler ou carregar arquivo {file_path}: {e}")

# Carregar e unificar os dados
carregar_e_unificar_dados(data_dir, con)

Arquivos no diretório:
F.K03200$W.SIMPLES.CSV.D40413
F.K03200$Z.D40413.NATJUCSV
K3241.K03200Y0.D40413.EMPRECSV
K3241.K03200Y0.D40413.ESTABELE
K3241.K03200Y1.D40413.EMPRECSV
K3241.K03200Y1.D40413.ESTABELE
K3241.K03200Y2.D40413.EMPRECSV
K3241.K03200Y2.D40413.ESTABELE
K3241.K03200Y3.D40413.EMPRECSV
K3241.K03200Y3.D40413.ESTABELE
K3241.K03200Y4.D40413.EMPRECSV
K3241.K03200Y4.D40413.ESTABELE
K3241.K03200Y5.D40413.EMPRECSV
K3241.K03200Y5.D40413.ESTABELE
K3241.K03200Y6.D40413.ESTABELE
K3241.K03200Y7.D40413.ESTABELE
K3241.K03200Y8.D40413.EMPRECSV
K3241.K03200Y8.D40413.ESTABELE
K3241.K03200Y9.D40413.EMPRECSV
K3241.K03200Y9.D40413.ESTABELE


Processando arquivos:   0%|          | 0/20 [00:00<?, ?it/s]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/F.K03200$W.SIMPLES.CSV.D40413 no DuckDB em chunks...


Processando arquivos:   5%|▌         | 1/20 [02:57<56:13, 177.55s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y0.D40413.EMPRECSV no DuckDB em chunks...


Processando arquivos:  15%|█▌        | 3/20 [04:54<25:21, 89.52s/it] 

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y0.D40413.ESTABELE no DuckDB em chunks...


Processando arquivos:  20%|██        | 4/20 [12:00<54:54, 205.91s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y1.D40413.EMPRECSV no DuckDB em chunks...


Processando arquivos:  25%|██▌       | 5/20 [12:41<37:49, 151.28s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y1.D40413.ESTABELE no DuckDB em chunks...


Processando arquivos:  30%|███       | 6/20 [15:10<35:06, 150.44s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y2.D40413.EMPRECSV no DuckDB em chunks...


Processando arquivos:  35%|███▌      | 7/20 [15:59<25:44, 118.77s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y2.D40413.ESTABELE no DuckDB em chunks...


Processando arquivos:  40%|████      | 8/20 [18:44<26:34, 132.91s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y3.D40413.EMPRECSV no DuckDB em chunks...


Processando arquivos:  45%|████▌     | 9/20 [19:40<20:01, 109.24s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y3.D40413.ESTABELE no DuckDB em chunks...


Processando arquivos:  50%|█████     | 10/20 [22:46<22:06, 132.61s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y4.D40413.EMPRECSV no DuckDB em chunks...


Processando arquivos:  55%|█████▌    | 11/20 [23:54<16:57, 113.11s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y4.D40413.ESTABELE no DuckDB em chunks...


Processando arquivos:  60%|██████    | 12/20 [27:31<19:15, 144.46s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y5.D40413.EMPRECSV no DuckDB em chunks...


Processando arquivos:  65%|██████▌   | 13/20 [28:50<14:34, 124.90s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y5.D40413.ESTABELE no DuckDB em chunks...


Processando arquivos:  70%|███████   | 14/20 [32:59<16:13, 162.29s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y6.D40413.ESTABELE no DuckDB em chunks...


Processando arquivos:  75%|███████▌  | 15/20 [37:23<16:04, 192.84s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y7.D40413.ESTABELE no DuckDB em chunks...


Processando arquivos:  80%|████████  | 16/20 [42:13<14:47, 221.83s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y8.D40413.EMPRECSV no DuckDB em chunks...


Processando arquivos:  85%|████████▌ | 17/20 [44:03<09:25, 188.35s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y8.D40413.ESTABELE no DuckDB em chunks...


Processando arquivos:  90%|█████████ | 18/20 [49:16<07:31, 225.71s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y9.D40413.EMPRECSV no DuckDB em chunks...


Processando arquivos:  95%|█████████▌| 19/20 [51:08<03:11, 191.59s/it]

Lendo e carregando arquivo D:/CNPJ_Dados_Abertos/Extracted/K3241.K03200Y9.D40413.ESTABELE no DuckDB em chunks...


Processando arquivos: 100%|██████████| 20/20 [56:31<00:00, 169.57s/it]


In [3]:
# Visão geral das tabelas
def overview_tabela(con, table_name):
    if tabela_existe(con, table_name):
        print(f"\nVisão geral da tabela '{table_name}':")
        
        # Exibir nomes das colunas
        colunas = con.execute(f"PRAGMA table_info({table_name})").fetchdf()
        print("Nomes das colunas:")
        print(colunas['name'].tolist())
        
        print(f"Primeiras linhas da tabela '{table_name}':")
        result = con.execute(f"SELECT * FROM {table_name} LIMIT 10").fetchdf()
        print(result)

        print(f"Estatísticas da tabela '{table_name}':")
        result = con.execute(f"SELECT COUNT(*) AS num_rows FROM {table_name}").fetchdf()
        print(result)
    else:
        print(f"Tabela '{table_name}' não existe.")

# Exibir visão geral de cada tabela
overview_tabela(con, 'estabelecimentos')
overview_tabela(con, 'empresas')
overview_tabela(con, 'simples')


Visão geral da tabela 'estabelecimentos':
Nomes das colunas:
['CNPJ_BASICO', 'CNPJ_ORDEM', 'CNPJ_DV', 'IDENTIFICADOR_MATRIZ_FILIAL', 'NOME_FANTASIA', 'SITUACAO_CADASTRAL', 'DATA_SITUACAO_CADASTRAL', 'MOTIVO_SITUACAO_CADASTRAL', 'NOME_DA_CIDADE_NO_EXTERIOR', 'PAIS', 'DATA_DE_INICIO_ATIVIDADE', 'CNAE_FISCAL_PRINCIPAL', 'CNAE_FISCAL_SECUNDARIA', 'TIPO_DE_LOGRADOURO', 'LOGRADOURO', 'NUMERO', 'COMPLEMENTO', 'BAIRRO', 'CEP', 'UF', 'MUNICIPIO', 'DDD_1', 'TELEFONE_1', 'DDD_2', 'TELEFONE_2', 'DDD_DO_FAX', 'FAX', 'CORREIO_ELETRONICO', 'SITUACAO_ESPECIAL', 'DATA_DA_SITUACAO_ESPECIAL']
Primeiras linhas da tabela 'estabelecimentos':
  CNPJ_BASICO CNPJ_ORDEM CNPJ_DV IDENTIFICADOR_MATRIZ_FILIAL   NOME_FANTASIA  \
0    76137801       0001      65                           1            None   
1    61079117       0205      56                           2            None   
2    06900758       0001      62                           1            None   
3    38692695       0001      92                   

In [7]:
query = """
WITH params AS (
    SELECT 
        DATE '2023-12-31' AS dt_ref_nao_baixadas,
        DATE '2019-01-01' AS dt_ref_inicial
),
filtered_estabelecimentos AS (
    SELECT 
        DISTINCT LTRIM(RTRIM(CNPJ_BASICO)) AS CNPJ_BASICO,
        LTRIM(RTRIM(CNPJ_ORDEM)) AS CNPJ_ORDEM,
        LTRIM(RTRIM(CNPJ_DV)) AS CNPJ_DV,
        UF,
        MUNICIPIO,
        CEP,
        TRY_CAST(SUBSTRING(DATA_SITUACAO_CADASTRAL, 1, 4) || '-' || SUBSTRING(DATA_SITUACAO_CADASTRAL, 5, 2) || '-' || SUBSTRING(DATA_SITUACAO_CADASTRAL, 7, 2) AS DATE) AS DataSituacaoCadastral,
        TRY_CAST(SUBSTRING(DATA_DE_INICIO_ATIVIDADE, 1, 4) || '-' || SUBSTRING(DATA_DE_INICIO_ATIVIDADE, 5, 2) || '-' || SUBSTRING(DATA_DE_INICIO_ATIVIDADE, 7, 2) AS DATE) AS DataCadastro,
        CNAE_FISCAL_PRINCIPAL,
        CNAE_FISCAL_SECUNDARIA
    FROM estabelecimentos
    WHERE IDENTIFICADOR_MATRIZ_FILIAL = 1
      AND UF != 'EX'
      AND SITUACAO_CADASTRAL = '02' -- Apenas empresas ativas
      AND TRY_CAST(SUBSTRING(DATA_SITUACAO_CADASTRAL, 1, 4) || '-' || SUBSTRING(DATA_SITUACAO_CADASTRAL, 5, 2) || '-' || SUBSTRING(DATA_SITUACAO_CADASTRAL, 7, 2) AS DATE) <= (SELECT dt_ref_nao_baixadas FROM params) -- Data de situação cadastral <= Data de referência
      AND TRY_CAST(SUBSTRING(DATA_DE_INICIO_ATIVIDADE, 1, 4) || '-' || SUBSTRING(DATA_DE_INICIO_ATIVIDADE, 5, 2) || '-' || SUBSTRING(DATA_DE_INICIO_ATIVIDADE, 7, 2) AS DATE) BETWEEN (SELECT dt_ref_inicial FROM params) AND (SELECT dt_ref_nao_baixadas FROM params) -- Data de abertura entre o intervalo de datas
      AND CAST(CNAE_FISCAL_PRINCIPAL AS INT) / 100000 NOT IN (84, 94, 99) -- Exclui empresas não mercantis para o Sebrae
      AND DATA_SITUACAO_CADASTRAL IS NOT NULL
      AND DATA_DE_INICIO_ATIVIDADE IS NOT NULL
),
empresa_portes AS (
    SELECT
        emp.CNPJ_BASICO,
        CASE 
         WHEN emp.PORTE_DA_EMPRESA = '01' AND simples.DATA_EXCLUSAO_DO_MEI IS NOT NULL THEN 'MEI'
            WHEN emp.PORTE_DA_EMPRESA = '01' AND simples.DATA_EXCLUSAO_DO_MEI IS NULL THEN 'ME' 
            WHEN emp.PORTE_DA_EMPRESA = '03' AND simples.DATA_EXCLUSAO_DO_MEI IS NOT NULL THEN 'MEI'
            WHEN emp.PORTE_DA_EMPRESA = '03' AND simples.DATA_EXCLUSAO_DO_MEI IS NULL THEN 'EPP' 
            WHEN emp.PORTE_DA_EMPRESA = '05' AND simples.DATA_EXCLUSAO_DO_MEI IS NOT NULL THEN 'MEI'
            WHEN emp.PORTE_DA_EMPRESA = '05' AND simples.DATA_EXCLUSAO_DO_SIMPLES IS NOT NULL THEN 'EPP'
            WHEN emp.PORTE_DA_EMPRESA = '05' AND simples.DATA_EXCLUSAO_DO_MEI IS NULL THEN 'DEMAIS'
            WHEN emp.PORTE_DA_EMPRESA IS NULL AND simples.DATA_EXCLUSAO_DO_MEI IS NOT NULL THEN 'MEI'
            
        END AS Porte
    FROM empresas emp
    LEFT JOIN simples simples
        ON emp.CNPJ_BASICO = simples.CNPJ_BASICO
        AND TRY_CAST(SUBSTRING(simples.DATA_OPCAO_PELO_SIMPLES, 1, 4) || '-' || SUBSTRING(simples.DATA_OPCAO_PELO_SIMPLES, 5, 2) || '-' || SUBSTRING(simples.DATA_OPCAO_PELO_SIMPLES, 7, 2) AS DATE) <= (SELECT dt_ref_nao_baixadas FROM params)
        AND (TRY_CAST(SUBSTRING(simples.DATA_EXCLUSAO_DO_SIMPLES, 1, 4) || '-' || SUBSTRING(simples.DATA_EXCLUSAO_DO_SIMPLES, 5, 2) || '-' || SUBSTRING(simples.DATA_EXCLUSAO_DO_SIMPLES, 7, 2) AS DATE) > (SELECT dt_ref_nao_baixadas FROM params) OR simples.DATA_EXCLUSAO_DO_SIMPLES = '00000000')
        AND TRY_CAST(SUBSTRING(simples.DATA_OPCAO_PELO_MEI, 1, 4) || '-' || SUBSTRING(simples.DATA_OPCAO_PELO_MEI, 5, 2) || '-' || SUBSTRING(simples.DATA_OPCAO_PELO_MEI, 7, 2) AS DATE) <= (SELECT dt_ref_nao_baixadas FROM params)
        AND (TRY_CAST(SUBSTRING(simples.DATA_EXCLUSAO_DO_MEI, 1, 4) || '-' || SUBSTRING(simples.DATA_EXCLUSAO_DO_MEI, 5, 2) || '-' || SUBSTRING(simples.DATA_EXCLUSAO_DO_MEI, 7, 2) AS DATE) > (SELECT dt_ref_nao_baixadas FROM params) OR simples.DATA_EXCLUSAO_DO_MEI = '00000000')
)

SELECT DISTINCT
    CAST(est.CNPJ_BASICO || est.CNPJ_ORDEM || est.CNPJ_DV AS VARCHAR(14)) AS CNPJ,
    est.UF,
    est.MUNICIPIO,
    est.CEP,
    est.DataSituacaoCadastral,
    est.DataCadastro,
    ep.Porte,
    est.CNAE_FISCAL_PRINCIPAL AS CNAE_Principal,
    est.CNAE_FISCAL_SECUNDARIA AS CNAE_Secundario
FROM filtered_estabelecimentos AS est
LEFT JOIN empresa_portes ep
    ON est.CNPJ_BASICO = ep.CNPJ_BASICO
WHERE 
    ep.Porte IS NOT NULL
    AND ep.Porte IN ('ME', 'EPP', 'DEMAIS');
"""

In [9]:
# Executar a consulta
result = con.execute(query).fetchdf()
print(result)

FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

                   CNPJ  UF MUNICIPIO       CEP DataSituacaoCadastral  \
0        41645633000134  GO      9433  75804610            2021-04-20   
1        41645666000184  SP      6713  08735640            2021-04-20   
2        41645669000118  RJ      6001  23097155            2021-04-20   
3        41645671000197  MG      4123  31080100            2021-04-20   
4        41645810000182  SC      8179  89209530            2021-04-20   
...                 ...  ..       ...       ...                   ...   
3743728  41249316000107  MT      0117  78565000            2021-03-17   
3743729  41249545000113  PR      7565  85603025            2021-03-16   
3743730  41249556000101  RJ      6001  22410906            2021-03-17   
3743731  41249621000190  PE      2531  50790640            2021-03-17   
3743732  41249893000190  PR      7493  85807141            2021-03-17   

        DataCadastro   Porte CNAE_Principal  \
0         2021-04-20      ME        4772500   
1         2021-04-20     EPP 

In [10]:
result.describe()

Unnamed: 0,DataSituacaoCadastral,DataCadastro
count,3743733,3743733
mean,2021-10-14 21:45:28.108976,2021-09-26 00:59:35.031874
min,2019-01-01 00:00:00,2019-01-01 00:00:00
25%,2020-09-15 00:00:00,2020-08-18 00:00:00
50%,2021-11-29 00:00:00,2021-10-30 00:00:00
75%,2022-12-28 00:00:00,2022-12-14 00:00:00
max,2023-12-31 00:00:00,2023-12-31 00:00:00


In [11]:
result.astype('object').describe()

Unnamed: 0,CNPJ,UF,MUNICIPIO,CEP,DataSituacaoCadastral,DataCadastro,Porte,CNAE_Principal,CNAE_Secundario
count,3743733,3743733,3743733,3743733,3743733,3743733,3743733,3743733,2740267
unique,3743733,27,5570,505569,1826,1826,3,1306,1035535
top,41645633000134,SP,7107,88220000,2023-08-21 00:00:00,2023-08-21 00:00:00,ME,8211300,8219999
freq,1,1216306,475159,4962,4604,4558,2690057,129209,26295


In [12]:
result.DataSituacaoCadastral.max()

Timestamp('2023-12-31 00:00:00')

In [13]:
result["CNPJ"].nunique()

3743733

In [14]:
# Salvar o resultado em um arquivo Parquet
parquet_file = 'Dados/empresas_ativas.parquet'
result.to_parquet(parquet_file)

print(f"Arquivo Parquet salvo em: {parquet_file}")

Arquivo Parquet salvo em: Dados/empresas_ativas.parquet


In [21]:
# Remover todas as tabelas
remover_tabela_se_existir(con, 'estabelecimentos')
remover_tabela_se_existir(con, 'empresas')
remover_tabela_se_existir(con, 'simples')
remover_tabela_se_existir(con, 'natureza_juridica')

# Fechar a conexão com o banco de dados
con.close()