In [2]:
import pandas as pd
import os
from supabase import create_client, Client
from tqdm import tqdm
import numpy as np
from dotenv import load_dotenv

In [3]:
# Charger les variables d'environnement depuis le fichier .env
load_dotenv()

def connect_to_supabase() -> Client:
    """
    Établit une connexion à Supabase en utilisant les variables d'environnement.

    Returns:
        Client: Une instance du client Supabase.

    Raises:
        ValueError: Si les variables d'environnement SUPABASE_URL ou SUPABASE_KEY ne sont pas définies.
    """
    supabase_url = os.environ.get("SUPABASE_URL")
    supabase_key = os.environ.get("SUPABASE_KEY")

    if not all([supabase_url, supabase_key]):
        raise ValueError(
            "Les variables d'environnement SUPABASE_URL et SUPABASE_KEY doivent être définies."
        )

    # S'assurer que l'URL commence par http:// ou https://
    supabase_url = (
        f"https://{supabase_url}"
        if not supabase_url.startswith(("http://", "https://"))
        else supabase_url
    )

    return create_client(supabase_url, supabase_key)

In [6]:
supabase_url = os.environ.get("SUPABASE_URL")
supabase_key = os.environ.get("SUPABASE_KEY")
print(supabase_url)

http://localhost:8000


In [7]:
print(supabase_key)

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyAgCiAgICAicm9sZSI6ICJhbm9uIiwKICAgICJpc3MiOiAic3VwYWJhc2UtZGVtbyIsCiAgICAiaWF0IjogMTY0MTc2OTIwMCwKICAgICJleHAiOiAxNzk5NTM1NjAwCn0.dc_X5iR_VP_qT0zsiyj_I_OZ2T9FtRU2BBNWN8Bu4GE


In [8]:
def initialize_supabase_connection() -> Client:
    """
    Initialise la connexion à Supabase et gère les erreurs potentielles.

    Returns:
        Client: Une instance du client Supabase.
    """
    try:
        return connect_to_supabase()
    except ValueError as ve:
        print(f"Erreur lors de la connexion à Supabase : {ve}")
    except Exception as e:
        print(
            f"Une erreur inattendue s'est produite lors de la connexion à Supabase : {e}"
        )
    exit(1)

In [None]:
def load_data(csv_path: str, table_name: str, supabase: Client, expected_types: dict, dtype=None):
    """
    Charge les données d'un fichier CSV dans une table Supabase.

    Args:
        csv_path (str): Chemin vers le fichier CSV à charger.
        table_name (str): Nom de la table dans Supabase.
        supabase (Client): Instance du client Supabase.
        expected_types (dict): Types attendus pour les colonnes de la table.
        dtype (dict, optional): Dictionnaire des types de données pour pandas. Par défaut, None.
    """
    # Si aucun type n'est spécifié, utiliser les types attendus
    if dtype is None:
        dtype = expected_types

    # Modifier les types pour permettre les valeurs NA dans les colonnes d'entiers
    for col, typ in dtype.items():
        if typ == "int64":
            dtype[col] = "Int64"  # Utiliser le type nullable Int64 de pandas

    # Lire le nombre total de lignes dans le fichier CSV
    total_rows = len(pd.read_csv(csv_path))

    # Définir la taille des chunks pour le chargement par morceaux
    chunksize = 5000
    total_chunks = (total_rows // chunksize) + (1 if total_rows % chunksize != 0 else 0)

    # Charger le fichier CSV par morceaux et insérer dans Supabase
    for chunk in tqdm(
        pd.read_csv(csv_path, dtype=dtype, chunksize=chunksize),
        desc=f"Insertion des {total_rows} lignes de {table_name}",
        total=total_chunks,
        dynamic_ncols=True,
    ):
        # Remplacer les valeurs manquantes par None
        chunk = chunk.where(pd.notnull(chunk), None)

        # Convertir le DataFrame en liste de dictionnaires pour l'insertion
        data = chunk.to_dict(orient="records")

        # Préparer les données pour l'insertion en gérant les types
        data = [
            {
                k: (
                    float(v) if isinstance(v, (float, np.float64)) and not pd.isna(v)
                    else (v if not pd.isna(v) else None)
                )
                for k, v in record.items()
            }
            for record in data
        ]

        # Insérer ou mettre à jour les données dans Supabase
        supabase.table(table_name).upsert(data).execute()

# Initialiser la connexion à Supabase
supabase = initialize_supabase_connection()

In [11]:
data_dir = '/home/antoine/jul24_cmlops_reco_film/ml/data/processed'

In [12]:
# Configuration des chemins des fichiers et des types attendus pour chaque table
data_config = {
    "tables": {
        "movies": os.path.join(data_dir, "processed_movies.csv"),
        "links": os.path.join(data_dir, "processed_links.csv"),
        "ratings": os.path.join(data_dir, "processed_ratings.csv"),
        "users": os.path.join(data_dir, "users.csv")
    },
    "expected_types": {
        "movies": {
            "movieId": "int64",
            "title": "object",
            "genres": "object",
            "year": "int64",
            "rating": "float64",
        },
        "ratings": {
            "userId": "int64",
            "movieId": "int64",
            "rating": "float64",
            "timestamp": "int64",
            "bayesian_mean": "float64"
        },
        "links": {
            "movieId": "int64",
            "imdbId": "object",
            "tmdbId": "object"
        },
        "users": {
            "userId": "int64",
            "email": "object",
            "password": "object"
        },
    },
}

In [33]:
movies = pd.read_csv('/home/antoine/jul24_cmlops_reco_film/ml/data/processed/processed_movies.csv')

movies.head()

# # Lire le nombre total de lignes dans le fichier CSV
# total_rows = len(movies)

# # Définir la taille des chunks pour le chargement par morceaux
# chunksize = 5000
# total_chunks = (total_rows // chunksize) + (1 if total_rows % chunksize != 0 else 0)


Unnamed: 0,movieId,title,genres,year
0,1,Toy Story,"['Adventure', 'Animation', 'Children', 'Comedy...",1995
1,2,Jumanji,"['Adventure', 'Children', 'Fantasy']",1995
2,3,Grumpier Old Men,"['Comedy', 'Romance']",1995
3,4,Waiting to Exhale,"['Comedy', 'Drama', 'Romance']",1995
4,5,Father of the Bride Part II,['Comedy'],1995


In [14]:
print(data_config['tables'].items())

dict_items([('movies', '/home/antoine/jul24_cmlops_reco_film/ml/data/processed/processed_movies.csv'), ('links', '/home/antoine/jul24_cmlops_reco_film/ml/data/processed/processed_links.csv'), ('ratings', '/home/antoine/jul24_cmlops_reco_film/ml/data/processed/processed_ratings.csv'), ('users', '/home/antoine/jul24_cmlops_reco_film/ml/data/processed/users.csv')])


In [19]:
movies.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 27278 entries, 0 to 27277
Data columns (total 4 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   movieId  27278 non-null  int64 
 1   title    27278 non-null  object
 2   genres   27278 non-null  object
 3   year     27278 non-null  int64 
dtypes: int64(2), object(2)
memory usage: 852.6+ KB


In [3]:
ratings = pd.read_csv("/home/antoine/jul24_cmlops_reco_film/ml/data/processed/processed_ratings.csv")

ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp,bayesian_mean
0,1,2,3.5,1112486027,3.209414
1,1,29,3.5,1112484676,3.886141
2,1,32,3.5,1112484819,3.885546
3,1,47,3.5,1112484727,4.03785
4,1,50,3.5,1112484580,4.315561


In [32]:
links = pd.read_csv("/home/antoine/jul24_cmlops_reco_film/ml/data/processed/processed_links.csv")

links.head()

Unnamed: 0,movieId,imdbId,tmdbId
0,1,114709,862
1,2,113497,8844
2,3,113228,15602
3,4,114885,31357
4,5,113041,11862


In [22]:
users = pd.read_csv("/home/antoine/jul24_cmlops_reco_film/ml/data/processed/users.csv")

users.head()

Unnamed: 0,userId,email,password
0,1,user1@example.com,.4@@hd2K(}WF
1,2,user2@example.com,Cc=Cq2O;V#!&
2,3,user3@example.com,t}cwlqYDzxO1
3,4,user4@example.com,A5c=tVb)$E2X
4,5,user5@example.com,x|%u[AEVDv>?


In [24]:
def bayesienne_mean(df, M, C):
    """
    Calcule la moyenne bayésienne des notes d'un film.

    Args:
        df (pd.Series): La série de notes du film.
        M (float): La moyenne brute des notes des films.
        C (float): La moyenne de la quantité de notes.

    Returns:
        float: La moyenne bayésienne calculée.
    """
    moy_ba = (C * M + df.sum()) / (C + df.count())
    return moy_ba


def preprocessing_ratings(ratings_file) -> pd.DataFrame:
    """
    Lecture du fichier CSV des évaluations et application de la moyenne bayésienne.

    Args:
        ratings_file (str): Chemin vers le fichier CSV contenant les évaluations.

    Returns:
        pd.DataFrame: DataFrame contenant les évaluations traitées.
    """
    # Lire le fichier CSV
    df = pd.read_csv(ratings_file)
    print("Dataset ratings chargé")

    # Statistiques par film : quantité et moyenne des notes
    movies_stats = df.groupby('movieId').agg({'rating': ['count', 'mean']})
    movies_stats.columns = ['count', 'mean']

    # Calculer les moyennes nécessaires pour la moyenne bayésienne
    C = movies_stats['count'].mean()  # Moyenne de la quantité de notes
    M = movies_stats['mean'].mean()    # Moyenne brute des notes

    # Calculer la moyenne bayésienne par film
    movies_stats['bayesian_mean'] = movies_stats.apply(
        lambda x: bayesienne_mean(df[df['movieId'] == x.name]['rating'], M, C), axis=1)

    # Ajouter la colonne bayesian_mean au DataFrame original
    df = df.merge(movies_stats[['bayesian_mean']], on='movieId', how='left')

    print("Application de la moyenne bayésienne sur la colonne rating effectuée")

    # Définir le chemin vers le dossier 'processed' pour enregistrer le fichier traité
    output_dir = os.path.join("ml", "data", "processed")
    output_file = os.path.join(output_dir, "processed_ratings.csv")

    # Créer le dossier 'processed' s'il n'existe pas
    os.makedirs(output_dir, exist_ok=True)

    # Enregistrer le DataFrame en tant que fichier CSV
    try:
        df.to_csv(output_file, index=False)  # Enregistrer sans l'index
        print(f"Fichier enregistré avec succès sous {output_file}.")
    except Exception as e:
        print(f"Une erreur s'est produite lors de l'enregistrement du fichier : {e}")

    return df

In [25]:
ratings = pd.read_csv('/home/antoine/jul24_cmlops_reco_film/ml/data/raw/ratings.csv')

ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,2,3.5,1112486027
1,1,29,3.5,1112484676
2,1,32,3.5,1112484819
3,1,47,3.5,1112484727
4,1,50,3.5,1112484580


In [26]:
movies_stats = ratings.groupby('movieId').agg({'rating': ['count', 'mean']})
movies_stats.columns = ['count', 'mean']

# Calculer les moyennes nécessaires pour la moyenne bayésienne
C = movies_stats['count'].mean()  # Moyenne de la quantité de notes
M = movies_stats['mean'].mean()    # Moyenne brute des notes

# Calculer la moyenne bayésienne par film
movies_stats['bayesian_mean'] = movies_stats.apply(
    lambda x: bayesienne_mean(ratings[ratings['movieId'] == x.name]['rating'], M, C), axis=1)



In [28]:
# Ajouter la colonne bayesian_mean au DataFrame original
ratings = ratings.merge(movies_stats[['bayesian_mean']], on='movieId', how='left')

In [29]:
ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp,bayesian_mean
0,1,2,3.5,1112486027,3.209414
1,1,29,3.5,1112484676,3.886141
2,1,32,3.5,1112484819,3.885546
3,1,47,3.5,1112484727,4.03785
4,1,50,3.5,1112484580,4.315561
