In [None]:
# # Instalación de dependencias (si no las tienes)
# # !pip install connectorx polars

# import connectorx as cx
# import polars as pl

# # Configuración de conexión
# connection_str = (
#     "mssql://datalitica:425am953@192.168.0.20:1433/expreso?"
#     "driver=ODBC+Driver+17+for+SQL+Server"
#     "timeout=0"
# )

# # Consulta SQL
# query = "SELECT * FROM dbo.Asientos"

# # Cargar datos directamente a un DataFrame de Polars
# df = cx.read_sql(
#     conn=connection_str,
#     query=query,
#     return_type="polars",
#     protocol="mssql"  # Especificar el dialecto de SQL Server
# )

# # Mostrar resultados
# print(f"DataFrame shape: {df.shape}")
# print(df.head())

In [1]:
import pyodbc

def create_connection():
    connection_string = (
        'DRIVER={FreeTDS};'
        'SERVER=192.168.0.20;'
        'PORT=1433;'
        'DATABASE=expreso;'
        'UID=datalitica;'
        'PWD=425am953;'
        'TDS_Version=7.2;'
    )
    try:
        conn = pyodbc.connect(connection_string)
        return conn
    except Exception as e:
        print(f"Error al conectar: {e}")
        return None

mssql_conn = create_connection()

In [2]:
import psycopg2

def create_postgres_connection():
    try:
        conn = psycopg2.connect(
            host="localhost",
            database="rivadavia",
            user="rivadavia_user",
            password="Iud1g8Rou!JV&n",
            port="5432"
        )
        return conn
    except Exception as e:
        print(f"Error al conectar a PostgreSQL: {e}")
        return None

# Crear conexión a PostgreSQL
pg_conn = create_postgres_connection()

In [3]:
import polars as pl

def get_table_stats(connection, table_names):
    results = []
    
    for table in table_names:
        # Obtener el número de filas y columnas
        row_count = pl.read_database(f"SELECT COUNT(*) FROM {table}", connection).item()
        col_count = len(pl.read_database(f"SELECT TOP 1 * FROM {table}", connection).columns)
        
        # Obtener el tamaño en MB
        size_mb = pl.read_database(
            f"""
            SELECT 
                SUM(a.total_pages) * 8 / 1024.0 AS size_mb
            FROM 
                sys.tables t
            INNER JOIN 
                sys.indexes i ON t.object_id = i.object_id
            INNER JOIN 
                sys.partitions p ON i.object_id = p.object_id AND i.index_id = p.index_id
            INNER JOIN 
                sys.allocation_units a ON p.partition_id = a.container_id
            WHERE 
                t.name = '{table.split('.')[-1]}'
            """,
            connection
        ).item()
        
        results.append({
            'table_name': table,
            'rows': row_count,
            'columns': col_count,
            'size_mb': round(size_mb, 2)
        })
    
    return pl.DataFrame(results)

# Lista de tablas
tables = [
    # 'dbo.Asientos',
    # 'dbo.Usuarios',
    # 'dbo.Guias',
    # 'dbo.GuiaTraza',
    # 'dbo.Ticket',
    # 'dbo.Clientes',
    # 'dbo.UbicacionGuia',
    # 'dbo.Valores',
    # 'dbo.Resumen',
    'dbo.Resumen1',
    # 'dbo.Sucursal',
    # 'dbo.Saldos'
]

# Obtener estadísticas
stats_df = get_table_stats(mssql_conn, tables)

In [4]:
print(stats_df)

shape: (6, 4)
┌───────────────────┬─────────┬─────────┬──────────────┐
│ table_name        ┆ rows    ┆ columns ┆ size_mb      │
│ ---               ┆ ---     ┆ ---     ┆ ---          │
│ str               ┆ i64     ┆ i64     ┆ decimal[*,2] │
╞═══════════════════╪═════════╪═════════╪══════════════╡
│ dbo.Ticket        ┆ 228017  ┆ 27      ┆ 64.87        │
│ dbo.Clientes      ┆ 7645    ┆ 49      ┆ 5.68         │
│ dbo.UbicacionGuia ┆ 15      ┆ 4       ┆ 0.02         │
│ dbo.Valores       ┆ 192288  ┆ 10      ┆ 42.09        │
│ dbo.Resumen       ┆ 306296  ┆ 15      ┆ 51.59        │
│ dbo.Resumen1      ┆ 1271633 ┆ 5       ┆ 79.00        │
└───────────────────┴─────────┴─────────┴──────────────┘


In [13]:
guias_batchs = pl.read_database(
    "SELECT * FROM dbo.Resumen",
    mssql_conn,
    iter_batches=True,
    batch_size=100000,
)

In [14]:
import time
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from tqdm import tqdm
from memory_profiler import profile

pg_engine = create_engine('postgresql+psycopg2://rivadavia_user:Iud1g8Rou!JV&n@localhost:5432/rivadavia')

# Iniciar temporizador
start_time = time.time()

# Convertir guias_batchs en una lista para evitar el problema del generador
guias_batchs = list(guias_batchs)  

# Calcular el total de filas
total_filas = sum(batch.shape[0] for batch in guias_batchs)

# Medir tiempo transcurrido hasta aquí
elapsed_time = time.time() - start_time
print(f"\nTiempo para calcular total de filas: {elapsed_time:.2f} segundos")
print(f"Total de filas a procesar: {total_filas}")

# Crear una conexión para manejar la transacción
with pg_engine.connect() as connection:
    transaction = connection.begin()
    
    try:
        # Inicializar contador de filas
        filas_procesadas = 0
        
        # Usar tqdm para mostrar el progreso sin agregar nuevas líneas en cada iteración
        with tqdm(total=total_filas, desc="Procesando filas", unit="fila", ncols=100, dynamic_ncols=True) as pbar:
            for i, batch in enumerate(guias_batchs, 1):
                filas_lote = batch.shape[0]
                if_exists_mode = "replace" if i == 1 else "append"
                
                try:
                    # Escribir los lotes en PostgreSQL
                    batch.write_database(
                        table_name="expreso.guias",
                        connection=connection,
                        if_table_exists=if_exists_mode,
                        engine="sqlalchemy"
                    )
                except Exception as e:
                    # Si falla el replace porque la tabla existe, intentar con append
                    if "already exists" in str(e) and if_exists_mode == "replace":
                        batch.write_database(
                            table_name="expreso.guias",
                            connection=connection,
                            if_table_exists="append",
                            engine="sqlalchemy"
                        )
                    else:
                        raise e  # Relanzar otros errores

                # Actualizar contador de filas
                filas_procesadas += filas_lote
                pbar.update(filas_lote)
                del batch

        # Si todo fue bien, confirmar la transacción
        transaction.commit()
        print(f"\nTransacción completada exitosamente. Total filas procesadas: {filas_procesadas}")
        
    except Exception as e:
        # Si hay algún error, revertir la transacción
        transaction.rollback()
        print(f"\nError durante la transacción: {str(e)}")
        print(f"Filas procesadas antes del error: {filas_procesadas}")
        raise



Tiempo para calcular total de filas: 25.49 segundos
Total de filas a procesar: 306296


Procesando filas: 100%|██████████| 306296/306296 [08:59<00:00, 567.75fila/s]


Transacción completada exitosamente. Total filas procesadas: 306296



