# RPA - Sincronização de Banco de Dados SaveIt (v2.0)

Este notebook sincroniza dados entre dois bancos (primeira e segunda série), mapeando corretamente os IDs gerados automaticamente e atualizando as chaves estrangeiras.

In [1]:
import psycopg2
from dotenv import load_dotenv
from os import getenv
import pandas as pd

In [2]:
load_dotenv()

True

## Função de Upsert com Mapeamento de IDs (v2.0)

In [3]:
def create_composite_key(df, columns):
    # Concatena os valores das colunas com um separador
    composite = df[columns].astype(str).agg('||'.join, axis=1)
    return composite


def upsert_data_with_fks(df, unique_col, columns_list, table, cur, conn, pk_col="id", fk_mappings=None, composite_key_cols=None):
    # Cria uma cópia do DataFrame para não modificar o original
    df_work = df.copy()
    
    # Guarda os IDs originais
    df_work["_old_id"] = df_work[pk_col].copy()
    
    # Se usar chave composta, cria uma coluna temporária
    if composite_key_cols:
        df_work["_composite_key"] = create_composite_key(df_work, composite_key_cols)
        unique_col = "_composite_key"
        # Adiciona a coluna composta à lista de colunas a inserir temporariamente
        columns_list_with_key = columns_list.copy()
    else:
        columns_list_with_key = columns_list
    
    # Atualiza as chaves estrangeiras se fornecidas
    if fk_mappings:
        for fk_col, mapping_dict in fk_mappings:
            if fk_col in df_work.columns:
                df_work[fk_col] = df_work[fk_col].map(mapping_dict)
                # Verifica se há valores não mapeados
                unmapped = df_work[df_work[fk_col].isna()][fk_col]
                if len(unmapped) > 0:
                    print(f"AVISO: {len(unmapped)} valores não mapeados em {fk_col}")
    
    # Remove a coluna de ID para inserção
    df_insert = df_work.drop(columns=[pk_col])
    
    cols = ", ".join(columns_list)
    placeholders = ", ".join(["%s"] * len(columns_list))
    
    # Se não houver coluna única, insere sem ON CONFLICT (Busca + Insere)
    if composite_key_cols:
        inserted_rows = []
        
        for idx, row in df_insert.iterrows():
            values = [row[col] for col in columns_list]
            
            # Primeiro tenta encontrar se já existe
            where_conditions = " AND ".join([f"{col} = %s" for col in columns_list])
            check_query = f"SELECT {pk_col} FROM {table} WHERE {where_conditions};"
            
            try:
                cur.execute(check_query, values)
                existing = cur.fetchone()
                
                if existing:
                    # Já existe, usa o ID existente
                    new_id = existing[0]
                    composite_key = row["_composite_key"]
                    inserted_rows.append((new_id, composite_key))
                    # print(f"  → Registro já existe (ID={new_id})")
                else:
                    # Não existe, insere novo
                    insert_query = f"""
                    INSERT INTO {table} ({cols})
                    VALUES ({placeholders})
                    RETURNING {pk_col};
                    """
                    cur.execute(insert_query, values)
                    result = cur.fetchone()
                    conn.commit()
                    
                    if result:
                        new_id = result[0]
                        composite_key = row["_composite_key"]
                        inserted_rows.append((new_id, composite_key))
                        
            except Exception as e:
                print(f"Erro ao processar registro na tabela {table}: {e}")
                print(f"Valores: {values}")
                conn.rollback()
    else:
        # Estratégia original com ON CONFLICT
        excluded = ", ".join([f"{col} = EXCLUDED.{col}" for col in columns_list if col != unique_col])
        inserted_rows = []
        
        for idx, row in df_insert.iterrows():
            values = [row[col] for col in columns_list]
            query = f"""
            INSERT INTO {table} ({cols})
            VALUES ({placeholders})
            ON CONFLICT ({unique_col}) DO UPDATE
            SET {excluded}
            RETURNING {pk_col}, {unique_col};
            """
            try:
                cur.execute(query, values)
                result = cur.fetchone()
                conn.commit()
                if result:
                    inserted_rows.append(result)
            except Exception as e:
                print(f"Erro ao inserir na tabela {table}: {e}")
                print(f"Valores: {values}")
                conn.rollback()
    
    # Cria DataFrame com os IDs gerados
    df_ids = pd.DataFrame(inserted_rows, columns=[pk_col, unique_col])
    
    # Cria mapping old_id -> new_id
    mapping = df_work.merge(df_ids, on=unique_col, how="left", suffixes=('_old', '_new'))
    mapping_dict = dict(zip(mapping["_old_id"], mapping[f"{pk_col}_new"]))
        
    return mapping_dict

## Conexão com os Bancos de Dados

In [4]:
# Estabelecendo conexão com os databases
try:
    first_conn = psycopg2.connect(
        getenv("FIRST_DATABASE_ACCESS")
    )
    first_cur = first_conn.cursor()
    
    second_conn = psycopg2.connect(
        getenv("SECOND_DATABASE_ACCESS")
    )
    second_cur = second_conn.cursor()
    print("✓ Conexões estabelecidas com sucesso")
except Exception as e:
    print(f"Erro ao conectar aos bancos: {e}")

✓ Conexões estabelecidas com sucesso


## Extração de Dados do Banco de Origem

In [5]:
# Extraindo tabelas do banco da primeira série
try:    
    print("--- Extraindo dados do banco de origem ---")
    
    df_admin_first = pd.read_sql("""
        SELECT 
            id,
            nome_admin AS name,
            email,
            senha AS password
        FROM admin
        """,
        first_conn
    )
      
    df_enterprise_first = pd.read_sql("""
        SELECT 
            c.id,
            c.cnpj,
            c.email,
            c.senha AS password,
            c.nome AS name,
            MAX(t.num_telefone) AS phone_number,
            c.id_endereco AS address_id,
            COALESCE(MAX(i.id_plano), 1) AS plan_id
        FROM cliente c
        JOIN telefone t ON t.id_cliente = c.id
        LEFT JOIN industria i ON i.id_cliente = c.id
        GROUP BY c.id, c.cnpj, c.email, password, name, address_id
        """, 
        first_conn
    )

    df_address_first = pd.read_sql("""
        SELECT 
            id,
            cep,
            cep_cidade AS city,
            cep_bairro AS neighbourhood,
            cep_rua_num AS house_number,
            cep_estado AS state,
            cep_rua AS public_place,
            cep_complemento AS complement
        FROM endereco
        """,
        first_conn
    )

    df_employee_first = pd.read_sql("""
        SELECT 
            id,
            nome AS name,
            email,
            senha AS password,
            id_empresa AS enterprise_id,
            COALESCE(is_admin, false) AS is_admin
        FROM funcionario
        WHERE id_empresa is not null
        """,
        first_conn
    )
    
except Exception as e:
    print(f"Erro ao extrair dados: {e}")

--- Extraindo dados do banco de origem ---


  df_admin_first = pd.read_sql("""
  df_enterprise_first = pd.read_sql("""
  df_address_first = pd.read_sql("""
  df_employee_first = pd.read_sql("""


## Preparação dos Dados

In [6]:
# Preparando dados adicionais
df_admin_first['write'] = False
df_admin_first['image'] = None

# Inserção de imagem genérica para inserção no banco da segunda série
default_image = "https://res.cloudinary.com/dxdjsvo0e/image/upload/v1761314229/f682575d-3083-45b2-a065-f043cf789d64_aonwbs.jpg"
df_enterprise_first["enterprise_image"] = default_image

## Sincronização de Dados

A ordem de inserção é importante devido às dependências de chaves estrangeiras:
1. Admin (sem dependências)
2. Address (sem dependências)
3. Enterprise (depende de Address)
4. Employee (depende de Enterprise)

In [7]:
print("--- Iniciando sincronização ---")

# 1. Admin insert (COM coluna única)
print("\n1. Inserindo admins...")
admin_mapping = upsert_data_with_fks(
    df_admin_first, 
    'email', 
    ['name', 'email', 'password', 'write'], 
    'admin_saveit', 
    second_cur, 
    second_conn
)

--- Iniciando sincronização ---

1. Inserindo admins...


In [8]:
# 2. Address insert (SEM COLUNA ÚNICA - usa chave composta)
print("\n2. Inserindo endereços...")
address_mapping = upsert_data_with_fks(
    df_address_first,
    None,  # Não há coluna única
    ['cep', 'city', 'neighbourhood', 'house_number', 'state', 'public_place', 'complement'], 
    'address', 
    second_cur, 
    second_conn,
    composite_key_cols=['cep', 'city', 'neighbourhood', 'house_number']  # Chave composta
)


2. Inserindo endereços...


In [9]:
# 3. Enterprise insert (depende de address)
print("\n3. Inserindo empresas...")
enterprise_mapping = upsert_data_with_fks(
    df_enterprise_first, 
    'cnpj', 
    ['cnpj', 'email', 'password', 'name', 'phone_number', 'address_id', 'plan_id', 'enterprise_image'], 
    'enterprise', 
    second_cur, 
    second_conn,
    fk_mappings=[('address_id', address_mapping)]
)


3. Inserindo empresas...


In [10]:
# 4. Employee insert (depende de enterprise)
print("\n4. Inserindo funcionários...")
employee_mapping = upsert_data_with_fks(
    df_employee_first, 
    'email', 
    ['name', 'email', 'password', 'enterprise_id', 'is_admin'], 
    'employee', 
    second_cur, 
    second_conn,
    fk_mappings=[('enterprise_id', enterprise_mapping)]
)


4. Inserindo funcionários...
AVISO: 4 valores não mapeados em enterprise_id
Erro ao inserir na tabela employee: bigint out of range

Valores: ['Mariana Oliveira', 'mariana@empresa1.com', '633687b0c0a44b85af838c859eb7a4f6e733ad604b8b27c57ab03e104aeadcdb', nan, False]
Erro ao inserir na tabela employee: bigint out of range

Valores: ['Carlos Silva', 'carlos@empresa1.com', '55a5e9e78207b4df8699d60886fa070079463547b095d1a05bc719bb4e6cd251', nan, True]
Erro ao inserir na tabela employee: bigint out of range

Valores: ['Rafael Nogueira', 'rafael.nogueira@gmaio.com', '675bf690d55e2a543a85e3c0999d0ce0e0c737e61c84d2926d92b59084b92e28', nan, False]
Erro ao inserir na tabela employee: bigint out of range

Valores: ['Beatriz Andrade', 'beatriz.andrade@gmail.com', 'd2f3dcf179934a213f2747a7dadc75b41a9fce6329b9af065cc111a71ede02e5', nan, False]


## Fechamento das Conexões

In [11]:
# Fechando conexões
first_cur.close()
first_conn.close()
second_cur.close()
second_conn.close()
print("\nConexões fechadas")


Conexões fechadas
