In [1]:
import pandas as pd
import numpy as np

In [5]:
movies = pd.read_csv("/home/antoine/workspace/mlops_reco_movies/airflow/data/raw/bronze/movies.csv")

movies.head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [7]:
# movies.py
from typing import Optional
import pandas as pd
import logging
import os
import numpy as np
from sklearn.preprocessing import MultiLabelBinarizer

logger = logging.getLogger(__name__)


def preprocess_movies(df: pd.DataFrame) -> pd.DataFrame:
    """
    Prétraite le dataset movies avec gestion MLOps :
    - Extraction robuste de l'année
    - Gestion des genres (one-hot encoding + liste)
    - Validation des données en sortie

    Args:
        genres_threshold: Seuil minimal d'occurrence pour garder un genre (évite le overfitting)
    """
    # Typage et validation initiale
    assert {"movieId", "title", "genres"}.issubset(df.columns), "Colonnes manquantes"
    df = df.astype({"movieId": "int32"}).rename(columns={"movieId": "movie_id"})

    try:
        # Extraction de l'année avec regex robuste
        df["year"] = (
            df["title"]
            .str.extract(r"(?:\(|\[)?(\d{4})(?:\)|\]| TV)?")[0]
            .astype(float)
            .astype("Int64")
        )
        # Gestion des valeurs manquantes, outliers, par médiane
        ANNEE_MIN = 1888
        ANNEE_MAX = 2025
        # Remplacer les années aberrantes par NaN
        df.loc[(df["year"] < ANNEE_MIN) | (df["year"] > ANNEE_MAX), "year"] = np.nan

        # Imputation par la médiane des valeurs valides
        median_year = df.loc[df["year"].between(ANNEE_MIN, ANNEE_MAX), "year"].median()
        df["year"] = df["year"].fillna(median_year)

        # Nettoyage du titre
        df["clean_title"] = df["title"].str.replace(r"\s*[\[(]\d{4}[\])]\s*", "", regex=True)

        # Gestion des genres avec seuillage
        df["genres"] = df["genres"].str.replace("(no genres listed)|^$", "Unknown", regex=True)
        genre_lists = df["genres"].str.split("|")
        df["genres_list"] = genre_lists
        df["genres"] = genre_lists.str.join(", ")

        # One-Hot Encoding
        mlb = MultiLabelBinarizer()
        genres_encoded = pd.DataFrame(
            mlb.fit_transform(genre_lists), columns=mlb.classes_, index=df.index
        )
        df = pd.concat([df, genres_encoded], axis=1)

    except Exception as e:
        logger.error(f"Erreur de prétraitement : {str(e)}")
        raise

    # Validation finale
    assert df["movie_id"].is_unique, "IDs de films dupliqués"
    assert df["genres"].notna().all(), "Genres manquants"
    logger.info(f"Prétraitement movies terminé. Films traités : {len(df)}")

    return df


In [8]:
movies_processed = preprocess_movies(movies)

movies_processed.head()

Unnamed: 0,movie_id,title,genres,year,clean_title,genres_list,(Unknown),Action,Adventure,Animation,...,Film-Noir,Horror,IMAX,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western
0,1,Toy Story (1995),"Adventure, Animation, Children, Comedy, Fantasy",1995,Toy Story,"[Adventure, Animation, Children, Comedy, Fantasy]",0,0,1,1,...,0,0,0,0,0,0,0,0,0,0
1,2,Jumanji (1995),"Adventure, Children, Fantasy",1995,Jumanji,"[Adventure, Children, Fantasy]",0,0,1,0,...,0,0,0,0,0,0,0,0,0,0
2,3,Grumpier Old Men (1995),"Comedy, Romance",1995,Grumpier Old Men,"[Comedy, Romance]",0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
3,4,Waiting to Exhale (1995),"Comedy, Drama, Romance",1995,Waiting to Exhale,"[Comedy, Drama, Romance]",0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
4,5,Father of the Bride Part II (1995),Comedy,1995,Father of the Bride Part II,[Comedy],0,0,0,0,...,0,0,0,0,0,0,0,0,0,0


In [6]:
genome_scores = pd.read_csv("/home/antoine/workspace/mlops_reco_movies/airflow/data/raw/bronze/genome-scores.csv")
genome_scores.head()

Unnamed: 0,movieId,tagId,relevance
0,1,1,0.025
1,1,2,0.025
2,1,3,0.05775
3,1,4,0.09675
4,1,5,0.14675


In [9]:
genome_tags = pd.read_csv("/home/antoine/workspace/mlops_reco_movies/airflow/data/raw/bronze/genome-tags.csv")
genome_tags.head()

Unnamed: 0,tagId,tag
0,1,007
1,2,007 (series)
2,3,18th century
3,4,1920s
4,5,1930s


In [14]:
from typing import Optional
import pandas as pd
import logging
import os
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import OrdinalEncoder
import spacy
import re


logger = logging.getLogger(__name__)

nlp = spacy.load("en_core_web_sm")


def clean_tag(tag: str) -> str:
    doc = nlp(tag)
    return " ".join(
        token.text.lower() for token in doc if not token.is_punct and not token.is_stop
    )


def preprocess_genome_tags(df: pd.DataFrame, min_term_freq: Optional[int] = 2) -> pd.DataFrame:
    """
    Prétraite le dataset genome-tags avec gestion MLOps :
    - Nettoyage sémantique des tags
    - Dédoublonnage par similarité textuelle
    - Optimisation pour l'embedding

    Args:
        min_term_freq: Seuil d'occurrence minimal pour conserver un terme (TF-IDF)
    """
    # Validation initiale
    assert {"tagId", "tag"}.issubset(df.columns), "Colonnes manquantes"
    df = df.astype({"tagId": "int32"})
    df = df.rename(columns={"tagId": "tag_id"})

    try:
        # Nettoyage des tags
        df["clean_tag"] = df["tag"].apply(clean_tag)

        # Gestion des doublons sémantiques
        if min_term_freq:
            vectorizer = TfidfVectorizer(min_df=min_term_freq)
            tfidf_matrix = vectorizer.fit_transform(df["clean_tag"])
            df = df.iloc[list(vectorizer.vocabulary_.values())].reset_index(drop=True)

        # Catégorisation automatique
        df["category"] = df["clean_tag"].apply(
            lambda x: (
                "decade"
                if (
                    re.search(r"\d{4}s?$", x)
                    or re.search(r"\d{1,2}(st|nd|rd|th) century", x, re.IGNORECASE)
                )
                else "concept"
            )
        )

        # Encodage ordinal des catégories
        encoder = OrdinalEncoder(categories=[df["category"].unique().tolist()])
        df["category_encoded"] = encoder.fit_transform(df[["category"]])

    except Exception as e:
        logger.error(f"Erreur de prétraitement : {str(e)}")
        raise

    # Validation finale
    assert df["tag_id"].is_unique, "IDs de tags dupliqués"
    assert df["clean_tag"].str.len().gt(0).all(), "Tags vides après nettoyage"
    logger.info(f"Prétraitement genome-tags terminé. Tags traités : {len(df)}")

    return df


from typing import Optional
import pandas as pd
import logging
import os
import numpy as np
from pandas.api.types import is_float_dtype

logger = logging.getLogger(__name__)

def preprocess_genome_scores(
    df: pd.DataFrame,
    tags_df: pd.DataFrame,
    min_relevance: Optional[float] = 0.2,
    max_memory_usage: Optional[int] = 1024
) -> pd.DataFrame:
    """
    Prétraite le dataset genome-scores avec gestion MLOps :
    - Optimisation mémoire pour les grands volumes
    - Filtrage des signaux faibles
    - Validation d'intégrité référentielle

    Args:
        min_relevance: Seuil minimal de pertinence (défaut: environ 0.01)
        max_memory_usage: Mémoire max en MB pour l'optimisation des types
    """
    # Configuration adaptive
    min_relevance = float(os.getenv("MIN_RELEVANCE", min_relevance))

    # Validation initiale
    assert {"movieId", "tagId", "relevance"}.issubset(df.columns), "Colonnes manquantes"

    try:
        # Optimisation mémoire
        df = df.astype({
            "movieId": "int32",
            "tagId": "int32",
            "relevance": "float32"
        }).rename(columns={"movieId": "movie_id", "tagId": "tag_id"})

        # Filtrage des valeurs non-informatives
        if min_relevance > 0:
            df = df[df["relevance"] >= min_relevance]

        # Normalisation adaptative
        if os.getenv("LOG_SCALE_RELEVANCE", "false").lower() == "true":
            df["relevance"] = np.log1p(df["relevance"])

        # Validation référentielle
        valid_tags = tags_df["tag_id"].unique()
        df = df[df["tag_id"].isin(valid_tags)]

        # Gestion des doublons complets
        df = df.drop_duplicates(["movie_id", "tag_id"])

        # Optimisation mémoire dynamique
        if max_memory_usage:
            for col in df.columns:
                if is_float_dtype(df[col]):
                    df[col] = pd.to_numeric(df[col], downcast="float")
                else:
                    df[col] = pd.to_numeric(df[col], downcast="integer")

    except Exception as e:
        logger.error(f"Erreur de prétraitement : {str(e)}")
        raise

    # Validation finale
    assert df.merge(tags_df, on="tag_id").notna().all().all(), "Références tagId invalides"
    logger.info(
        f"Prétraitement genome-scores terminé. "
        f"Lignes traitées : {len(df)} - Mémoire utilisée : {df.memory_usage().sum() / 1024**2:.1f}MB"
    )

    return df

In [15]:
tags_df = preprocess_genome_tags(genome_tags)
genome_scores_processed = preprocess_genome_scores(
    genome_scores, tags_df, min_relevance=0.01, max_memory_usage=1024
)
genome_scores_processed.head()

Unnamed: 0,movie_id,tag_id,relevance
0,1,1,0.025
1,1,2,0.025
2,1,3,0.05775
3,1,4,0.09675
4,1,5,0.14675


In [20]:
item_features = [
    (row['movie_id'],  # Premier élément du tuple : l'identifiant du film
     ['genre:' + g for g in row['genres_list']] +  # Liste des genres
     [f'tag:{tag_row["tag_id"]}@{tag_row["relevance"]:.2f}'  # Tags avec relevance formatée
      for tag_row in genome_scores_processed[genome_scores_processed['movie_id'] == row['movie_id']].to_dict('records')]
    )
    for _, row in movies_processed.iterrows()  # Itération sur chaque film
]


In [21]:
print(item_features[:5])  # Affichage des 5 premiers éléments pour vérification

[(1, ['genre:Adventure', 'genre:Animation', 'genre:Children', 'genre:Comedy', 'genre:Fantasy', 'tag:1@0.03', 'tag:2@0.03', 'tag:3@0.06', 'tag:4@0.10', 'tag:5@0.15', 'tag:6@0.22', 'tag:7@0.07', 'tag:8@0.26', 'tag:9@0.26', 'tag:10@0.03', 'tag:11@0.58', 'tag:12@0.12', 'tag:13@0.19', 'tag:15@0.04', 'tag:16@0.28', 'tag:18@0.11', 'tag:19@0.67', 'tag:20@0.18', 'tag:21@0.33', 'tag:22@0.28', 'tag:23@0.06', 'tag:24@0.02', 'tag:25@0.09', 'tag:26@0.08', 'tag:27@0.19', 'tag:28@0.07', 'tag:29@0.89', 'tag:30@0.68', 'tag:31@0.04', 'tag:32@0.23', 'tag:33@0.40', 'tag:34@0.04', 'tag:35@0.03', 'tag:36@0.33', 'tag:37@0.10', 'tag:38@0.01', 'tag:39@0.02', 'tag:40@0.01', 'tag:41@0.01', 'tag:42@0.02', 'tag:43@0.15', 'tag:44@0.04', 'tag:45@0.27', 'tag:46@0.26', 'tag:48@0.14', 'tag:49@0.32', 'tag:50@0.07', 'tag:51@0.27', 'tag:52@0.16', 'tag:53@0.23', 'tag:54@0.08', 'tag:55@0.18', 'tag:56@0.02', 'tag:57@0.02', 'tag:58@0.02', 'tag:59@0.14', 'tag:60@0.03', 'tag:61@0.54', 'tag:62@0.69', 'tag:63@0.93', 'tag:64@0.99',

In [35]:
ratings = pd.read_csv("/home/antoine/workspace/mlops_reco_movies/airflow/data/raw/bronze/ratings.csv")


In [36]:
# ratings.py
import pandas as pd
import logging
import os

logger = logging.getLogger(__name__)


def preprocess_ratings(df: pd.DataFrame) -> pd.DataFrame:
    """
    Prétraite le dataset ratings avec :
    - Filtrage configurable des utilisateurs
    - Typage optimisé pour le stockage
    - Calcul de métriques pour le monitoring
    """
    # Configuration via environnement
    user_threshold = int(os.getenv("USERID_THRESHOLD", 20000))
    sample_ratio = float(os.getenv("RATINGS_SAMPLE_RATIO", 1.0))

    try:
        # Typage et filtrage
        df = df.astype({"userId": "int32", "movieId": "int32"})
        df = df.rename(columns={"userId": "user_id", "movieId": "movie_id"})

        if sample_ratio < 1.0:
            df = df.sample(frac=sample_ratio, random_state=42)

        if user_threshold > 0:
            df = df[df["user_id"] < user_threshold]

        # Calcul de métriques
        user_stats = df.groupby("user_id")["rating"].agg(["count", "mean", "std"])
        movie_stats = df.groupby("movie_id")["rating"].agg(["count", "mean", "std"])

        # Enregistrement pour monitoring
        if os.getenv("LOG_STATS", "false").lower() == "true":
            logger.info(f"User stats:\n{user_stats.describe()}")
            logger.info(f"Movie stats:\n{movie_stats.describe()}")

    except Exception as e:
        logger.error(f"Erreur de prétraitement ratings : {str(e)}")
        raise

    # Validation
    assert df["user_id"].between(1, 200000).all(), "Valeurs userid hors plage"
    logger.info(f"Ratings traités : {len(df)} lignes")

    return df


In [37]:
ratings_processed = preprocess_ratings(ratings)
ratings_processed.head()

Unnamed: 0,user_id,movie_id,rating,timestamp
0,1,2,3.5,1112486027
1,1,29,3.5,1112484676
2,1,32,3.5,1112484819
3,1,47,3.5,1112484727
4,1,50,3.5,1112484580


In [47]:
ratings_processed['weight'] = np.where(ratings_processed['rating'] >= 4, 1.0, 0.2)  # Boost des ratings élevés

ratings_processed.head()

Unnamed: 0,user_id,movie_id,rating,timestamp,weight
0,1,2,3.5,1112486027,0.2
1,1,29,3.5,1112484676,0.2
2,1,32,3.5,1112484819,0.2
3,1,47,3.5,1112484727,0.2
4,1,50,3.5,1112484580,0.2


In [None]:

ratings_processed['user_id'] = ratings_processed['user_id'].astype('int32')
ratings_processed['movie_id'] = ratings_processed['movie_id'].astype('int32')

(interactions, weights) = dataset.build_interactions(
    ((row['user_id'], row['movie_id'], row['weight'])
     for _, row in ratings_processed.iterrows())
)


In [49]:
model = LightFM(
    no_components=128,
    loss='warp-kos',
    item_alpha=1e-6,
    learning_rate=0.08,
    max_sampled=20
)


In [None]:
# 1. Collecte exhaustive des features
all_genres = set()
for genres in movies_processed['genres_list']:
    all_genres.update([f'genre:{g}' for g in genres])

all_tags = set()
for _, row in genome_scores_processed.iterrows():
    all_tags.add(f'tag:{row["tag_id"]}@{row["relevance"]:.2f}')

# 2. Initialisation du Dataset avec tous les features
dataset = Dataset()
dataset.fit(
    users=ratings_processed['user_id'].unique(),
    items=ratings_processed['movie_id'].unique(),
    item_features=list(all_genres | all_tags)  # Union des genres et tags
)

# 3. Construction OPTIMISÉE des item_features
tags_by_movie = genome_scores_processed.groupby('movie_id').apply(
    lambda x: [f'tag:{row["tag_id"]}@{row["relevance"]:.2f}'
               for _, row in x.iterrows()]
).to_dict()

item_features_data = [
    (movie_id,
     [f'genre:{g}' for g in genres] +
     tags_by_movie.get(movie_id, []))
    for movie_id, genres in zip(
        movies_processed['movie_id'],
        movies_processed['genres_list']
    )
]

item_features = dataset.build_item_features(item_features_data)


ValueError: item id 395 not in item id mappings.

In [None]:
print(type(item_features))  # Doit afficher <class 'scipy.sparse.csr.csr_matrix'>
print(item_features.shape)  # Doit correspondre à (nombre_items, nombre_features)
