In [1]:
import os
from dotenv import load_dotenv
import pandas as pd
import chardet
from unidecode import unidecode
from sqlalchemy import create_engine

# Carregar as variáveis de ambiente do arquivo .env
load_dotenv()

# Lendo as variáveis de ambiente
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_name = os.getenv('DB_NAME')

# Criando a string de conexão
connection_string = f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}'

# Configurando a conexão com o PostgreSQL
engine = create_engine(connection_string)

In [2]:
# Definir qual o encoding do arquivo

# Lendo uma amostra do arquivo para detectar o encoding
# with open('../dados_csv/estabelecimentos0.csv', 'rb') as f:
#     encoding_type = chardet.detect(f.read())  

# print(encoding_type)
encoding_type = {'encoding': 'ISO-8859-1', 'confidence': 0.7299998874356555, 'language': ''}

{'encoding': 'ISO-8859-1', 'confidence': 0.7299998874356555, 'language': ''}


In [3]:
# Definindo os tipos de dados
dtypes = {
    'cnpj_basico': 'str',  # NÚMERO BASE DE INSCRIÇÃO NO CNPJ (OITO PRIMEIROS DÍGITOS DO CNPJ).
    'cnpj_ordem': 'str', # NÚMERO DO ESTABELECIMENTO DE INSCRIÇÃO NO CNPJ (DONONO ATÉ O DÉCIMO SEGUNDO DÍGITO DO CNPJ).
    'cnpj_dv': 'str',  # DÍGITO VERIFICADOR DO CNPJ.
    'matriz_filial': 'str',  # INDICADOR DE MATRIZ OU FILIAL. 1 - MATRIZ 2 - FILIAL
    'nome_fantasia': 'str', # NOME DE FANTASIA DO ESTABELECIMENTO.
    'situacao_cadastral': 'str', # CÓDIGO DA SITUAÇÃO CADASTRAL:
      # 01 – NULA
      # 2 – ATIVA
      # 3 – SUSPENSA
      # 4 – INAPTA
      # 08 – BAIXADA
    'data_situacao_cadastral': 'str',  # DATA DO EVENTO DA SITUAÇÃO CADASTRAL.
    'motivo_situacao_cadastral': 'str',  # CÓDIGO DO MOTIVO DA SITUAÇÃO CADASTRAL:
    'nome_da_cidade_no_exterior': 'str', # NOME DA CIDADE NO EXTERIOR (SE O ESTABELECIMENTO ESTIVER NO EXTERIOR).
    'pais': 'str',  # Código do País
    'data_inicio_atividade': 'str',  # Carregar como string inicialmente
    'cnae_fiscal_principal': 'str',  # CÓDIGO DA ATIVIDADE ECONÔMICA PRINCIPAL.
    'cnae_fiscal_secundaria': 'str',  # CÓDIGO DA ATIVIDADE ECONÔMICA SECUNDÁRIA.
    'tipo_logradouro': 'str',  # TIPO DO LOGRADOURO.
    'logradouro': 'str',  # NOME DO LOGRADOURO.
    'numero': 'str',  # NÚMERO DO LOGRADOURO.
    'complemento': 'str',  # COMPLEMENTO DO LOGRADOURO.
    'bairro': 'str',  # NOME DO BAIRRO.
    'cep': 'str',  # CÓDIGO DE ENDEREÇAMENTO POSTAL.
    'uf': 'str',  # SIGLA DA UNIDADE DA FEDERAÇÃO.
    'municipio': 'str',  # ÓDIGO DO MUNICÍPIO DE JURISDIÇÃO ONDE SE ENCONTRA o ESTABELECIMENTO
    'ddd1': 'str',  # CÓDIGO DDD 1.
    'telefone1': 'str',  # NÚMERO DE TELEFONE 1.
    'ddd2': 'str',  # CÓDIGO DDD 2.
    'telefone2': 'str',  # NÚMERO DE TELEFONE 2.
    'ddd_fax': 'str',  # CÓDIGO DDD DO FAX.
    'correio_eletronico': 'str',  # EMAIL.
    'situacao_especial': 'str',  # CÓDIGO DA SITUAÇÃO ESPECIAL.
    'data_situacao_especial': 'str'  # DATA DA SITUAÇÃO ESPECIAL.    
}


In [4]:
# Função para processar cada chunk
def process_chunk(chunk):
    chunk['data_situacao_cadastral'] = pd.to_datetime(chunk['data_situacao_cadastral'].str.strip(), format='%Y%m%d', errors='coerce')
    chunk['data_inicio_atividade'] = pd.to_datetime(chunk['data_inicio_atividade'].str.strip(), format='%Y%m%d', errors='coerce')
    chunk['data_situacao_especial'] = pd.to_datetime(chunk['data_situacao_especial'].str.strip(), format='%Y%m%d', errors='coerce')    
    
    # Aplicando a função unaccent em todas as colunas do DataFrame
    chunk = chunk.applymap(lambda x: unidecode(x) if isinstance(x, str) else x)

    chunk.to_sql('estabelecimentos', engine, if_exists='append', index=False)


In [5]:
# Lendo o CSV em chunks e processando cada chunk
chunksize = 100000  # Ajuste o tamanho do chunk conforme necessário
for chunk in pd.read_csv('../dados_csv/estabelecimentos0.csv', sep=';', names=list(dtypes.keys()), dtype=dtypes, chunksize=chunksize, encoding=encoding_type["encoding"]):
    process_chunk(chunk)

print("Estabelecimentos0 enviado para o PostgreSQL com sucesso!")

UnicodeDecodeError: 'utf-8' codec can't decode byte 0xc1 in position 46497: invalid start byte

In [None]:
for chunk in pd.read_csv('../dados_csv/estabelecimentos1.csv', sep=';', names=list(dtypes.keys()), dtype=dtypes, chunksize=chunksize, encoding=encoding_type["encoding"]):
    process_chunk(chunk)

print("Estabelecimentos1 enviado para o PostgreSQL com sucesso!")

In [None]:
for chunk in pd.read_csv('../dados_csv/estabelecimentos2.csv', sep=';', names=list(dtypes.keys()), dtype=dtypes, chunksize=chunksize, encoding=encoding_type["encoding"]):
    process_chunk(chunk)

print("Estabelecimentos2 enviado para o PostgreSQL com sucesso!")

In [None]:
for chunk in pd.read_csv('../dados_csv/estabelecimentos3.csv', sep=';', names=list(dtypes.keys()), dtype=dtypes, chunksize=chunksize,encoding=encoding_type["encoding"]):
    process_chunk(chunk)

print("Estabelecimentos3 enviado para o PostgreSQL com sucesso!")

In [None]:
for chunk in pd.read_csv('../dados_csv/estabelecimentos4.csv', sep=';', names=list(dtypes.keys()), dtype=dtypes, chunksize=chunksize, encoding=encoding_type["encoding"]):
    process_chunk(chunk)

print("Estabelecimentos4 enviado para o PostgreSQL com sucesso!")

In [None]:
for chunk in pd.read_csv('../dados_csv/estabelecimentos5.csv', sep=';', names=list(dtypes.keys()), dtype=dtypes, chunksize=chunksize,encoding=encoding_type["encoding"]):
    process_chunk(chunk)

print("Estabelecimentos5 enviado para o PostgreSQL com sucesso!")

In [None]:
for chunk in pd.read_csv('../dados_csv/estabelecimentos6.csv', sep=';', names=list(dtypes.keys()), dtype=dtypes, chunksize=chunksize, encoding=encoding_type["encoding"]):
    process_chunk(chunk)

print("Estabelecimentos6 enviado para o PostgreSQL com sucesso!")

In [None]:
for chunk in pd.read_csv('../dados_csv/estabelecimentos7.csv', sep=';', names=list(dtypes.keys()), dtype=dtypes, chunksize=chunksize, encoding=encoding_type["encoding"]):
    process_chunk(chunk)

print("Estabelecimentos7 enviado para o PostgreSQL com sucesso!")

In [None]:
for chunk in pd.read_csv('../dados_csv/estabelecimentos8.csv', sep=';', names=list(dtypes.keys()), dtype=dtypes, chunksize=chunksize, encoding=encoding_type["encoding"]):
    process_chunk(chunk)

print("Estabelecimentos8 enviado para o PostgreSQL com sucesso!")

In [None]:
for chunk in pd.read_csv('../dados_csv/estabelecimentos9.csv', sep=';', names=list(dtypes.keys()), dtype=dtypes, chunksize=chunksize, encoding=encoding_type["encoding"]):
    process_chunk(chunk)

print("Estabelecimentos9 enviado para o PostgreSQL com sucesso!")