# üì¶ 11_load_playlist_items_snapshot_to_bigquery

### üéØ Objetivo  

Este notebook no transforma datos.
Su √∫nica responsabilidad es: Cargar df_playlist_items_snapshot (generado en el notebook 08) hacia BigQuery como tabla hist√≥rica.

Destino:  
youtube-datasets-360.angelgarciadatablog.playlist_items_snapshot

In [None]:
from dotenv import load_dotenv
import os
from google.cloud import bigquery


In [None]:
load_dotenv()

PROJECT_ID = os.getenv("GCP_PROJECT")
DATASET_ID = "angelgarciadatablog"
TABLE_ID = "playlist_items_snapshot"

FULL_TABLE_ID = f"{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}"

client = bigquery.Client(project=PROJECT_ID)

print("Destino configurado:", FULL_TABLE_ID)


## üß± Cargar snapshot desde Parquet (temporal)  

‚ö†Ô∏è Nota temporal:
Durante la fase de notebooks, el DataFrame se carga desde Parquet como mecanismo de intercambio entre notebooks.
En la versi√≥n productiva (scripts .py), el DataFrame se pasar√° directamente sin almacenamiento intermedio.

In [None]:
from pathlib import Path
import pandas as pd

PROJECT_ROOT = Path.cwd().parents[0]
PROCESSED_PATH = PROJECT_ROOT / "data" / "processed" / "youtube"

file_path = PROCESSED_PATH / "playlist_items_snapshot.parquet"

df_playlist_items_snapshot = pd.read_parquet(file_path)

df_playlist_items_snapshot.head()

In [None]:
df_playlist_items_snapshot.dtypes

## üèó Crear tabla particionada con el esquema y datos del dataframe 



In [None]:
from google.api_core.exceptions import NotFound
from google.cloud.bigquery import SchemaField

schema = [
    SchemaField("snapshot_date", "DATE"),
    SchemaField("playlist_id", "STRING"),
    SchemaField("video_id", "STRING"),
    SchemaField("position", "INT64"),
    SchemaField("added_at", "TIMESTAMP"),
    SchemaField("extracted_at", "TIMESTAMP"),
]

try:
    client.get_table(FULL_TABLE_ID)
    print("Tabla ya existe.")
    
except NotFound:
    table = bigquery.Table(FULL_TABLE_ID, schema=schema)

    table.time_partitioning = bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="snapshot_date",
    )

    client.create_table(table)
    print("Tabla creada con partici√≥n.")




## üîí Control de idempotencia por `snapshot_date`

Antes de insertar el snapshot actual, se eliminan los registros existentes con el mismo `snapshot_date`.

Esto garantiza que el proceso sea **idempotente**:  Si el pipeline se ejecuta m√∫ltiples veces el mismo d√≠a (pruebas, re-ejecuciones o errores), el resultado final en BigQuery ser√° siempre consistente y sin duplicados.


In [None]:
snapshot_date = df_playlist_items_snapshot["snapshot_date"].iloc[0]

delete_query = f"""
DELETE FROM `{FULL_TABLE_ID}`
WHERE snapshot_date = @snapshot_date
"""

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ScalarQueryParameter(
            "snapshot_date",
            "DATE",
            snapshot_date
        )
    ]
)

client.query(delete_query, job_config=job_config).result()

print(f"Snapshots del {snapshot_date} eliminados si exist√≠an.")

## üìå Cargar datos del parquet a big query

In [None]:
# 2Ô∏è‚É£ Carga los datos desde tu DataFrame hacia BigQuery. WRITE APPEND = Agrega fila nuevas
job_config = bigquery.LoadJobConfig(
    write_disposition="WRITE_APPEND"
)

job = client.load_table_from_dataframe(
    df_playlist_items_snapshot,
    FULL_TABLE_ID,
    job_config=job_config
)

job.result()

print("Carga completada correctamente.")