# API SQLite3 con Flask
Este notebook contiene las funciones para interactuar con SQLite3 y una API Flask para exponer los endpoints.

## 1. Funciones de Base de Datos

In [14]:
from __future__ import annotations

import os
import pandas as pd
import re
import sqlite3
from typing import Any, Dict, List, Tuple, Optional

# ------------------------------
# Config & Pluggable Connection
# ------------------------------

# Environment variables (defaults for SQLite)
DB_BACKEND = os.getenv('DB_BACKEND', 'sqlite').lower()
SQLITE_PATH = os.getenv('DB_FILE_PATH', 'dev.db')

def get_db_connection(db_path: Optional[str] = None):
    """
    Connection factory. Replace/extend this to support other DB engines.
    Currently supports SQLite.
    """
    backend = DB_BACKEND

    if backend == 'sqlite':
        path = db_path or SQLITE_PATH
        return sqlite3.connect(path)
    else:
        raise NotImplementedError(f"DB_BACKEND '{backend}' not implemented yet.")

# ------------------------------
# Helpers
# ------------------------------

_IDENTIFIER_RX = re.compile(r'^[A-Za-z_][A-Za-z0-9_]*$')

def validate_identifier(name: str, what: str = 'identifier') -> str:
    """
    Basic whitelist validation for table/column identifiers to reduce SQL injection risk.
    """
    if not isinstance(name, str) or not _IDENTIFIER_RX.match(name):
        raise ValueError(f"Invalid {what}: {name!r}")
    return name

def rows_to_dicts(cursor, rows: List[Tuple]) -> List[Dict[str, Any]]:
    columns = [col[0] for col in cursor.description] if cursor.description else []
    result = []
    for row in rows:
        if columns:
            result.append({col: row[idx] for idx, col in enumerate(columns)})
        else:
            result.append({'value': row})
    return result

# ------------------------------
# Core DB Functions
# ------------------------------

def db_tables(data_db: str = SQLITE_PATH) -> List[str]:
    """
    Devuelve el listado de tablas de la base de datos.
    """
    conn = get_db_connection(data_db)
    try:
        cur = conn.cursor()
        cur.execute(
            "SELECT name FROM sqlite_master WHERE type = 'table' AND name NOT LIKE 'sqlite_%' ORDER BY name"
        )
        rows = cur.fetchall()
        return [r[0] for r in rows]
    finally:
        conn.close()

def db_table_schema(tabla: str, data_db: str = SQLITE_PATH) -> List[Dict[str, Any]]:
    """
    Devuelve la lista de campos (columnas) de una tabla.
    """
    t = validate_identifier(tabla, 'table name')
    conn = get_db_connection(data_db)
    try:
        cur = conn.cursor()
        cur.execute(f'PRAGMA table_info({t})')
        rows = cur.fetchall()
        return [
            {
                'cid': r[0],
                'name': r[1],
                'type': r[2],
                'notnull': bool(r[3]),
                'dflt_value': r[4],
                'pk': bool(r[5]),
            }
            for r in rows
        ]
    finally:
        conn.close()

def db_update(tabla: str, campo: str, valor: Any, condicion_sql: str, data_db: str = SQLITE_PATH) -> Dict[str, Any]:
    """
    UPDATE <tabla> SET <campo> = ? WHERE <condicion_sql>
    """
    t = validate_identifier(tabla, 'table name')
    c = validate_identifier(campo, 'column name')
    sql = f'UPDATE {t} SET {c} = ? WHERE {condicion_sql}'
    conn = get_db_connection(data_db)
    try:
        cur = conn.cursor()
        cur.execute(sql, (valor,))
        conn.commit()
        res = {'rowcount': cur.rowcount}
    finally:
        conn.close()
    return res


def db_insert(
    tabla: str,
    json_valores: Dict[str, Any],
    data_db: str = SQLITE_PATH,
    update_on_conflict: bool = False,
    conflict_cols: Optional[List[str]] = None,
) -> Dict[str, Any]:
    """
    INSERT INTO <tabla> (<cols...>) VALUES (<placeholders...>)
    """
    t = validate_identifier(tabla, "table name")
    if not isinstance(json_valores, dict) or not json_valores:
        raise ValueError("json_valores debe ser un objeto con al menos una clave.")
    cols = [validate_identifier(k, "column name") for k in json_valores.keys()]
    vals = list(json_valores.values())
    placeholders = ", ".join(["?"] * len(cols))
    col_list = ", ".join(cols)
    base_sql = f"INSERT INTO {t} ({col_list}) VALUES ({placeholders})"

    # Optional UPSERT clause
    sql = base_sql
    if update_on_conflict:
        if not conflict_cols:
            raise ValueError("Debe proporcionar 'conflict_cols' cuando 'update_on_conflict' es True.")
        conflict_cols_safe = [validate_identifier(c, "conflict column") for c in conflict_cols]
        # Actualizar todas las columnas proporcionadas excepto las de conflicto
        update_cols = [c for c in cols if c not in conflict_cols_safe]
        if not update_cols:
            raise ValueError("No hay columnas para actualizar en el UPSERT (todas están en conflict_cols).")
        set_expr = ", ".join([f"{c} = excluded.{c}" for c in update_cols])
        sql = base_sql + f" ON CONFLICT ({', '.join(conflict_cols_safe)}) DO UPDATE SET {set_expr}"
    conn = get_db_connection(data_db)
    try:
        cur = conn.cursor()
        cur.execute(sql, vals)
        conn.commit()
        return {"rowcount": cur.rowcount, "lastrowid": getattr(cur, "lastrowid", None)}
    except sqlite3.IntegrityError as ie:
        # Rollback to release write lock and re-raise as ValueError with friendly message
        try:
            conn.rollback()
        except Exception:
            pass
        raise ValueError(f"Violación de integridad (UNIQUE/FOREIGN KEY): {ie}")
    except Exception:
        # Ensure rollback on any other failure to avoid lingering locks
        try:
            conn.rollback()
        except Exception:
            pass
        raise
    finally:
        conn.close()


def db_delete(tabla: str, condicion_sql: str, data_db: str = SQLITE_PATH) -> Dict[str, Any]:
    """
    DELETE FROM <tabla> WHERE <condicion_sql>
    """
    t = validate_identifier(tabla, 'table name')
    sql = f'DELETE FROM {t} WHERE {condicion_sql}'
    conn = get_db_connection(data_db)
    try:
        cur = conn.cursor()
        cur.execute(sql)
        conn.commit()
        res = {'rowcount': cur.rowcount}
    finally:
        conn.close()
    return res

def db_delete_pk(tabla: str, pk: str, valor: Any, data_db: str = SQLITE_PATH) -> Dict[str, Any]:
    """
    DELETE FROM <tabla> WHERE <pk> = ?
    """
    t = validate_identifier(tabla, 'table name')
    p = validate_identifier(pk, 'primary key column')
    sql = f'DELETE FROM {t} WHERE {p} = ?'
    conn = get_db_connection(data_db)
    try:
        cur = conn.cursor()
        cur.execute(sql, (valor,))
        conn.commit()
        res = {'rowcount': cur.rowcount}
    finally:
        conn.close()
    return res


def db_read(
    tabla: str,
    campos: Optional[List[str]] = None,
    condicion_sql: Optional[str] = None,
    data_db: str = SQLITE_PATH,
) -> Dict[str, Any]:
    """
    SELECT <campos> FROM <tabla> [WHERE <condicion_sql>]
    - tabla: nombre de la tabla
    - campos: lista de columnas o None/'*' para todas
    - condicion_sql: cláusula WHERE cruda (sin "WHERE")
    """
    t = validate_identifier(tabla, "table name")
    # Build column list
    if not campos or (isinstance(campos, list) and len(campos) == 0):
        col_expr = "*"
    else:
        # Allow caller to pass '*' as a single-item list or string
        if isinstance(campos, list):
            if len(campos) == 1 and campos[0] == "*":
                col_expr = "*"
            else:
                safe_cols = [validate_identifier(c, "column name") for c in campos]
                col_expr = ", ".join(safe_cols)
        else:
            # Unexpected type
            raise ValueError("'campos' debe ser una lista de nombres de columna o estar vacío para '*'.")

    sql = f"SELECT {col_expr} FROM {t}"
    if condicion_sql:
        sql += f" WHERE {condicion_sql}"

    conn = get_db_connection(data_db)
    try:
        cur = conn.cursor()
        cur.execute(sql)
        rows = cur.fetchall()
        columns = [col[0] for col in cur.description] if cur.description else []
        return {
            "columns": columns,
            "rows": rows,
            "rowcount": len(rows),
        }
    finally:
        conn.close()

def json_to_dataframe(json_data):
    """
    Convierte un JSON con formato específico a DataFrame de pandas
    
    Args:
        json_data: Puede ser un diccionario Python o string JSON con el formato:
                  {'columns': [lista_columnas], 'rows': [lista_tuplas], 'rowcount': n}
    
    Returns:
        pandas.DataFrame: DataFrame con los datos del JSON
    """
    # Si el input es un string, convertirlo a diccionario
    if isinstance(json_data, str):
        json_data = json.loads(json_data)
    
    # Crear DataFrame directamente desde las columnas y filas
    df = pd.DataFrame(json_data['rows'], columns=json_data['columns'])
    
    return df


## 2. Ejemplos de Uso de las Funciones

In [21]:
# Crear una base de datos de ejemplo
import sqlite3

# Crear base de datos de prueba
conn = sqlite3.connect('dev.db')
cursor = conn.cursor()

cursor.execute('''DROP TABLE IF EXISTS usuarios''')

# Crear tabla de usuarios
cursor.execute('''
    CREATE TABLE IF NOT EXISTS usuarios (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        nombre TEXT NOT NULL,
        email TEXT UNIQUE NOT NULL,
        edad INTEGER
    )
''')

# Insertar datos de ejemplo
cursor.execute('''
    INSERT OR IGNORE INTO usuarios (nombre, email, edad) 
    VALUES 
    ('Juan Pérez', 'juan@example.com', 30),
    ('María García', 'maria@example.com', 25),
    ('Carlos López', 'carlos@example.com', 35)
''')

conn.commit()
conn.close()

print('Base de datos de ejemplo creada exitosamente!')


Base de datos de ejemplo creada exitosamente!


In [22]:
# Ejemplos de uso de las funciones
print('=== Listado de Tablas ===')
tablas = db_tables()
print(f'Tablas en la base de datos: {tablas}')

=== Listado de Tablas ===
Tablas en la base de datos: ['usuarios']


In [23]:
print('=== Esquema de la tabla usuarios ===')
table = 'usuarios'
esquema = db_table_schema('usuarios')
for col in esquema:
    print(f"Col: {col['name']}, Tipo: {col['type']}, PK: {col['pk']}")

=== Esquema de la tabla usuarios ===
Col: id, Tipo: INTEGER, PK: True
Col: nombre, Tipo: TEXT, PK: False
Col: email, Tipo: TEXT, PK: False
Col: edad, Tipo: INTEGER, PK: False


In [26]:
print('=== Insertar nuevo usuario ===')
nuevo_usuario_ = {
    'nombre': 'Maria López',
    'email': 'maria_lopez@example.com', 
    'edad': 51
}
# en caso de conflicto en email, actualizar el registro existente
resultado_insert = db_insert('usuarios', nuevo_usuario_, update_on_conflict=True, conflict_cols=['email'])
print(f'Usuario insertado: {resultado_insert}')

=== Insertar nuevo usuario ===
Usuario insertado: {'rowcount': 1, 'lastrowid': 0}


In [11]:
print('=== Actualizar usuario ===')
resultado_update = db_update('usuarios', 'edad', 31, "nombre = 'Juan Pérez'")
print(f'Usuarios actualizados: {resultado_update}')

=== Actualizar usuario ===
Usuarios actualizados: {'rowcount': 1}


In [27]:
print('=== Lee tabla ===')
resultado_read =  db_read('usuarios')
df = json_to_dataframe(resultado_read)
df

=== Lee tabla ===


Unnamed: 0,id,nombre,email,edad
0,1,Juan Pérez,juan@example.com,30
1,2,María García,maria@example.com,25
2,3,Carlos López,carlos@example.com,35
3,4,Maria López,maria_lopez@example.com,51


In [None]:
db_read('usuarios')

## 3. Backend en FastAPI

In [None]:
from flask import Flask, request, jsonify
import threading
import time
import requests

app = Flask(__name__)

# ------------------------------
# Flask Routes
# ------------------------------
@app.route('/db/update', methods=['POST'])
def route_db_update():
    """
    POST /db/update
    JSON: {'tabla': '...', 'campo': '...', 'valor': <any>, 'condicion_sql': 'id = 1', 'db': '<optional_db_path>'}
    """
    payload = request.get_json(silent=True) or {}
    try:
        tabla = payload['tabla']
        campo = payload['campo']
        valor = payload['valor']
        condicion_sql = payload['condicion_sql']
        data_db = payload.get('db', SQLITE_PATH)
    except KeyError as ke:
        return jsonify({'error': f'Falta el campo requerido: {ke}'}) , 400
    try:
        result = db_update(tabla, campo, valor, condicion_sql, data_db=data_db)
        return jsonify(result)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/db/insert', methods=['POST'])
def route_db_insert():
    """
    POST /db/insert
    JSON: {'tabla': '...', 'valores': {'col1': v1, 'col2': v2, ...}, 'db': '<optional_db_path>'}
    """
    payload = request.get_json(silent=True) or {}
    try:
        tabla = payload['tabla']
        valores = payload['valores']
        data_db = payload.get('db', SQLITE_PATH)
    except KeyError as ke:
        return jsonify({'error': f'Falta el campo requerido: {ke}'}) , 400
    try:
        result = db_insert(tabla, valores, data_db=data_db)
        return jsonify(result)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/db/delete', methods=['POST'])
def route_db_delete():
    """
    POST /db/delete
    JSON: {'tabla': '...', 'condicion_sql': 'id > 5', 'db': '<optional_db_path>'}
    """
    payload = request.get_json(silent=True) or {}
    try:
        tabla = payload['tabla']
        condicion_sql = payload['condicion_sql']
        data_db = payload.get('db', SQLITE_PATH)
    except KeyError as ke:
        return jsonify({'error': f'Falta el campo requerido: {ke}'}) , 400
    try:
        result = db_delete(tabla, condicion_sql, data_db=data_db)
        return jsonify(result)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/db/delete_pk', methods=['POST'])
def route_db_delete_pk():
    """
    POST /db/delete_pk
    JSON: {'tabla': '...', 'pk': '...', 'valor': <any>, 'db': '<optional_db_path>'}
    """
    payload = request.get_json(silent=True) or {}
    try:
        tabla = payload['tabla']
        pk = payload['pk']
        valor = payload['valor']
        data_db = payload.get('db', SQLITE_PATH)
    except KeyError as ke:
        return jsonify({'error': f'Falta el campo requerido: {ke}'}) , 400
    try:
        result = db_delete_pk(tabla, pk, valor, data_db=data_db)
        return jsonify(result)
    except Exception as e:
        return jsonify({'error': str(e)}), 500

# Listado de tablas
@app.route('/db/tables', methods=['GET'])
def route_db_tables():
    """
    GET /db/tables?db=<optional_db_path>
    Devuelve la lista de tablas de la base de datos.
    """
    data_db = request.args.get('db', SQLITE_PATH)
    try:
        tables = db_tables(data_db=data_db)
        return jsonify({'tables': tables, 'count': len(tables)})
    except Exception as e:
        return jsonify({'error': str(e)}), 500

# Esquema de una tabla
@app.route('/db/table_schema', methods=['GET'])
def route_db_table_schema():
    """
    GET /db/table_schema?tabla=<nombre>&db=<optional_db_path>
    Devuelve el esquema (columnas) de la tabla indicada.
    """
    tabla = request.args.get('tabla')
    data_db = request.args.get('db', SQLITE_PATH)
    if not tabla:
        return jsonify({'error': "Falta el parámetro requerido: 'tabla'."}), 400
    try:
        validate_identifier(tabla, 'table name')
        schema = db_table_schema(tabla, data_db=data_db)
        return jsonify({'tabla': tabla, 'schema': schema, 'count': len(schema)})
    except ValueError as ve:
        return jsonify({'error': str(ve)}), 400
    except Exception as e:
        return jsonify({'error': str(e)}), 500

# Health check
@app.route('/health', methods=['GET'])
def health():
    try:
        conn = get_db_connection(SQLITE_PATH)
        conn.close()
        return jsonify({'status': 'ok', 'backend': DB_BACKEND, 'db': SQLITE_PATH})
    except Exception as e:
        return jsonify({'status': 'error', 'message': str(e)}), 500

def run_flask_app():
    """Función para ejecutar Flask en segundo plano"""
    app.run(host='0.0.0.0', port=5000, debug=False, use_reloader=False)

print('Backend Flask definido correctamente!')


## 4. Ejecutar Servidor en Segundo Plano

In [None]:
# Ejecutar el servidor Flask en segundo plano
flask_thread = threading.Thread(target=run_flask_app, daemon=True)
flask_thread.start()

# Esperar a que el servidor se inicie
time.sleep(3)

print('Servidor Flask ejecutándose en http://localhost:5000')
print('Endpoints disponibles:')
print('  GET  /health')
print('  GET  /db/tables')
print('  GET  /db/table_schema?tabla=<nombre>')
print('  POST /db/insert')
print('  POST /db/update') 
print('  POST /db/delete')
print('  POST /db/delete_pk')


## 5. Prueba del Servidor desde el Notebook

In [None]:
# Pruebas de los endpoints del servidor
base_url = 'http://localhost:5000'

print('=== Health Check ===')
try:
    response = requests.get(f'{base_url}/health')
    print(f'Status: {response.status_code}')
    print(f'Response: {response.json()}')
except Exception as e:
    print(f'Error: {e}')

print('=== Listar Tablas ===')
try:
    response = requests.get(f'{base_url}/db/tables')
    print(f'Status: {response.status_code}')
    print(f'Tablas: {response.json()}')
except Exception as e:
    print(f'Error: {e}')


In [None]:
print('=== Obtener Esquema de usuarios ===')
table = 'usuarios'
try:
    response = requests.get(f'{base_url}/db/table_schema?tabla={table}')
    print(f'Status: {response.status_code}')
    print(f'Esquema: {response.json()}')
except Exception as e:
    print(f'Error: {e}')

In [None]:

print('=== Insertar Nuevo Usuario ===')
nuevo_usuario = {
    'tabla': 'usuarios',
    'valores': {
        'nombre': 'Pedro Sánchez',
        'email': 'pedro@example.com',
        'edad': 40
    }
}
try:
    response = requests.post(f'{base_url}/db/insert', json=nuevo_usuario)
    print(f'Status: {response.status_code}')
    print(f'Resultado: {response.json()}')
except Exception as e:
    print(f'Error: {e}')


In [None]:
print('=== Actualizar Usuario ===')
actualizacion = {
    'tabla': 'usuarios',
    'campo': 'edad',
    'valor': 32,
    'condicion_sql': "nombre = 'Juan Pérez'"
}
try:
    response = requests.post(f'{base_url}/db/update', json=actualizacion)
    print(f'Status: {response.status_code}')
    print(f'Resultado: {response.json()}')
except Exception as e:
    print(f'Error: {e}')

In [None]:

print('=== Eliminar Usuario por PK ===')
eliminacion = {
    'tabla': 'usuarios',
    'pk': 'id',
    'valor': 1
}
try:
    response = requests.post(f'{base_url}/db/delete_pk', json=eliminacion)
    print(f'Status: {response.status_code}')
    print(f'Resultado: {response.json()}')
except Exception as e:
    print(f'Error: {e}')

### Resumen de Pruebas Completadas

El servidor Flask está funcionando correctamente y se han probado los siguientes endpoints:

1. ✅ **Health Check** - Verifica que el servidor esté activo
2. ✅ **Listar Tablas** - Obtiene todas las tablas de la base de datos
3. ✅ **Obtener Esquema** - Muestra la estructura de una tabla específica
4. ✅ **Insertar Datos** - Agrega nuevos registros a la base de datos
5. ✅ **Actualizar Datos** - Modifica registros existentes
6. ✅ **Eliminar por PK** - Borra registros usando la clave primaria

El API está listo para ser usado desde cualquier cliente HTTP.
