In [None]:
import pandas as pd
import psycopg2
from psycopg2 import sql
import json
import os
from modules.postgres_functions import probar_conexion_postgresql, create_sql_script, sql_query, df_to_sql

host = "localhost"
port = 5434
database = "kurtdb"
user = "postgres"
password = "kurt2025"

# Probar conexion


"""
-- Tabla para registrar errores
DROP TABLE IF EXISTS errores_ejemplo_tabla;
CREATE TABLE errores_ejemplo_tabla (
    id SERIAL PRIMARY KEY,
	table_name TEXT,
    error_message TEXT,
    error_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    data JSONB
);
"""
probar_conexion_postgresql(host, user, password, database, port)

In [None]:
from faker import Faker
import random

# Inicializa Faker
fake = Faker()

# Función para generar datos falsos
def generar_datos_fake(num_rows):
    datos = []
    for _ in range(num_rows):
        id_ = random.randint(1,20)
        nombre = fake.first_name()
        apellido = nombre
        cantidad = random.randint(1, 100)
        monto = round(random.uniform(10.0, 1000.0), 2)
        
        datos.append({
            "dni": id_,
            "nombre": nombre,
            "apellido": apellido,
            "cantidad": cantidad,
            "monto": monto
        })
    
    return pd.DataFrame(datos)


In [None]:
index_cols = ['nombre', 'apellido']
pk_name = 'dni'
num_filas = 5
df2 = generar_datos_fake(num_filas)
create_sql_script(df2, "tabla1", "query.sql", index_cols, pk_name, 'True')

script = open("query.sql", "r").read()

sql_query(host, password, database, user, port, script, values=None, fetch=False)

In [None]:
def generate_transaction_script(df, table_name, error_table_name):
    """
    Genera un script SQL dinámico para insertar filas de un DataFrame en PostgreSQL con manejo de errores.
    
    Args:
        df (pd.DataFrame): DataFrame con los datos a insertar.
        table_name (str): Nombre de la tabla destino.
        error_table_name (str): Nombre de la tabla para registrar errores.
    
    Returns:
        str: Script SQL.
    """
    # Obtener columnas del DataFrame
    columns = df.columns.tolist()
    
    # Generar la lista de columnas en formato SQL
    columns_sql = ", ".join(columns)
    
    # Generar las filas en formato SQL VALUES
    values = []
    for row in df.itertuples(index=False):
        row_values = ", ".join(
            f"'{x}'" if isinstance(x, str) else "NULL" if pd.isna(x) else str(x)
            for x in row
        )
        values.append(f"({row_values})")
    
    values_sql = ",\n            ".join(values)

    # Generar el script SQL
    script = f"""
DO $$
DECLARE
    fila RECORD;
BEGIN
    -- Iterar sobre las filas a insertar
    FOR fila IN
        SELECT * FROM (VALUES
            {values_sql}
        ) AS datos({', '.join(columns)})
    LOOP
        BEGIN
            -- Intentar insertar la fila
            INSERT INTO {table_name} ({columns_sql})
            VALUES ({', '.join(f'fila.{col}' for col in columns)});
        EXCEPTION
            WHEN OTHERS THEN
                -- Registrar el error en la tabla de errores
                INSERT INTO {error_table_name} (error_message, data)
                VALUES (SQLERRM, row_to_json(fila));
        END;
    END LOOP;
END $$;
"""
    return script


In [None]:
def generate_transaction_script(df, table_name, error_table_name):
    """
    Genera un script SQL dinámico para insertar filas de un DataFrame en PostgreSQL con manejo de errores.
    
    Args:
        df (pd.DataFrame): DataFrame con los datos a insertar.
        table_name (str): Nombre de la tabla destino.
        error_table_name (str): Nombre de la tabla para registrar errores.
    
    Returns:
        str: Script SQL.
    """
    # Obtener columnas del DataFrame
    columns = df.columns.tolist()
    
    # Generar la lista de columnas en formato SQL
    columns_sql = ", ".join(columns)
    
    # Generar las filas en formato SQL VALUES
    values = []
    for row in df.itertuples(index=False):
        row_values = ", ".join(
            f"'{x}'" if isinstance(x, str) else "NULL" if pd.isna(x) else str(x)
            for x in row
        )
        values.append(f"({row_values})")
    
    values_sql = ",\n            ".join(values)

    # Generar el script SQL
    script = f"""
DO $$
DECLARE
    fila RECORD;
BEGIN
    -- Iterar sobre las filas a insertar
    FOR fila IN
        SELECT * FROM (VALUES
            {values_sql}
        ) AS datos({', '.join(columns)})
    LOOP
        BEGIN
            -- Intentar insertar la fila
            INSERT INTO {table_name} ({columns_sql})
            VALUES ({', '.join(f'fila.{col}' for col in columns)});
        EXCEPTION
            WHEN OTHERS THEN
                -- Registrar el error en la tabla de errores incluyendo el nombre de la tabla de destino
                INSERT INTO {error_table_name} (table_name, error_message, data)
                VALUES ('{table_name}', SQLERRM, row_to_json(fila));
        END;
    END LOOP;
END $$;
"""
    return script


In [None]:
def generate_transaction_script(df, table_name, error_table_name, primary_key_column=None):
    """
    Genera un script SQL dinámico para insertar filas de un DataFrame en PostgreSQL con manejo de errores.
    Si se proporciona un primary_key_column, se agrega un ON CONFLICT DO UPDATE en el query.
    
    Args:
        df (pd.DataFrame): DataFrame con los datos a insertar.
        table_name (str): Nombre de la tabla destino.
        error_table_name (str): Nombre de la tabla para registrar errores.
        primary_key_column (str, optional): Nombre de la columna de la clave primaria para manejar conflictos.
    
    Returns:
        str: Script SQL.
    """
    # Obtener columnas del DataFrame
    columns = df.columns.tolist()
    
    # Generar la lista de columnas en formato SQL
    columns_sql = ", ".join(columns)
    
    # Generar las filas en formato SQL VALUES
    values = []
    for row in df.itertuples(index=False):
        row_values = ", ".join(
            f"'{x}'" if isinstance(x, str) else "NULL" if pd.isna(x) else str(x)
            for x in row
        )
        values.append(f"({row_values})")
    
    values_sql = ",\n            ".join(values)

    # Generar la parte del ON CONFLICT si se pasa la columna de la clave primaria
    on_conflict_sql = ""
    if primary_key_column:
        on_conflict_sql = f"""
            ON CONFLICT ({primary_key_column}) 
            DO UPDATE SET {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col != primary_key_column)}
        """

    # Generar el script SQL
    script = f"""
DO $$
DECLARE
    fila RECORD;
BEGIN
    -- Iterar sobre las filas a insertar
    FOR fila IN
        SELECT * FROM (VALUES
            {values_sql}
        ) AS datos({', '.join(columns)})
    LOOP
        BEGIN
            -- Intentar insertar la fila
            INSERT INTO {table_name} ({columns_sql})
            VALUES ({', '.join(f'fila.{col}' for col in columns)}) {on_conflict_sql};
        EXCEPTION
            WHEN OTHERS THEN
                -- Registrar el error en la tabla de errores incluyendo el nombre de la tabla de destino
                INSERT INTO {error_table_name} (table_name, error_message, data)
                VALUES ('{table_name}', SQLERRM, row_to_json(fila));
        END;
    END LOOP;
END $$;
"""
    return script


In [None]:
def generate_transaction_script(df, table_name, error_table_name, primary_key_column=None):
    """
    Genera un script SQL dinámico para insertar filas de un DataFrame en PostgreSQL con manejo de errores.
    Si se proporciona un primary_key_column, se agrega un ON CONFLICT DO UPDATE en el query.
    
    Args:
        df (pd.DataFrame): DataFrame con los datos a insertar.
        table_name (str): Nombre de la tabla destino.
        error_table_name (str): Nombre de la tabla para registrar errores.
        primary_key_column (str, optional): Nombre de la columna de la clave primaria para manejar conflictos.
    
    Returns:
        str: Script SQL.
    """
    # Obtener columnas del DataFrame
    columns = df.columns.tolist()
    
    # Generar la lista de columnas en formato SQL
    columns_sql = ", ".join(columns)
    
    # Generar las filas en formato SQL VALUES
    values = []
    for row in df.itertuples(index=False):
        row_values = ", ".join(
            f"'{x}'" if isinstance(x, str) else "NULL" if pd.isna(x) else str(x)
            for x in row
        )
        values.append(f"({row_values})")
    
    values_sql = ",\n            ".join(values)

    # Generar la parte del ON CONFLICT si se pasa la columna de la clave primaria
    on_conflict_sql = ""
    if primary_key_column:
        on_conflict_sql = f"""
            ON CONFLICT ({primary_key_column}) 
            DO UPDATE SET {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col != primary_key_column)}
        """

    # Generar el script SQL
    script = f"""
DO $$
DECLARE
    fila RECORD;
BEGIN
    -- Iterar sobre las filas a insertar
    FOR fila IN
        SELECT * FROM (VALUES
            {values_sql}
        ) AS datos({', '.join(columns)})
    LOOP
        BEGIN
            -- Intentar insertar la fila
            INSERT INTO {table_name} ({columns_sql})
            VALUES ({', '.join(f'fila.{col}' for col in columns)}) {on_conflict_sql};

        EXCEPTION
            WHEN unique_violation THEN
                -- Capturar el conflicto y registrar el error
                INSERT INTO {error_table_name} (table_name, error_message, data)
                VALUES ('{table_name}', 'Unique violation error: Conflict in columns {columns_sql}', row_to_json(fila));
            WHEN OTHERS THEN
                -- Registrar el error general en la tabla de errores
                INSERT INTO {error_table_name} (table_name, error_message, data)
                VALUES ('{table_name}', SQLERRM, row_to_json(fila));
        END;
    END LOOP;
END $$;
"""
    return script


In [None]:
def generate_transaction_script(df, table_name, error_table_name, file_name, primary_key_column=None):
    """
    Genera un script SQL dinámico para insertar filas de un DataFrame en PostgreSQL con manejo de errores.
    Si se proporciona un primary_key_column, se agrega un ON CONFLICT DO UPDATE en el query.
    
    Args:
        df (pd.DataFrame): DataFrame con los datos a insertar.
        table_name (str): Nombre de la tabla destino.
        error_table_name (str): Nombre de la tabla para registrar errores.
        primary_key_column (str, optional): Nombre de la columna de la clave primaria para manejar conflictos.
    
    Returns:
        str: Script SQL.
    """
    # Obtener columnas del DataFrame
    columns = df.columns.tolist()
    
    # Generar la lista de columnas en formato SQL
    columns_sql = ", ".join(columns)
    
    # Generar las filas en formato SQL VALUES
    values = []
    for row in df.itertuples(index=False):
        row_values = ", ".join(
            f"'{x}'" if isinstance(x, str) else "NULL" if pd.isna(x) else str(x)
            for x in row
        )
        values.append(f"({row_values})")
    
    values_sql = ",\n            ".join(values)

    # Generar la parte del ON CONFLICT si se pasa la columna de la clave primaria
    on_conflict_sql = ""
    if primary_key_column:
        on_conflict_sql = f"""
            ON CONFLICT ({primary_key_column}) 
            DO UPDATE SET {', '.join(f"{col} = EXCLUDED.{col}" for col in columns if col != primary_key_column)}
        """

    # Generar el script SQL
    script = f"""
DO $$
DECLARE
    fila RECORD;
BEGIN
    -- Iterar sobre las filas a insertar
    FOR fila IN
        SELECT * FROM (VALUES
            {values_sql}
        ) AS datos({', '.join(columns)})
    LOOP
        BEGIN
            -- Intentar insertar la fila
            INSERT INTO {table_name} ({columns_sql})
            VALUES ({', '.join(f'fila.{col}' for col in columns)}) {on_conflict_sql};

        EXCEPTION
            WHEN unique_violation THEN
                -- Capturar el conflicto y registrar el error con detalles específicos
                INSERT INTO {error_table_name} (table_name, error_message, data)
                VALUES (
                    '{table_name}', 
                    'Unique violation error: Conflict in columns {', '.join(columns)}', 
                    row_to_json(fila)
                );
            WHEN foreign_key_violation THEN
                -- Error de clave foránea
                INSERT INTO {error_table_name} (table_name, error_message, data)
                VALUES (
                    '{table_name}', 
                    'Foreign key violation error: Invalid foreign key value in row', 
                    row_to_json(fila)
                );
            WHEN check_violation THEN
                -- Error de violación de restricciones de CHECK
                INSERT INTO {error_table_name} (table_name, error_message, data)
                VALUES (
                    '{table_name}', 
                    'Check constraint violation error: Invalid value in row',
                    row_to_json(fila)
                );
            WHEN others THEN
                -- Registrar error general con detalles
                INSERT INTO {error_table_name} (table_name, error_message, data)
                VALUES (
                    '{table_name}', 
                    'General error: ' || SQLERRM || ' | Row data: ' || row_to_json(fila)::text, 
                    row_to_json(fila)
                );
        END;
    END LOOP;
END $$;
"""
    with open(file_name, "w") as file:
        file.write(script)
    return script


In [None]:
df2 = generar_datos_fake(num_filas)
script_trans = generate_transaction_script(df2, "tabla1", "errores_ejemplo_tabla")
print(script_trans)
with open('script1.sql', 'w') as f:
    f.write(script_trans)
sql_query(host, password, database, user, port, script_trans, values=None, fetch=False)

In [None]:
df2 = generar_datos_fake(50)
script_trans = generate_transaction_script(df2, "tabla1", "errores_ejemplo_tabla", "dni")
print(script_trans)
sql_query(host, password, database, user, port, script_trans, values=None, fetch=False)

In [None]:
# Create a function to read dataframe col types and create random data

def generate_random_data(df):
    """
    Generate random data for a given dataframe
    
    Args:
        df (pd.DataFrame): DataFrame with the columns to generate random data
    
    Returns:
        pd.DataFrame: DataFrame with random data
    """
    data = {}
    for col in df.columns:
        if df[col].dtype == "int64":
            data[col] = [random.randint(1, 100) for _ in range(df.shape[0])]
        elif df[col].dtype == "float64":
            data[col] = [round(random.uniform(10.0, 1000.0), 2) for _ in range(df.shape[0])]
        elif df[col].dtype == "object":
            data[col] = [fake.first_name() for _ in range(df.shape[0])]
    
    return pd.DataFrame(data)