In [37]:
# imports
from abc import abstractmethod
from datetime import datetime
from typing import List, Literal, Optional

import numpy as np
import pandas as pd
import scipy.stats as stats
from loguru import logger
from pydantic import BaseModel
from scipy.sparse import csr_matrix
from scipy.sparse import hstack
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm


In [38]:
class Recommender:
    def __init__(self):
        self.k = 3 # default
        self.user_id = None
        self.item_id = None
        self.similarity: Literal["cosine", "pearson"] = "cosine"  # default
        self.calculation_variant: Literal["weighted", "unweighted"] = "weighted"  # default
        self.data = None


    @abstractmethod
    def _preprocess_data(self):
        ...


    def _prepare_information(self, user_id: str, item_id: str, k: int, similarity: Literal["cosine", "pearson"] = "cosine", calculation_variant: Literal["weighted", "unweighted"] = "weighted") -> None:
        self.user_id = user_id
        self.item_id = item_id
        self.similarity = similarity
        self.calculation_variant = calculation_variant
        self.k = k

        if similarity == 'pearson' and self.data is not None:
            self.data['mean'] = self.data.mean(axis=1)


    @abstractmethod
    def predict(
            self,
            user_id: str,
            item_id: str,
            similarity: Optional[Literal['cosine', 'pearson']] = 'cosine',   # only for collaborative filtering
            calculation_variety: Optional[Literal['weighted', 'unweighted']] = 'weighted',  # only for collaborative filtering
            k: Optional[int] = 3,
            second_k_value: Optional[int] = None):
        ...

In [39]:
class CollaborativeFilteringRecommender(Recommender):
    def __init__(self, data: pd.DataFrame, mode: Literal['user', 'item'] = 'user', display_results_for_each_step: Optional[bool] = False) -> None:
        super().__init__()
        self.display_results_for_each_step = display_results_for_each_step
        self.original_data = data
        self.mode = mode
        self._preprocess_data()


    def _preprocess_data(self) -> None:
        self.original_data = self.original_data.set_index("user_ID")
        self.original_data.index = self.original_data.index.astype(str) # convert the index to string (due to error with int values)
        if self.mode == 'item':
            self.data = self.original_data.T  # transpose for item based
        else:
            self.data = self.original_data  # original for user based


    def _calculate_distance_and_indices(self, dataframe: pd.DataFrame) -> ([], []):
        knn = NearestNeighbors(metric="cosine", algorithm='brute')
        knn.fit(dataframe.values)
        distances, indices = knn.kneighbors(dataframe.values, n_neighbors=self.k + 1)

        if self.mode == 'item':
            index = dataframe.index.get_loc(self.item_id)
        else:
            index = dataframe.index.get_loc(self.user_id)

        similar_distances = distances[index, 1:]
        similar_indices = indices[index, 1:]

        return similar_distances, similar_indices

    def _calculate_similarities(self, similar_distances: np.ndarray) -> np.ndarray:
        similarity = [1 - x for x in similar_distances]
        similarity = [(y + 1) / 2 for y in similarity]
        return np.array(similarity)

    def _calculate_result(self, similarity: np.ndarray, ratings: np.ndarray) -> float:
        if self.calculation_variant == "weighted":
            mean = np.dot(ratings, similarity) / similarity.sum()
            return mean
        else:
            return float(np.mean(ratings))

    def _check_values(self) -> None:
        if self.mode == 'user':
            if self.user_id not in self.data.index:
                raise ValueError(f"User {self.user_id} nicht in Daten.")
            if self.item_id not in self.data.columns:
                raise ValueError(f"Item {self.item_id} nicht in Daten.")
        elif self.mode == 'item':
            if self.user_id not in self.original_data.index:
                raise ValueError(
                    f"User {self.user_id} nicht in Originaldaten.")
            if self.item_id not in self.data.index:
                raise ValueError(
                    f"Item {self.item_id} nicht in transponierten Daten.")

    def _process_item_based(self) -> pd.DataFrame:
        user_ratings = self.original_data.loc[self.user_id]

        # filter based on the item. Only the users that already gave a rating are relevant
        rated_items = user_ratings[user_ratings > 0.0].index.tolist()

        if not rated_items:
            raise ValueError(f"User {self.user_id} hat keine Items bewertet!")

        return self.data.loc[rated_items + [self.item_id]]

    def _process_user_based(self) -> pd.DataFrame:
        # filter based on the item. Only the users that already gave a rating are relevant
        relevant_df = self.data[self.data[self.item_id] > 0.0]

        # add the user we are looking for (due to non-existing rating this user where filtered out)
        return pd.concat([relevant_df, self.data.loc[[self.user_id]]])

    def _normalize_for_pearson(self, relevant_df: pd.DataFrame) -> pd.DataFrame:
        mean_values = relevant_df.mean(axis=1).to_numpy()
        relevant_df = relevant_df.sub(mean_values, axis=0)
        return relevant_df

    def predict(
            self,
            user_id: str,
            item_id: str,
            similarity: Literal['cosine', 'pearson'] = 'cosine',
            calculation_variety: Literal['weighted', 'unweighted'] = 'weighted',
            k: Optional[int] = 3,
            second_k_value: Optional[int] = None) -> float:
        self._prepare_information(user_id=user_id, item_id=item_id, similarity=similarity, calculation_variant=calculation_variety, k=k)
        self._check_values()

        if self.mode == 'item':
            relevant_df = self._process_item_based()
        else:
            relevant_df = self._process_user_based()

        if similarity == 'pearson':
            self._normalize_for_pearson(relevant_df)

        # make sure that there are no NaN values -> set NaN to 0.0
        relevant_df = relevant_df.fillna(0.0)
        similar_distances, similar_indices = self._calculate_distance_and_indices(dataframe=relevant_df)

        if self.mode == 'item':
            ratings = relevant_df.iloc[similar_indices][self.user_id].to_numpy()
        else:
            ratings = relevant_df.iloc[similar_indices][self.item_id].to_numpy()

        similarity = self._calculate_similarities(similar_distances)
        result = self._calculate_result(similarity, ratings)

        if self.display_results_for_each_step:
          self.explain(similar_indices, relevant_df, ratings, similarity, result)

        return result

    def explain(self, similar_indices, relevant_df, ratings, similarity, result) -> None:
        print("-" * 50)
        print(f"<mode: {self.mode}>")
        print(f"({self.calculation_variant}) Mittelwert: {result:.4f}")
        print(f"Metrik: {self.similarity}")
        print()
        print(f"k ({self.k}) ähnlichsten {'Items' if self.mode == 'item' else 'Nutzer'}:")
        df = pd.DataFrame({
            "ID": relevant_df.index[similar_indices],
            "rating": ratings,
            "similarity": similarity
        }).reset_index(drop=True)
        print(df.to_string(index=True, header=True))
        print("-" * 50)


In [40]:
class ContentBasedRecommender(Recommender):
    def __init__(self, item_profile: pd.DataFrame, user_ratings: pd.DataFrame) -> None:
        super().__init__()
        self.item_profile = item_profile
        self.user_ratings = user_ratings
        self.k = 3
        self.feature_matrix = None
        self._preprocess_data()

        # check if the features "budget", "revenue", "runtime" are relevant for the item/rating correlation
        self._check_features_correlation(features=["budget", "revenue", "runtime"])
        self._calculate_tfidf_matrix()


    def _preprocess_data(self):
        self.item_profile["item_ID"] = self.item_profile["item_ID"].astype(str)
        self.user_ratings["item_ID"] = self.user_ratings["item_ID"].astype(str)
        self.user_ratings["user_ID"] = self.user_ratings["user_ID"].astype(str)

    def _check_features_correlation(self, features: List[str]) -> None:
        irrelevant_features = []  #  list for irrelevant feature that will be removed

        for feature in features:
            if feature not in self.item_profile.columns:
                continue

            # combine item and user ratings
            merged_data = pd.merge(self.user_ratings, self.item_profile, on="item_ID")

            # convert to numeric
            feature_data = pd.to_numeric(merged_data[feature].fillna(0), errors="coerce")
            rating_data = pd.to_numeric(merged_data["rating"].fillna(0), errors="coerce")

            # calculate the correlation between the user rating and the feature
            correlation, p_value = stats.pearsonr(feature_data, rating_data)

            # check if the correlation is relevant / significant
            if abs(correlation) < 0.1 or p_value > 0.05:
                logger.debug(f"Feature '{feature}' does not have a sigificant correlation and will be ignored.")
                irrelevant_features.append(feature)
            else:
                logger.debug(f"Feature '{feature}' has a significant correlation: {correlation}")

        self.item_profile.drop(columns=irrelevant_features, inplace=True)


    def _safe_get_feature(self, feature_name):
        if feature_name in self.item_profile.columns:
            return self.item_profile[feature_name]
        else:
            return None

    def _calculate_tfidf_matrix(self) -> None:
        # optional but if the title is empty we set it as an empty string
        self.item_profile["title"] = self.item_profile["title"].fillna("")

        # use the TfidfVectorizer() to transform title into numerical feature
        title_vectorizer = TfidfVectorizer()
        title_features = title_vectorizer.fit_transform(self.item_profile["title"])

        # change genre columns in text by just extracting the word after '"Genre_"'
        genre_cols = [col for col in self.item_profile.columns if col.startswith("Genre_")]
        if genre_cols:
            self.item_profile["genre_text"] = self.item_profile[genre_cols].astype(int).apply(
                lambda row: " ".join([col.replace("Genre_", "") for col, val in row.items() if val == 1]), axis=1
            )
            genre_vectorizer = TfidfVectorizer()
            genre_features = genre_vectorizer.fit_transform(self.item_profile["genre_text"])
        else:
            genre_features = np.empty((len(self.item_profile), 0))

        # the language of the items transformed into one-hot-encoded-dummies
        language_dummies = pd.get_dummies(self.item_profile["original_language"], prefix="lang")

        # put runtime into three categories (short, medium, long)
        runtime_feature = self._safe_get_feature("runtime")
        if runtime_feature is not None:
            runtime_bucket = pd.qcut(runtime_feature, q=3, labels=["kurz", "mittel", "lang"])
            runtime_dummies = pd.get_dummies(runtime_bucket, prefix="runtime")
        else:
            runtime_dummies = pd.DataFrame(index=self.item_profile.index)

        # budget and include will be logarithmically transformed and then scaled
        numerical_features = []
        if "budget" in self.item_profile.columns:
            self.item_profile["log_budget"] = np.log1p(self.item_profile["budget"].fillna(0))
            numerical_features.append("log_budget")
        if "revenue" in self.item_profile.columns:
            self.item_profile["log_revenue"] = np.log1p(self.item_profile["revenue"].fillna(0))
            numerical_features.append("log_revenue")

        if numerical_features:
            scaler = StandardScaler()
            scaled_numericals = scaler.fit_transform(self.item_profile[numerical_features])
        else:
            scaled_numericals = np.empty((len(self.item_profile), 0))

        # create feature matrix
        self.feature_matrix = hstack([
            title_features,
            genre_features,
            language_dummies.values,
            runtime_dummies.values,
            scaled_numericals
        ])

        self.feature_matrix = csr_matrix(self.feature_matrix)

    def _check_values(self):
        if self.user_id not in self.user_ratings["user_ID"].values:
            raise ValueError(f"User-ID {self.user_id} not found.")

        if self.item_id not in self.item_profile["item_ID"].values:
            raise ValueError(f"Item-ID {self.item_id} not found.")


    def predict(
            self,
            user_id: str,
            item_id: str,
            similarity: Optional[Literal['cosine', 'pearson']] = 'cosine',  # only for collaborative filtering
            calculation_variety: Optional[Literal['weighted', 'unweighted']] = 'weighted', # only for collaborative filtering
            k: Optional[int] = 3,
            second_k_value: Optional[int] = None) -> float:

        # default function to save all the information
        self._prepare_information(user_id=user_id, item_id=item_id, k=k)

        # check if the values included in the dataframes
        self._check_values()

        # extract only the items, the user rated
        rated_items = self.user_ratings[self.user_ratings["user_ID"] == user_id]
        rated_item_ids = rated_items["item_ID"].values

        # this case can happen when k is greater than the rated items by the user
        if self.k > len(rated_item_ids):
            self.k = len(rated_item_ids)

        # extract the rated item indices from the item profile
        rated_item_indices = self.item_profile[self.item_profile["item_ID"].isin(rated_item_ids)].index

        # check if the user rated some items... if not then return 0.0
        if len(rated_item_indices) == 0:
            return 0.0

        # get the feature matrix that is calculated in the '_calculate_tfidf_matrix()'-Method
        filtered_matrix = self.feature_matrix[rated_item_indices]

        # default kNN usage like in the lecture with brute algorithm and cosine as metric
        knn = NearestNeighbors(metric="cosine", algorithm="brute")
        knn.fit(filtered_matrix)

        item_index = self.item_profile[self.item_profile["item_ID"] == item_id].index[0]
        distances, indices = knn.kneighbors(self.feature_matrix[item_index], n_neighbors=self.k + 1)  # k+1 because the item itself is also included

        # the similar item indices beginning with the first real neighbor
        similar_items = indices.flatten()[1:]
        similar_item_indices = rated_item_indices[similar_items]

        # extract for each item in the similar item indices list the rating and save it in the list
        similar_ratings = []
        for idx in similar_item_indices:
            similar_item_id = self.item_profile.iloc[idx]["item_ID"]
            user_rating = self.user_ratings[(self.user_ratings["user_ID"] == user_id) & (self.user_ratings["item_ID"] == similar_item_id)]
            if not user_rating.empty:
                similar_ratings.append(user_rating["rating"].values[0])

        # if the similar ratings is zero then we return a default 0.0
        if not similar_ratings:
            return 0.0

        # calculate the predicted rating based on the sum of ratings and len of ratings
        return sum(similar_ratings) / len(similar_ratings)

In [41]:
class HybridRecommender(Recommender):
    def __init__(self, data: pd.DataFrame, item_profile: pd.DataFrame, user_ratings: pd.DataFrame, mode: Literal['user', 'item'] = 'user', alpha: float = 0.5):
        super().__init__()
        self.collaborative_recommender = CollaborativeFilteringRecommender(data=data, mode=mode)
        self.content_based_recommender = ContentBasedRecommender(item_profile=item_profile, user_ratings=user_ratings)
        self.alpha = alpha

    def predict(
            self,
            user_id: str,
            item_id: str,
            similarity: Optional[Literal['cosine', 'pearson']] = 'cosine',  # only for collaborative filtering
            calculation_variety: Optional[Literal['weighted', 'unweighted']] = 'weighted', # only for collaborative filtering
            k: Optional[int] = 3,
            second_k_value: Optional[int] = 3):

        collaborative_prediction = self.collaborative_recommender.predict(
            user_id=user_id,
            item_id=item_id,
            similarity=similarity, # ignore that it can be NONE
            calculation_variety=calculation_variety, # ignore that it can be NONE
            k=k
        )

        content_based_prediction = self.content_based_recommender.predict(
            user_id=user_id,
            item_id=item_id,
            similarity=similarity,
            calculation_variety=calculation_variety,
            k=second_k_value
        )

        # combine both with alpha as weight
        combined_prediction = (self.alpha * collaborative_prediction) + ((1 - self.alpha) * content_based_prediction)
        return combined_prediction

In [42]:
class Test(BaseModel):
    name: str
    type: Literal["collaborative_filtering", "content_based", "hybrid"]
    mode: Optional[Literal["user", "item"]] = "item"
    k_value: int
    second_k_value: Optional[int] = 3
    metric: Optional[Literal["cosine", "pearson"]] = 'cosine'
    calculation_variety: Optional[Literal["weighted", "unweighted"]] = 'weighted'
    alpha: Optional[float] = 0.5


class TestResult(BaseModel):
    name: str
    type: Literal["collaborative_filtering", "content_based", "hybrid"]
    mode: Literal["user", "item"]
    k_value: int
    metric: Literal["cosine", "pearson"]
    calculation_variety: Literal["weighted", "unweighted"]
    alpha: float
    mae: float


class TestResults(BaseModel): # just for saving in a "pretty" form
    date: str
    num_tests: int
    best_test: TestResult
    results: List[TestResult]



class MAETester:
    def __init__(self, tests: List[Test], test_data_path: str, data_path: str, item_profile_path: str, user_ratings: str):
        self.tests = tests
        self.testdata = pd.read_csv(test_data_path)  # testdata (for evaluaton)
        self.item_profile = pd.read_csv(item_profile_path)
        self.user_ratings = pd.read_csv(user_ratings)
        self._prepare_data()
        self.data = pd.read_csv(data_path)  # trainings-data
        self.results: List[TestResult] = []


    def _prepare_data(self):
        self.testdata["user_ID"] = self.testdata["user_ID"].astype(str)
        self.testdata["item_ID"] = self.testdata["item_ID"].astype(str)


    def run_tests(self) -> pd.DataFrame:
        for test in self.tests:
            result = self._run_test(test)
            self.results.append(result)
            logger.success(f"Test abgeschlossen: {test.name}, MAE: {result.mae:.4f}\n")

        # display final results
        result_df = self._summarize_test_results()

        # save final results to file
        self._save_to_file()

        return result_df

    def _run_test(self, test: Test) -> TestResult:
        logger.info(f"Running test: {test.name}")

        if test.type == "content_based":
            recommender = ContentBasedRecommender(
                item_profile=self.item_profile,
                user_ratings=self.user_ratings,
            )
        elif test.type == "collaborative_filtering":
            recommender = CollaborativeFilteringRecommender(
                mode=test.mode, # ignore type (that this can be NONE)
                data=self.data,
            )
        elif test.type == "hybrid":
            recommender = HybridRecommender(
                data=self.data,
                item_profile=self.item_profile,
                user_ratings=self.user_ratings,
                mode=test.mode,  # ignore type (that this can be NONE)
                alpha=test.alpha,
            )
        else:
            raise ValueError(f"Unbekannter Recomendertyp: {test.type}")

        predictions = []
        actuals = []

        testdata_list = self.testdata.to_numpy()

        for row in tqdm(testdata_list, desc="Vorhersagen werden berechnet"):
            user_id: str = str(row[0])
            item_id: str = str(row[1])
            actual_rating = row[2]

            try:
                predicted_rating = recommender.predict(
                    user_id=user_id,
                    item_id=item_id,
                    similarity=test.metric,
                    calculation_variety=test.calculation_variety,
                    k=test.k_value,
                    second_k_value=test.second_k_value,
                )
                predictions.append(predicted_rating)
                actuals.append(actual_rating)
            except ValueError as e:
                logger.warning(f"Fehler bei der Vorhersage: {e}")

        mae = self._mean_absolute_error(actuals, predictions)

        return TestResult(
            name=test.name,
            type=test.type,
            mode=test.mode,
            k_value=test.k_value,
            metric=test.metric,
            calculation_variety=test.calculation_variety,
            alpha=test.alpha,
            mae=mae,
        )

    @staticmethod
    def _mean_absolute_error(actuals: List[float], predictions: List[float]) -> float:
        if not actuals or not predictions or len(actuals) != len(predictions):
            raise ValueError("Listen für tatsächliche und vorhergesagte Werte müssen gleich lang und nicht leer sein.")

        absolute_errors = [abs(a - p) for a, p in zip(actuals, predictions)]
        mae = sum(absolute_errors) / len(absolute_errors)
        return mae

    def _summarize_test_results(self) -> pd.DataFrame:
        if not self.results:
            logger.info("Keine Testergebnisse vorhanden.")
            return

        summary_df = pd.DataFrame([{
            "Testname": result.name,
            "Recomendertyp": result.type,
            "Modus": result.mode if result.type == "collaborative_filtering" else "/",
            "k-Wert": result.k_value,
            "Metrik": result.metric if result.type == "collaborative_filtering" else "/",
            "Berechnungsvariante": result.calculation_variety if result.type == "collaborative_filtering" else "/",
            "Alpha (weight)" : result.alpha if result.type == "hybrid" else "/",
            "MAE": result.mae
        } for result in self.results])

        print("-" * 50)
        print("Zusammenfassung der Testergebnisse:")
        print(summary_df.to_string(index=False))
        print("-" * 50)

        return summary_df


    def _save_to_file(self) -> None:
        if not self.results:
            logger.info("Keine Testergebnisse vorhanden, nichts zu speichern.")
            return
        date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        best_test = min(self.results, key=lambda result: result.mae)  # best results based on mae
        test_results = TestResults(
            date=date,
            num_tests=len(self.results),
            best_test=best_test,
            results=self.results
        )

        file_path = f"./outputs/testergebnis_{date.replace(':', '-')}.json"

        with open(file_path, "w", encoding="utf-8") as json_file:
            json_file.write(test_results.model_dump_json(indent=4))

        logger.success(f"Testergebnisse erfolgreich gespeichert.")




# Beginn der Aufrufe und Nutzung der Recommender

wir haben viele Testfälle durchgeführt (diese sind ganz unten zu finden) und uns dann aufgrund der MAE-Ergebnisse für folgenden Hybriden Recommender entschieden der sowohl CollaborativeFiltering als auch Conten-Based-Filtering berücksichtigt.

Um diesen Recommender zu testen stellen wir eine struktur bereit wobei lediglich der parameter `testdata` angepasst werden muss um den evaluationsdatensatz zu verwenden. Aufrgund der sehr nah beieinanderliegenden Ergebnisse für pearson und cosine stellen wir beide Testprofile zur Verfügung.

In [43]:
testdata_path = "./data/Testdaten_FlixNet.csv"

In [44]:
choosen_recommender_profile = [
    Test(name="HybridRecommender", type="hybrid", mode="user", k_value=5, second_k_value=14, metric="pearson", calculation_variety="weighted", alpha=0.5),
    Test(name="HybridRecommender", type="hybrid", mode="user", k_value=5, second_k_value=14, metric="cosine", calculation_variety="weighted", alpha=0.5)
]

In [45]:
tester = MAETester(
        tests=choosen_recommender_profile,
        test_data_path=testdata_path,
        data_path="./data/Bewertungsmatrix_FlixNet.csv",
        user_ratings="./data/Ratings_FlixNet.csv",
        item_profile_path="./data/Itemprofile_FlixNet.csv",
    )

FileNotFoundError: [Errno 2] No such file or directory: './data/Testdaten_FlixNet.csv'

In [None]:
result = tester.run_tests()
result # print result here