## 1. Preparação

In [None]:
from sqlalchemy import create_engine, text
import dask.dataframe as dd
import pandas as pd
import numpy as np

## 2. Configurando a Conexão

In [None]:
# Configurar a conexão PostgreSQL
db_user = 'admin'
db_password = 'PstgsqlAdm2024!$'
db_host = 'localhost'
db_port = '5432'
db_name = 'empresasbr_db'

# String de conexão do SQLAlchemy
engine = create_engine(f'postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')

## 3. Funções Base

In [1]:
# Função para verificar quais registros já existem no banco de dados em lotes menores
def check_existing_records(connection, table_name, primary_key_column, batch, chunk_size=10000):
    existing_records = set()
    keys = tuple(batch[primary_key_column].unique())

    if not keys:
        return existing_records

    for i in range(0, len(keys), chunk_size):
        chunk = keys[i:i + chunk_size]
        query = text(f"SELECT {primary_key_column} FROM {table_name} WHERE {primary_key_column} IN :keys")
        result = connection.execute(query, {'keys': chunk}).fetchall()
        existing_records.update([row[0] for row in result])

    return existing_records

# Função para carregar os dados do Parquet e inserir no PostgreSQL com tratamento de duplicidade
def insert_dask_dataframe_to_postgres(parquet_file, table_name, primary_key_column):
    try:
        # Carregar o arquivo Parquet com Dask
        df = dd.read_parquet(parquet_file)

        # Lista de colunas de data
        colunas_data = [
            'data_situacao_cadastral', 'data_inicio_atividade', 'data_situacao_especial',
            'data_entrada_sociedade', 'data_opcao_simples', 'data_exclusao_simples', 
            'data_opcao_mei', 'data_exclusao_mei'
        ]

        # Substituir NaT por None nas colunas de data
        df[colunas_data] = df[colunas_data].map_partitions(lambda part: part.astype('object').where(part.notna(), None))

        # Inserção por lotes
        batch_size = 10000

        with engine.connect() as connection:
            trans = connection.begin()

            try:
                for i in range(0, len(df), batch_size):
                    # Computar o lote como Pandas DataFrame para inserção
                    batch = df.loc[i:i + batch_size].compute()

                    if batch.empty:
                        print(f"Lote {i // batch_size + 1} está vazio. Interrompendo o processo...")
                        break

                    # Substituir valores nulos por None no Pandas DataFrame
                    batch = batch.where(pd.notnull(batch), None)

                    # Verificar registros que já existem no banco de dados
                    existing_records = check_existing_records(connection, table_name, primary_key_column, batch)

                    # Filtrar apenas os registros que não estão no banco
                    new_records = batch[~batch[primary_key_column].isin(existing_records)]

                    if new_records.empty:
                        print(f"Lote {i // batch_size + 1} já foi inserido previamente. Pulando...")
                        continue

                    # Gerar a query de inserção com upsert (ON CONFLICT)
                    insert_query = generate_upsert_query(table_name, new_records.columns, primary_key_column)

                    # Executar o upsert usando `text()` para preparar a query
                    connection.execute(text(insert_query), new_records.to_dict(orient='records'))
                    print(f"Lote {i // batch_size + 1} inserido com sucesso.")
                
                trans.commit()
                print(f"Dados inseridos ou atualizados com sucesso na tabela {table_name}.")

            except Exception as e:
                trans.rollback()
                print(f"Erro ao inserir lote {i // batch_size + 1}: {e}")

    except Exception as e:
        print(f"Erro ao inserir dados na tabela {table_name}: {e}")

# Função para gerar uma query de inserção com ON CONFLICT para evitar duplicatas
def generate_upsert_query(table_name, columns, primary_key_column):
    insert_cols = ', '.join(columns)
    value_placeholders = ', '.join([f':{col}' for col in columns])
    update_clause = ', '.join([f"{col} = EXCLUDED.{col}" for col in columns if col != primary_key_column])

    query = f"""
    INSERT INTO {table_name} ({insert_cols})
    VALUES ({value_placeholders})
    ON CONFLICT ({primary_key_column})
    DO UPDATE SET {update_clause};
    """
    return query




## 4. Execução da importação para o PostgreSQL

In [2]:
# Executar a função para inserir no PostgreSQL
parquet_file = './parquets/dataset_final.parquet'
insert_dask_dataframe_to_postgres(parquet_file, 'empresas', 'cnpj_completo')

Lote 1 inserido com sucesso.
Lote 2 inserido com sucesso.
Lote 3 inserido com sucesso.
Lote 4 inserido com sucesso.
Lote 5 inserido com sucesso.
Lote 6 inserido com sucesso.
Lote 7 inserido com sucesso.
Lote 8 inserido com sucesso.
Lote 9 inserido com sucesso.
Lote 10 inserido com sucesso.
Lote 11 inserido com sucesso.
Lote 12 inserido com sucesso.
Lote 13 inserido com sucesso.
Lote 14 inserido com sucesso.
Lote 15 inserido com sucesso.
Lote 16 inserido com sucesso.
Lote 17 inserido com sucesso.
Lote 18 inserido com sucesso.
Lote 19 inserido com sucesso.
Lote 20 inserido com sucesso.
Lote 21 inserido com sucesso.
Lote 22 inserido com sucesso.
Lote 23 inserido com sucesso.
Lote 24 inserido com sucesso.
Lote 25 inserido com sucesso.
Lote 26 inserido com sucesso.
Lote 27 inserido com sucesso.
Lote 28 inserido com sucesso.
Lote 29 inserido com sucesso.
Lote 30 está vazio. Interrompendo o processo...
Dados inseridos ou atualizados com sucesso na tabela empresas.
