In [38]:
import os
import requests
import pandas as pd
from google.cloud import bigquery, storage

# 🔑 Variables de config GCP et TMDB
bucket_name = "tmdb-data-bucket"
bq_project = "tmdb-elt-project-460312"
dataset_raw = "tmdb_raw"
dataset_clean = "tmdb_clean"
table_raw = f"{bq_project}.{dataset_raw}.movies"
table_facts = f"{bq_project}.{dataset_clean}.movies"
table_genres = f"{bq_project}.{dataset_clean}.dim_genres"
table_relation = f"{bq_project}.{dataset_clean}.movie_genres"
gcs_movies = f"gs://{bucket_name}/movies_raw.csv"
gcs_genres = f"gs://{bucket_name}/dim_genres.csv"
gcs_movie_genres = f"gs://{bucket_name}/movie_genres.csv"

# 🔧 Clients GCP
client = bigquery.Client()
storage_client = storage.Client()

# 📁 Créer le bucket si inexistant
bucket = storage_client.bucket(bucket_name)
if not bucket.exists():
    storage_client.create_bucket(bucket, location="EU")

# 📂 Créer les datasets si inexistants
for dataset in [dataset_raw, dataset_clean]:
    dataset_id = f"{bq_project}.{dataset}"
    try:
        client.get_dataset(dataset_id)
    except:
        ds = bigquery.Dataset(dataset_id)
        ds.location = "EU"
        client.create_dataset(ds)

# 1️⃣ EXTRACT — Films populaires (5 pages)
all_movies = []
for page in range(1, 6):
    url = f"https://api.themoviedb.org/3/movie/popular?api_key={os.environ.get('TMDB_API_KEY')}&language=en-US&page={page}"
    res = requests.get(url)
    if res.status_code == 200:
        all_movies.extend(res.json()["results"])
df = pd.DataFrame(all_movies)
df.to_csv("/tmp/movies_raw.csv", index=False)
os.system(f"gsutil cp /tmp/movies_raw.csv {gcs_movies}")

# 2️⃣ LOAD — Nettoyer puis charger dans table RAW
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    autodetect=True
)
client.query(f"DELETE FROM `{table_raw}` WHERE TRUE").result()
client.load_table_from_uri(gcs_movies, table_raw, job_config=job_config).result()

# 3️⃣ TRANSFORM — Table de faits nettoyée et enrichie
query_facts = f"""
CREATE OR REPLACE TABLE `{table_facts}` AS
SELECT
  id,
  title,
  release_date,
  vote_average,
  vote_count,
  popularity,
  genre_ids,
  original_language,
  CONCAT('https://image.tmdb.org/t/p/w780', backdrop_path) AS backdrop_path,
  CONCAT('https://image.tmdb.org/t/p/w500', poster_path) AS poster_path,
  overview
FROM (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY id ORDER BY popularity DESC) AS rn
  FROM `{table_raw}`
)
WHERE rn = 1 AND release_date IS NOT NULL
"""
client.query(query_facts).result()

# 4️⃣ DIM_GENRES — Appel API pour récupérer tous les genres
genre_url = f"https://api.themoviedb.org/3/genre/movie/list?api_key={os.environ.get('TMDB_API_KEY')}&language=en-US"
res = requests.get(genre_url)
df_genres = pd.DataFrame(res.json()["genres"])
df_genres.columns = ["genre_id", "genre_name"]
df_genres.to_csv("/tmp/dim_genres.csv", index=False)
os.system(f"gsutil cp /tmp/dim_genres.csv {gcs_genres}")
client.load_table_from_uri(gcs_genres, table_genres, job_config=job_config).result()

# 5️⃣ RELATION — Table n-n movie_id <-> genre_id
movie_genres = df[['id', 'genre_ids']].explode('genre_ids').dropna()
movie_genres.columns = ['movie_id', 'genre_id']
movie_genres['genre_id'] = movie_genres['genre_id'].astype(int)
movie_genres.to_csv("/tmp/movie_genres.csv", index=False)
os.system(f"gsutil cp /tmp/movie_genres.csv {gcs_movie_genres}")
client.load_table_from_uri(gcs_movie_genres, table_relation, job_config=job_config).result()

print("🎬✅ Pipeline ELT complet avec schéma en étoile terminé sans doublons")


🎬✅ Pipeline ELT complet avec schéma en étoile terminé sans doublons


In [39]:
import os

# Crée le dossier source si nécessaire
os.makedirs("/tmp/function_src", exist_ok=True)

# 📄 1. Création du fichier main.py
main_py_code = '''
import os
import requests
import pandas as pd
from google.cloud import bigquery, storage

def main(request):
    bucket_name = "tmdb-data-bucket"
    bq_project = "tmdb-elt-project-460312"
    dataset_raw = "tmdb_raw"
    dataset_clean = "tmdb_clean"
    table_raw = f"{bq_project}.{dataset_raw}.movies"
    table_facts = f"{bq_project}.{dataset_clean}.movies"
    table_genres = f"{bq_project}.{dataset_clean}.dim_genres"
    table_relation = f"{bq_project}.{dataset_clean}.movie_genres"
    gcs_movies = f"gs://{bucket_name}/movies_raw.csv"
    gcs_genres = f"gs://{bucket_name}/dim_genres.csv"
    gcs_movie_genres = f"gs://{bucket_name}/movie_genres.csv"

    client = bigquery.Client()
    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    if not bucket.exists():
        storage_client.create_bucket(bucket, location="EU")

    for dataset in [dataset_raw, dataset_clean]:
        dataset_id = f"{bq_project}.{dataset}"
        try:
            client.get_dataset(dataset_id)
        except:
            ds = bigquery.Dataset(dataset_id)
            ds.location = "EU"
            client.create_dataset(ds)

    all_movies = []
    for page in range(1, 6):
        url = f"https://api.themoviedb.org/3/discover/movie?api_key={os.environ.get('TMDB_API_KEY')}&language=en-US&page={page}"
        res = requests.get(url)
        if res.status_code == 200:
            all_movies.extend(res.json()["results"])
    df = pd.DataFrame(all_movies)
    df.to_csv("/tmp/movies_raw.csv", index=False)
    os.system(f"gsutil cp /tmp/movies_raw.csv {gcs_movies}")

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True
    )
    client.query(f"DELETE FROM `{table_raw}` WHERE TRUE").result()
    client.load_table_from_uri(gcs_movies, table_raw, job_config=job_config).result()

    query_facts = f\"\"\"
    CREATE OR REPLACE TABLE `{table_facts}` AS
    SELECT
      id,
      title,
      release_date,
      vote_average,
      vote_count,
      popularity,
      genre_ids,
      original_language,
      CONCAT('https://image.tmdb.org/t/p/w780', backdrop_path) AS backdrop_path,
      CONCAT('https://image.tmdb.org/t/p/w500', poster_path) AS poster_path,
      overview
    FROM (
      SELECT *,
             ROW_NUMBER() OVER (PARTITION BY id ORDER BY popularity DESC) AS rn
      FROM `{table_raw}`
    )
    WHERE rn = 1 AND release_date IS NOT NULL
    \"\"\"
    client.query(query_facts).result()

    genre_url = f"https://api.themoviedb.org/3/genre/movie/list?api_key={os.environ.get('TMDB_API_KEY')}&language=en-US"
    res = requests.get(genre_url)
    df_genres = pd.DataFrame(res.json()["genres"])
    df_genres.columns = ["genre_id", "genre_name"]
    df_genres.to_csv("/tmp/dim_genres.csv", index=False)
    os.system(f"gsutil cp /tmp/dim_genres.csv {gcs_genres}")
    client.load_table_from_uri(gcs_genres, table_genres, job_config=job_config).result()

    movie_genres = df[['id', 'genre_ids']].explode('genre_ids').dropna()
    movie_genres.columns = ['movie_id', 'genre_id']
    movie_genres['genre_id'] = movie_genres['genre_id'].astype(int)
    movie_genres.to_csv("/tmp/movie_genres.csv", index=False)
    os.system(f"gsutil cp /tmp/movie_genres.csv {gcs_movie_genres}")
    client.load_table_from_uri(gcs_movie_genres, table_relation, job_config=job_config).result()

    return "✅ Pipeline exécuté avec succès"
'''

with open("/tmp/function_src/main.py", "w") as f:
    f.write(main_py_code)

# 📄 2. Création du fichier requirements.txt
with open("/tmp/function_src/requirements.txt", "w") as f:
    f.write("pandas\nrequests\ngoogle-cloud-bigquery\ngoogle-cloud-storage\n")


In [40]:
api_key = os.environ["TMDB_API_KEY"]

!gcloud functions deploy orchestrate_pipeline \
  --entry-point main \
  --runtime python310 \
  --trigger-http \
  --allow-unauthenticated \
  --source=/tmp/function_src \
  --region=europe-west1 \
  --project=tmdb-elt-project-460312 \
  --set-env-vars TMDB_API_KEY={api_key}


  [INFO] A new revision will be deployed serving with 100% traffic.
You can view your function in the Cloud Console here: https://console.cloud.google.com/functions/details/europe-west1/orchestrate_pipeline?project=tmdb-elt-project-460312

buildConfig:
  automaticUpdatePolicy: {}
  build: projects/1092783899634/locations/europe-west1/builds/b653d0bf-a762-42f8-8fcf-3df64588a4b8
  dockerRegistry: ARTIFACT_REGISTRY
  dockerRepository: projects/tmdb-elt-project-460312/locations/europe-west1/repositories/gcf-artifacts
  entryPoint: main
  runtime: python310
  serviceAccount: projects/tmdb-elt-project-460312/serviceAccounts/1092783899634-compute@developer.gserviceaccount.com
  source:
    storageSource:
      bucket: gcf-v2-sources-1092783899634-europe-west1
      generation: '1747732678536892'
      object: orchestrate_pipeline/function-source.zip
  sourceProvenance:
    resolvedStorageSource:
      bucket: gcf-v2-sources-1092783899634-europe-west1
      generation: '1747732678536892'
     

In [41]:
!mkdir -p tmdb-elt-pipeline/notebook
!mkdir -p tmdb-elt-pipeline/function_src
!mkdir -p tmdb-elt-pipeline/docs


In [42]:
from google.colab import files
files.upload()

Saving tmdb_pipeline.ipynb to tmdb_pipeline.ipynb


{'tmdb_pipeline.ipynb': b'{\n  "nbformat": 4,\n  "nbformat_minor": 0,\n  "metadata": {\n    "colab": {\n      "cell_execution_strategy": "setup",\n      "provenance": [],\n      "name": "tmdb_pipeline"\n    },\n    "kernelspec": {\n      "name": "python3",\n      "display_name": "Python 3"\n    },\n    "language_info": {\n      "name": "python"\n    }\n  },\n  "cells": [\n    {\n      "cell_type": "code",\n      "source": [\n        "!echo \\"TMDB_API_KEY=3cfa35042af236cd2d3d3ad121a6e865\\" > .env"\n      ],\n      "metadata": {\n        "id": "m20xwGn6mmAQ"\n      },\n      "execution_count": null,\n      "outputs": []\n    },\n    {\n      "cell_type": "code",\n      "source": [\n        "!cat .env"\n      ],\n      "metadata": {\n        "colab": {\n          "base_uri": "https://localhost:8080/"\n        },\n        "id": "C1K9k6d2m6Md",\n        "executionInfo": {\n          "status": "ok",\n          "timestamp": 1747727525637,\n          "user_tz": -120,\n          "elapsed": 21

In [43]:
!cp tmdb_pipeline.ipynb tmdb-elt-pipeline/notebook/


In [44]:
import os

main_py_path = "tmdb-elt-pipeline/function_src/main.py"
reqs_path = "tmdb-elt-pipeline/requirements.txt"

os.makedirs(os.path.dirname(main_py_path), exist_ok=True)

main_code = '''import os
import requests
import pandas as pd
from google.cloud import bigquery, storage

def main(request):
    bucket_name = "tmdb-data-bucket"
    bq_project = "tmdb-elt-project-460312"
    dataset_raw = "tmdb_raw"
    dataset_clean = "tmdb_clean"
    table_raw = f"{bq_project}.{dataset_raw}.movies"
    table_facts = f"{bq_project}.{dataset_clean}.movies"
    table_genres = f"{bq_project}.{dataset_clean}.dim_genres"
    table_relation = f"{bq_project}.{dataset_clean}.movie_genres"
    gcs_movies = f"gs://{bucket_name}/movies_raw.csv"
    gcs_genres = f"gs://{bucket_name}/dim_genres.csv"
    gcs_movie_genres = f"gs://{bucket_name}/movie_genres.csv"

    client = bigquery.Client()
    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    if not bucket.exists():
        storage_client.create_bucket(bucket, location="EU")

    for dataset in [dataset_raw, dataset_clean]:
        dataset_id = f"{bq_project}.{dataset}"
        try:
            client.get_dataset(dataset_id)
        except:
            ds = bigquery.Dataset(dataset_id)
            ds.location = "EU"
            client.create_dataset(ds)

    all_movies = []
    for page in range(1, 6):
        url = f"https://api.themoviedb.org/3/discover/movie?api_key={os.environ.get('TMDB_API_KEY')}&language=en-US&page={page}"
        res = requests.get(url)
        if res.status_code == 200:
            all_movies.extend(res.json()["results"])
    df = pd.DataFrame(all_movies)
    df.to_csv("/tmp/movies_raw.csv", index=False)
    os.system(f"gsutil cp /tmp/movies_raw.csv {gcs_movies}")

    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True
    )
    client.query(f"DELETE FROM `{table_raw}` WHERE TRUE").result()
    client.load_table_from_uri(gcs_movies, table_raw, job_config=job_config).result()

    query_facts = f\"\"\"
    CREATE OR REPLACE TABLE `{table_facts}` AS
    SELECT
      id,
      title,
      release_date,
      vote_average,
      vote_count,
      popularity,
      genre_ids,
      original_language,
      CONCAT('https://image.tmdb.org/t/p/w780', backdrop_path) AS backdrop_path,
      CONCAT('https://image.tmdb.org/t/p/w500', poster_path) AS poster_path,
      overview
    FROM (
      SELECT *,
             ROW_NUMBER() OVER (PARTITION BY id ORDER BY popularity DESC) AS rn
      FROM `{table_raw}`
    )
    WHERE rn = 1 AND release_date IS NOT NULL
    \"\"\"
    client.query(query_facts).result()

    genre_url = f"https://api.themoviedb.org/3/genre/movie/list?api_key={os.environ.get('TMDB_API_KEY')}&language=en-US"
    res = requests.get(genre_url)
    df_genres = pd.DataFrame(res.json()["genres"])
    df_genres.columns = ["genre_id", "genre_name"]
    df_genres.to_csv("/tmp/dim_genres.csv", index=False)
    os.system(f"gsutil cp /tmp/dim_genres.csv {gcs_genres}")
    client.load_table_from_uri(gcs_genres, table_genres, job_config=job_config).result()

    movie_genres = df[['id', 'genre_ids']].explode('genre_ids').dropna()
    movie_genres.columns = ['movie_id', 'genre_id']
    movie_genres['genre_id'] = movie_genres['genre_id'].astype(int)
    movie_genres.to_csv("/tmp/movie_genres.csv", index=False)
    os.system(f"gsutil cp /tmp/movie_genres.csv {gcs_movie_genres}")
    client.load_table_from_uri(gcs_movie_genres, table_relation, job_config=job_config).result()

    return "✅ Pipeline exécuté avec succès"'''
with open(main_py_path, "w") as f:
    f.write(main_code)

with open(reqs_path, "w") as f:
    f.write("pandas\nrequests\ngoogle-cloud-bigquery\ngoogle-cloud-storage\n")


In [45]:
!echo ".env\n__pycache__/\n*.pyc\n.DS_Store\n" > tmdb-elt-pipeline/.gitignore
!touch tmdb-elt-pipeline/README.md

In [46]:
%cd /content/tmdb-elt-pipeline

/content/tmdb-elt-pipeline


In [None]:
token = "dz213"
!git remote set-url origin https://{token}@github.com/YanisZedira/ETL_MOVIES_PIPELINE.git

In [48]:
!git pull origin main --allow-unrelated-histories
!git push -u origin main

From https://github.com/YanisZedira/ETL_MOVIES_PIPELINE
 * branch            main       -> FETCH_HEAD
Already up to date.
Branch 'main' set up to track remote branch 'main' from 'origin'.
Everything up-to-date


In [49]:
!git push -u origin main --force

Branch 'main' set up to track remote branch 'main' from 'origin'.
Everything up-to-date
