In [1]:
from pymongo import MongoClient
import pandas as pd
# Read credentials from file
with open("../Database/credentials.txt", "r") as file:
    lines = file.readlines()
    username = lines[0].strip()
    password = lines[1].strip()
    host = lines[2].strip()
    database_name = lines[3].strip()

# Construct the MongoDB URI
uri = f"mongodb+srv://{username}:{password}@{host}"
client = MongoClient(uri)

#Connect to database
db = client[database_name]

#Load movies dataframe
movies = db["movies"]
movies = list(movies.find())
movies = pd.DataFrame(movies)
movies = movies.rename(columns = {'_id':'movieId'})

#Load ratings dataframe
ratings = db["ratings"]
ratings = list(ratings.find())
ratings = pd.DataFrame(ratings)
ratings = ratings.rename(columns = {'_id':'ratingId'})

In [2]:
movies.head()

Unnamed: 0,movieId,title,genres,year
0,51372,"""Great Performances"" Cats",['Musical'],1998
1,281280,"""Sr.""",['Documentary'],2022
2,136604,#1 Cheerleader Camp,"['Comedy', 'Drama']",2010
3,221850,#Alive,"['Action', 'Horror', 'Thriller']",2020
4,221310,#AnneFrank. Parallel Stories,['Documentary'],2019


In [3]:
ratings.head()

Unnamed: 0,ratingId,userId,movieId,rating,tstamp
0,17227,60834,1608,2.0,1997-09-17 17:53:58
1,17202,60834,1479,2.0,1997-09-17 17:53:58
2,17221,60834,1588,1.0,1997-09-17 17:55:19
3,17192,60834,1407,2.0,1997-09-17 17:56:19
4,17196,60834,1422,2.0,1997-09-17 17:57:33


In [4]:
ratings['userId'].min()

42170

In [5]:
# Calculate the number of ratings per user
ratings_per_user = ratings['userId'].value_counts()

# Find the minimum and maximum number of ratings per user
min_ratings = ratings_per_user.min()
max_ratings = ratings_per_user.max()
print(min_ratings, ',', max_ratings)


users_with_less_ratings = ratings_per_user[ratings_per_user < 20].shape[0]
print(f"Number of users with <20 ratings: {users_with_less_ratings}")

5 , 9930
Number of users with <20 ratings: 601


## Train model

### Preparing data division

We want a division that ensures that all users appear in training, test and validation. We can not use random split.

The split approach will be an 80/20 with minimum thresholds on 10 for training and 10 for test (or at least 5 only for training). Users with less than 20 will be inserted directly to training


In [6]:
import pandas as pd

# Function to split user data into training, validation, and test sets
def split_user_data(user_data):
    """
    Splits user data into training, validation, and test sets.
    Ensures users have at least `min_ratings` total ratings.
    """
    n = len(user_data)
    if 5 <= n < 20:
        train = user_data
        test = None
    elif 20 <= n < 50:
        # train = user_data
        train = user_data.iloc[-10:]
        test = user_data.iloc[:-10]
    else:
        # Proportional split: 80/20
        train_end = int(0.8 * n)
        train = user_data.iloc[:train_end]
        test = user_data.iloc[train_end:]
    return train, test

# Apply the splitting across all users
train_list, test_list = [], []

for user_id, user_data in ratings.groupby("userId"):
    user_data = user_data.sample(frac=1, random_state=42).reset_index(drop=True)
    train, test = split_user_data(user_data)
    if train is not None:
        train_list.append(train)
        test_list.append(test)

# Combine splits into dataframes
train_data = pd.concat(train_list)
test_data = pd.concat(test_list)

train_data.shape, test_data.shape


((1205706, 5), (311343, 5))

### Collaborative filtering

We prepare the data in matrix format required.

In [7]:
from scipy.sparse import coo_matrix, csr_matrix

# Unified mappings based on the entire ratings dataset
user_mapping = {u: i for i, u in enumerate(ratings['userId'].unique())}
movie_mapping = {m: i for i, m in enumerate(ratings['movieId'].unique())}

# Map user and movie IDs in training and test sets using the mappings
train_user_ids = train_data['userId'].map(user_mapping)
train_movie_ids = train_data['movieId'].map(movie_mapping)

test_user_ids = test_data['userId'].map(user_mapping)
test_movie_ids = test_data['movieId'].map(movie_mapping)

# Create sparse matrices for training and test sets
train_matrix = coo_matrix(
    (train_data['rating'], (train_user_ids, train_movie_ids)),
    shape=(len(user_mapping), len(movie_mapping))
).tocsr()

test_matrix = coo_matrix(
    (test_data['rating'], (test_user_ids, test_movie_ids)),
    shape=(len(user_mapping), len(movie_mapping))
).tocsr()

# Verify the results
print(f"Number of users in training matrix: {train_matrix.shape[0]}")
print(f"Number of items in training matrix: {train_matrix.shape[1]}")
print(f"Number of users in test matrix: {test_matrix.shape[0]}")
print(f"Number of items in test matrix: {test_matrix.shape[1]}")


Number of users in training matrix: 3993
Number of items in training matrix: 57079
Number of users in test matrix: 3993
Number of items in test matrix: 57079


In [8]:
# Check for zero interactions
train_user_interactions = train_matrix.tocsr().sum(axis=1).A1
test_user_interactions = test_matrix.tocsr().sum(axis=1).A1

print(f"Number of users with zero interactions in training set: {(train_user_interactions == 0).sum()}")
print(f"Number of users with zero interactions in testing set: {(test_user_interactions == 0).sum()}")


Number of users with zero interactions in training set: 0
Number of users with zero interactions in testing set: 601


In [9]:
import numpy as np
import pandas as pd
from scipy.sparse import csr_matrix, vstack
import implicit
import mlflow.pyfunc
class ALSModelWrapper(mlflow.pyfunc.PythonModel):
    def __init__(self, als_model, user_item_matrix, user_mapping, movie_mapping, factors, iterations, regularization):
        """
        Custom MLflow wrapper for ALS-based recommendations with ID mapping.
        """
        self.als_model = als_model
        self.user_item_matrix = user_item_matrix
        self.user_mapping = user_mapping
        self.movie_mapping = movie_mapping
        self.reverse_movie_mapping = {v: k for k, v in movie_mapping.items()}  # Reverse mapping for recommendations
        self.factors = factors
        self.iterations = iterations
        self.regularization = regularization

    def _get_user_index(self, user_id):
        """Get the matrix index for a given user ID."""
        if isinstance(user_id, np.ndarray):
            user_id = user_id.item()  # Convert numpy array to a scalar
        return self.user_mapping.get(user_id, None)

    def _get_movie_indices(self, movie_ids):
        """Convert movie IDs to matrix column indices."""
        return [self.movie_mapping[movie_id] for movie_id in movie_ids if movie_id in self.movie_mapping]

    def _get_movie_ids(self, movie_indices):
        """Convert matrix column indices back to movie IDs."""
        return [self.reverse_movie_mapping[i] for i in movie_indices]

    def _reinitialize_and_retrain(self):
        """Reinitialize the ALS model and retrain with the updated matrix."""
        self.als_model = implicit.als.AlternatingLeastSquares(
            factors=self.factors,
            regularization=self.regularization,
            iterations=self.iterations,
            random_state=42
        )
        self.als_model.fit(self.user_item_matrix)

    def retrain_with_updated_user(self, user_index, movie_ids, ratings):
        """Update a known user's ratings and retrain."""
        from scipy.sparse import csr_matrix

        # Map movie_ids to matrix column indices
        movie_indices = self._get_movie_indices(movie_ids)

        # Create a sparse vector for the new ratings
        new_ratings = csr_matrix((ratings, ([0] * len(movie_indices), movie_indices)),
                                 shape=(1, self.user_item_matrix.shape[1]))

        # Combine existing ratings with new ratings
        existing_ratings = self.user_item_matrix[user_index]
        updated_ratings = existing_ratings + new_ratings

        # Replace the user's row in the matrix
        self.user_item_matrix[user_index] = updated_ratings

        # Reinitialize and retrain the ALS model
        self._reinitialize_and_retrain()

    def retrain_with_new_user(self, movie_ids, ratings):
        """Add a new user to the matrix and retrain."""
        from scipy.sparse import csr_matrix, vstack

        # Map movie_ids to matrix column indices
        movie_indices = self._get_movie_indices(movie_ids)

        # Create a sparse vector for the new user ratings
        new_user_ratings = csr_matrix((ratings, ([0] * len(movie_indices), movie_indices)),
                                      shape=(1, self.user_item_matrix.shape[1]))

        # Add the new user to the matrix
        self.user_item_matrix = vstack([self.user_item_matrix, new_user_ratings])

        # Reinitialize and retrain the ALS model
        self._reinitialize_and_retrain()

    def predict(self, context, model_input):
        """
        Predict recommendations for a user.

        Parameters:
        - model_input: Input data in DataFrame or dictionary format.

        Returns:
        - List of recommended movie IDs.
        """
        # Convert DataFrame input to dictionary
        if isinstance(model_input, pd.DataFrame):
            model_input = model_input.iloc[0].to_dict()  # Take the first row

        # Extract fields from the input
        user_id = model_input.get("user_id", None)
        n_recommendations = model_input.get("n_recommendations", 10)
        movie_ids = model_input.get("user_interactions", None)
        ratings = model_input.get("ratings", None)  # New field for user ratings

        if user_id is not None:
            # Check if user_id exists in the mapping
            user_index = self._get_user_index(user_id)
            if user_index is not None:
                # Known user
                if movie_ids is not None and ratings is not None:
                    # Update existing user's row and retrain
                    self.retrain_with_updated_user(user_index, movie_ids, ratings)

                # Recommend for existing user
                recommendations, _ = self.als_model.recommend(
                    user_index,
                    self.user_item_matrix[user_index],
                    N=n_recommendations,
                )
                return self._get_movie_ids(recommendations)

        if movie_ids is not None and ratings is not None:
            # New user
            self.retrain_with_new_user(movie_ids, ratings)

            # Get the new user's ID (last row in the matrix)
            new_user_index = self.user_item_matrix.shape[0] - 1

            # Recommend for the new user
            recommendations, _ = self.als_model.recommend(
                new_user_index,
                self.user_item_matrix[new_user_index],
                N=n_recommendations,
            )
            return self._get_movie_ids(recommendations)

        # Raise error if input is invalid
        raise ValueError("Invalid input: Provide a valid 'user_id' or 'user_interactions' with 'ratings'.")


Now we have matrices with the ratings of the users, and each of them appear in training, test and validation.

In [10]:

import mlflow.pyfunc
import joblib
from implicit.als import AlternatingLeastSquares
from implicit.evaluation import mean_average_precision_at_k
from scipy.sparse import vstack, csr_matrix


# Configurar MLflow
mlflow.set_tracking_uri("http://localhost:5000")  

# Lista de combinaciones de hiperparámetros
hyperparameter_grid = [
    {"factors": 50, "iterations": 100, "regularization": 0.1},
    {"factors": 100, "iterations": 50, "regularization": 0.01},
    {"factors": 20, "iterations": 150, "regularization": 0.05}
]

# Entrenar y registrar múltiples modelos
for i, params in enumerate(hyperparameter_grid):
    with mlflow.start_run(run_name=f"ALS Model Run {i}"):
        # Extraer hiperparámetros
        factors = params["factors"]
        iterations = params["iterations"]
        regularization = params["regularization"]

        # Registrar los hiperparámetros en MLflow
        mlflow.log_param("factors", factors)
        mlflow.log_param("iterations", iterations)
        mlflow.log_param("regularization", regularization)

        # Entrenar el modelo
        als_model = AlternatingLeastSquares(
            factors=factors,
            iterations=iterations,
            regularization=regularization,
            random_state=42
        )
        als_model.fit(train_matrix)

        # Evaluar el modelo
        mapk = mean_average_precision_at_k(als_model, train_matrix, test_matrix, K=5)
        mlflow.log_metric("MAPK5", mapk)

        # Guardar el modelo localmente
        model_path = f"./mlruns/models/als_model_{i}.pkl"  # Usar una ruta válida dentro del contenedor
        joblib.dump(als_model, model_path)

        # Registrar el modelo en MLflow usando el wrapper
        registered_model_name = f"ALS_Recommender_Model_with_Predict_{i}"
        mlflow.pyfunc.log_model(
            artifact_path="model",
            python_model= ALSModelWrapper(als_model, train_matrix, user_mapping, movie_mapping,factors, iterations, regularization),
            registered_model_name=registered_model_name
        )

        print(f"Modelo {registered_model_name} registrado con MAP@5: {mapk:.4f}")


  check_blas_config()


  0%|          | 0/100 [00:00<?, ?it/s]

  0%|          | 0/3392 [00:00<?, ?it/s]

Successfully registered model 'ALS_Recommender_Model_with_Predict_0'.
2024/12/11 16:41:40 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: ALS_Recommender_Model_with_Predict_0, version 1
Created version '1' of model 'ALS_Recommender_Model_with_Predict_0'.


Modelo ALS_Recommender_Model_with_Predict_0 registrado con MAP@5: 0.5122
🏃 View run ALS Model Run 0 at: http://localhost:5000/#/experiments/0/runs/9677c9153b0d41e69d1f815174d82972
🧪 View experiment at: http://localhost:5000/#/experiments/0


  0%|          | 0/50 [00:00<?, ?it/s]

  0%|          | 0/3392 [00:00<?, ?it/s]

Successfully registered model 'ALS_Recommender_Model_with_Predict_1'.
2024/12/11 16:42:01 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: ALS_Recommender_Model_with_Predict_1, version 1
Created version '1' of model 'ALS_Recommender_Model_with_Predict_1'.


Modelo ALS_Recommender_Model_with_Predict_1 registrado con MAP@5: 0.5123
🏃 View run ALS Model Run 1 at: http://localhost:5000/#/experiments/0/runs/4351dbcbcb7546f6b2cec9483120b38d
🧪 View experiment at: http://localhost:5000/#/experiments/0


  0%|          | 0/150 [00:00<?, ?it/s]

  0%|          | 0/3392 [00:00<?, ?it/s]

Successfully registered model 'ALS_Recommender_Model_with_Predict_2'.
2024/12/11 16:42:33 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: ALS_Recommender_Model_with_Predict_2, version 1
Created version '1' of model 'ALS_Recommender_Model_with_Predict_2'.


Modelo ALS_Recommender_Model_with_Predict_2 registrado con MAP@5: 0.4835
🏃 View run ALS Model Run 2 at: http://localhost:5000/#/experiments/0/runs/12240741fcd74690a2300fe51488e4e0
🧪 View experiment at: http://localhost:5000/#/experiments/0


This next cell has to be used when added a new model, to format it in order that it can be opened in Docker, as the file path will be app/MLflow, and not the local folder where the model is trained.

In [12]:
import os
import re

mlruns_dir = "mlruns"
# This regex will match any path that starts with file:/// and eventually ends with /Agile/MLflow
# It captures 'file:///' followed by any number of characters until '/Agile/MLflow'
pattern = re.compile(r"file:///.*Agile\\MLflow")

new_prefix = "file:///app/MLflow"

for root, dirs, files in os.walk(mlruns_dir):
    for file in files:
        if file.endswith(".yaml") or file == "MLmodel":
            file_path = os.path.join(root, file)
            with open(file_path, 'r') as f:
                content = f.read()

            # Replace any occurrence of paths ending in /Agile/MLflow with /app/MLflow
            new_content = pattern.sub(new_prefix, content)

            if new_content != content:
                print(f"Updating paths in {file_path}")
                with open(file_path, 'w') as f:
                    f.write(new_content)
