In [None]:
# versao 8 csv - xlsx email - link e checkpoint  

import csv
import requests
import time
import os
import pandas as pd
from datetime import datetime
from tqdm import tqdm

# Arquivos de entrada e saída
ARQUIVO_ENTRADA = r'C:\Users\rfsra\OneDrive\Desktop\Lucro Real Camila\cnpj_limpos_unido_orign_pos_cnae.csv'
ARQUIVO_SAIDA = r'C:\Users\rfsra\OneDrive\Desktop\Lucro Real Camila\resultado_consulta_cnae_I.xlsx'

# Remove formatação do CNPJ (mantém apenas números)
def limpar_cnpj(cnpj):
    return ''.join(filter(str.isdigit, cnpj.zfill(14)))

# Formata o CNPJ no padrão 00.000.000/0001-91
def formatar_cnpj(cnpj):
    cnpj = ''.join(filter(str.isdigit, str(cnpj)))
    cnpj = cnpj.zfill(14)
    return f'{cnpj[:2]}.{cnpj[2:5]}.{cnpj[5:8]}/{cnpj[8:12]}-{cnpj[12:]}'

# Lê os CNPJs do arquivo CSV
def ler_cnpjs(caminho):
    with open(caminho, newline='', encoding='utf-8') as f:
        leitor = csv.reader(f)
        next(leitor)  # Pula o cabeçalho
        return [limpar_cnpj(row[1].strip()) for row in leitor if len(row) > 1]

# Lê os CNPJs já processados no Excel de saída
def ler_cnpjs_processados(caminho_saida):
    if not os.path.exists(caminho_saida):
        return set()
    try:
        df = pd.read_excel(caminho_saida, engine='openpyxl')
        return set(df['cnpj'].astype(str).str.zfill(14))
    except Exception as e:
        print(f"[ERRO] Falha ao ler o arquivo de saída: {e}")
        return set()

# Consulta os dados de um CNPJ na API
def consultar_cnpj(cnpj):
    url = f'https://publica.cnpj.ws/cnpj/{cnpj}'
    tentativas = 0

    while tentativas < 3:
        try:
            response = requests.get(url, timeout=35)

            if response.status_code == 404:
                break

            if response.status_code == 429:
                time.sleep(25)
                tentativas += 1
                continue

            response.raise_for_status()
            data = response.json()

            email_api = data.get('estabelecimento', {}).get('email')
            email_valido = email_api if email_api and email_api.strip() else "não cadastrado"

            dados = {
                'cnpj': cnpj,
                'nome': data.get('razao_social', 'N/D'),
                'uf': data.get('estabelecimento', {}).get('estado', {}).get('sigla', 'N/D'),
                'cidade': data.get('estabelecimento', {}).get('cidade', {}).get('nome', 'N/D'),
                'email': email_valido,
                'link_cnpj': f'https://cnpj.biz/{cnpj}',
                'cnae_codigo': data.get('estabelecimento', {}).get('atividade_principal', {}).get('subclasse', 'N/D'),
                'cnae_desc': data.get('estabelecimento', {}).get('atividade_principal', {}).get('descricao', 'N/D'),
                'data_hora': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            }

            return dados

        except Exception:
            return {
                'cnpj': cnpj, 'nome': 'Erro', 'uf': 'Erro', 'cidade': 'Erro',
                'email': 'não cadastrado',
                'link_cnpj': f'https://cnpj.biz/{cnpj}',
                'cnae_codigo': 'Erro', 'cnae_desc': 'Erro',
                'data_hora': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            }

    return {
        'cnpj': cnpj, 'nome': 'Falha', 'uf': 'Timeout', 'cidade': 'Timeout',
        'email': 'não cadastrado',
        'link_cnpj': f'https://cnpj.biz/{cnpj}',
        'cnae_codigo': 'Timeout', 'cnae_desc': 'Timeout',
        'data_hora': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }

# Salva os dados em um arquivo Excel
def salvar_excel(lista_dados, caminho_saida):
    df_novo = pd.DataFrame(lista_dados)
    df_novo['cnpj'] = df_novo['cnpj'].apply(formatar_cnpj)

    if os.path.exists(caminho_saida):
        df_existente = pd.read_excel(caminho_saida, engine='openpyxl')
        df_existente['cnpj'] = df_existente['cnpj'].apply(formatar_cnpj)
        df_completo = pd.concat([df_existente, df_novo], ignore_index=True)
    else:
        df_completo = df_novo

    df_completo.to_excel(caminho_saida, index=False, engine='openpyxl')

# Função principal
def main():
    print("Lendo CNPJs do arquivo de entrada...")
    cnpjs = ler_cnpjs(ARQUIVO_ENTRADA)

    print("Verificando CNPJs já processados (checkpoint)...")
    cnpjs_processados = ler_cnpjs_processados(ARQUIVO_SAIDA)

    novos_cnpjs = [c for c in cnpjs if c not in cnpjs_processados]
    print(f"Total de novos CNPJs a processar: {len(novos_cnpjs)}")

    resultados = []

    for i, cnpj in enumerate(tqdm(novos_cnpjs, desc="Consultando CNPJs", unit="cnpj"), 1):
        dados = consultar_cnpj(cnpj)
        resultados.append(dados)

        # Checkpoint automático a cada 10 CNPJs ou no final
        if i % 10 == 0 or i == len(novos_cnpjs):
            salvar_excel(resultados, ARQUIVO_SAIDA)
            resultados = []

        time.sleep(21)

# Executa
if __name__ == '__main__':
    main()


🔍 Lendo CNPJs do arquivo de entrada...
✅ Verificando CNPJs já processados (checkpoint)...
📦 Total de novos CNPJs a processar: 236829


Consultando CNPJs:   0%|          | 15/236829 [05:52<1544:25:21, 23.48s/cnpj]


KeyboardInterrupt: 

In [1]:
# versão 9 csv - xlsx email - link e checkpoint 2


import csv
import requests
import time
import os
import pandas as pd
from datetime import datetime
from tqdm import tqdm

# Caminhos dos arquivos
ARQUIVO_ENTRADA = r'C:\Users\rfsra\OneDrive\Desktop\Lucro Real Camila\CNAE_10083.xlsx'
ARQUIVO_SAIDA = r'C:\Users\rfsra\OneDrive\Desktop\Lucro Real Camila\resultado_consulta_cnae_II.xlsx'
CHECKPOINT_FILE = 'checkpoint.txt'  # Caminho do arquivo de checkpoint

# Remove formatação do CNPJ
def limpar_cnpj(cnpj):
    cnpj_str = str(cnpj).zfill(14)
    return ''.join(filter(str.isdigit, cnpj_str))

# Formata o CNPJ no padrão 00.000.000/0001-91
def formatar_cnpj(cnpj):
    cnpj = ''.join(filter(str.isdigit, str(cnpj)))
    cnpj = cnpj.zfill(14)
    return f'{cnpj[:2]}.{cnpj[2:5]}.{cnpj[5:8]}/{cnpj[8:12]}-{cnpj[12:]}'

# Lê os CNPJs do arquivo Excel de entrada
def ler_cnpjs(caminho):
    df = pd.read_excel(caminho)
    cnpjs = df['cnpj'].tolist()
    return [limpar_cnpj(c) for c in cnpjs]

# Lê os CNPJs já processados no Excel de saída
def ler_cnpjs_processados(caminho_saida):
    if not os.path.exists(caminho_saida):
        return set()
    df = pd.read_excel(caminho_saida)
    return set(df['cnpj'].astype(str).str.zfill(14))

# Consulta os dados de um CNPJ na API
def consultar_cnpj(cnpj):
    url = f'https://publica.cnpj.ws/cnpj/{cnpj}'
    tentativas = 0
    link_cnpj = f'https://cnpj.biz/{cnpj}'

    while tentativas < 3:
        try:
            response = requests.get(url, timeout=35)

            if response.status_code == 404:
                break

            if response.status_code == 429:
                time.sleep(25)
                tentativas += 1
                continue

            response.raise_for_status()
            data = response.json()

            email_extraido = data.get('estabelecimento', {}).get('email', 'N/D')
            email_formatado = 'e-mail não cadastrado' if email_extraido in ['N/D', 'Erro', 'Timeout', '', None] else email_extraido

            dados = {
                'cnpj': formatar_cnpj(cnpj),
                'nome': data.get('razao_social', 'N/D'),
                'uf': data.get('estabelecimento', {}).get('estado', {}).get('sigla', 'N/D'),
                'cidade': data.get('estabelecimento', {}).get('cidade', {}).get('nome', 'N/D'),
                'email': email_formatado,
                'link': link_cnpj,
                'cnae_codigo': data.get('estabelecimento', {}).get('atividade_principal', {}).get('subclasse', 'N/D'),
                'cnae_desc': data.get('estabelecimento', {}).get('atividade_principal', {}).get('descricao', 'N/D'),
                'data_hora': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            }

            return dados

        except Exception:
            return {
                'cnpj': formatar_cnpj(cnpj),
                'nome': 'Erro',
                'uf': 'Erro',
                'cidade': 'Erro',
                'email': 'e-mail não cadastrado',
                'link': link_cnpj,
                'cnae_codigo': 'Erro',
                'cnae_desc': 'Erro',
                'data_hora': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            }

    return {
        'cnpj': formatar_cnpj(cnpj),
        'nome': 'Falha',
        'uf': 'Timeout',
        'cidade': 'Timeout',
        'email': 'e-mail não cadastrado',
        'link': link_cnpj,
        'cnae_codigo': 'Timeout',
        'cnae_desc': 'Timeout',
        'data_hora': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }

# Salva os dados no Excel
def salvar_excel(lista_dados, caminho_saida):
    df_novo = pd.DataFrame(lista_dados)
    if os.path.exists(caminho_saida):
        df_existente = pd.read_excel(caminho_saida)
        df_existente['cnpj'] = df_existente['cnpj'].astype(str).str.zfill(14)
        df_completo = pd.concat([df_existente, df_novo], ignore_index=True)
    else:
        df_completo = df_novo

    df_completo.to_excel(caminho_saida, index=False)

# Salva o último CNPJ processado no checkpoint
def salvar_checkpoint(cnpj):
    with open(CHECKPOINT_FILE, 'w') as f:
        f.write(cnpj)

# Carrega o último CNPJ do checkpoint
def carregar_checkpoint():
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, 'r') as f:
            return f.read().strip()
    return None

# Função principal
def main():
    cnpjs = ler_cnpjs(ARQUIVO_ENTRADA)
    cnpjs_processados = ler_cnpjs_processados(ARQUIVO_SAIDA)

    ultimo_checkpoint = carregar_checkpoint()

    # Se houver checkpoint, retoma a partir dele
    if ultimo_checkpoint:
        try:
            index_ultimo = cnpjs.index(ultimo_checkpoint)
            cnpjs = cnpjs[index_ultimo + 1:]
        except ValueError:
            pass

    novos_cnpjs = [c for c in cnpjs if c not in cnpjs_processados]

    resultados = []

    for i, cnpj in enumerate(tqdm(novos_cnpjs, desc="Consultando CNPJs", unit="cnpj"), 1):
        dados = consultar_cnpj(cnpj)
        resultados.append(dados)

        salvar_checkpoint(cnpj)  # salva o checkpoint

        if i % 10 == 0 or i == len(novos_cnpjs):
            salvar_excel(resultados, ARQUIVO_SAIDA)
            resultados = []

        time.sleep(21)

# Executa o script
if __name__ == '__main__':
    main()



Consultando CNPJs: 100%|██████████| 9088/9088 [103:27:51<00:00, 40.98s/cnpj]        


In [6]:
# correção de erros output api timeout

# versão 10 csv - xlsx email - link e checkpoint 2


import csv
import requests
import time
import os
import pandas as pd
from datetime import datetime
from tqdm import tqdm

# Caminhos dos arquivos
ARQUIVO_ENTRADA = r'C:\Users\rfsra\OneDrive\Desktop\Lucro Real Camila\CNAE_10083_2.xlsx'
ARQUIVO_SAIDA = r'C:\Users\rfsra\OneDrive\Desktop\Lucro Real Camila\resultado_consulta_cnae_erros.xlsx'
CHECKPOINT_FILE = 'checkpoint.txt'  # Caminho do arquivo de checkpoint

# Remove formatação do CNPJ
def limpar_cnpj(cnpj):
    cnpj_str = str(cnpj).zfill(14)
    return ''.join(filter(str.isdigit, cnpj_str))

# Formata o CNPJ no padrão 00.000.000/0001-91
def formatar_cnpj(cnpj):
    cnpj = ''.join(filter(str.isdigit, str(cnpj)))
    cnpj = cnpj.zfill(14)
    return f'{cnpj[:2]}.{cnpj[2:5]}.{cnpj[5:8]}/{cnpj[8:12]}-{cnpj[12:]}'

# Lê os CNPJs do arquivo Excel de entrada
def ler_cnpjs(caminho):
    df = pd.read_excel(caminho)
    cnpjs = df['cnpj'].tolist()
    return [limpar_cnpj(c) for c in cnpjs]

# Lê os CNPJs já processados no Excel de saída
def ler_cnpjs_processados(caminho_saida):
    if not os.path.exists(caminho_saida):
        return set()
    df = pd.read_excel(caminho_saida)
    return set(df['cnpj'].astype(str).str.zfill(14))

# Consulta os dados de um CNPJ na API
def consultar_cnpj(cnpj):
    url = f'https://publica.cnpj.ws/cnpj/{cnpj}'
    tentativas = 0
    link_cnpj = f'https://cnpj.biz/{cnpj}'

    while tentativas < 3:
        try:
            response = requests.get(url, timeout=35)

            if response.status_code == 404:
                break

            if response.status_code == 429:
                time.sleep(25)
                tentativas += 1
                continue

            response.raise_for_status()
            data = response.json()

            email_extraido = data.get('estabelecimento', {}).get('email', 'N/D')
            email_formatado = 'e-mail não cadastrado' if email_extraido in ['N/D', 'Erro', 'Timeout', '', None] else email_extraido

            dados = {
                'cnpj': formatar_cnpj(cnpj),
                'nome': data.get('razao_social', 'N/D'),
                'uf': data.get('estabelecimento', {}).get('estado', {}).get('sigla', 'N/D'),
                'cidade': data.get('estabelecimento', {}).get('cidade', {}).get('nome', 'N/D'),
                'email': email_formatado,
                'link': link_cnpj,
                'cnae_codigo': data.get('estabelecimento', {}).get('atividade_principal', {}).get('subclasse', 'N/D'),
                'cnae_desc': data.get('estabelecimento', {}).get('atividade_principal', {}).get('descricao', 'N/D'),
                'data_hora': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            }

            return dados

        except Exception:
            return {
                'cnpj': formatar_cnpj(cnpj),
                'nome': 'Erro',
                'uf': 'Erro',
                'cidade': 'Erro',
                'email': 'e-mail não cadastrado',
                'link': link_cnpj,
                'cnae_codigo': 'Erro',
                'cnae_desc': 'Erro',
                'data_hora': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
            }

    return {
        'cnpj': formatar_cnpj(cnpj),
        'nome': 'Falha',
        'uf': 'Timeout',
        'cidade': 'Timeout',
        'email': 'e-mail não cadastrado',
        'link': link_cnpj,
        'cnae_codigo': 'Timeout',
        'cnae_desc': 'Timeout',
        'data_hora': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    }

# Salva os dados no Excel
def salvar_excel(lista_dados, caminho_saida):
    df_novo = pd.DataFrame(lista_dados)
    if os.path.exists(caminho_saida):
        df_existente = pd.read_excel(caminho_saida)
        df_existente['cnpj'] = df_existente['cnpj'].astype(str).str.zfill(14)
        df_completo = pd.concat([df_existente, df_novo], ignore_index=True)
    else:
        df_completo = df_novo

    df_completo.to_excel(caminho_saida, index=False)

# Salva o último CNPJ processado no checkpoint
def salvar_checkpoint(cnpj):
    with open(CHECKPOINT_FILE, 'w') as f:
        f.write(cnpj)

# Carrega o último CNPJ do checkpoint
def carregar_checkpoint():
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, 'r') as f:
            return f.read().strip()
    return None

# Função principal
def main():
    cnpjs = ler_cnpjs(ARQUIVO_ENTRADA)
    cnpjs_processados = ler_cnpjs_processados(ARQUIVO_SAIDA)

    ultimo_checkpoint = carregar_checkpoint()

    # Se houver checkpoint, retoma a partir dele
    if ultimo_checkpoint:
        try:
            index_ultimo = cnpjs.index(ultimo_checkpoint)
            cnpjs = cnpjs[index_ultimo + 1:]
        except ValueError:
            pass

    novos_cnpjs = [c for c in cnpjs if c not in cnpjs_processados]

    resultados = []

    for i, cnpj in enumerate(tqdm(novos_cnpjs, desc="Consultando CNPJs", unit="cnpj"), 1):
        dados = consultar_cnpj(cnpj)
        resultados.append(dados)

        salvar_checkpoint(cnpj)  # salva o checkpoint

        if i % 10 == 0 or i == len(novos_cnpjs):
            salvar_excel(resultados, ARQUIVO_SAIDA)
            resultados = []

        time.sleep(21)

# Executa o script
if __name__ == '__main__':
    main()



Consultando CNPJs: 100%|██████████| 26/26 [10:27<00:00, 24.12s/cnpj]
