# Taller 4

## Carga CSV a MySQL con carga inicial e incremental

*mysql -u root -p*


In [1]:
import pandas as pd
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import os
from datetime import datetime

load_dotenv()
DB_HOST = os.getenv('DB_HOST')
DB_PORT = os.getenv('DB_PORT')
DB_USER = os.getenv('DB_USER')
DB_PASSWORD = os.getenv('DB_PASSWORD')
DB_NAME = os.getenv('DB_NAME')

# Agregar parámetros de conexión para manejar la autenticación MySQL 8.0+
DB_URL = f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4&auth_plugin_map=caching_sha2_password:mysql_native_password'

# Crear engine con configuraciones adicionales
engine = create_engine(
    DB_URL,
    pool_pre_ping=True,
    pool_recycle=300,
    echo=False
)

ddl = """
    CREATE TABLE IF NOT EXISTS ventas (
    venta_id INT AUTO_INCREMENT PRIMARY KEY,
    cliente VARCHAR(100),
    producto VARCHAR(100),
    categoria VARCHAR(100),
    precio_unitario DECIMAL(10, 2),
    unidades INT,
    fecha DATE
    );
"""

try:
    with engine.connect() as connection:
        connection.execute(text(ddl))
        connection.commit()
    print(f"Tabla creada exitosamente. {datetime.now()}")
except Exception as e:
    print(f"Error al conectar con la base de datos: {e}")
    print("Intentando conexión alternativa...")
    
    # Intentar con configuración alternativa
    DB_URL_ALT = f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4'
    engine_alt = create_engine(DB_URL_ALT, pool_pre_ping=True)
    
    try:
        with engine_alt.connect() as connection:
            connection.execute(text(ddl))
            connection.commit()
        print("Tabla creada exitosamente con configuración alternativa.")
        engine = engine_alt  # Usar la configuración alternativa
    except Exception as e2:
        print(f"Error con configuración alternativa: {e2}")
        print("Verificar que MySQL esté ejecutándose: docker-compose up -d")

Tabla creada exitosamente. 2025-10-10 20:50:31.970702


In [2]:
# Leer CSV

csv_file_path = 'ventas.csv'
df = pd.read_csv(csv_file_path) 

df['precio_unitario'] = pd.to_numeric(df['precio_unitario'], errors='coerce')
df['unidades'] = pd.to_numeric(df['unidades'], errors='coerce')
df['fecha'] = pd.to_datetime(df['fecha'], errors='coerce').dt.date

with engine.begin() as conn:
    conn.execute(text("TRUNCATE TABLE ventas"))

df.to_sql('ventas', con=engine, if_exists='append', index=False, method='multi', chunksize=1000)

print(f"Datos insertados exitosamente. {datetime.now()} - Total filas insertadas: {len(df)}")



Datos insertados exitosamente. 2025-10-10 20:50:32.127040 - Total filas insertadas: 20


In [4]:
from sqlalchemy import text

df_inc = pd.read_csv('ventas_incrementales.csv')

df_inc['precio_unitario'] = pd.to_numeric(df_inc['precio_unitario'], errors='coerce')
df_inc['unidades'] = pd.to_numeric(df_inc['unidades'], errors='coerce')
df_inc['fecha'] = pd.to_datetime(df_inc['fecha'], errors='coerce').dt.date

### Convertimos a lista de diccionarios para ejecucciones masivas

data = df_inc.to_dict(orient='records')

upsert_sql = text("""
    INSERT INTO ventas (cliente, producto, categoria, precio_unitario, unidades, fecha)
    VALUES (:cliente, :producto, :categoria, :precio_unitario, :unidades, :fecha)
    ON DUPLICATE KEY UPDATE
                  precio_unitario = VALUES(precio_unitario),
        unidades = VALUES(unidades),
        fecha = VALUES(fecha);""")

batch = 1000

with engine.begin() as conn:
    conn.execute(upsert_sql, data, execution_options={"batch_size": batch})

print(f"Datos incrementales insertados/actualizados exitosamente. {datetime.now()} - Total filas procesadas: {len(df_inc)}")



Datos incrementales insertados/actualizados exitosamente. 2025-10-10 21:04:59.604773 - Total filas procesadas: 4


In [7]:
## Mostrar primeras y ultimas filas de la tabla ventas
all_rows = pd.read_sql("SELECT * FROM ventas", con=engine)
print(all_rows.head(5))
print(all_rows.tail(5))
print(f"Total Registros {len(all_rows)} ")


# Top productos pro ventas (precio * unidades)
top_productos = pd.read_sql("""
    SELECT producto, SUM(precio_unitario * unidades) AS total_ventas
    FROM ventas
    GROUP BY producto
    ORDER BY total_ventas DESC
    LIMIT 10;
""", con=engine)
print(f"=" * 40)
print("Top 10 productos por ventas")
print(top_productos)


   venta_id cliente   producto    categoria  precio_unitario  unidades  \
0         1     Ana     Laptop  Electrónica           2500.0         1   
1         2    Luis   Teléfono  Electrónica           1200.0         2   
2         3   Cesar     Tablet  Electrónica            800.0         1   
3         4   María  Audífonos   Accesorios            150.0         3   
4         5   Pedro    Monitor  Electrónica            600.0         2   

        fecha  
0  2023-01-01  
1  2023-01-02  
2  2023-01-02  
3  2023-01-03  
4  2023-01-04  
    venta_id cliente producto    categoria  precio_unitario  unidades  \
23        24     Leo  Lámpara   Accesorios             40.0         2   
24        25   Pedro  Monitor  Electrónica            650.0         1   
25        26    Nora   Cámara  Electrónica            350.0         1   
26        27  Andrés  Lámpara   Accesorios             40.0         2   
27        28     Leo  Lámpara   Accesorios             40.0         2   

         fecha  
23 