In [1]:
# Celda 1: Importar librerías
import os
import pandas as pd
import psycopg2
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from sqlalchemy import create_engine, text
from dotenv import load_dotenv
import logging
import time # Para medir el tiempo

In [2]:
# Celda 2: Configuración Inicial
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
pd.set_option('display.max_rows', 15)
pd.set_option('display.max_columns', None)

In [3]:
# Celda 3: Cargar Variables de Entorno y Definir Constantes para SPOTIFY
logging.info("Cargando variables de entorno para la conexión a la base de datos...")
dotenv_path = '/home/nicolas/Escritorio/workshops/workshop_2/env/.env' # <-- CONFIRMA ESTA RUTA
load_dotenv(dotenv_path=dotenv_path)

DB_USER = os.getenv('POSTGRES_USER')
DB_PASSWORD = os.getenv('POSTGRES_PASSWORD')
DB_HOST = os.getenv('POSTGRES_HOST')
DB_PORT = os.getenv('POSTGRES_PORT')
DB_NAME = os.getenv('POSTGRES_DB')
DEFAULT_DB = 'postgres'

CSV_FILE_PATH = '/home/nicolas/Escritorio/workshops/workshop_2/data/spotify_dataset.csv' # <-- NUEVO ARCHIVO CSV
TABLE_NAME = 'spotify_dataset'                                                  # <-- NUEVO NOMBRE DE TABLA
CHUNK_SIZE = 40000                                                             # <-- TAMAÑO DEL CHUNK

if not all([DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME]):
    logging.error("Una o más variables de entorno no están definidas correctamente en " + dotenv_path)
    raise ValueError("Variables de entorno incompletas.")

2025-04-11 12:28:59,743 - INFO - Cargando variables de entorno para la conexión a la base de datos...


In [4]:
# Celda 4: Verificar/Crear la base de datos 'artists' (Sin cambios)
# ... (código de la celda 4 anterior) ...
conn_default = None
cursor_default = None
try:
    logging.info(f"Intentando conectar a la base de datos por defecto '{DEFAULT_DB}' para verificar/crear '{DB_NAME}'...")
    conn_default = psycopg2.connect(
        dbname=DEFAULT_DB,
        user=DB_USER,
        password=DB_PASSWORD,
        host=DB_HOST,
        port=DB_PORT
    )
    conn_default.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    cursor_default = conn_default.cursor()

    cursor_default.execute(sql.SQL("SELECT 1 FROM pg_database WHERE datname = %s"), (DB_NAME,))
    exists = cursor_default.fetchone()

    if not exists:
        logging.info(f"La base de datos '{DB_NAME}' no existe. Creándola...")
        cursor_default.execute(sql.SQL(f"CREATE DATABASE {DB_NAME}"))
        logging.info(f"Base de datos '{DB_NAME}' creada exitosamente.")
    else:
        logging.info(f"La base de datos '{DB_NAME}' ya existe.")

except psycopg2.Error as e:
    logging.error(f"Error al conectar o verificar/crear la base de datos '{DB_NAME}': {e}")
    raise
except Exception as e:
    logging.error(f"Ocurrió un error inesperado durante la verificación/creación de la base de datos: {e}")
    raise
finally:
    if cursor_default:
        cursor_default.close()
        logging.info("Cursor de la conexión por defecto cerrado.")
    if conn_default:
        conn_default.close()
        logging.info("Conexión a la base de datos por defecto cerrada.")

2025-04-11 12:28:59,754 - INFO - Intentando conectar a la base de datos por defecto 'postgres' para verificar/crear 'artists'...
DETAIL:  La base de datos fue creada usando la versión de ordenamiento 2.31, pero el sistema operativo provee la versión 2.35.
HINT:  Reconstruya todos los objetos en esta base de datos que usen el ordenamiento por omisión y ejecute ALTER DATABASE postgres REFRESH COLLATION VERSION, o construya PostgreSQL con la versión correcta de la biblioteca.
2025-04-11 12:28:59,775 - INFO - La base de datos 'artists' ya existe.
2025-04-11 12:28:59,776 - INFO - Cursor de la conexión por defecto cerrado.
2025-04-11 12:28:59,777 - INFO - Conexión a la base de datos por defecto cerrada.


In [5]:
# Celda 5: Cargar datos desde el archivo CSV de Spotify en Chunks
# Celda 6: Conectar a la base de datos 'artists' y cargar el DataFrame de Spotify con Chunksize

# Celda 5 y 6 Combinadas: Leer CSV en Chunks y Cargar a PostgreSQL
logging.info(f"Iniciando carga de {CSV_FILE_PATH} a la tabla '{TABLE_NAME}' en chunks de {CHUNK_SIZE}...")

engine = None
total_rows_processed = 0
start_time = time.time()
first_chunk = True

try:
    # Crear motor SQLAlchemy ANTES del bucle
    logging.info(f"Creando motor SQLAlchemy para la base de datos '{DB_NAME}'...")
    db_url = f'postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
    engine = create_engine(db_url)

    # Crear el iterador de chunks
    reader = pd.read_csv(CSV_FILE_PATH, chunksize=CHUNK_SIZE)

    # Iterar sobre los chunks
    for i, chunk in enumerate(reader):
        chunk_start_time = time.time()
        logging.info(f"Procesando chunk {i+1}...")

        # Eliminar columna 'Unnamed: 0' si existe en el chunk
        if 'Unnamed: 0' in chunk.columns:
            chunk = chunk.drop(columns=['Unnamed: 0'])
            # No loguear esto cada vez, solo procesarlo
            # logging.info(" Columna 'Unnamed: 0' eliminada del chunk.")

        # --- Lógica para el primer chunk ---
        if first_chunk:
            logging.info("Procesando el primer chunk (mostrando info y reemplazando tabla)...")
            logging.info("Primeras 5 filas del primer chunk:")
            print(chunk.head().to_markdown(index=False))
            logging.info("Información del primer chunk:")
            chunk.info()
            # Cargar el primer chunk, reemplazando la tabla si existe
            chunk.to_sql(TABLE_NAME, con=engine, if_exists='replace', index=False, method='multi')
            logging.info(f"Primer chunk cargado en la tabla '{TABLE_NAME}' (tabla reemplazada).")
            first_chunk = False # Ya no es el primer chunk
        # --- Lógica para chunks subsiguientes ---
        else:
            # Añadir los chunks restantes a la tabla existente
            chunk.to_sql(TABLE_NAME, con=engine, if_exists='append', index=False, method='multi')
            # logging.info(f"Chunk {i+1} añadido a la tabla '{TABLE_NAME}'.") # Opcional, puede ser muy verboso

        rows_in_chunk = len(chunk)
        total_rows_processed += rows_in_chunk
        chunk_end_time = time.time()
        logging.info(f"Chunk {i+1} procesado ({rows_in_chunk} filas) en {chunk_end_time - chunk_start_time:.2f} segundos. Total procesado: {total_rows_processed}")

    end_time = time.time()
    logging.info(f"Todos los chunks procesados. Tiempo total: {end_time - start_time:.2f} segundos.")
    logging.info(f"DataFrame completo cargado exitosamente en la tabla '{TABLE_NAME}'. Total de filas: {total_rows_processed}")

except FileNotFoundError:
    logging.error(f"Error: El archivo CSV no se encontró en la ruta: {CSV_FILE_PATH}")
    engine = None # Falló antes de usar el engine
    raise
except Exception as e:
    logging.error(f"Error durante la lectura del CSV o la carga a la base de datos: {e}")
    # Mantener engine definido si el error fue durante el bucle para permitir la verificación
    raise

2025-04-11 12:28:59,785 - INFO - Iniciando carga de /home/nicolas/Escritorio/workshops/workshop_2/data/spotify_dataset.csv a la tabla 'spotify_dataset' en chunks de 40000...
2025-04-11 12:28:59,786 - INFO - Creando motor SQLAlchemy para la base de datos 'artists'...
2025-04-11 12:28:59,902 - INFO - Procesando chunk 1...
2025-04-11 12:28:59,905 - INFO - Procesando el primer chunk (mostrando info y reemplazando tabla)...
2025-04-11 12:28:59,906 - INFO - Primeras 5 filas del primer chunk:
2025-04-11 12:28:59,911 - INFO - Información del primer chunk:


| track_id               | artists                | album_name                                             | track_name                 |   popularity |   duration_ms | explicit   |   danceability |   energy |   key |   loudness |   mode |   speechiness |   acousticness |   instrumentalness |   liveness |   valence |   tempo |   time_signature | track_genre   |
|:-----------------------|:-----------------------|:-------------------------------------------------------|:---------------------------|-------------:|--------------:|:-----------|---------------:|---------:|------:|-----------:|-------:|--------------:|---------------:|-------------------:|-----------:|----------:|--------:|-----------------:|:--------------|
| 5SuOikwiRyPMVoIQDJUgSV | Gen Hoshino            | Comedy                                                 | Comedy                     |           73 |        230666 | False      |          0.676 |   0.461  |     1 |     -6.746 |      0 |        0.143  |         0.0322 |

2025-04-11 12:29:10,627 - INFO - Primer chunk cargado en la tabla 'spotify_dataset' (tabla reemplazada).
2025-04-11 12:29:10,628 - INFO - Chunk 1 procesado (40000 filas) en 10.73 segundos. Total procesado: 40000
2025-04-11 12:29:10,733 - INFO - Procesando chunk 2...
2025-04-11 12:29:22,531 - INFO - Chunk 2 procesado (40000 filas) en 11.80 segundos. Total procesado: 80000
2025-04-11 12:29:22,630 - INFO - Procesando chunk 3...
2025-04-11 12:29:33,792 - INFO - Chunk 3 procesado (34000 filas) en 11.16 segundos. Total procesado: 114000
2025-04-11 12:29:33,795 - INFO - Todos los chunks procesados. Tiempo total: 34.01 segundos.
2025-04-11 12:29:33,796 - INFO - DataFrame completo cargado exitosamente en la tabla 'spotify_dataset'. Total de filas: 114000


In [6]:
# Celda 7: Verificar la carga de datos (Usando total_rows_processed)
if engine: # Solo verificar si el engine se pudo crear
    try:
        logging.info(f"Verificando la carga final en la tabla '{TABLE_NAME}'...")
        with engine.connect() as connection:
            query = text(f'SELECT COUNT(*) FROM "{TABLE_NAME}"')
            result = connection.execute(query)
            num_db_rows = result.scalar_one()

        logging.info(f"Número de filas en la tabla final '{TABLE_NAME}': {num_db_rows}")

        # Comparar con el número total de filas procesadas de los chunks
        if total_rows_processed == num_db_rows:
            logging.info("¡Verificación exitosa! El número de filas procesadas coincide con la base de datos.")
        # Añadir una comprobación extra por si la cuenta total fue 0 (indicando quizás un archivo vacío o error temprano)
        elif total_rows_processed == 0 and num_db_rows == 0:
            logging.warning("Se procesaron 0 filas y la tabla está vacía. Verifica el archivo CSV.")
        else:
            logging.warning(f"Discrepancia en el número de filas: Procesadas ({total_rows_processed}) vs DB ({num_db_rows}).")

    except Exception as e:
        logging.error(f"Error durante la verificación final de la carga: {e}")
else:
    logging.error("No se pudo realizar la verificación porque la creación del engine falló previamente.")

logging.info(f"Script de carga para '{TABLE_NAME}' finalizado.")

2025-04-11 12:29:33,806 - INFO - Verificando la carga final en la tabla 'spotify_dataset'...
2025-04-11 12:29:33,821 - INFO - Número de filas en la tabla final 'spotify_dataset': 114000
2025-04-11 12:29:33,822 - INFO - ¡Verificación exitosa! El número de filas procesadas coincide con la base de datos.
2025-04-11 12:29:33,822 - INFO - Script de carga para 'spotify_dataset' finalizado.
