# Tutorial: Esquema PostgreSQL para proyectos solares (desde Python)

**Objetivo:**  
Crear un esquema en PostgreSQL desde Python que modele proyectos solares, sistemas por proyecto, y datos meteorológicos y eléctricos por sistema. Mostrar cómo cargar CSV eléctricos con `pandas` y `psycopg2` usando inserciones masivas.

**Nota para la clase:** cada celda contiene explicación y código ejecutable. Asegúrate de ajustar las credenciales de la base de datos antes de ejecutar las celdas que crean el esquema o insertan datos.


## Requisitos

- Python 3.8+  
- Paquetes: `psycopg2-binary`, `pandas`

Instálalos ejecutando la celda siguiente (usa `%pip` en notebooks para asegurar que se instalen en el kernel actual).

In [25]:
# Instala las dependencias (descomenta si necesitas instalar)
# %pip install psycopg2-binary pandas
print('Si no tienes psycopg2-binary y pandas, descomenta la línea anterior y ejecútala.')

Si no tienes psycopg2-binary y pandas, descomenta la línea anterior y ejecútala.


## Importaciones y configuración básica

Define imports y la configuración de conexión (modifica `DB_CONFIG` con tus credenciales).
**No** compartas contraseñas reales en notebooks públicos; en clase puedes pedir a cada estudiante que use sus credenciales locales.


In [26]:
import os
from typing import Optional, Dict, Any
from datetime import datetime

# -------------------------
# CONFIGURA TUS CREDENCIALES AQUÍ
# -------------------------
# Cambia estos valores por los de tu servidor PostgreSQL antes de ejecutar la creación del esquema.
DB_CONFIG = {
    "host": "localhost",
    "port": 5432,
    "dbname": "solar_db",
    "user": "postgres",
    "password": "clase2021"
}

def _conn_str(cfg: Dict[str, Any]) -> str:
    return f"host={cfg['host']} port={cfg['port']} dbname={cfg['dbname']} user={cfg['user']} password={cfg['password']}"

def get_connection(cfg: Optional[Dict[str, Any]] = None):
    import psycopg2
    cfg = cfg or DB_CONFIG
    conn = psycopg2.connect(_conn_str(cfg))
    return conn

print('Imports y configuración listos. Revisa DB_CONFIG antes de crear el esquema.')

Imports y configuración listos. Revisa DB_CONFIG antes de crear el esquema.


## Crear esquema y tablas en PostgreSQL

Esta celda crea tablas: `projects`, `systems`, `meteo_data` y `electrical_data`, y los índices necesarios.


In [27]:
def create_schema(cfg: Optional[Dict[str, Any]] = None, schema_name: str = 'public'):
    conn = get_connection(cfg)
    cur = conn.cursor()
    cur.execute(f"""
    CREATE TABLE IF NOT EXISTS {schema_name}.projects (
        id SERIAL PRIMARY KEY,
        name TEXT NOT NULL,
        location TEXT,
        description TEXT,
        created_at TIMESTAMP WITH TIME ZONE DEFAULT now()
    );

    CREATE TABLE IF NOT EXISTS {schema_name}.systems (
        id SERIAL PRIMARY KEY,
        project_id INTEGER NOT NULL REFERENCES {schema_name}.projects(id) ON DELETE CASCADE,
        name TEXT NOT NULL,
        capacity_kw DOUBLE PRECISION,
        inverter_type TEXT,
        notes TEXT,
        created_at TIMESTAMP WITH TIME ZONE DEFAULT now()
    );

    CREATE TABLE IF NOT EXISTS {schema_name}.meteo_data (
        id SERIAL PRIMARY KEY,
        system_id INTEGER NOT NULL REFERENCES {schema_name}.systems(id) ON DELETE CASCADE,
        timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
        ghi DOUBLE PRECISION,
        dni DOUBLE PRECISION,
        dhi DOUBLE PRECISION,
        temp_c DOUBLE PRECISION,
        wind_m_s DOUBLE PRECISION,
        precip_mm DOUBLE PRECISION,
        source TEXT,
        inserted_at TIMESTAMP WITH TIME ZONE DEFAULT now()
    );

    CREATE TABLE IF NOT EXISTS {schema_name}.electrical_data (
        id SERIAL PRIMARY KEY,
        system_id INTEGER NOT NULL REFERENCES {schema_name}.systems(id) ON DELETE CASCADE,
        timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
        power_kw DOUBLE PRECISION,
        voltage_v DOUBLE PRECISION,
        current_a DOUBLE PRECISION,
        energy_kwh DOUBLE PRECISION,
        status TEXT,
        inserted_at TIMESTAMP WITH TIME ZONE DEFAULT now()
    );
    """)
    cur.execute(f"CREATE INDEX IF NOT EXISTS idx_systems_project ON {schema_name}.systems(project_id);")
    cur.execute(f"CREATE INDEX IF NOT EXISTS idx_meteo_system_time ON {schema_name}.meteo_data(system_id, timestamp);")
    cur.execute(f"CREATE INDEX IF NOT EXISTS idx_elec_system_time ON {schema_name}.electrical_data(system_id, timestamp);")
    conn.commit()
    cur.close()
    conn.close()
    print('Esquema y tablas creadas en PostgreSQL.')

print('Función create_schema definida. Ejecuta create_schema(DB_CONFIG) para crear tablas.')

Función create_schema definida. Ejecuta create_schema(DB_CONFIG) para crear tablas.


## Funciones CRUD básicas

Funciones para insertar proyectos y sistemas en la base de datos (devuelven el id insertado).


In [28]:
def add_project(name: str, location: Optional[str] = None, description: Optional[str] = None, cfg: Optional[Dict[str, Any]] = None) -> int:
    conn = get_connection(cfg)
    cur = conn.cursor()
    cur.execute("INSERT INTO projects (name, location, description) VALUES (%s, %s, %s) RETURNING id;", (name, location, description))
    project_id = cur.fetchone()[0]
    conn.commit()
    cur.close()
    conn.close()
    return project_id

def add_system(project_id: int, name: str, capacity_kw: Optional[float] = None,
               inverter_type: Optional[str] = None, notes: Optional[str] = None, cfg: Optional[Dict[str, Any]] = None) -> int:
    conn = get_connection(cfg)
    cur = conn.cursor()
    cur.execute(
        "INSERT INTO systems (project_id, name, capacity_kw, inverter_type, notes) VALUES (%s, %s, %s, %s, %s) RETURNING id;",
        (project_id, name, capacity_kw, inverter_type, notes)
    )
    system_id = cur.fetchone()[0]
    conn.commit()
    cur.close()
    conn.close()
    return system_id

print('Funciones add_project y add_system definidas.')

Funciones add_project y add_system definidas.


## Helpers y sanitización

Funciones internas para convertir valores leídos del CSV a tipos seguros.


In [29]:
def _safe_float(x) -> Optional[float]:
    try:
        if x is None:
            return None
        s = str(x).strip()
        if s == '' or s.lower() in ('nan', 'none', 'null'):
            return None
        s = s.replace(',', '.')
        return float(s)
    except Exception:
        return None

def _safe_str(x) -> Optional[str]:
    if x is None:
        return None
    s = str(x).strip()
    return s if s != '' else None

print('Helpers definidos.')

Helpers definidos.


## Ingestión de CSV a `electrical_data` (bulk insert)

La función siguiente lee un CSV con `pandas` por chunks y realiza inserciones masivas usando `psycopg2.extras.execute_values`.
Ajusta `col_mappings` si los nombres de columnas del CSV son distintos.


In [30]:
def ingest_electrical_csv(cfg: Optional[Dict[str, Any]], system_id: int, csv_path: str,
                          col_mappings: Optional[Dict[str, str]] = None,
                          chunk_size: int = 10000):
    import psycopg2
    import psycopg2.extras
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"No existe el archivo CSV: {csv_path}")

    default_map = {
        'timestamp': 'timestamp',
        'time': 'timestamp',
        'datetime': 'timestamp',
        'power_kw': 'power_kw',
        'power_kW': 'power_kw',
        'power': 'power_kw',
        'voltage_v': 'voltage_v',
        'voltage': 'voltage_v',
        'current_a': 'current_a',
        'current': 'current_a',
        'energy_kwh': 'energy_kwh',
        'energy': 'energy_kwh',
        'status': 'status'
    }
    combined_map = {**default_map, **(col_mappings or {})}

    total_inserted = 0
    for chunk in pd.read_csv(csv_path, chunksize=chunk_size):
        chunk.columns = [c.strip() for c in chunk.columns]
        map_existing = {}
        for c in chunk.columns:
            lower = c.lower()
            if lower in combined_map:
                map_existing[c] = combined_map[lower]
            elif lower in {'timestamp', 'power_kw', 'voltage_v', 'current_a', 'energy_kwh', 'status'}:
                map_existing[c] = lower

        if not any(v == 'timestamp' for v in map_existing.values()):
            raise ValueError(f"El CSV no contiene columna de timestamp reconocida. Columnas: {list(chunk.columns)}")

        df = pd.DataFrame()
        for src, target in map_existing.items():
            df[target] = chunk[src]

        df['timestamp'] = pd.to_datetime(df['timestamp'], infer_datetime_format=True, errors='coerce')
        n_before = len(df)
        df = df.dropna(subset=['timestamp'])
        n_after = len(df)
        if n_after < n_before:
            print(f"Descartadas {n_before-n_after} filas con timestamp inválido en chunk.")

        for col in ['power_kw', 'voltage_v', 'current_a', 'energy_kwh', 'status']:
            if col not in df.columns:
                df[col] = None

        tuples = [
            (
                system_id,
                pd.Timestamp(row['timestamp']).to_pydatetime(),
                _safe_float(row['power_kw']),
                _safe_float(row['voltage_v']),
                _safe_float(row['current_a']),
                _safe_float(row['energy_kwh']),
                _safe_str(row['status'])
            )
            for _, row in df.iterrows()
        ]

        conn = get_connection(cfg)
        cur = conn.cursor()
        insert_sql = """
            INSERT INTO electrical_data (system_id, timestamp, power_kw, voltage_v, current_a, energy_kwh, status)
            VALUES %s
        """
        try:
            psycopg2.extras.execute_values(cur, insert_sql, tuples, template=None, page_size=1000)
            conn.commit()
            inserted = len(tuples)
            total_inserted += inserted
            print(f"Insertadas {inserted} filas (acumulado: {total_inserted})")
        except Exception as e:
            conn.rollback()
            raise RuntimeError(f"Error insertando chunk: {e}")
        finally:
            cur.close()
            conn.close()

    print(f"Ingestión finalizada. Total filas insertadas: {total_inserted}")

## Generar un CSV de ejemplo (local) y probar la ingestión

La siguiente celda genera un CSV sintético en la carpeta del notebook y lo ingresa al `system_id` creado en el ejemplo.


In [31]:
# Generar CSV de ejemplo
import pandas as pd
sample_csv = 'datos_electricos_ejemplo_pg.csv'
from datetime import timedelta
start = datetime(2025, 10, 1, 0, 0)
rows = []
for i in range(24):
    t = start + timedelta(hours=i)
    power = max(0, 500 * (1 - abs(12 - i)/12))
    voltage = 400 + (i % 3)
    current = power / (voltage if voltage != 0 else 1)
    energy = power / 1000.0
    status = 'OK' if power > 0 else 'NO_GEN'
    rows.append([t.strftime('%Y-%m-%d %H:%M:%S'), round(power,2), round(voltage,2), round(current,3), round(energy,4), status])

pd.DataFrame(rows, columns=['timestamp','power_kW','voltage','current','energy','status']).to_csv(sample_csv, index=False)
print('CSV de ejemplo creado:', sample_csv)

CSV de ejemplo creado: datos_electricos_ejemplo_pg.csv


## Ejemplo completo: crear esquema, insertar proyecto/sistema e ingestar CSV

Ejecuta esta celda **después** de haber ajustado `DB_CONFIG` y de haber creado el esquema (o puedes dejar que la celda cree el esquema).


In [39]:
# EJEMPLO: crear esquema y agregar datos de ejemplo
# Asegúrate de haber ajustado DB_CONFIG en la celda de configuración.

# try:
create_schema(DB_CONFIG)
p_id = add_project('Parque Solar Postgres Demo', location='Campus', description='Demo PostgreSQL')
print('Proyecto creado id:', p_id)
s_id = add_system(p_id, 'Array Demo', capacity_kw=1200.0, inverter_type='String')
print('Sistema creado id:', s_id)

# Ingestar CSV generado en la celda anterior
ingest_electrical_csv(DB_CONFIG, s_id, sample_csv)
# except Exception as e:
#     print('Error en el flujo de ejemplo:', e)

Esquema y tablas creadas en PostgreSQL.
Proyecto creado id: 1
Sistema creado id: 1
Insertadas 24 filas (acumulado: 24)
Ingestión finalizada. Total filas insertadas: 24


  df['timestamp'] = pd.to_datetime(df['timestamp'], infer_datetime_format=True, errors='coerce')


## Consultas simples

Ejemplo de cómo leer algunas filas desde `electrical_data` para verificar que la ingestión funcionó.


In [None]:
# Consultar las primeras filas del sistema creado (si existe)
try:
    conn = get_connection(DB_CONFIG)
    cur = conn.cursor()
    s_id = globals().get('s_id', None)
    if s_id is None:
        print('s_id no encontrado en el entorno. Ejecuta la celda de ejemplo que crea el sistema.')
    else:
        cur.execute('SELECT id, system_id, timestamp, power_kw, voltage_v, current_a, energy_kwh, status FROM electrical_data WHERE system_id = %s ORDER BY timestamp LIMIT 10;', (s_id,))
        rows = cur.fetchall()
        for r in rows:
            print(r)
    cur.close()
    conn.close()
except Exception as e:
    print('No se pudo ejecutar la consulta:', e)

---
### Sugerencias para la clase

- Pide a los estudiantes modificar `col_mappings` si cambian nombres de columnas en el CSV.  
- Añade validaciones (rangos físicos, duplicados por timestamp).  
- Para producción: usar pool de conexiones, manejo de transacciones más fino, y monitoreo.  
- Si quieres, puedo convertir este notebook en una versión con celdas que muestren gráficos (matplotlib) de potencia a lo largo del día.
