<a href="https://colab.research.google.com/github/dmoy7/ProyectoDanielOchoa/blob/main/src/EntregableDanielOchoa.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install ydata-profiling

Collecting ydata-profiling
  Downloading ydata_profiling-4.12.2-py2.py3-none-any.whl.metadata (20 kB)
Collecting visions<0.8.0,>=0.7.5 (from visions[type_image_path]<0.8.0,>=0.7.5->ydata-profiling)
  Downloading visions-0.7.6-py3-none-any.whl.metadata (11 kB)
Collecting htmlmin==0.1.12 (from ydata-profiling)
  Downloading htmlmin-0.1.12.tar.gz (19 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting phik<0.13,>=0.11.1 (from ydata-profiling)
  Downloading phik-0.12.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.6 kB)
Collecting multimethod<2,>=1.4 (from ydata-profiling)
  Downloading multimethod-1.12-py3-none-any.whl.metadata (9.6 kB)
Collecting imagehash==4.3.1 (from ydata-profiling)
  Downloading ImageHash-4.3.1-py2.py3-none-any.whl.metadata (8.0 kB)
Collecting dacite>=1.8 (from ydata-profiling)
  Downloading dacite-1.9.2-py3-none-any.whl.metadata (17 kB)
Collecting PyWavelets (from imagehash==4.3.1->ydata-profiling)
  Downloading pywavelets-1.

In [2]:
import os
import logging
import requests
import json
import pandas as pd
from pathlib import Path
from datetime import datetime, timedelta
from ydata_profiling import ProfileReport
from sklearn.impute import SimpleImputer


class Config:
    """Clase de configuración para definir directorios y archivos de logging."""
    BASE_DIR = Path("/content/tvmaze_data")
    REPORTS_DIR = Path("/content/tvmaze_reports")
    PARQUET_DIR = Path("/content/tvmaze_parquet")
    LOG_FILE = BASE_DIR / "tvmaze_data.log"

    @classmethod
    def setup_directories(cls):
        """Crea los directorios si no existen."""
        cls.BASE_DIR.mkdir(parents=True, exist_ok=True)
        cls.REPORTS_DIR.mkdir(parents=True, exist_ok=True)
        cls.PARQUET_DIR.mkdir(parents=True, exist_ok=True)


class TVMazeDataCollector:
    """Clase para manejar la extracción, almacenamiento y carga de datos desde la API de TVMaze."""

    def __init__(self):
        """Inicializa la configuración de directorios y logging."""
        Config.setup_directories()

        logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s - %(levelname)s - %(message)s",
            handlers=[
                logging.FileHandler(Config.LOG_FILE, mode="w", encoding="utf-8"),
                logging.StreamHandler()
            ],
            force=True
        )

    @classmethod
    def obtener_datos(cls, fecha: str) -> list | None:
        """Obtiene los datos de series de TV desde la API de TVMaze para una fecha dada."""
        url = f"http://api.tvmaze.com/schedule/web?date={fecha}"
        try:
            respuesta = requests.get(url, timeout=10)
            respuesta.raise_for_status()
            datos = respuesta.json()
            logging.info(f"✅ Datos obtenidos para {fecha} (Series: {len(datos)})")
            return datos
        except requests.RequestException as e:
            logging.error(f"❌ Error al obtener datos para {fecha}: {e}")
            return None

    def guardar_json(self, datos: list, fecha: str) -> None:
        """Guarda los datos en un archivo JSON dentro del directorio definido."""
        if not datos:
            logging.warning(f"⚠️ No hay datos para guardar en {fecha}.")
            return

        archivo_json = Config.BASE_DIR / f"tvmaze_{fecha}.json"
        try:
            with archivo_json.open("w", encoding="utf-8") as f:
                json.dump(datos, f, indent=4, ensure_ascii=False)
            logging.info(f"📁 Archivo guardado: {archivo_json}")
        except IOError as e:
            logging.error(f"❌ Error al guardar archivo JSON {archivo_json}: {e}")

    def ejecutar_proceso(self) -> None:
        """Ejecuta el proceso para obtener los datos de las series emitidas en enero de 2024 y almacenarlos en archivos JSON."""
        fecha_inicio = datetime(2024, 1, 1)
        fecha_fin = datetime(2024, 1, 31)

        logging.info("🚀 Iniciando extracción de datos de TVMaze para enero de 2024...")

        for dias in range((fecha_fin - fecha_inicio).days + 1):
            fecha_str = (fecha_inicio + timedelta(days=dias)).strftime("%Y-%m-%d")
            datos = self.obtener_datos(fecha_str)
            self.guardar_json(datos, fecha_str)

        logging.info("✅ Proceso completado. Todos los archivos han sido guardados.")

    def cargar_json_a_dataframe(self) -> pd.DataFrame:
        """Carga todos los archivos JSON generados en un solo DataFrame de Pandas, normalizando los datos anidados."""
        archivos_json = sorted(Config.BASE_DIR.glob("*.json"))
        lista_dataframes = []

        if not archivos_json:
            logging.warning("⚠️ No se encontraron archivos JSON para cargar.")
            return pd.DataFrame()

        for archivo in archivos_json:
            try:
                with archivo.open("r", encoding="utf-8") as f:
                    data = json.load(f)

                if data:
                    df = pd.json_normalize(data, sep="_")
                    lista_dataframes.append(df)
                    logging.info(f"📂 Archivo cargado: {archivo.name} (Filas: {len(df)})")
            except (json.JSONDecodeError, IOError) as e:
                logging.error(f"❌ Error al cargar {archivo.name}: {e}")

        df_final = pd.concat(lista_dataframes, ignore_index=True) if lista_dataframes else pd.DataFrame()
        logging.info(f"\n✅ DataFrame consolidado con {len(df_final)} filas y {len(df_final.columns)} columnas.")
        return df_final

    def generar_reporte_profiling(self, df: pd.DataFrame, nombre_archivo: str) -> None:
        """Genera un reporte de profiling en formato HTML para el DataFrame proporcionado."""
        if df.empty:
            logging.warning("⚠️ No hay datos para generar el reporte de profiling.")
            return

        logging.info(f"📊 Generando reporte de profiling: {nombre_archivo}...")

        try:
            profile = ProfileReport(df, title="Análisis de Datos TVMaze", explorative=True)
            html_report_path = Config.REPORTS_DIR / nombre_archivo
            profile.to_file(html_report_path)
            logging.info(f"📄 Reporte en HTML guardado: {html_report_path}")
        except Exception as e:
            logging.error(f"❌ Error al generar el reporte de profiling: {e}")

    def limpiar_datos(self, df: pd.DataFrame) -> pd.DataFrame:
      """Limpia el DataFrame eliminando registros duplicados, columnas con más del 50% de valores nulos e imputando
    las que tienen hasta un 20% de valores nulos. También asegura la consistencia en formatos de fecha,
    valores numéricos y variables categóricas."""
      if df.empty:
          logging.warning("⚠️ No hay datos para limpiar.")
          return df

      logging.info("🧹 Iniciando proceso de limpieza de datos...")

      # Eliminar columnas con más del 50% de valores nulos
      porcentaje_nulos = df.isnull().mean()
      columnas_a_eliminar = porcentaje_nulos[porcentaje_nulos > 0.5].index
      df = df.drop(columns=columnas_a_eliminar)
      logging.info(f"🗑️ Columnas eliminadas: {list(columnas_a_eliminar)}")

      # Imputar valores nulos en columnas con hasta un 20% de valores nulos
      columnas_a_imputar = porcentaje_nulos[(porcentaje_nulos > 0) & (porcentaje_nulos <= 0.2)].index
      for col in columnas_a_imputar:
          imputer = SimpleImputer(strategy="most_frequent" if df[col].dtype == "object" else "median")
          df[col] = imputer.fit_transform(df[[col]]).ravel()
          logging.info(f"🔄 Imputación aplicada en columna: {col}")

      # Convertir listas y diccionarios a string para evitar errores de tipo
      df = df.applymap(lambda x: str(x) if isinstance(x, (list, dict)) else x)

      # Convertir columnas de fecha a formato datetime si es necesario
      columnas_fecha = [col for col in df.columns if 'date' in col.lower() or 'fecha' in col.lower()]
      for col in columnas_fecha:
          try:
              df[col] = pd.to_datetime(df[col], errors='coerce')
              logging.info(f"📆 Columna de fecha convertida: {col}")
          except Exception as e:
              logging.warning(f"⚠️ No se pudo convertir {col} a datetime: {e}")

      # Eliminar valores numéricos negativos en columnas específicas
      columnas_numericas = df.select_dtypes(include=['number']).columns
      for col in columnas_numericas:
          if (df[col] < 0).any():
              df[col] = df[col].apply(lambda x: x if x >= 0 else None)
              logging.info(f"❌ Se eliminaron valores negativos en columna: {col}")

      # Normalizar categorías (convertir a minúsculas y eliminar espacios)
      columnas_categoricas = df.select_dtypes(include=['object']).columns
      for col in columnas_categoricas:
          df[col] = df[col].str.strip().str.lower()
          logging.info(f"🔤 Normalización aplicada en columna categórica: {col}")

      # Eliminar registros duplicados
      df = df.drop_duplicates()
      logging.info("🗑️ Registros duplicados eliminados.")

      logging.info("✅ Limpieza de datos completada.")
      return df

    def guardar_como_parquet(self, df: pd.DataFrame, nombre_archivo: str) -> None:
      """Guarda el DataFrame en formato Parquet dentro de la carpeta independiente con compresión Snappy."""
      if df.empty:
          logging.warning("⚠️ No hay datos para guardar en Parquet.")
          return

      archivo_parquet = Config.PARQUET_DIR / f"{nombre_archivo}.parquet"

      try:
          df.to_parquet(archivo_parquet, engine="pyarrow", compression="snappy", index=False)
          logging.info(f"📦 Archivo Parquet guardado en: {archivo_parquet}")
      except Exception as e:
          logging.error(f"❌ Error al guardar archivo Parquet {archivo_parquet}: {e}")

if __name__ == "__main__":
    collector = TVMazeDataCollector()
    collector.ejecutar_proceso()
    df_series = collector.cargar_json_a_dataframe()
    collector.generar_reporte_profiling(df_series, "tvmaze_profiling_before.html")
    df_limpiado = collector.limpiar_datos(df_series)
    collector.generar_reporte_profiling(df_limpiado, "tvmaze_profiling_after.html")
    collector.guardar_como_parquet(df_limpiado, "tvmaze_limpiado")
    if not df_limpiado.empty:
        print(df_limpiado.head())

2025-02-10 20:02:55,818 - INFO - 🚀 Iniciando extracción de datos de TVMaze para enero de 2024...
2025-02-10 20:02:56,599 - INFO - ✅ Datos obtenidos para 2024-01-01 (Series: 191)
2025-02-10 20:02:56,645 - INFO - 📁 Archivo guardado: /content/tvmaze_data/tvmaze_2024-01-01.json
2025-02-10 20:02:57,196 - INFO - ✅ Datos obtenidos para 2024-01-02 (Series: 144)
2025-02-10 20:02:57,238 - INFO - 📁 Archivo guardado: /content/tvmaze_data/tvmaze_2024-01-02.json
2025-02-10 20:02:57,799 - INFO - ✅ Datos obtenidos para 2024-01-03 (Series: 113)
2025-02-10 20:02:57,837 - INFO - 📁 Archivo guardado: /content/tvmaze_data/tvmaze_2024-01-03.json
2025-02-10 20:02:58,445 - INFO - ✅ Datos obtenidos para 2024-01-04 (Series: 179)
2025-02-10 20:02:58,496 - INFO - 📁 Archivo guardado: /content/tvmaze_data/tvmaze_2024-01-04.json
2025-02-10 20:02:59,062 - INFO - ✅ Datos obtenidos para 2024-01-05 (Series: 161)
2025-02-10 20:02:59,109 - INFO - 📁 Archivo guardado: /content/tvmaze_data/tvmaze_2024-01-05.json
2025-02-10 20

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

  series = series.fillna(np.nan)


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

2025-02-10 20:04:45,569 - INFO - 📄 Reporte en HTML guardado: /content/tvmaze_reports/tvmaze_profiling_before.html
2025-02-10 20:04:45,596 - INFO - 🧹 Iniciando proceso de limpieza de datos...
2025-02-10 20:04:45,634 - INFO - 🗑️ Columnas eliminadas: ['image', 'summary', 'rating_average', '_embedded_show_runtime', '_embedded_show_ended', '_embedded_show_rating_average', '_embedded_show_network', '_embedded_show_dvdCountry', '_embedded_show_externals_tvrage', '_embedded_show_externals_imdb', '_embedded_show_image', '_embedded_show__links_nextepisode_href', '_embedded_show__links_nextepisode_name', 'image_medium', 'image_original', '_embedded_show_network_id', '_embedded_show_network_name', '_embedded_show_network_country_name', '_embedded_show_network_country_code', '_embedded_show_network_country_timezone', '_embedded_show_network_officialSite', '_embedded_show_webChannel', '_embedded_show_webChannel_country', '_embedded_show_dvdCountry_name', '_embedded_show_dvdCountry_code', '_embedded_

Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]

  "min": pd.Timestamp.to_pydatetime(series.min()),
  "max": pd.Timestamp.to_pydatetime(series.max()),


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

2025-02-10 20:05:28,348 - INFO - 📄 Reporte en HTML guardado: /content/tvmaze_reports/tvmaze_profiling_after.html
2025-02-10 20:05:28,520 - INFO - 📦 Archivo Parquet guardado en: /content/tvmaze_parquet/tvmaze_limpiado.parquet


        id                                                url     name  \
0  2730586  https://www.tvmaze.com/episodes/2730586/neznos...  серия 1   
1  2730587  https://www.tvmaze.com/episodes/2730587/neznos...  серия 2   
2  2730588  https://www.tvmaze.com/episodes/2730588/neznos...  серия 3   
3  2730589  https://www.tvmaze.com/episodes/2730589/neznos...  серия 4   
4  2730590  https://www.tvmaze.com/episodes/2730590/neznos...  серия 5   

   season  number     type    airdate airtime                   airstamp  \
0       2     1.0  regular 2024-01-01          2024-01-01t00:00:00+00:00   
1       2     2.0  regular 2024-01-01          2024-01-01t00:00:00+00:00   
2       2     3.0  regular 2024-01-01          2024-01-01t00:00:00+00:00   
3       2     4.0  regular 2024-01-01          2024-01-01t00:00:00+00:00   
4       2     5.0  regular 2024-01-01          2024-01-01t00:00:00+00:00   

   runtime  ... _embedded_show_webChannel_country_timezone  \
0     23.0  ...                     

In [3]:
import sqlite3
import pandas as pd
import logging
from urllib.parse import urlparse
import ast

# Configurar logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")

class TVShowDatabase:
    def __init__(self, db_path, file_path):
        """Inicializa la conexión a la base de datos y carga los datos desde un archivo Parquet."""
        self.db_path = db_path
        self.file_path = file_path
        self.conn = sqlite3.connect(self.db_path)
        self.cursor = self.conn.cursor()
        self.df = None

    def load_data(self):
        """Carga datos desde un archivo Parquet en un DataFrame."""
        try:
            self.df = pd.read_parquet(self.file_path)
            logging.info("Datos cargados correctamente desde Parquet.")
        except Exception as e:
            logging.error(f"Error al cargar datos desde Parquet: {e}")
            raise

    def create_tables(self):
        """Crea las tablas en la base de datos SQLite."""
        try:
            self.cursor.executescript('''
                CREATE TABLE IF NOT EXISTS web_channels (
                    id INTEGER PRIMARY KEY,
                    name TEXT,
                    official_site TEXT
                );

                CREATE TABLE IF NOT EXISTS shows (
                    id INTEGER PRIMARY KEY,
                    name TEXT,
                    type TEXT,
                    language TEXT,
                    status TEXT,
                    average_runtime INTEGER,
                    premiered TEXT,
                    official_site TEXT,
                    web_channel_id INTEGER,
                    FOREIGN KEY (web_channel_id) REFERENCES web_channels(id)
                );

                CREATE TABLE IF NOT EXISTS genres (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT UNIQUE
                );

                CREATE TABLE IF NOT EXISTS show_genres (
                    show_id INTEGER,
                    genre_id INTEGER,
                    PRIMARY KEY (show_id, genre_id),
                    FOREIGN KEY (show_id) REFERENCES shows(id),
                    FOREIGN KEY (genre_id) REFERENCES genres(id)
                );

                CREATE TABLE IF NOT EXISTS episodes (
                    id INTEGER PRIMARY KEY,
                    show_id INTEGER,
                    season INTEGER,
                    number INTEGER,
                    name TEXT,
                    airdate TEXT,
                    runtime INTEGER,
                    url TEXT,
                    FOREIGN KEY (show_id) REFERENCES shows(id)
                );
            ''')
            self.conn.commit()
            logging.info("Tablas creadas correctamente.")
        except Exception as e:
            logging.error(f"Error al crear tablas: {e}")
            raise

    def insert_data(self):
        """Inserta los datos en las tablas SQLite desde el DataFrame."""
        try:
            # Insertar datos en web_channels
            web_channels_df = self.df[
                ['_embedded_show_webChannel_id', '_embedded_show_webChannel_name', '_embedded_show_webChannel_officialSite']
            ].dropna().drop_duplicates()
            web_channels_df.rename(columns={
                '_embedded_show_webChannel_id': 'id',
                '_embedded_show_webChannel_name': 'name',
                '_embedded_show_webChannel_officialSite': 'official_site'
            }, inplace=True)
            web_channels_df.to_sql("web_channels", self.conn, if_exists="append", index=False)
            logging.info("Datos insertados en la tabla web_channels.")

            # Insertar datos en shows
            shows_df = self.df[
                ['_embedded_show_id', '_embedded_show_name', '_embedded_show_type', '_embedded_show_language',
                 '_embedded_show_status', '_embedded_show_averageRuntime', '_embedded_show_premiered',
                 '_embedded_show_officialSite', '_embedded_show_webChannel_id']
            ].dropna().drop_duplicates()
            shows_df.rename(columns={
                '_embedded_show_id': 'id',
                '_embedded_show_name': 'name',
                '_embedded_show_type': 'type',
                '_embedded_show_language': 'language',
                '_embedded_show_status': 'status',
                '_embedded_show_averageRuntime': 'average_runtime',
                '_embedded_show_premiered': 'premiered',
                '_embedded_show_officialSite': 'official_site',
                '_embedded_show_webChannel_id': 'web_channel_id'
            }, inplace=True)
            shows_df.to_sql("shows", self.conn, if_exists="append", index=False)
            logging.info("Datos insertados en la tabla shows.")

            # Insertar datos en genres y show_genres
            genre_set = set()
            show_genre_pairs = set()

            for _, row in self.df.iterrows():
                show_id = row['_embedded_show_id']
                genre_list = ast.literal_eval(row['_embedded_show_genres']) if isinstance(row['_embedded_show_genres'], str) else []

                for genre in genre_list:
                    genre_set.add(genre)
                    show_genre_pairs.add((show_id, genre))

            genre_df = pd.DataFrame(list(genre_set), columns=['name'])
            genre_df.to_sql("genres", self.conn, if_exists="append", index=False)
            logging.info("Datos insertados en la tabla genres.")

            # Obtener IDs de géneros
            self.cursor.execute("SELECT id, name FROM genres")
            genre_id_map = {name: genre_id for genre_id, name in self.cursor.fetchall()}

            show_genre_df = pd.DataFrame(
                [(show_id, genre_id_map[genre]) for show_id, genre in show_genre_pairs],
                columns=['show_id', 'genre_id']
            )
            show_genre_df.to_sql("show_genres", self.conn, if_exists="append", index=False)
            logging.info("Datos insertados en la tabla show_genres.")

            # Insertar datos en episodes
            episodes_df = self.df[['id', '_embedded_show_id', 'season', 'number', 'name', 'airdate', 'runtime', 'url']].dropna()
            episodes_df.rename(columns={'_embedded_show_id': 'show_id'}, inplace=True)
            episodes_df.to_sql("episodes", self.conn, if_exists="append", index=False)
            logging.info("Datos insertados en la tabla episodes.")

        except Exception as e:
            logging.error(f"Error al insertar datos en la base de datos: {e}")
            raise

    def calculate_metrics(self):
        """Calcula y devuelve métricas de agregación sobre los datos almacenados."""
        try:
            self.cursor.execute("SELECT AVG(average_runtime) FROM shows;")
            average_runtime = self.cursor.fetchone()[0]

            self.cursor.execute("""
                SELECT g.name AS genre, COUNT(sg.show_id) AS count
                FROM genres g
                JOIN show_genres sg ON g.id = sg.genre_id
                GROUP BY g.name
                ORDER BY count DESC;
            """)
            genre_counts = self.cursor.fetchall()

            self.cursor.execute("SELECT official_site FROM shows WHERE official_site IS NOT NULL AND official_site != ''")
            official_sites = self.cursor.fetchall()

            # Extraer dominios únicos
            domains = set()
            for site in official_sites:
                url = site[0]
                parsed_url = urlparse(url)
                if parsed_url.netloc:
                    domains.add(parsed_url.netloc)

            domain_list = list(domains)

            result = {
                "average_runtime": average_runtime,
                "genre_counts": genre_counts,
                "unique_domains": domain_list
            }
            logging.info("Métricas calculadas correctamente.")
            return result

        except Exception as e:
            logging.error(f"Error al calcular métricas: {e}")
            raise

    def close(self):
        """Cierra la conexión a la base de datos."""
        self.conn.close()
        logging.info("Conexión a la base de datos cerrada correctamente.")

# Uso de la clase
if __name__ == "__main__":
    db = TVShowDatabase("tv_shows.db", "/content/tvmaze_parquet/tvmaze_limpiado.parquet")

    try:
        db.load_data()
        db.create_tables()
        db.insert_data()
        metrics = db.calculate_metrics()
        logging.info(f"Resultados de métricas: {metrics}")
    finally:
        db.close()


2025-02-10 20:19:04,401 - INFO - Datos cargados correctamente desde Parquet.
2025-02-10 20:19:04,461 - INFO - Tablas creadas correctamente.
2025-02-10 20:19:04,490 - INFO - Datos insertados en la tabla web_channels.
2025-02-10 20:19:04,522 - INFO - Datos insertados en la tabla shows.
2025-02-10 20:19:04,882 - INFO - Datos insertados en la tabla genres.
2025-02-10 20:19:04,905 - INFO - Datos insertados en la tabla show_genres.
2025-02-10 20:19:04,971 - INFO - Datos insertados en la tabla episodes.
2025-02-10 20:19:04,981 - INFO - Métricas calculadas correctamente.
2025-02-10 20:19:04,983 - INFO - Resultados de métricas: {'average_runtime': 41.56859504132231, 'genre_counts': [('drama', 155), ('comedy', 120), ('romance', 71), ('adventure', 70), ('fantasy', 69), ('action', 59), ('crime', 44), ('anime', 41), ('mystery', 30), ('thriller', 29), ('history', 25), ('children', 24), ('food', 22), ('sports', 20), ('travel', 18), ('music', 16), ('family', 15), ('science-fiction', 13), ('war', 12), 