# Notebook Title


**Authors**: Ashkan Khademian, Ujunwa Edum\
**Project Part**: Part I\
**Course**: DATA.ML.360-2024-2025-1 Recommender Systems

# Foundations

## Introduction
In this notebook, we introduce a model-agnostic method for providing counterfactual explanations for group recommendations. The method is based on the idea of finding the smallest set of items that, when removed, results in a change in the recommendation list. The method is inspired by the idea of conciseness, which is considered to be the most important characteristic of an explanation.

## Install Requirements

Use the comment template for installing your packages that are not already present in the google colab environment.

In [1]:
# !pip install <package-name>

## Import Libararies

### Main Libraries

In [2]:
import typing
from time import sleep
from dataclasses import dataclass
from collections import defaultdict
from functools import cached_property

from tqdm import tqdm
import hdbscan
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from itertools import combinations
from sentence_transformers import SentenceTransformer
from part2_utils.predict_user_rating import transform_csv_dataframe_to_user_movies_matrix

  from .autonotebook import tqdm as notebook_tqdm


### Typing

In [3]:
from typing import *
from pandas.core.frame import DataFrame, Series

## Define Constants

In [4]:
UserIDType = int
MovieIDType = int

RatingType = float
PreferenceScoreType = float

MOVIE_DETAILS_DATASET = "data/ml-latest-small/movies.csv"
USER_TAGS_FOR_MOVIES_DATASET = "data/ml-latest-small/tags.csv"

RATINGS_DATASET = "data/ml-latest-small/ratings.csv"

# Counterfactual Explanation for Group Recommendations

## Data Classes

Group represents the group of users for which we want to provide recommendations.\
Explanation represents the movies that we want to provide explanations for.

In [5]:
@dataclass
class Group:
    users: List[UserIDType]

    def get_item_intensity(
            self,
            movie: MovieIDType,
            users_rated_movie: Callable[[UserIDType], List[MovieIDType]],
    ) -> float:
        return len([user for user in self.users if user in users_rated_movie(movie)])

    def __len__(self):
        return len(self.users)

    def __iter__(self):
        return iter(self.users)

In [6]:
@dataclass
class Explanation:
    movies: List[MovieIDType]

    def get_user_intensity(
            self,
            user: UserIDType,
            movies_rated_for_user: Callable[[UserIDType], List[MovieIDType]],
    ) -> float:
        return len([movie for movie in self.movies if movie in movies_rated_for_user(user)])

    def __len__(self):
        return len(self.movies)

    def __iter__(self):
        return iter(self.movies)

## Explanation Analysis

### Explanation Analyzer
The `ExplanationAnalyzer` class provides methods to analyze the explanation provided for a group of users. The class provides the following methods:
- `get_popularity`: Returns the popularity of the explanation.
- `get_normalized_popularity`: Returns the normalized popularity of the explanation.
- `get_fairness`: Returns the fairness of the explanation.
- `get_normalized_fairness`: Returns the normalized fairness of the explanation.
- `get_conciceness`: Returns the conciseness of the explanation.
- `calculate_explanation_score`: Returns the overall score of the explanation. The score is calculated as the average of the normalized popularity, normalized fairness, and conciseness.

In [7]:
class ExplanationAnalyzer:
    def __init__(
            self,
            group: Group,
            explanation: Explanation,
            users_rated_movie: Callable[[MovieIDType], List[UserIDType]],
            movies_rated_for_user: Callable[[UserIDType], List[MovieIDType]],
    ):
        self.group = group
        self.explanation = explanation
        self.users_rated_movie = users_rated_movie
        self.movies_rated_for_user = movies_rated_for_user

    def get_popularity(self) -> float:
        return sum(
            [
                self.group.get_item_intensity(movie, self.users_rated_movie)
                for movie in self.explanation
            ]
        ) / len(self.explanation)

    def get_normalized_popularity(self) -> float:
        return self.get_popularity() / len(self.group)

    def get_fairness(self) -> float:
        users_intensity_list = [
            self.explanation.get_user_intensity(user, self.movies_rated_for_user)
            for user in self.group
        ]
        return max(users_intensity_list) - min(users_intensity_list)

    def get_normalized_fairness(self) -> float:
        return self.get_fairness() / len(self.explanation)

    def get_conciceness(self) -> float:
        return 1 / len(self.explanation)

    def calculate_explanation_score(self) -> float:
        return (self.get_normalized_popularity() + self.get_normalized_fairness() + self.get_conciceness()) / 3


# Proposed Method

## Baseline
To generate explanations for a group, we first define the available search area, which is the items any group member interacts with:
- Since we consider conciseness to be the most essential characteristic of an explanation, we sequentially test each potential combination of items $p$.
- We remove these items from all group members’ interaction sets.
- if removal of the subset resulted in not seeing the item $p$ in the recommendation list, we consider the subset as a valid explanation.

if there exist multiple valid explanations, we select the one with the highest explanation score. The explanation score can be further calculated using three different metrics:
- Popularity: The average popularity of the items in the explanation.
- Fairness: The difference between the highest and lowest intensity of the items in the explanation.
- Conciseness: The inverse of the number of items in the explanation.
### Formulations
for a group $G$ and an explanation $E$:
- Item and User Intensity
$$
ItemIntensity(i) = |{u \in G ; i \in U(i)}|
$$
$$
UserIntensity(u) = |{i \in E ; u \in I(u)}|
$$
- Popularity
$$
Popularity(E) = \frac{\sum_{i \in E} ItemIntensity(i)}{|E|}
$$
- Fairness
$$
Fairness(E) = max(UserIntensity(u)) - min(UserIntensity(u))
$$
- Conciseness
$$
Conciseness(E) = \frac{1}{|E|}
$$
## Our Method
We confine the search space using three different strategies each called a `Filter`. the filters are further introduced in this notebook.
Then for the resulted explanations, we calculate the explanation score using this average formula:
$$
ExplanationScore(E) = \frac{1}{3}.(\frac{Popularity(E)}{|G|} + \frac{Fairness(E)}{|E|} + Conciseness(E))
$$

## Filters

### Interface

In [8]:
class MoviePreferencesFilterInterface:

    def __call__(self, preferences: List[MovieIDType]) -> List[MovieIDType]:
        pass

### Filter 1: Popularity Filter

`MoveGlobalPopularityFilter` removes the items that are not popular enough in the dataset. The popularity of an item is defined as the number of users who have interacted with it. The filter removes items with popularity below a certain percentile threshold.

In [9]:
class MoveGlobalPopularityFilter(MoviePreferencesFilterInterface):

    def __init__(self, user_movie_matrix: DataFrame, percentile_threshold: float = 0.3):
        self.user_movie_matrix = user_movie_matrix
        self.percentile_threshold = percentile_threshold

    @cached_property
    def movie_rating_counts(self):
        user_movie_matrix_without_user_column = self.user_movie_matrix.drop(columns=['userId'])
        movie_rating_counts = user_movie_matrix_without_user_column.notna().sum()
        return movie_rating_counts

    @cached_property
    def popularity_percentile_value(self):
        percentile_value = self.movie_rating_counts.quantile(self.percentile_threshold)
        return percentile_value

    def __call__(self, preferences: List[MovieIDType]) -> List[MovieIDType]:
        return [movie for movie in preferences if self.movie_rating_counts[movie] > self.popularity_percentile_value]

### Filter 2: Group Intensity Filter

`MovieGroupIntensityFilter` removes the items that are not intense enough for the group. The intensity of an item is defined as the number of group members who have interacted with it. The filter removes items with intensity below a certain threshold.

In [10]:
class MovieGroupIntensityFilter(MoviePreferencesFilterInterface):

    def __init__(
            self,
            group: Group,
            users_rated_movie: Callable[[UserIDType], List[MovieIDType]],
            intensity_threshold: Union[float, int] = 0.5
    ):
        self.group = group
        self.users_rated_movie = users_rated_movie
        if isinstance(intensity_threshold, float):
            self.intensity_threshold = len(group) * intensity_threshold
        else:
            self.intensity_threshold = intensity_threshold

    def __call__(self, preferences: List[MovieIDType]) -> List[MovieIDType]:
        return [
            movie
            for movie in preferences
            if self.group.get_item_intensity(movie, self.users_rated_movie) > self.intensity_threshold
        ]

### Filter 3: Preference Similar to Query Filter

#### Movie Clusters

`Encoder` is a class that encodes the movie descriptions using the SentenceTransformer model.\
`MovieDesciber` is a class that generates a description for a movie using its genres and user tags. These descriptions are further encoded using the `Encoder` class.

In [11]:
class Encoder:
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)

    def __call__(self, descriptive: str):
        embedding = self.model.encode(descriptive, convert_to_numpy=True)
        return embedding


class MovieDesciber:
    def __init__(self):
        self.movie_details = pd.read_csv(MOVIE_DETAILS_DATASET)
        self.user_tags_for_movies = pd.read_csv(USER_TAGS_FOR_MOVIES_DATASET)

    def __call__(self, movie_id: MovieIDType):
        genres = self.movie_details[self.movie_details['movieId'] == movie_id]['genres'].values[0]
        description_list = set(genre.lower() for genre in genres.split('|'))

        tags = self.user_tags_for_movies[self.user_tags_for_movies['movieId'] == movie_id]['tag'].tolist()
        tags_set = set(tag.lower() for tag in tags)
        description_list = description_list.union(tags_set)

        description_string = " ".join(description_list)
        return description_string

`MoviesClustering` is a class that clusters movies based on their descriptions. The class uses the HDBSCAN clustering algorithm to cluster the movies. The class provides the following methods:
- `cluster_movies`: Clusters the movies based on their descriptions (which is provided by `MovieDescriber`) and returns the clusters and the number of clusters.

In [12]:
class MoviesClustering:
    def __init__(self, movies: List[MovieIDType], min_cluster_size=None):
        self.movies = movies
        self.min_cluster_size = min_cluster_size
        self.movie_encoder = Encoder()
        self.movie_describer = MovieDesciber()

    def get_movie_encodings(self):
        movie_encodings = []
        for movie_id in self.movies:
            movie_description = self.movie_describer(movie_id)
            movie_encodings.append(self.movie_encoder(movie_description))
        return movie_encodings

    def get_hdbscan_kwargs(self):
        if self.min_cluster_size is not None:
            return {"min_cluster_size": self.min_cluster_size}
        return {}

    def cluster_movies(self, important_index: Optional[int] = None):
        movie_encodings = self.get_movie_encodings()
        encodings_array = np.array(movie_encodings)

        clusterer = hdbscan.HDBSCAN(**self.get_hdbscan_kwargs())
        labels = clusterer.fit_predict(encodings_array)

        # If the important movie is noise, adjust clustering
        if important_index is not None and labels[important_index] == -1:
            important_encoding = movie_encodings[important_index]
            # Find the nearest non-noise cluster
            cluster_indices = [i for i, label in enumerate(labels) if label != -1]
            nearest_cluster_index = min(
                cluster_indices,
                key=lambda i: np.linalg.norm(movie_encodings[i] - important_encoding)
            )
            labels[important_index] = labels[nearest_cluster_index]

        # Organize movie IDs into clusters
        clusters = {}
        for movie_id, label in zip(self.movies, labels):
            if label != -1:  # Ignore noise points (label -1)
                clusters.setdefault(label, []).append(movie_id)

        # Convert clusters dictionary to a list of clusters
        cluster_list = list(clusters.values())

        # Return the clusters and the number of clusters
        return cluster_list, len(cluster_list)

#### Filter Based on Movie Clustering

`MovieQueryClusterFilter` removes the items that are not similar enough to the query item. The filter uses the clustering of the movies based on their descriptions. The filter removes items that are not in the same cluster as the query item. 

In [13]:
class MovieQueryClusterFilter(MoviePreferencesFilterInterface):

    def __init__(self, query: MovieIDType, cluster_threshold: Optional[Union[float, int]] = None):
        self.query = query
        self.cluster_threshold = cluster_threshold

    def get_cluster_threshold(self, clustering_movies: List[MovieIDType]) -> int:
        if self.cluster_threshold is None or isinstance(self.cluster_threshold, int):
            return self.cluster_threshold
        return int(len(clustering_movies) * self.cluster_threshold)

    def get_clustering(self, clustering_movies: List[MovieIDType]) -> MoviesClustering:
        return MoviesClustering(clustering_movies, self.get_cluster_threshold(clustering_movies))

    def __call__(self, preferences: List[MovieIDType]) -> List[MovieIDType]:
        clustering_movies = preferences
        if not self.query in clustering_movies:
            clustering_movies.append(self.query)

        clustering = self.get_clustering(clustering_movies)
        clusters, _ = clustering.cluster_movies(important_index=clustering_movies.index(self.query))
        query_cluster = []
        for cluster in clusters:
            if self.query in cluster:
                query_cluster = cluster
                break

        assert query_cluster, "Don't know why the query cluster is empty"

        return [movie for movie in preferences if movie in query_cluster]

### Composite Filter

`CompositeFilter` is a class that combines multiple filters. This class adheres to the `MoviePreferencesFilterInterface` interface so that it can be treated like a simple filter.

In [14]:
class CompositeFilter(MoviePreferencesFilterInterface):

    def __init__(self, filters: List[MoviePreferencesFilterInterface]):
        self.filters = filters

    def apply_filter(
            self, preferences: List[MovieIDType], filter_: MoviePreferencesFilterInterface
    ) -> List[MovieIDType]:
        return filter_(preferences)

    def __call__(self, preferences: List[MovieIDType]) -> List[MovieIDType]:
        for filter_ in tqdm(self.filters, desc="Applying filters"):
            preferences = self.apply_filter(preferences, filter_)
        return preferences
    
class CompositeFilterWithCondition(CompositeFilter):

    def __init__(self, filters: List[MoviePreferencesFilterInterface], condition: Callable[[List[MovieIDType]], bool]):
        super().__init__(filters)
        self.condition = condition

    def apply_filter(
            self, preferences: List[MovieIDType], filter_: MoviePreferencesFilterInterface
    ) -> List[MovieIDType]:
        filtered_preferences = super().apply_filter(preferences, filter_)
        if self.condition(filtered_preferences):
            return filtered_preferences
        return preferences

## Implementation

`GroupRecommendationModel` is a class that provides recommendations for a group of users. Since Counterfactual Explanation for Group Recommendations is a model-agnostic model, we don't take into account the model's specifics. The class provides the following methods:
- `recommend`: Returns the recommendations for the group of users.

In [15]:
from black_box.model_v2 import GroupRecommendationModel
sample_group = [280, 528, 251, 43, 237, 149, 372, 114, 360, 366]
model = GroupRecommendationModel(sample_group)

this function returns the composite filter that combines the three filters.

In [16]:
def get_preferences_filter(
        group: Group,
        query: MovieIDType,
        user_movie_matrix: DataFrame,
        users_rated_movie: Callable[[MovieIDType],
        List[UserIDType]],
) -> MoviePreferencesFilterInterface:

    return CompositeFilterWithCondition(
        filters=[
            MovieGroupIntensityFilter(
                group=group,
                users_rated_movie=users_rated_movie,
            ),
            MoveGlobalPopularityFilter(
                user_movie_matrix=user_movie_matrix,
            ),
            MovieQueryClusterFilter(
                query=query,
            )
        ],
        condition=lambda filtered_preferences: len(filtered_preferences) > 5
    )

`get_preferences_for_group` returns the preferences (the union set of items interacted by the group users) for the group.

In [17]:
def get_preferences_for_group(
        group: Group, user_movie_matrix: DataFrame
) -> List[MovieIDType]:
    user_movie_matrix_limited = user_movie_matrix[user_movie_matrix['userId'].isin(group.users)]
    return list(user_movie_matrix_limited.dropna(axis=1, how='all').drop(columns=['userId']).columns)

`get_users_rated_movie` returns the users who have interacted with a specific movie.

In [18]:
def get_users_rated_movie(
        users: List[UserIDType],
        movies: List[MovieIDType],
        user_movie_matrix: DataFrame,
) -> Callable[[MovieIDType], List[UserIDType]]:
    user_movie_matrix_limited = user_movie_matrix[user_movie_matrix['userId'].isin(users)]
    user_movie_matrix_limited = user_movie_matrix_limited[["userId"] + movies]

    def users_rated_movie(movie):
        return list(user_movie_matrix_limited[user_movie_matrix_limited[movie].notna()]["userId"])

    return users_rated_movie

`get_movies_rated_for_user` returns the movies that a user has interacted with.

In [19]:
def get_movies_rated_for_user(
        users: List[UserIDType],
        movies: List[MovieIDType],
        user_movie_matrix: DataFrame,
) -> Callable[[UserIDType], List[MovieIDType]]:
    user_movie_matrix_limited = user_movie_matrix[user_movie_matrix['userId'].isin(users)]
    user_movie_matrix_limited = user_movie_matrix_limited[["userId"] + movies]

    def movies_rated_for_user(user):
        return list(
            user_movie_matrix_limited[user_movie_matrix_limited["userId"] == user].drop(
                columns=["userId"]
            ).dropna(axis=1, how='any').columns
        )

    return movies_rated_for_user

`find_optimum_explanation` finds the optimum explanation for a group of users.

In [20]:
def find_optimum_explanation(
        group_users: List[UserIDType], user_movie_matrix: DataFrame, query: MovieIDType,
        model: GroupRecommendationModel,
) -> Explanation:
    group = Group(group_users)

    preferences: List[MovieIDType] = get_preferences_for_group(group, user_movie_matrix)

    filter_: MoviePreferencesFilterInterface = get_preferences_filter(
        group, query, user_movie_matrix, get_users_rated_movie(
            group.users, preferences, user_movie_matrix
        )
    )
    filtered_preferences = filter_(preferences)

    explanations = []
    for r in reversed(range(1, len(filtered_preferences) + 1)):
        for combination in tqdm(
                list(combinations(filtered_preferences, r)),
                desc="checking combinations when r={}/{}".format(r, len(filtered_preferences))
        ):
            to_be_removed = list(combination)
            recommendation = model.recommend(to_be_removed)
            if query not in recommendation:
                explanation = Explanation(list(combination))
                explanations.append(explanation)
            else:
                if r == len(filtered_preferences):
                    raise ValueError("Even the full set is not a valid explanation")

    sleep(1)
    explanation_scores = []
    for explanation in tqdm(explanations, desc="calculating explanation scores"):
        explanation_scores.append(ExplanationAnalyzer(
            group=group,
            explanation=explanation,
            users_rated_movie=get_users_rated_movie(
                group.users, explanation.movies, user_movie_matrix
            ),
            movies_rated_for_user=get_movies_rated_for_user(
                group.users, explanation.movies, user_movie_matrix
            ),
        ).calculate_explanation_score())

    # find explanation with highest score
    best_explanation = explanations[explanation_scores.index(max(explanation_scores))]
    print("Number of preferences before filter: ", len(preferences))
    print("Number of found explanations: ", len(explanations))
    return best_explanation

### Example

In [21]:
recommended = model.recommend(preferences_to_exclude=None)
print("Recommendations for the group: ", recommended)

Recommendations for the group:  [6016, 2985, 48774, 1573, 2617, 2000, 6365, 6934, 6333, 786]


In [22]:
user_movie_matrix = transform_csv_dataframe_to_user_movies_matrix(pd.read_csv(RATINGS_DATASET))

In [23]:
result = find_optimum_explanation(
    sample_group,
    user_movie_matrix,
    query=786,
    model=model,
)

Applying filters: 100%|██████████| 3/3 [00:13<00:00,  4.34s/it]
checking combinations when r=10/10: 100%|██████████| 1/1 [00:00<00:00, 60.64it/s]
checking combinations when r=9/10: 100%|██████████| 10/10 [00:00<00:00, 82.89it/s]
checking combinations when r=8/10: 100%|██████████| 45/45 [00:00<00:00, 95.75it/s]
checking combinations when r=7/10: 100%|██████████| 120/120 [00:01<00:00, 96.11it/s]
checking combinations when r=6/10: 100%|██████████| 210/210 [00:02<00:00, 91.94it/s]
checking combinations when r=5/10: 100%|██████████| 252/252 [00:02<00:00, 93.74it/s]
checking combinations when r=4/10: 100%|██████████| 210/210 [00:02<00:00, 99.57it/s] 
checking combinations when r=3/10: 100%|██████████| 120/120 [00:01<00:00, 94.37it/s]
checking combinations when r=2/10: 100%|██████████| 45/45 [00:00<00:00, 93.82it/s]
checking combinations when r=1/10: 100%|██████████| 10/10 [00:00<00:00, 91.55it/s]
calculating explanation scores: 100%|██████████| 64/64 [00:06<00:00,  9.17it/s]

Number of preferences before filter:  641
Number of found explanations:  64





In [24]:
print("Optimum Explanation: ", result)

Optimum Explanation:  Explanation(movies=[172, 1488, 1515, 1626])
