# Projet Final ‚Äî Analyse de donn√©es IMDB + Stream Processing Wikimedia

Ce notebook r√©pond **de A √† Z** aux consignes du projet :

1. T√©l√©charger les donn√©es IMDB **via une cellule** (pas de t√©l√©chargement manuel sauf exception).
2. Charger et analyser les fichiers n√©cessaires.
3. R√©pondre aux questions (avec r√©ponses en **markdown** via affichage format√©).
4. Mettre en place un mini job de **stream processing** bas√© sur la plateforme d‚Äô√©v√©nements Wikimedia.


In [31]:
# ‚ñ∂Ô∏è Installation (optionnel)
# Si besoin (uniquement si vous ne les avez pas d√©j√†) :
# %pip install pandas numpy requests tqdm sseclient-py

import os
from pathlib import Path
import gzip
import time
import sqlite3
import requests
import pandas as pd
import numpy as np
from datetime import datetime, timezone, date
from IPython.display import display, Markdown

pd.set_option("display.max_columns", 50)


## 1) T√©l√©chargement des donn√©es IMDB

Source officielle : https://datasets.imdbws.com/

‚ö†Ô∏è Certains fichiers sont tr√®s volumineux. Le code ci‚Äëdessous :
- t√©l√©charge uniquement les fichiers utiles √† ce projet ;
- les met en cache dans `./data/` (si d√©j√† pr√©sent, ne ret√©l√©charge pas) ;
- t√©l√©charge en streaming (robuste).

Si un t√©l√©chargement √©choue (r√©seau, proxy, disque), vous pouvez **ajouter manuellement** le fichier dans `./data/` et relancer le notebook.


In [32]:
from tqdm.auto import tqdm

BASE_URL = "https://datasets.imdbws.com/"
DATA_DIR = Path("data")
DATA_DIR.mkdir(exist_ok=True)

IMDB_FILES = [
    # personnes
    "name.basics.tsv.gz",
    # titres + m√©tadonn√©es
    "title.basics.tsv.gz",
    "title.ratings.tsv.gz",
    "title.crew.tsv.gz",
    "title.akas.tsv.gz",
]

def download_file(url: str, dest: Path, chunk_size: int = 1024 * 1024) -> None:
    """T√©l√©charge un fichier en streaming avec barre de progression."""
    if dest.exists() and dest.stat().st_size > 0:
        print(f"‚úÖ D√©j√† pr√©sent : {dest}")
        return

    print(f"‚¨áÔ∏è T√©l√©chargement : {url} -> {dest}")
    with requests.get(url, stream=True, timeout=60) as r:
        r.raise_for_status()
        total = int(r.headers.get("Content-Length", 0))
        pbar = tqdm(total=total, unit="B", unit_scale=True, desc=dest.name)
        with open(dest, "wb") as f:
            for chunk in r.iter_content(chunk_size=chunk_size):
                if chunk:
                    f.write(chunk)
                    pbar.update(len(chunk))
        pbar.close()

for fname in IMDB_FILES:
    download_file(BASE_URL + fname, DATA_DIR / fname)

print("‚úÖ T√©l√©chargements termin√©s.")


‚úÖ D√©j√† pr√©sent : data\name.basics.tsv.gz
‚úÖ D√©j√† pr√©sent : data\title.basics.tsv.gz
‚úÖ D√©j√† pr√©sent : data\title.ratings.tsv.gz
‚úÖ D√©j√† pr√©sent : data\title.crew.tsv.gz
‚úÖ D√©j√† pr√©sent : data\title.akas.tsv.gz
‚úÖ T√©l√©chargements termin√©s.


## 2) Chargement des fichiers

Les fichiers IMDB sont des TSV compress√©s (`.tsv.gz`) avec `\N` pour les valeurs manquantes.

Pour √©viter les probl√®mes m√©moire :
- on utilise des `dtypes` quand c‚Äôest possible ;
- on charge en entier ici (les 5 fichiers du projet sont g√©n√©ralement g√©rables), mais vous pouvez adapter en `chunksize` si besoin.


In [33]:
NA = [r"\N"]

# Dtypes : on garde en string la plupart des colonnes IMDB
name_dtypes = {
    "nconst": "string",
    "primaryName": "string",
    "birthYear": "Int64",
    "deathYear": "Int64",
    "primaryProfession": "string",
    "knownForTitles": "string",
}

title_basics_dtypes = {
    "tconst": "string",
    "titleType": "string",
    "primaryTitle": "string",
    "originalTitle": "string",
    "isAdult": "Int64",
    "startYear": "Int64",
    "endYear": "Int64",
    "runtimeMinutes": "Int64",
    "genres": "string",  # ‚úÖ DOIT rester string (Reality-TV etc.)
}

ratings_dtypes = {"tconst": "string", "averageRating": "float64", "numVotes": "Int64"}
crew_dtypes = {"tconst": "string", "directors": "string", "writers": "string"}

akas_dtypes = {
    "titleId": "string",
    "ordering": "Int64",
    "title": "string",
    "region": "string",
    "language": "string",
    "types": "string",
    "attributes": "string",
    "isOriginalTitle": "Int64",
}

# Chargement
name_basics = pd.read_csv(DATA_DIR / "name.basics.tsv.gz", sep="\t", na_values=NA, dtype=name_dtypes, compression="gzip", low_memory=False)
# --- Chargement ROBUSTE de title.basics (√©vite l'erreur Reality-TV) ---
title_basics = pd.read_csv(
    DATA_DIR / "title.basics.tsv.gz",
    sep="\t",
    na_values=NA,
    dtype="string",          # <--- TOUT en string au chargement
    compression="gzip",
    low_memory=False,
    on_bad_lines="skip"      # <--- ignore les rares lignes cass√©es
)

# Conversion propre des colonnes num√©riques apr√®s chargement
for col in ["isAdult", "startYear", "endYear", "runtimeMinutes"]:
    title_basics[col] = pd.to_numeric(title_basics[col], errors="coerce").astype("Int64")

title_ratings = pd.read_csv(DATA_DIR / "title.ratings.tsv.gz", sep="\t", na_values=NA, dtype=ratings_dtypes, compression="gzip", low_memory=False)
title_crew = pd.read_csv(DATA_DIR / "title.crew.tsv.gz", sep="\t", na_values=NA, dtype=crew_dtypes, compression="gzip", low_memory=False)
title_akas = None  # on le chargera en streaming (chunks) uniquement quand on en aura besoin


print("‚úÖ Chargement termin√©.")
display(name_basics.head(3))
display(title_basics.head(3))


‚úÖ Chargement termin√©.


Unnamed: 0,nconst,primaryName,birthYear,deathYear,primaryProfession,knownForTitles
0,nm0000001,Fred Astaire,1899,1987.0,"actor,miscellaneous,producer","tt0072308,tt0050419,tt0027125,tt0025164"
1,nm0000002,Lauren Bacall,1924,2014.0,"actress,miscellaneous,soundtrack","tt0037382,tt0075213,tt0038355,tt0117057"
2,nm0000003,Brigitte Bardot,1934,,"actress,music_department,producer","tt0057345,tt0049189,tt0056404,tt0054452"


Unnamed: 0,tconst,titleType,primaryTitle,originalTitle,isAdult,startYear,endYear,runtimeMinutes,genres
0,tt0000001,short,Carmencita,Carmencita,0,1894,,1,"Documentary,Short"
1,tt0000002,short,Le clown et ses chiens,Le clown et ses chiens,0,1892,,5,"Animation,Short"
2,tt0000003,short,Poor Pierrot,Pauvre Pierrot,0,1892,,5,"Animation,Comedy,Romance"


## 3) Questions ‚Äî Analyse IMDB

Dans cette section on calcule toutes les r√©ponses demand√©es.

üìå Remarque importante sur les dates :
- Le fichier `name.basics` fournit **birthYear** (ann√©e) et pas le **jour/mois**.
- Donc toute question de type ‚Äúdate de naissance‚Äù est trait√©e √† l‚Äô√©chelle **ann√©e** uniquement (car on doit rester ‚ÄúUsing only the data in the dataset‚Äù).


In [34]:
# =========================
# Questions ‚Äî IMDB
# =========================

TODAY = date.today()  # date locale de votre machine
CURRENT_YEAR = TODAY.year

def split_genres(series: pd.Series) -> set[str]:
    out: set[str] = set()
    for v in series.dropna():
        for g in str(v).split(","):
            g = g.strip()
            if g:
                out.add(g)
    return out

# Q1) How many total people in data set?
total_people = int(name_basics["nconst"].nunique())

# Q2) What is the earliest year of birth?
earliest_birth_year = int(name_basics["birthYear"].min(skipna=True))

# Q3) How many years ago was this person born?
years_ago_earliest = int(CURRENT_YEAR - earliest_birth_year)

# Q4) Using only the data in the data set, determine if this date of birth correct.
# ‚úÖ On ne peut PAS confirmer l'exactitude "r√©elle" sans source externe.
# ‚úÖ On peut seulement v√©rifier la coh√©rence interne du dataset (voir markdown juste apr√®s).

earliest_people = name_basics.loc[
    name_basics["birthYear"] == earliest_birth_year,
    ["nconst", "primaryName", "birthYear", "deathYear", "knownForTitles"],
].copy()

# R√®gle 1 : birthYear <= deathYear si deathYear existe
earliest_people["birth_le_death"] = (
    earliest_people["deathYear"].isna()
    | (earliest_people["birthYear"] <= earliest_people["deathYear"])
)

# R√®gle 2 : coh√©rence avec les titres "knownForTitles" (si pr√©sents)
def min_knownfor_year(known):
    if pd.isna(known):
        return None

    years = []

    for tconst in known.split(","):
        rows = title_basics.loc[title_basics["tconst"] == tconst, "startYear"]
        if not rows.empty and pd.notna(rows.iloc[0]):
            years.append(int(rows.iloc[0]))

    return min(years) if years else None


earliest_people["min_knownfor_startYear"] = earliest_people["knownForTitles"].apply(min_knownfor_year)
earliest_people["age_at_first_knownfor"] = earliest_people.apply(
    lambda r: (int(r["min_knownfor_startYear"]) - int(r["birthYear"]))
    if pd.notna(r["min_knownfor_startYear"])
    else np.nan,
    axis=1,
)

# Seuil permissif : √¢ge >= 5 si on a une date de knownFor
earliest_people["internally_consistent"] = earliest_people["birth_le_death"] & (
    earliest_people["min_knownfor_startYear"].isna()
    | (earliest_people["age_at_first_knownfor"] >= 5)
)

# Q5) What is the most recent date/year of birth?
most_recent_birth_year = int(name_basics["birthYear"].max(skipna=True))

# Q6) What percentage of the people do not have a listed date/year of birth?
missing_birth_year_pct = float(name_basics["birthYear"].isna().mean() * 100)

# Q7) What is the length of the longest "short" after 1900?
short_after_1900 = title_basics[(title_basics["titleType"] == "short") & (title_basics["startYear"] > 1900)]
longest_short_runtime = int(short_after_1900["runtimeMinutes"].max(skipna=True))

# Q8) What is the length of the shortest "movie" after 1900?
movie_after_1900 = title_basics[(title_basics["titleType"] == "movie") & (title_basics["startYear"] > 1900)]
shortest_movie_runtime = int(movie_after_1900["runtimeMinutes"].min(skipna=True))

# Q9) List of all of the genres represented.
all_genres = sorted(split_genres(title_basics["genres"]))

# Q10) Highest rated comedy "movie" (tie -> most votes)
comedy_movies = (
    movie_after_1900
    .loc[movie_after_1900["genres"].fillna("").str.contains(r"(^|,)Comedy(,|$)")]
    .merge(title_ratings, on="tconst", how="inner")
)

best_comedy = comedy_movies.sort_values(["averageRating", "numVotes"], ascending=[False, False]).head(1)
best_row = best_comedy.iloc[0]

best_tconst = str(best_row["tconst"])
best_title = str(best_row["primaryTitle"])
best_rating = float(best_row["averageRating"])
best_votes = int(best_row["numVotes"])

# Q11) Who was the director of the movie?
crew_row = title_crew.loc[title_crew["tconst"] == best_tconst].head(1)
director_nconsts: list[str] = []
if len(crew_row):
    directors_field = crew_row.iloc[0]["directors"]
    if pd.notna(directors_field):
        director_nconsts = [x.strip() for x in str(directors_field).split(",") if x.strip()]

directors = (
    name_basics.loc[name_basics["nconst"].isin(director_nconsts), ["nconst", "primaryName"]]
    .drop_duplicates()
    .sort_values("primaryName")
)

# Q12) Alternate titles for the movie (if any)
# Q12) Alternate titles for the movie (if any) ‚Äî chargement en chunks (√©vite MemoryError)
import pandas as pd

def load_akas_for_title(akas_path, target_tconst, na_values=r"\N", chunksize=1_000_000):
    usecols = ["titleId", "ordering", "title", "region", "language", "types", "attributes", "isOriginalTitle"]
    dtypes = {
        "titleId": "string",
        "ordering": "Int64",
        "title": "string",
        "region": "string",
        "language": "string",
        "types": "string",
        "attributes": "string",
        "isOriginalTitle": "Int64",
    }

    out = []
    for chunk in pd.read_csv(
        akas_path,
        sep="\t",
        na_values=[na_values],
        usecols=usecols,
        dtype=dtypes,
        compression="gzip",
        chunksize=chunksize,
        low_memory=False,
    ):
        sub = chunk[chunk["titleId"] == target_tconst]
        if not sub.empty:
            out.append(sub)

    if out:
        return pd.concat(out, ignore_index=True)
    return pd.DataFrame(columns=usecols)

akas_path = DATA_DIR / "title.akas.tsv.gz"
akas_for_best = load_akas_for_title(akas_path, best_tconst).sort_values("ordering")


# =========================
# Affichage des r√©ponses (markdown)
# =========================
answers = f"""### ‚úÖ R√©ponses (calcul√©es)

1. **Total de personnes** : **{total_people:,}**
2. **Ann√©e de naissance la plus ancienne** : **{earliest_birth_year}**
3. **Cette personne est n√©e il y a** : **{years_ago_earliest} ans** (r√©f√©rence = ann√©e {CURRENT_YEAR})
4. **Date/ann√©e de naissance correcte ?** ‚Üí *On ne peut pas prouver l'exactitude historique avec le dataset seul ; on v√©rifie seulement la coh√©rence interne (cf. section suivante).*
5. **Ann√©e de naissance la plus r√©cente** : **{most_recent_birth_year}**
6. **% de personnes sans ann√©e de naissance** : **{missing_birth_year_pct:.2f}%**
7. **Dur√©e du plus long `short` apr√®s 1900** : **{longest_short_runtime} minutes**
8. **Dur√©e du plus court `movie` apr√®s 1900** : **{shortest_movie_runtime} minutes**
9. **Genres repr√©sent√©s (n={len(all_genres)})** : {", ".join(all_genres)}
10. **Meilleure com√©die (movie)** : **{best_title}** (`{best_tconst}`) ‚Äî rating **{best_rating}** / votes **{best_votes:,}**
11. **R√©alisateur(s)** : {", ".join(directors["primaryName"].astype(str).tolist()) if len(directors) else "Non renseign√© dans title.crew"}
12. **Titres alternatifs (akas)** : {len(akas_for_best)} entr√©es (voir tableau ci‚Äëdessous)

"""

display(Markdown(answers))

print("\n--- Aper√ßu : personnes avec l'ann√©e de naissance la plus ancienne ---")
display(earliest_people.sort_values("primaryName").head(20))

print("\n--- Titres alternatifs (akas) du meilleur film ---")
display(akas_for_best.head(50))


  .loc[movie_after_1900["genres"].fillna("").str.contains(r"(^|,)Comedy(,|$)")]


### ‚úÖ R√©ponses (calcul√©es)

1. **Total de personnes** : **14,953,819**
2. **Ann√©e de naissance la plus ancienne** : **4**
3. **Cette personne est n√©e il y a** : **2021 ans** (r√©f√©rence = ann√©e 2025)
4. **Date/ann√©e de naissance correcte ?** ‚Üí *On ne peut pas prouver l'exactitude historique avec le dataset seul ; on v√©rifie seulement la coh√©rence interne (cf. section suivante).*
5. **Ann√©e de naissance la plus r√©cente** : **2025**
6. **% de personnes sans ann√©e de naissance** : **95.58%**
7. **Dur√©e du plus long `short` apr√®s 1900** : **1311 minutes**
8. **Dur√©e du plus court `movie` apr√®s 1900** : **1 minutes**
9. **Genres repr√©sent√©s (n=28)** : Action, Adult, Adventure, Animation, Biography, Comedy, Crime, Documentary, Drama, Family, Fantasy, Film-Noir, Game-Show, History, Horror, Music, Musical, Mystery, News, Reality-TV, Romance, Sci-Fi, Short, Sport, Talk-Show, Thriller, War, Western
10. **Meilleure com√©die (movie)** : **Space Melody** (`tt32752452`) ‚Äî rating **10.0** / votes **6**
11. **R√©alisateur(s)** : Leonardo Thimo
12. **Titres alternatifs (akas)** : 4 entr√©es (voir tableau ci‚Äëdessous)




--- Aper√ßu : personnes avec l'ann√©e de naissance la plus ancienne ---


Unnamed: 0,nconst,primaryName,birthYear,deathYear,knownForTitles,birth_le_death,min_knownfor_startYear,age_at_first_knownfor,internally_consistent
737944,nm0784172,Lucio Anneo Seneca,4,65,"tt0043802,tt0218822,tt0049203,tt0972562",True,1951,1947,True



--- Titres alternatifs (akas) du meilleur film ---


Unnamed: 0,titleId,ordering,title,region,language,types,attributes,isOriginalTitle
0,tt32752452,1,Space Melody,,,original,,1
1,tt32752452,2,H Melwdia Tou Diastimatos,GR,,,complete title,0
2,tt32752452,3,Leonardo Thimo's Space Melody,CA,en,imdbDisplay,,0
3,tt32752452,4,Space Melody,GR,,,,0


In [35]:
import pandas as pd

def load_akas_for_title(akas_path, target_tconst, na_values=r"\N", chunksize=1_000_000):
    """
    Charge title.akas.tsv.gz en streaming (chunks) et ne garde que les lignes du film target_tconst.
    √âvite le MemoryError en ne chargeant jamais tout le fichier en RAM.
    """
    usecols = ["titleId", "ordering", "title", "region", "language", "types", "attributes", "isOriginalTitle"]
    dtypes = {
        "titleId": "string",
        "ordering": "Int64",
        "title": "string",
        "region": "string",
        "language": "string",
        "types": "string",
        "attributes": "string",
        "isOriginalTitle": "Int64",
    }

    out = []
    for chunk in pd.read_csv(
        akas_path,
        sep="\t",
        na_values=[na_values],
        usecols=usecols,
        dtype=dtypes,
        compression="gzip",
        chunksize=chunksize,
        low_memory=False,
    ):
        sub = chunk[chunk["titleId"] == target_tconst]
        if not sub.empty:
            out.append(sub)

    if out:
        return pd.concat(out, ignore_index=True)
    else:
        return pd.DataFrame(columns=usecols)

# Exemple d'utilisation :
best_tconst = str(best_row["tconst"])  # best_row existe d√©j√† dans ton notebook
akas_path = DATA_DIR / "title.akas.tsv.gz"
best_akas = load_akas_for_title(akas_path, best_tconst)


display(best_akas.head(20))
print("Nb titres alternatifs trouv√©s:", len(best_akas))


Unnamed: 0,titleId,ordering,title,region,language,types,attributes,isOriginalTitle
0,tt32752452,1,Space Melody,,,original,,1
1,tt32752452,2,H Melwdia Tou Diastimatos,GR,,,complete title,0
2,tt32752452,3,Leonardo Thimo's Space Melody,CA,en,imdbDisplay,,0
3,tt32752452,4,Space Melody,GR,,,,0


Nb titres alternatifs trouv√©s: 4


### Explication (raisonnement) ‚Äî ‚ÄúUsing only the data in the data set, determine if this date of birth correct.‚Äù

- Le dataset IMDB ne donne que **birthYear** (ann√©e) : il est impossible de confirmer la ‚Äúvraie‚Äù date sans consulter une source externe (Wikip√©dia, IMDB web, etc.), ce qui est interdit par la consigne.
- Donc on applique une **v√©rification de coh√©rence interne** :
  1. Si **deathYear** est renseign√©, alors on exige **birthYear ‚â§ deathYear**.
  2. Si **knownForTitles** est renseign√©, on r√©cup√®re l‚Äôann√©e minimale **startYear** de ces titres, et on v√©rifie que l‚Äô√¢ge au premier titre est **plausible** (seuil volontairement permissif, ex. ‚â• 5 ans).
- Si ces r√®gles passent, on conclut : **la donn√©e est coh√©rente √† l‚Äôint√©rieur du dataset**, sans pr√©tendre qu‚Äôelle est historiquement exacte.


## 4) Stream Processing ‚Äî Wikimedia Events Platform

Objectif : suivre en temps r√©el des √©v√©nements (modifications wiki) pour **5 entit√©s** ayant une page tra√ßable.

Impl√©mentation propos√©e (simple, robuste, et not√©e ‚ÄúA √† Z‚Äù) :
- Source : endpoint SSE `https://stream.wikimedia.org/v2/stream/recentchange`
- Filtre : `page_title` ‚àà ensemble de pages (entit√©s)
- M√©triques (stock√©es dans SQLite ou CSV) :
  - `edits_total` par page et par fen√™tre de temps (ex. 1 min)
  - `unique_users` par page et par fen√™tre
  - `bot_edits` par page et par fen√™tre
- Alerting (exig√©) :
  - exemple : **alerte** si un utilisateur pr√©cis √©dite une page, OU si la taille du changement d√©passe un seuil
  - les alertes sont rout√©es dans une table SQLite s√©par√©e (ou un CSV s√©par√©)

üìå Note :
- Ce bloc n√©cessite une connexion internet pour fonctionner au moment o√π vous ex√©cutez le notebook.
- Pour la correction, le prof peut ex√©cuter quelques minutes, v√©rifier que les fichiers/tables sont bien produits.


In [None]:
# --- Param√®tres : choisissez 5 entit√©s avec pages Wikipedia tra√ßables ---
#ENTITIES = ["United States", "France", "Donald Trump", "Wikipedia", "Elon Musk"]

ENTITIES = [
    best_movie_title,          # Highest rated comedy movie (from dataset)
    best_movie_director,       # Director of the movie
    "Comedy film",              # Genre (trackable concept)
    "Film director",            # Abstract IMDB-related role
    "Academy Award for Best Picture"  # Film-related entity
]
def norm_title(s: str) -> str:
    return (s or "").strip().replace("_", " ").casefold()

ENTITIES_NORM = {norm_title(x) for x in ENTITIES}

# --- Imports ---
from pathlib import Path
import sqlite3, time, json
from datetime import datetime, timezone
import pandas as pd
import requests
from requests.exceptions import ChunkedEncodingError, ReadTimeout, ConnectionError as ReqConnectionError

# --- Stockage SQLite ---
DB_PATH = Path("wikimedia_metrics.sqlite")
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()

cur.execute("""
CREATE TABLE IF NOT EXISTS metrics_minute (
    window_start TEXT NOT NULL,
    page_title   TEXT NOT NULL,
    edits_total  INTEGER NOT NULL,
    unique_users INTEGER NOT NULL,
    bot_edits    INTEGER NOT NULL,
    PRIMARY KEY (window_start, page_title)
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS alerts (
    ts          TEXT NOT NULL,
    page_title  TEXT NOT NULL,
    user        TEXT,
    alert_type  TEXT NOT NULL,
    details     TEXT
)
""")
conn.commit()

# --- Stream reader SSE (robuste avec reconnexion) ---
def iter_recentchange_events(reconnect_sleep: float = 2.0):
    url = "https://stream.wikimedia.org/v2/stream/recentchange"
    headers = {
        "Accept": "text/event-stream",
        "User-Agent": "IMDB-Wikimedia-Stream-Project/1.0 (contact: ton.email@exemple.com)",
    }

    while True:
        try:
            with requests.get(url, headers=headers, stream=True, timeout=60) as r:
                if r.status_code == 429:
                    # rate limit ‚Üí on attend un peu et on r√©essaie
                    time.sleep(5)
                    continue
                r.raise_for_status()

                buf = ""
                for line in r.iter_lines(decode_unicode=True):
                    if line is None:
                        continue
                    if line == "":
                        if "data:" in buf:
                            data_lines = [l[5:].strip() for l in buf.splitlines() if l.startswith("data:")]
                            payload = "\n".join(data_lines)
                            yield payload
                        buf = ""
                    else:
                        buf += line + "\n"

        except (ChunkedEncodingError, ReadTimeout, ReqConnectionError) as e:
            # Connexion SSE coup√©e ‚Üí reconnexion automatique
            time.sleep(reconnect_sleep)
            continue

def run_stream_job(
    duration_seconds: int = 600,
    window_seconds: int = 60,
    alert_user: str | None = None,
    alert_abs_size_change: int = 50_000,
    alert_burst_edits: int = 3,
    debug: bool = False,
):
    start = time.time()
    agg = {}  # {window_start: {page_title: {edits, users_set, bot_edits}}}

    def window_start(ts_unix: float) -> str:
        w = int(ts_unix // window_seconds) * window_seconds
        return datetime.fromtimestamp(w, tz=timezone.utc).isoformat()

    for raw in iter_recentchange_events():
        if time.time() - start > duration_seconds:
            break

        try:
            evt = json.loads(raw)
        except json.JSONDecodeError:
            continue

        # Filtre enwiki
        domain = (evt.get("meta") or {}).get("domain")
        if domain != "en.wikipedia.org":
            continue

        page = evt.get("title")
        if not page:
            continue

        if norm_title(page) not in ENTITIES_NORM:
            if debug:
                print("ENWIKI:", page)
            continue

        ts = float(evt.get("timestamp", time.time()))
        wstart = window_start(ts)

        user = evt.get("user")
        bot = bool(evt.get("bot", False))
        length = evt.get("length") or {}
        size_change = None
        if isinstance(length, dict):
            old = length.get("old")
            new = length.get("new")
            if isinstance(old, int) and isinstance(new, int):
                size_change = new - old

        agg.setdefault(wstart, {}).setdefault(page, {"edits": 0, "users": set(), "bot_edits": 0})
        agg[wstart][page]["edits"] += 1
        if user:
            agg[wstart][page]["users"].add(user)
        if bot:
            agg[wstart][page]["bot_edits"] += 1

        # ALERT A: user sp√©cifique
        if alert_user and user == alert_user:
            cur.execute(
                "INSERT INTO alerts(ts,page_title,user,alert_type,details) VALUES (?,?,?,?,?)",
                (datetime.fromtimestamp(ts, tz=timezone.utc).isoformat(), page, user, "USER_EDIT", "User edited tracked page")
            )

        # ALERT B: gros changement de taille
        if size_change is not None and abs(size_change) >= alert_abs_size_change:
            cur.execute(
                "INSERT INTO alerts(ts,page_title,user,alert_type,details) VALUES (?,?,?,?,?)",
                (datetime.fromtimestamp(ts, tz=timezone.utc).isoformat(), page, user, "LARGE_SIZE_CHANGE", f"size_change={size_change}")
            )

        conn.commit()

    # Flush + ALERT C (burst)
    rows = []
    for wstart, pages in agg.items():
        for page, m in pages.items():
            edits = int(m["edits"])
            uniq = int(len(m["users"]))
            bots = int(m["bot_edits"])
            rows.append((wstart, page, edits, uniq, bots))

            if edits >= alert_burst_edits:
                cur.execute(
                    "INSERT INTO alerts(ts,page_title,user,alert_type,details) VALUES (?,?,?,?,?)",
                    (wstart, page, None, "BURST_EDITS", f"edits_in_window={edits} window_seconds={window_seconds}")
                )

    cur.executemany(
        "INSERT OR REPLACE INTO metrics_minute(window_start,page_title,edits_total,unique_users,bot_edits) VALUES (?,?,?,?,?)",
        rows
    )
    conn.commit()

    return len(rows)

# Lancement (10 minutes)
written = run_stream_job(duration_seconds=600, window_seconds=60, debug=False)
print(f"‚úÖ Job termin√©. Lignes metrics √©crites/maj: {written}")
print(f"Base SQLite: {DB_PATH.resolve()}")

df_metrics = pd.read_sql_query("SELECT * FROM metrics_minute ORDER BY window_start DESC, edits_total DESC LIMIT 50", conn)
df_alerts = pd.read_sql_query("SELECT * FROM alerts ORDER BY ts DESC LIMIT 50", conn)
display(df_metrics)
display(df_alerts)


## Stream Processing ‚Äì Entity Selection

The five tracked entities were selected directly from the IMDB dataset or are
directly related to the movie domain:

- The highest rated comedy movie identified in the dataset
- The director of that movie
- Film-related concepts with stable and trackable Wikipedia pages

This ensures compliance with the project requirement that tracked entities
originate from or are clearly related to the IMDB dataset.


## 5) Notes pour le rendu (Repo Git + email)



- Participants : Mohammed MANOUNI, Elias ABDELMALEK, Wacim OUZAID
- Comment ex√©cuter :
  1. Avoir un noyau base anaconda ou un noyau classique jupyter + creer une cellule de code ephemere avec `pip install -r requirements.txt` puis supprimer cette cellule une fois le requirements install√©
  2. verifier que les dependances ont bien √©t√© install√©es
  3. cliquer sur `Run All`
- Fichiers attendus dans le folder du projet :
  - `data/XXXXXXX.tsv.gz` (t√©l√©charg√©s automatiquement ; possibilit√© d‚Äôajout manuel si probl√®me pourquoi pas)
  - `wikimedia_metrics.sqlite` (g√©n√©r√© par la partie stream)
