# INICIALIZACIÓN

In [19]:
# @title INSTALACIÓN DE LIBRERÍAS


## DEFINICIÓN DE FUNCIONES

In [20]:
# @title GBQ_schema_dic()

def GBQ_schema_dic(config):
    """
    Obtiene un diccionario que contiene todos los datasets, tablas y campos en un proyecto de BigQuery.

    Args:
        config: Un diccionario con las siguientes posibles claves:
            - 'project_id': (requerido) ID del proyecto de Google Cloud.
            - 'datasets': (opcional) Lista de datasets específicos a incluir. Si no se proporciona, se incluyen todos.
            - 'include_tables': (opcional) Booleano que indica si se deben incluir las tablas. Por defecto es True.

    Returns:
        Un diccionario con la estructura:
        {
            'dataset1': {
                'tabla1': {
                    'fields': [
                        {'name': 'campo1', 'type': 'STRING'},
                        {'name': 'campo2', 'type': 'INTEGER'},
                        # ...
                    ]
                },
                'tabla2': {
                    'fields': [
                        # ...
                    ]
                },
                # ...
            },
            'dataset2': {
                # ...
            },
            # ...
        }
    """
    project_id = config.get('project_id')
    if not project_id:
        raise ValueError("El 'project_id' es un argumento requerido en el diccionario de configuración.")

    # Crear el cliente de BigQuery dentro de la función
    client = bigquery.Client(project=project_id)

    datasets_incluidos = config.get('datasets', None)
    include_tables = config.get('include_tables', True)

    # Obtener la lista de datasets
    if datasets_incluidos:
        datasets = [client.get_dataset(f"{project_id}.{dataset_id}") for dataset_id in datasets_incluidos]
    else:
        datasets = list(client.list_datasets(project=project_id))

    esquema_proyecto = {}

    for dataset in datasets:
        dataset_id = dataset.dataset_id
        full_dataset_id = f"{project_id}.{dataset_id}"

        if include_tables:
            tables = list(client.list_tables(full_dataset_id))

            tablas = {}
            for table_item in tables:
                table_ref = client.get_table(table_item.reference)

                # Obtener los campos y tipos
                campos = []
                for field in table_ref.schema:
                    campos.append({'name': field.name, 'type': field.field_type})

                # Añadir información de la tabla
                tablas[table_item.table_id] = {
                    'fields': campos
                }

            esquema_proyecto[dataset_id] = tablas
        else:
            # Si no se incluyen las tablas, simplemente añadimos el dataset
            esquema_proyecto[dataset_id] = {}

    return esquema_proyecto


In [21]:
# @title GBQ_tables_info()

from google.cloud import bigquery
from google.colab import auth
import pandas as pd

def GBQ_tables_info(params):
    """
    Obtiene información sobre las tablas de los datasets especificados en Google BigQuery.

    Parámetros:
    - params (dict): Diccionario con las siguientes claves:
        - project_id (str): ID del proyecto de Google Cloud.
        - dataset_ids (list): Lista de los IDs de los datasets a consultar.

    Retorno:
    - pd.DataFrame: DataFrame con información sobre las tablas, incluyendo el nombre, el número de filas, el número de columnas y el tamaño en MB.
    """
    # Extraer parámetros del diccionario
    project_id = params.get('project_id')
    dataset_ids = params.get('dataset_ids', [])

    # Autenticar la cuenta de Google
    auth.authenticate_user()

    # Crear un cliente de BigQuery
    client = bigquery.Client()

    # Lista para almacenar la información de cada tabla
    table_info = []

    # Iterar sobre cada dataset
    for dataset_id in dataset_ids:
        # Obtener la lista de tablas del dataset
        dataset_ref = client.dataset(dataset_id, project=project_id)
        tables = client.list_tables(dataset_ref)

        # Iterar sobre cada tabla y recopilar la información
        for table in tables:
            table_ref = dataset_ref.table(table.table_id)
            table_obj = client.get_table(table_ref)  # Obtiene la información de la tabla
            num_rows = table_obj.num_rows
            num_columns = len(table_obj.schema)
            table_size = table_obj.num_bytes / (1024 * 1024)  # Convertir de bytes a megabytes (MB)

            table_info.append({
                'dataset_id': dataset_id,
                'table_name': table.table_id,
                'num_rows': num_rows,
                'num_columns': num_columns,
                'size_mb': round(table_size, 2)
            })

    # Crear un DataFrame de la información de las tablas
    df = pd.DataFrame(table_info)

    # Mostrar el DataFrame
    return df

In [22]:
# @title GCS_tsv_to_GBQ()
def GCS_tsv_to_GBQ(params):
    r"""
    Procesa archivos TSV desde GCS y los carga en BigQuery usando Dask.
    Interpreta '\\N' como valores nulos y permite especificar tipos 'nullable'
    (por ejemplo, 'Int64') para manejar columnas con valores nulos sin caer en
    conflictos de dtypes.

    Flujo:
      1. Descarga cada TSV de GCS a un archivo local.
      2. Usa Dask para leer el TSV por particiones (block_size).
         - Interpreta '\\N' como nulo (na_values).
         - Usa dtypes_map para columnas con tipos 'Int64' (nullable) u otros.
      3. Itera sobre las particiones y carga cada chunk en BigQuery (append).
      4. (Opcional) elimina el archivo local si remove_local=True.

    Args:
        params (dict):
            - gcp_project_id (str): Proyecto GCP.
            - gcs_bucket_name (str): Bucket GCS.
            - tsv_files (list[str]): Lista de archivos TSV en GCS.
            - gbq_dataset_id (str): Dataset de BigQuery de destino.
            - table_suffix (str, opcional): Sufijo para la tabla en BQ. Default: "raw".
            - block_size (float, opcional): Tamaño de bloque (bytes) para Dask. Default: 128e6 (128 MB).
            - remove_local (bool, opcional): Eliminar el archivo local tras cargarlo. Default: False.
            - dtypes_map (dict, opcional): Mapa col->tipo Dask/Pandas. Ej:
                  {
                      "birthYear": "Int64",
                      "deathYear": "Int64",
                      "startYear": "Int64",
                      "endYear": "Int64",
                      "runtimeMinutes": "Int64",
                      "seasonNumber": "Int64",
                      "episodeNumber": "Int64",
                      "ordering": "Int64",
                      "numVotes": "Int64",
                      "isAdult": "Int64"
                  }
              para indicar columnas que contengan '\N' y deban ser int nullable.
    """
    # 1) Extraer parámetros
    gcp_project_id = params.get("gcp_project_id")
    gcs_bucket_name = params.get("gcs_bucket_name")
    tsv_files = params.get("tsv_files", [])
    gbq_dataset_id = params.get("gbq_dataset_id")
    table_suffix = params.get("table_suffix", "raw")
    block_size = params.get("block_size", 128e6)  # 128 MB por defecto
    remove_local = params.get("remove_local", False)

    # Mapa de tipos para columnas con '\N' (e.g., int con nulos)
    dtypes_map = params.get("dtypes_map", {})

    # Validar que tengamos los parámetros esenciales
    if not all([gcp_project_id, gcs_bucket_name, tsv_files, gbq_dataset_id]):
        raise ValueError("Parámetros incompletos en 'params'.")

    storage_client = storage.Client(project=gcp_project_id)

    # 2) Iterar sobre cada archivo TSV
    for tsv_file in tsv_files:
        print(f"\n==> Procesando archivo: {tsv_file}")

        # 2a) Descargar a un archivo local (puedes usar otra ruta si quieres)
        local_filename = tsv_file.replace("/", "_")  # Evita subdirectorios en el nombre
        print(f"Descargando desde GCS: gs://{gcs_bucket_name}/{tsv_file}")
        bucket = storage_client.bucket(gcs_bucket_name)
        blob = bucket.blob(tsv_file)
        blob.download_to_filename(local_filename)
        print(f"Archivo descargado localmente como: {local_filename}")

        # 3) Leer con Dask, interpretando '\N' como nulos
        print("Leyendo TSV con Dask...")
        dask_df = dd.read_csv(
            local_filename,
            sep="\t",
            blocksize=block_size,
            engine="python",
            quoting=3,
            on_bad_lines="skip",
            na_values=["\\N"],     # '\N' se convierte en NaN
            dtype=dtypes_map       # Fuerza las columnas con nulos a Int64 u otros
        )

        # 4) Convertir particiones a DataFrames de pandas y cargar a BQ
        delayed_partitions = dask_df.to_delayed()
        table_name = tsv_file.replace(".tsv", f"_{table_suffix}").replace(".", "_")
        table_id = f"{gbq_dataset_id}.{table_name}"

        partition_index = 0
        for part_delayed in delayed_partitions:
            # 4a) 'Compute' la partición, generando un DataFrame de pandas
            chunk_df = part_delayed.compute()

            # 4b) Eliminar filas completamente vacías (opcional)
            cleaned_chunk = chunk_df.dropna(how='all')
            n_rows = len(cleaned_chunk)
            print(f"Partición #{partition_index} -> {n_rows} filas tras limpieza (dropna how='all').")

            if n_rows > 0:
                # 4c) Insertar en BigQuery
                pandas_gbq.to_gbq(
                    cleaned_chunk,
                    table_id,
                    project_id=gcp_project_id,
                    if_exists="append"
                )
                print(f"  -> {n_rows} filas cargadas a {table_id}")

            partition_index += 1

        print(f"Archivo {tsv_file} procesado con éxito.")

        # 5) (Opcional) Eliminar archivo local
        if remove_local:
            os.remove(local_filename)
            print(f"Archivo local {local_filename} eliminado.")

    print("\nProceso completado exitosamente.")


In [23]:
# @title generate_sql_cleaning_filter_str()

def generate_sql_cleaning_filter_str(params: dict) -> str:
    r"""
    Genera una única consulta SQL que:
      1. "Limpia" y transforma datos de una tabla fuente (géneros a columnas binarias,
         cálculo de duraciones, etc.).
      2. Aplica filtros opcionales según rango de años, lista de géneros y
         lista de tipos de películas/series.
      3. Crea (o reemplaza) una tabla destino con el resultado final.

    Args:
        params (dict):
            -- Parámetros de limpieza --
            - source_table (str): Nombre de la tabla fuente.

            -- Parámetros de filtrado --
            - target_table (str): Nombre de la tabla destino.
            - year_mode (str): Modo de filtro por año ('from', 'to', 'between').
            - start_year (int, opcional): Año inicial (obligatorio si year_mode='from' o 'between').
            - end_year (int, opcional): Año final (obligatorio si year_mode='to' o 'between').
            - genres (list, opcional): Lista de géneros para filtrar (p.ej. ["Action", "Comedy"]).
            - genres_mode (str, opcional): Modo de filtro de géneros ('include' o 'exclude').
            - title_types (list, opcional): Lista de tipos de título para filtrar (p.ej. ["movie", "tvSeries"]).
            - title_types_mode (str, opcional): Modo de filtro de tipos de título ('include' o 'exclude').

    Returns:
        str: Consulta SQL que limpia y filtra los datos en un solo paso.

    Raises:
        ValueError: Si faltan parámetros obligatorios o no son válidos.
    """
    # Extraer parámetros
    source_table = params.get('source_table')
    target_table = params.get('target_table')
    year_mode = params.get('year_mode')
    start_year = params.get('start_year')
    end_year = params.get('end_year')
    filter_genres = params.get('genres', [])
    genres_mode = params.get('genres_mode', 'include')
    filter_title_types = params.get('title_types', [])
    title_types_mode = params.get('title_types_mode', 'include')

    # Validación básica
    if not source_table or not target_table:
        raise ValueError("Parámetros 'source_table' y 'target_table' son obligatorios.")

    # Validación de year_mode y años
    if year_mode and year_mode not in ['from', 'to', 'between']:
        raise ValueError("El parámetro 'year_mode' debe ser uno de: 'from', 'to', 'between'.")

    if year_mode == 'from' and not start_year:
        raise ValueError("El parámetro 'start_year' es obligatorio cuando 'year_mode' es 'from'.")
    if year_mode == 'to' and not end_year:
        raise ValueError("El parámetro 'end_year' es obligatorio cuando 'year_mode' es 'to'.")
    if year_mode == 'between' and (not start_year or not end_year):
        raise ValueError("Los parámetros 'start_year' y 'end_year' son obligatorios cuando 'year_mode' es 'between'.")

    # Lista de géneros de películas (para las columnas binarias)
    genres_list = [
        'Action', 'Adventure', 'Animation', 'Biography', 'Comedy', 'Crime',
        'Documentary', 'Drama', 'Family', 'Fantasy', 'Film-Noir', 'History',
        'Horror', 'Music', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Sport',
        'Thriller', 'War', 'Western'
    ]

    # -- Sección de limpieza (subconsulta) --
    # Construimos la parte SELECT que crea las columnas calculadas:
    cleaning_select = """
        tconst,
        titleType,
        primaryTitle,
        originalTitle,
        CASE WHEN isAdult = 1 THEN TRUE ELSE FALSE END AS isAdult,
        startYear,
        endYear,
        CASE
            WHEN endYear IS NOT NULL THEN endYear - startYear
            ELSE EXTRACT(YEAR FROM CURRENT_DATE()) - startYear
        END AS duration_years,
        runtimeMinutes,
        CASE
            WHEN runtimeMinutes < 30 THEN 'Short'
            WHEN runtimeMinutes BETWEEN 30 AND 90 THEN 'Medium'
            ELSE 'Long'
        END AS runtime_category,
    """

    # Agregar columnas binarias para géneros
    # (Usamos COALESCE(genres, '') en CONTAINS_SUBSTR() para evitar que sea NULL y devuelva NULL).
    for genre in genres_list:
        col_name = genre.replace('-', '_')  # Reemplazar guiones por _
        cleaning_select += f"IF(CONTAINS_SUBSTR(COALESCE(genres, ''), '{genre}'), TRUE, FALSE) AS {col_name},\n    "

    # Quitamos la última coma y salto de línea:
    cleaning_select = cleaning_select.rstrip(",\n ")

    # Armamos la subconsulta que limpia
    # IMPORTANTE: Se encierra en paréntesis para poder filtrar después
    cleaning_subquery = f"""
    (
        SELECT
            {cleaning_select}
        FROM {source_table}
    )
    AS cleaned
    """

    # -- Sección de filtrado --
    # Iniciamos la consulta final con CREATE OR REPLACE
    sql_query_str = f"CREATE OR REPLACE TABLE {target_table} AS\n"

    # Añadimos el SELECT * FROM (subconsulta) y un WHERE 1=1
    sql_query_str += f"SELECT *\nFROM {cleaning_subquery}\nWHERE 1=1"

    # Filtro por año (startYear)
    if year_mode == 'from':
        sql_query_str += f" AND startYear >= {start_year}"
    elif year_mode == 'to':
        sql_query_str += f" AND startYear <= {end_year}"
    elif year_mode == 'between':
        sql_query_str += f" AND startYear BETWEEN {start_year} AND {end_year}"

    # Filtro por géneros
    if filter_genres:
        if genres_mode == 'include':
            genre_conditions = " OR ".join([f"{g.replace('-', '_')} = TRUE" for g in filter_genres])
        elif genres_mode == 'exclude':
            genre_conditions = " AND ".join([f"{g.replace('-', '_')} = FALSE" for g in filter_genres])
        sql_query_str += f" AND ({genre_conditions})"

    # Filtro por tipos de título
    if filter_title_types:
        if title_types_mode == 'include':
            type_conditions = " OR ".join([f"titleType = '{tt}'" for tt in filter_title_types])
        elif title_types_mode == 'exclude':
            type_conditions = " AND ".join([f"titleType != '{tt}'" for tt in filter_title_types])
        sql_query_str += f" AND ({type_conditions})"

    # Terminamos la instrucción
    sql_query_str += ";"

    return sql_query_str


In [24]:
# @title imdb_scrape_crew_data_bq()
import time
from datetime import datetime

import pandas as pd
import pandas_gbq
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from bs4 import BeautifulSoup

# Para limpiar salida en Colab/Jupyter
from IPython.display import clear_output

# Librerías de BigQuery (Storage API)
from google.cloud import bigquery
from google.cloud import bigquery_storage_v1

def imdb_scrape_crew_data_bq(params: dict) -> None:
    """
    Realiza scraping de los datos de artistas (crew) de títulos desde IMDb y exporta la información
    a una tabla de BigQuery. Incluye una inserción de marcador "NoCrew" si un título no tiene
    registros de crew, para no volver a scrapearlo en futuras ejecuciones.

    Columnas resultantes (en la tabla destino):
      - tconst (STRING)
      - primaryTitle (STRING)
      - nconst (STRING o NULL)
      - primaryName (STRING o NULL)
      - category (STRING, p.ej. "Actor", "NoCrew", etc.)
      - job (STRING o NULL)
      - name_url (STRING o NULL)
      - date_scraped (STRING de fecha/hora ISO8601)

    Args:
        params (dict):
            - gcp_project_id (str)
            - source_bq_table (str)
            - target_bq_table (str)
            - excluded_categories (list[str])
            - chunk_size (int, opcional): Default=1000
            - wait_seconds (int, opcional): Default=1
            - testing_tconsts (list[str], opcional): Para pruebas rápidas.

    Returns:
        None
    """

    # 1) Validar parámetros
    gcp_project_id = params.get("gcp_project_id")
    source_bq_table = params.get("source_bq_table")
    target_bq_table = params.get("target_bq_table")

    if not gcp_project_id or not source_bq_table or not target_bq_table:
        raise ValueError("Faltan claves obligatorias: 'gcp_project_id', 'source_bq_table', 'target_bq_table'.")

    # 2) Parámetros opcionales
    excluded_categories = params.get("excluded_categories", [])
    chunk_size = params.get("chunk_size", 1000)  # Aumentado para mejor rendimiento
    wait_seconds = params.get("wait_seconds", 1)
    testing_tconsts = params.get("testing_tconsts", [])

    # 3) Crear clientes de BigQuery
    bq_client = bigquery.Client(project=gcp_project_id)
    bqstorage_client = bigquery_storage_v1.BigQueryReadClient()

    # 4) Leer la tabla de destino (para no re-scrapear tconst) con Storage API
    existing_tconsts = set()
    try:
        query_exists = f"SELECT DISTINCT tconst FROM `{target_bq_table}`"
        query_job = bq_client.query(query_exists)
        dest_results = query_job.result()
        df_existing = dest_results.to_dataframe(bqstorage_client=bqstorage_client)
        existing_tconsts = set(df_existing["tconst"].unique())
        print(f"[Info] Encontrados {len(existing_tconsts)} tconst procesados en '{target_bq_table}'.")
    except Exception as e:
        print(f"[Info] No se pudo leer '{target_bq_table}' o está vacía. Error: {e}")

    # 5) Leer la tabla de origen con Storage API
    if testing_tconsts:
        list_str = ", ".join([f"'{tc}'" for tc in testing_tconsts])
        query_str = f"""
            SELECT tconst, primaryTitle
            FROM `{source_bq_table}`
            WHERE tconst IN ({list_str})
        """
        print(f"[Modo TEST] Leyendo solo {testing_tconsts} de '{source_bq_table}'.")
    else:
        query_str = f"""
            SELECT tconst, primaryTitle
            FROM `{source_bq_table}`
        """
        print(f"[Modo COMPLETO] Leyendo todos los tconst desde '{source_bq_table}'.")

    query_job = bq_client.query(query_str)
    src_results = query_job.result()
    tconst_df = src_results.to_dataframe(bqstorage_client=bqstorage_client)
    total_titles = len(tconst_df)
    print(f"[Info] Se han leído {total_titles} títulos desde '{source_bq_table}'.")

    # 6) Configurar la sesión HTTP con reintentos para IMDb
    session = requests.Session()
    retry_strategy = Retry(
        total=5,
        backoff_factor=1,
        status_forcelist=[429, 500, 502, 503, 504],
        allowed_methods=["HEAD", "GET", "OPTIONS"]
    )
    adapter = HTTPAdapter(max_retries=retry_strategy)
    session.mount("https://", adapter)
    session.mount("http://", adapter)

    # Helpers
    def normalize_category(cat: str) -> str:
        return " ".join(cat.lower().split())

    excluded_categories_norm = [normalize_category(cat) for cat in excluded_categories]

    def parse_nconst(artist_link):
        if not artist_link:
            return None
        href_val = artist_link.get("href", "")
        parts = href_val.split("/")
        if len(parts) > 2 and parts[1] == "name":
            return parts[2]  # nm0000123
        return None

    def get_job_for_non_cast(artist_link):
        if not artist_link:
            return None
        job_cell = artist_link.parent.parent.select_one("td[class=credit]")
        if job_cell:
            return job_cell.text.strip().replace("(", "").replace(")", "")
        return None

    # 7) Estructuras para scraping y logs
    crew_data_list = []
    logs_for_chunk = []
    processed_count = 0

    # 8) Iterar sobre cada título a scrapear
    for index, row in tconst_df.iterrows():
        tconst = row["tconst"]
        primary_title = row.get("primaryTitle", "Título desconocido")

        if tconst in existing_tconsts:
            continue  # Saltar si ya existe

        try:
            credits_url = f"https://www.imdb.com/title/{tconst}/fullcredits"
            response = session.get(credits_url, timeout=15)
            response.raise_for_status()

            soup = BeautifulSoup(response.text, "lxml")
            credits_section = soup.select_one('div[id="fullcredits_content"]')
            if not credits_section:
                logs_for_chunk.append(f"[Advertencia] No se encontró 'fullcredits_content' para '{tconst}'.")
                # Insertar un registro marcador "NoCrew"
                crew_data_list.append({
                    "tconst": tconst,
                    "primaryTitle": primary_title,
                    "nconst": None,
                    "primaryName": None,
                    "category": "NoCrew",  # Etiqueta especial
                    "job": None,
                    "name_url": None,
                    "date_scraped": datetime.utcnow().isoformat()
                })
                processed_count += 1
                continue

            local_count = 0
            current_category = ""
            skip_this_category = False

            for element in credits_section.find_all(["h4", "table"], recursive=False):
                if element.name == "h4":
                    current_category = element.get_text(strip=True).replace("&nbsp;", "")
                    cat_norm = normalize_category(current_category)
                    skip_this_category = (cat_norm in excluded_categories_norm)

                elif element.name == "table":
                    if skip_this_category:
                        # Se omite la tabla entera
                        continue

                    table_classes = element.get("class", [])
                    is_cast_table = ("cast_list" in table_classes)

                    if is_cast_table:
                        rows = element.select("tr.odd, tr.even")
                        for row_tr in rows:
                            tds = row_tr.find_all("td", recursive=False)
                            if len(tds) < 2:
                                continue
                            artist_link = tds[1].select_one("a")
                            if artist_link:
                                nconst_val = parse_nconst(artist_link)
                                primary_name = artist_link.text.strip()

                                crew_data_list.append({
                                    "tconst": tconst,
                                    "primaryTitle": primary_title,
                                    "nconst": nconst_val,
                                    "primaryName": primary_name,
                                    "category": current_category,
                                    "job": None,
                                    "name_url": f"https://www.imdb.com{artist_link['href']}",
                                    "date_scraped": datetime.utcnow().isoformat()
                                })
                                local_count += 1
                    else:
                        # Otras categorías
                        name_cells = element.select("tbody tr td[class=name] a")
                        for artist_link in name_cells:
                            nconst_val = parse_nconst(artist_link)
                            job_val = get_job_for_non_cast(artist_link)
                            primary_name = artist_link.text.strip()

                            crew_data_list.append({
                                "tconst": tconst,
                                "primaryTitle": primary_title,
                                "nconst": nconst_val,
                                "primaryName": primary_name,
                                "category": current_category,
                                "job": job_val,
                                "name_url": f"https://www.imdb.com{artist_link['href']}",
                                "date_scraped": datetime.utcnow().isoformat()
                            })
                            local_count += 1

            # Si el título no aportó filas, insertar un registro "NoCrew"
            if local_count == 0:
                crew_data_list.append({
                    "tconst": tconst,
                    "primaryTitle": primary_title,
                    "nconst": None,
                    "primaryName": None,
                    "category": "NoCrew",
                    "job": None,
                    "name_url": None,
                    "date_scraped": datetime.utcnow().isoformat()
                })
                logs_for_chunk.append(f"[Info] Título sin crew. Registrado con marcador 'NoCrew' para {tconst}")

            processed_count += 1
            logs_for_chunk.append(
                f"Se extrajeron {local_count} filas para '{primary_title}' (tconst={tconst})."
            )

            # Subida parcial al completar chunk_size
            if (processed_count % chunk_size == 0):
                if crew_data_list:
                    chunk_df = pd.DataFrame(crew_data_list)

                    # Asegurar tipos de datos
                    chunk_df['tconst'] = chunk_df['tconst'].astype(str)
                    chunk_df['primaryTitle'] = chunk_df['primaryTitle'].astype(str)
                    chunk_df['nconst'] = chunk_df['nconst'].astype('object')  # STRING o NULL
                    chunk_df['primaryName'] = chunk_df['primaryName'].astype('object')  # STRING o NULL
                    chunk_df['category'] = chunk_df['category'].astype(str)
                    chunk_df['job'] = chunk_df['job'].astype('object')  # STRING o NULL
                    chunk_df['name_url'] = chunk_df['name_url'].astype('object')  # STRING o NULL
                    chunk_df['date_scraped'] = chunk_df['date_scraped'].astype(str)

                    # Subir a BigQuery usando pandas_gbq.to_gbq
                    pandas_gbq.to_gbq(
                        chunk_df,
                        destination_table=target_bq_table,
                        project_id=gcp_project_id,
                        if_exists="append",
                        table_schema=[
                            {"name": "tconst", "type": "STRING"},
                            {"name": "primaryTitle", "type": "STRING"},
                            {"name": "nconst", "type": "STRING"},
                            {"name": "primaryName", "type": "STRING"},
                            {"name": "category", "type": "STRING"},
                            {"name": "job", "type": "STRING"},
                            {"name": "name_url", "type": "STRING"},
                            {"name": "date_scraped", "type": "TIMESTAMP"}
                        ]
                    )
                    chunk_len = len(chunk_df)
                    print(f"[Info] Subidas {chunk_len} filas a '{target_bq_table}' (chunk_size alcanzado).")

                    # Limpiar la pantalla y mostrar los logs del chunk
                    clear_output(wait=True)
                    print("\n".join(logs_for_chunk))
                    print(f"\n[Info] Subidas {chunk_len} filas a '{target_bq_table}' (chunk_size alcanzado).")

                    # Resetear estructuras
                    crew_data_list = []
                    logs_for_chunk = []
                    existing_tconsts.update(chunk_df['tconst'].unique())

        except requests.RequestException as e:
            logs_for_chunk.append(f"[Error] Falló la solicitud para '{tconst}': {e}")

        time.sleep(wait_seconds)

    # 9) Subir datos restantes (si quedan)
    if crew_data_list or logs_for_chunk:
        if crew_data_list:
            final_df = pd.DataFrame(crew_data_list)

            # Asegurar tipos de datos
            final_df['tconst'] = final_df['tconst'].astype(str)
            final_df['primaryTitle'] = final_df['primaryTitle'].astype(str)
            final_df['nconst'] = final_df['nconst'].astype('object')  # STRING o NULL
            final_df['primaryName'] = final_df['primaryName'].astype('object')  # STRING o NULL
            final_df['category'] = final_df['category'].astype(str)
            final_df['job'] = final_df['job'].astype('object')  # STRING o NULL
            final_df['name_url'] = final_df['name_url'].astype('object')  # STRING o NULL
            final_df['date_scraped'] = final_df['date_scraped'].astype(str)

            # Subir a BigQuery usando pandas_gbq.to_gbq
            pandas_gbq.to_gbq(
                final_df,
                destination_table=target_bq_table,
                project_id=gcp_project_id,
                if_exists="append",
                table_schema=[
                    {"name": "tconst", "type": "STRING"},
                    {"name": "primaryTitle", "type": "STRING"},
                    {"name": "nconst", "type": "STRING"},
                    {"name": "primaryName", "type": "STRING"},
                    {"name": "category", "type": "STRING"},
                    {"name": "job", "type": "STRING"},
                    {"name": "name_url", "type": "STRING"},
                    {"name": "date_scraped", "type": "TIMESTAMP"}
                ]
            )
            final_len = len(final_df)
            print(f"[Info] Subidas {final_len} filas finales a '{target_bq_table}'.")

        # Limpiar la pantalla y mostrar logs finales
        if logs_for_chunk:
            clear_output(wait=True)
            print("\n".join(logs_for_chunk))
            print(f"\n[Info] Subidas datos restantes a '{target_bq_table}'.")

    # 10) Mensaje final
    print(f"\n[Completado] Se procesaron {processed_count} títulos nuevos. "
          f"Los datos de crew (y 'NoCrew') se almacenaron en '{target_bq_table}'.")


# PRODUCCIÓN

In [25]:
# @title EXPORTAR GCS -> GBQ
import os
import dask.dataframe as dd
import pandas as pd
import pandas_gbq
from google.cloud import storage
import io
from google.colab import auth
auth.authenticate_user()
params = {
    "gcp_project_id": "animum-dev-datawarehouse",
    "gcs_bucket_name": "imdb_raw_01",
    "tsv_files": [
        "name.basics.tsv",
        "title.basics.tsv",
        "title.akas.tsv",
        "title.principals.tsv"
        "title.crew.tsv",
        "title.episode.tsv",
        "title.ratings.tsv"
    ],
    "gbq_dataset_id": "IMDb_raw_01",
    "table_suffix": "raw",
    "block_size": 256e6,  # 256 MB, ajústalo según tu RAM
    "remove_local": True,
    # Este mapeo cubre las columnas enteras que podrían tener \N.
    # Ajusta si hay más.
    "dtypes_map": {
        "birthYear": "Int64",        # name.basics
        "deathYear": "Int64",        # name.basics
        "startYear": "Int64",        # title.basics
        "endYear": "Int64",          # title.basics
        "runtimeMinutes": "Int64",   # title.basics
        "ordering": "Int64",         # title.principals, title.akas, etc.
        "isAdult": "Int64"           # title.basics (0/1 -> Int64).
                                     # Luego podrías convertir a boolean si lo deseas.
    }
}

# GCS_tsv_to_GBQ(params)

In [26]:
# @title DATASETS LIST
from google.colab import auth
auth.authenticate_user()

from google.cloud import bigquery

config = {
    'project_id': 'animum-dev-datawarehouse',
    'datasets': ["IMDb_raw_01"],  # Opcional: lista de datasets específicos
    'include_tables': True,  # Opcional: por defecto es True
}

esquema_dic = GBQ_schema_dic(config)

from pprint import pprint # Importing the pprint function

pprint(esquema_dic)



{'IMDb_raw_01': {'name_basics_raw': {'fields': [{'name': 'nconst',
                                                 'type': 'STRING'},
                                                {'name': 'primaryName',
                                                 'type': 'STRING'},
                                                {'name': 'birthYear',
                                                 'type': 'INTEGER'},
                                                {'name': 'deathYear',
                                                 'type': 'INTEGER'},
                                                {'name': 'primaryProfession',
                                                 'type': 'STRING'},
                                                {'name': 'knownForTitles',
                                                 'type': 'STRING'}]},
                 'title_akas_raw': {'fields': [{'name': 'titleId',
                                                'type': 'STRING'},
                      

In [27]:
# @title TABLAS RAW INFO

# Configurar para mostrar todas las filas y columnas
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
params = {
    'project_id': 'animum-dev-datawarehouse',
    'dataset_ids': ['IMDb_staging_01']
}

df = GBQ_tables_info(params)
display(df)
pd.reset_option('display.max_rows')
pd.reset_option('display.max_columns')

Unnamed: 0,dataset_id,table_name,num_rows,num_columns,size_mb
0,IMDb_staging_01,crew_scraped,27066931,8,4171.9
1,IMDb_staging_01,title_basics_filtered,685450,32,74.53


In [28]:
# @title IMDb_staging title_basics_filtered

params = {
    "source_table": "animum-dev-datawarehouse.IMDb_raw_01.title_basics_raw",
    "target_table": "animum-dev-datawarehouse.IMDb_staging_01.title_basics_filtered",

    "year_mode": "from",                    # Opciones: 'from', 'to', 'between'
    "start_year": 2000,
    "end_year": 2050,

    "genres_mode": "include",               # Opciones: 'include', 'exclude'
    "genres": ["Action", "Adventure", "Animation"],
                                            # Géneros disponibles: 'Action', 'Adventure', 'Animation', 'Biography', 'Comedy',
                                            # 'Crime', 'Documentary', 'Drama', 'Family', 'Fantasy', 'Film-Noir', 'History',
                                            # 'Horror', 'Music', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Sport', 'Thriller', 'War', 'Western'

    "title_types_mode": "exclude",          # Opciones: 'include', 'exclude'
    "title_types": ["tvSpecial", "video", "videoGame"]
                                            # Tipos disponibles: "tvEpisode", "short", "movie", "tvSeries", "videoGame",
                                            # "tvMiniSeries", "tvMovie", "tvShort", "video"
    }

print(generate_sql_cleaning_filter_str(params))


CREATE OR REPLACE TABLE animum-dev-datawarehouse.IMDb_staging_01.title_basics_filtered AS
SELECT *
FROM 
    (
        SELECT
            
        tconst,
        titleType,
        primaryTitle,
        originalTitle,
        CASE WHEN isAdult = 1 THEN TRUE ELSE FALSE END AS isAdult,
        startYear,
        endYear,
        CASE
            WHEN endYear IS NOT NULL THEN endYear - startYear
            ELSE EXTRACT(YEAR FROM CURRENT_DATE()) - startYear
        END AS duration_years,
        runtimeMinutes,
        CASE
            WHEN runtimeMinutes < 30 THEN 'Short'
            WHEN runtimeMinutes BETWEEN 30 AND 90 THEN 'Medium'
            ELSE 'Long'
        END AS runtime_category,
    IF(CONTAINS_SUBSTR(COALESCE(genres, ''), 'Action'), TRUE, FALSE) AS Action,
    IF(CONTAINS_SUBSTR(COALESCE(genres, ''), 'Adventure'), TRUE, FALSE) AS Adventure,
    IF(CONTAINS_SUBSTR(COALESCE(genres, ''), 'Animation'), TRUE, FALSE) AS Animation,
    IF(CONTAINS_SUBSTR(COALESCE(genres, ''), 'Bio

In [29]:
# @title SCRAPPING

%%time

# Autenticar tu cuenta de Google para acceder a BigQuery
from google.colab import auth
auth.authenticate_user()

# 4) Definir los parámetros
params = {
    "gcp_project_id": "animum-dev-datawarehouse",
    "source_bq_table": "animum-dev-datawarehouse.IMDb_staging_01.title_basics_filtered",
    "target_bq_table": "animum-dev-datawarehouse.IMDb_staging_01.crew_scraped",
    "excluded_categories": [
        # "Additional Crew", "Camera and Electrical Department", "Casting By",
        # "Casting Department", "Costume and Wardrobe Department", "Editing by",
        # "Music Department", "Production Management", "Sound Department"
    ],
    "chunk_size": 1000,       # Subir datos a BigQuery cada n títulos procesados
    "wait_seconds": 1,      # Espera 1 segundo entre requests para no saturar IMDb
    "testing_tconsts": []   # Si lo dejas vacío, procesará TODOS los tconst de la tabla.
                            # Si pones p.ej. ["tt11198330", "tt0332379"], solo procesa esos.
}

imdb_scrape_crew_data_bq(params)


[Error] Falló la solicitud para 'tt31108335': 404 Client Error:  for url: https://www.imdb.com/title/tt31108335/fullcredits
[Error] Falló la solicitud para 'tt33065242': 404 Client Error:  for url: https://www.imdb.com/title/tt33065242/fullcredits
[Error] Falló la solicitud para 'tt33511185': 404 Client Error:  for url: https://www.imdb.com/title/tt33511185/fullcredits
[Error] Falló la solicitud para 'tt33253648': 404 Client Error:  for url: https://www.imdb.com/title/tt33253648/fullcredits
[Error] Falló la solicitud para 'tt33071558': 404 Client Error:  for url: https://www.imdb.com/title/tt33071558/fullcredits
[Error] Falló la solicitud para 'tt33079280': 404 Client Error:  for url: https://www.imdb.com/title/tt33079280/fullcredits
[Error] Falló la solicitud para 'tt33079278': 404 Client Error:  for url: https://www.imdb.com/title/tt33079278/fullcredits
[Error] Falló la solicitud para 'tt33081387': 404 Client Error:  for url: https://www.imdb.com/title/tt33081387/fullcredits
[Error] 