# Wide-and-Deep Recommender with Text Embeddings

In this notebook, we demonstrate how to use text embeddings to build a movie recommeder model. We extract embedding features from the movie titles and genres using Large Language Model (LLM) by using a [pretrained Transformer model from HuggingFace](https://huggingface.co/sentence-transformers/paraphrase-MiniLM-L6-v2).
We then train a Wide and Deep model by using [recommenders python library](https://github.com/Microsoft/Recommenders) on top of users and items features. 

![Wide-and-deep Movie Recommender](https://github.com/loomlike/recommenders_images/blob/main/wide_deep_movielens.png?raw=true)
<center>Wide-and-deep movie recommender architecture</center>

**Overall steps are as follows:**
1. Setup environment
1. Prepare data
1. Extract text embeddings
1. Train and evaluate wide-and-deep recommender

## Background

The primary goal of movie recommendation systems is to filter and predict only those movies that a corresponding user is most likely to want to watch.

Recommendation systems can use two main approaches: content-based filtering and collaborative filtering. The former approach makes recommendations based on the features of products, services, or content, as well as the user’s profile and preferences. The latter approach uses the preferences of other users who have similar tastes to the target user.

-- TODO deep dive about "wide and deep"

In [None]:
# %pip install sentence-transformers

In [None]:
import os

import plotly.express as px
import plotly.graph_objects as go

import recommenders
from recommenders.datasets import movielens
from recommenders.datasets.python_splitters import python_chrono_split

from sentence_transformers import SentenceTransformer

print(f"Recommenders version: {recommenders.__version__}")

Next, we will setup Lakehouse to ingest the data into and use for modeling. 
1. Select **Add lakehouse** in the left pane and select **Existing lakehouse** to open the Data hub dialog box.
1. Select the workspace and the lakehouse you intend to use with this example.
1. Once a lakehouse is added, it's visible in the lakehouse pane in the notebook UI where tables and files stored in the lakehouse can be viewed.

In [None]:
TOP_K = 10

DATA_DIR = "Files/mladsdata/recommenders"  # Shortcut's subdir
if not os.path.exists(f"/lakehouse/default/{DATA_DIR}"):
    DATA_DIR = None
DATA_DIR

In [None]:
# Download MovieLens data files using `recommenders` utility and ingest into the LakeHouse.
DATA_DIR = "Tables"
DATA_SIZE = "100k"
LOCAL_PATH = "/lakehouse/default/Files/tmp"

ratings_df = movielens.load_spark_df(
    spark,
    size=DATA_SIZE,
    local_cache_path=LOCAL_PATH,
)
items_df = spark.createDataFrame(
    movielens.load_item_df(
        size=DATA_SIZE,
        local_cache_path=LOCAL_PATH,
        movie_col="itemID",
        title_col="title",
        genres_col="genres",
        year_col="year",
    )
).withColumn("genres", F.regexp_replace(F.col("genres"), "\\|", ", "))


In [None]:
display(ratings_df.cache().limit(5))
display(items_df.cache().limit(5))

### Visualize data

Here, we use `plotly` library to visualize the data. You may use Fabric's [built-in visualization tools](https://learn.microsoft.com/en-us/fabric/data-engineering/notebook-visualization) as well.

In [None]:
ratings_pdf = ratings_df.toPandas()
fig = px.histogram(ratings_pdf, x="rating")
fig.update_layout(
    width=1000,
    height=500,
    bargap=0.1,
    title_text="Ratings distribution",
    xaxis_title="Rating",
    yaxis_title="Count",
)
fig.show()

In [None]:
user_ratings_count_pdf = (
    ratings_pdf.groupby("userID", as_index=False)
    .agg({"rating": "count"})
    .rename(columns={"rating": "n_ratings"})
)

fig = px.histogram(user_ratings_count_pdf, x="n_ratings")
fig.update_layout(
    width=1000,
    height=500,
    bargap=0.1,
    title_text="User distribution by number of movies rated",
    xaxis_title="Number of movies rated",
    yaxis_title="User count",
)
fig.show()

In [None]:
movie_ratings_count_pdf = (
    ratings_pdf.groupby("itemID", as_index=False)
    .agg({"rating": ["count", "mean", "std"]})
    .fillna(0)
)
movie_ratings_count_pdf.columns = ["itemID", "n_ratings", "avg_ratings", "std_ratings"]

fig = px.histogram(movie_ratings_count_pdf, x="n_ratings")
fig.update_layout(
    width=1000,
    height=500,
    bargap=0.1,
    title_text="Movie distribution by number of ratings",
    xaxis_title="Number of ratings",
    yaxis_title="Movie count",
)
fig.show()

In [None]:
df = (
    movie_ratings_count_pdf
    .sort_values("avg_ratings", ascending=False)
    .merge(items_df.toPandas(), on="itemID")
    .reset_index(drop=True)
)

fig = go.Figure(
    data=go.Bar(
        name="Average ratings",
        y=df["avg_ratings"],
        text=df["title"],
    ),
)

fig.add_trace(
    go.Bar(
        name="Number of reviews",
        y=df["n_ratings"],
        text=df["title"],
        yaxis="y2",
    ),
)

fig.update_layout(
    width=1000,
    height=500,
    title_text="Movies' averaged ratings and number of reviews",
    legend=dict(orientation="h"),
    yaxis=dict(
        title=dict(text="Average ratings"),
        side="left",
        range=[0, 6],
    ),
    yaxis2=dict(
        title=dict(text="Number of reviews"),
        side="right",
        range=[0, 600],
        overlaying="y",
        tickmode="sync",
    ),
)

fig.show()


It's also a long-tail graph, meaning some movies are popular (having more ratings) than the rest.

## Generate Text Embeddings from Movie Titles and Genres

Movie titles and genres are stored in the item table.

In [None]:
display(items_df.limit(5))

In [None]:
# TODO use this.
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
embedding = model.encode(data.to_list())


# Title embedding
if not os.path.exists(f"/lakehouse/default/{DATA_DIR}/title_embeddings"):
    title_embeddings_df = (
        embedding.transform(items_df.withColumnRenamed("title", "prompt"))
        .select(
            "itemID", "genres",  # We keep "genres" column so that we can join the genres embedding later
            F.col("prompt").alias("title"),
            F.col("embedding").alias("title_embedding"),
        )
    )
    title_embeddings_df.write.mode("overwrite").format("delta").save(f"{DATA_DIR}/title_embeddings")
else:
    title_embeddings_df = spark.read.format("delta").load(f"{DATA_DIR}/title_embeddings")

display(title_embeddings_df.cache().limit(5))

In [None]:
# Genres embedding
if not os.path.exists(f"/lakehouse/default/{DATA_DIR}/genres_embeddings"):
    genres_embeddings_df = (
        embedding.transform(items_df.select(F.col("genres").alias("prompt")).distinct())
        .select(
            F.col("prompt").alias("genres"),
            F.col("embedding").alias("genres_embedding"),
        )
    )
    genres_embeddings_df.write.mode("overwrite").format("delta").save(f"{DATA_DIR}/genres_embeddings")
else:
    genres_embeddings_df = spark.read.format("delta").load(f"{DATA_DIR}/genres_embeddings")

display(genres_embeddings_df.cache().limit(5))

### Visualize title and genres embeddings

In [None]:
import numpy as np
import pandas as pd
from sklearn.manifold import TSNE

In [None]:
# Assign sequential numbers to sorted genres so that similar genres have similar colors in the plot. 
genres_embeddings_plot_df = genres_embeddings_df.withColumn(
    "genres_color_label",
    F.row_number().over(Window.orderBy("genres"))
)

In [None]:
tsne = TSNE(n_components=2, perplexity=15, random_state=42, learning_rate="auto", init="pca")

In [None]:
title_embeddings_pdf = (
    title_embeddings_df.join(
        genres_embeddings_plot_df.select("genres", "genres_color_label"),
        on="genres",
        how="inner",
    ).toPandas()
)

title_embeddings_pdf[["x", "y"]] = tsne.fit_transform(
    np.vstack(
        title_embeddings_pdf["title_embedding"].apply(lambda x: x.toArray())
    )
)
fig = px.scatter(
    title_embeddings_pdf,
    x="x",
    y="y",
    color="genres_color_label",
    hover_name="title",
    hover_data=["genres"],
)

fig.update_coloraxes(showscale=False)
fig.update_layout(title="Movie title embeddings", width=1000, height=500)
fig.show()

In [None]:
# labels sort by names. gradient colors
genres_embeddings_pdf = genres_embeddings_plot_df.toPandas()

genres_embeddings_pdf[["x", "y"]] = tsne.fit_transform(
    np.vstack(
        genres_embeddings_pdf["genres_embedding"].apply(lambda x: x.toArray())
    )
)
fig = px.scatter(
    genres_embeddings_pdf,
    x="x",
    y="y",
    color="genres_color_label",
    hover_name="genres",
)

fig.update_coloraxes(showscale=False)
fig.update_layout(title="Movie genres embeddings", width=1000, height=500)
fig.show()

## Build Recommender Models

In [None]:
# Get users
users_df = ratings_df.select("userID").distinct()
users_df.cache()

In [None]:
print(f"Total {items_df.count()} movies and {users_df.count()} users in the dataset")

In [None]:
# Split train and test
train_df, test_df = spark_chrono_split(
    ratings_df,
    ratio=0.7,
)

print(f"Number of training : test samples = {train_df.cache().count()} : {test_df.cache().count()}")

#### Ranking prediction (Recommend top-K items)

In ranking prediction problems, we measure how well an algorithm might predict the top K most relevant items (in our case, top K highest movie ratings for a given user). We use NDCG@k, Precision@k, Recall@k, and MAP@k to evaluate the performance of the algorithm. All the metrics have values that range from 0 to 1 where 0 is poor and 1 is a perfect recommendation.

In [None]:
# Cross join users and items for rating prediction
user_item_remove_seen_df = (
    users_df
    .crossJoin(items_df.select("itemID"))
    .join(train_df, on=["userID", "itemID"], how='left_anti')  # Remove seen items
)
user_item_remove_seen_df.cache()
display(user_item_remove_seen_df.orderBy("userID", "itemID").limit(5))

In [None]:
als_ranking_pred_df = als_model.transform(user_item_remove_seen_df)
als_ranking_pred_df.cache()
display(als_ranking_pred_df.limit(5))

In [None]:
als_rank_eval = SparkRankingEvaluation(
    test_df,
    als_ranking_pred_df,
    k=TOP_K, 
    col_user="userID",
    col_item="itemID",
    col_rating="rating",
    col_prediction="prediction",
    relevancy_method="top_k",
)

als_metrics[f"ndcg_at_k"] = als_rank_eval.ndcg_at_k()
als_metrics[f"precision_at_k"] = als_rank_eval.precision_at_k()
als_metrics[f"recall_at_k"] = als_rank_eval.recall_at_k()
als_metrics[f"map_at_k"] = als_rank_eval.map_at_k()

#### Rating prediction

For rating prediction, we use regression metrics, RMSE and MAE.

In [None]:
als_pred_rating_df = als_model.transform(test_df)
als_pred_rating_df.cache()

In [None]:
als_rating_eval = SparkRatingEvaluation(
    test_df,
    als_pred_rating_df,
    col_user="userID",
    col_item="itemID",
    col_rating="rating",
    col_prediction="prediction",
)

als_metrics["rmse"] = als_rating_eval.rmse()
als_metrics["mae"] = als_rating_eval.mae()

In [None]:
als_metrics

Create feature DataFrames and write into parquet.

In [None]:
from pyspark.ml.functions import vector_to_array

train_features_df, test_features_df = [(
    df
    .join(
        title_embeddings_df.withColumn("title_embedding", vector_to_array("title_embedding")),
        on="itemID",
        how="inner")
    .join(
        genres_embeddings_df.withColumn("genres_embedding", vector_to_array("genres_embedding")),
        on="genres",
        how="inner")
    .select(
        "userID", "itemID", "rating",
        F.concat(F.col("title_embedding"), F.col("genres_embedding")).alias("features"),
    )
) for df in [train_df, test_df]]

In [None]:
display(train_features_df.limit(5))

In [None]:
import argparse
from tempfile import TemporaryDirectory

from azureml.core import Dataset, Run, Workspace

import mlflow
import pandas as pd
import tensorflow as tf

from recommenders.utils import tf_utils
import recommenders.models.wide_deep.wide_deep_utils as wide_deep
from recommenders.evaluation.python_evaluation import (
    ndcg_at_k,
    precision_at_k,
    recall_at_k,
    map_at_k,
    rmse,
    mae,
)

# Set tensorflow less verbose
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)

# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument('--top-k', type=int, help="Top k items to recommend")
parser.add_argument('--steps', type=int, help="Number of batch steps to train")
args = parser.parse_args()

# Parameters
TOP_K = args.top_k
SEED = 0
EXPORT_DIR_BASE = "models"
MODEL_TYPE = "wide_deep"
STEPS = args.steps
BATCH_SIZE = 32
# Wide (linear) model hyperparameters
LINEAR_OPTIMIZER = "adagrad"
LINEAR_OPTIMIZER_LR = 0.0621  # Learning rate
LINEAR_L1_REG = 0.0           # Regularization rate for FtrlOptimizer
LINEAR_L2_REG = 0.0
LINEAR_MOMENTUM = 0.0         # Momentum for MomentumOptimizer or RMSPropOptimizer
LINEAR_DIM = 1000
# DNN model hyperparameters
DNN_OPTIMIZER = "adadelta"
DNN_OPTIMIZER_LR = 0.1
DNN_L1_REG = 0.0           # Regularization rate for FtrlOptimizer
DNN_L2_REG = 0.0
DNN_MOMENTUM = 0.0         # Momentum for MomentumOptimizer or RMSPropOptimizer
DNN_HIDDEN_LAYERS = [512, 128, 64]
DNN_USER_DIM = 32          # userID embedding feature dimension
DNN_ITEM_DIM = 16          # itemID embedding feature dimension
DNN_DROPOUT = 0.8
DNN_BATCH_NORM = True      # to use batch normalization or not


run = Run.get_context()
ws = run.experiment.workspace
ds = ws.get_default_datastore()

# Download datasets prepared in default datastore.
# TODO - is there a way to load data directly?
data = Dataset.File.from_files(path=(ds, "movielens"))
data.download(target_path="./movielens", overwrite=True)

train_features_df = pd.read_parquet("./movielens/train_features.parquet")
test_features_df = pd.read_parquet("./movielens/test_features.parquet")

print(f"Num train : test = {len(train_features_df)} : {len(test_features_df)}")

# Unique items in the dataset
data_df = pd.concat([train_features_df, test_features_df])
users_df = data_df.drop_duplicates("userID")[["userID"]].reset_index(drop=True)
items_df = data_df.drop_duplicates("itemID")[["itemID", "features"]].reset_index(drop=True)

print("Total {} items and {} users in the dataset".format(len(items_df), len(users_df)))

# Define wide (linear) and deep (dnn) features
wide_columns, deep_columns = wide_deep.build_feature_columns(
    users=users_df["userID"].values,
    items=items_df["itemID"].values,
    user_col="userID",
    item_col="itemID",
    item_feat_col="features",
    crossed_feat_dim=LINEAR_DIM,
    user_dim=DNN_USER_DIM,
    item_dim=DNN_ITEM_DIM,
    item_feat_shape=len(train_features_df["features"][0]),
    model_type=MODEL_TYPE,
)

# Model checkpoint directory
tmp_dir = TemporaryDirectory()

# Build a model
model = wide_deep.build_model(
    model_dir=tmp_dir.name,
    wide_columns=wide_columns,
    deep_columns=deep_columns,
    linear_optimizer=tf_utils.build_optimizer(LINEAR_OPTIMIZER, LINEAR_OPTIMIZER_LR, **{
        'l1_regularization_strength': LINEAR_L1_REG,
        'l2_regularization_strength': LINEAR_L2_REG,
        'momentum': LINEAR_MOMENTUM,
    }),
    dnn_optimizer=tf_utils.build_optimizer(DNN_OPTIMIZER, DNN_OPTIMIZER_LR, **{
        'l1_regularization_strength': DNN_L1_REG,
        'l2_regularization_strength': DNN_L2_REG,
        'momentum': DNN_MOMENTUM,
    }),
    dnn_hidden_units=DNN_HIDDEN_LAYERS,
    dnn_dropout=DNN_DROPOUT,
    dnn_batch_norm=DNN_BATCH_NORM,
    log_every_n_iter=STEPS//100,
    save_checkpoints_steps=STEPS//10,
    seed=SEED,
)

# Define training input (sample feeding) function
train_input_fn = tf_utils.pandas_input_fn(
    df=train_features_df,
    y_col="rating",
    batch_size=BATCH_SIZE,
    num_epochs=None,  # We use steps=TRAIN_STEPS instead.
    shuffle=True,
    seed=SEED,
)

# Train
model.train(
    input_fn=train_input_fn,
    steps=STEPS
)

# Evaluate
metrics = {}

# Ratings
preds = [
    p['predictions'][0]  # model outputs are [{'predictions': array([3.072578], dtype=float32)}, ...]
    for p in model.predict(input_fn=tf_utils.pandas_input_fn(df=test_features_df))
]
preds_df = test_features_df.drop("rating", axis='columns')
preds_df["prediction"] = preds

for metric_name, metric_fn in {"rmse": rmse, "mae": mae}.items():
    metrics[metric_name] = metric_fn(
        rating_true=test_features_df,
        rating_pred=preds_df,
        col_user="userID",
        col_item="itemID",
        col_rating="rating",
        col_prediction="prediction",
    )

# Ranking, top-K
# cross join users and items
users_items_df = users_df.merge(
    items_df, how="cross"
)
# remove seen items
users_items_df = users_items_df.merge(
    train_features_df[["userID", "itemID"]], how="outer", on=["userID", "itemID"], indicator=True
)
users_items_df = users_items_df[users_items_df["_merge"] == "left_only"].drop(columns="_merge").reset_index(drop=True)

# Loop to avoid `ValueError: Cannot create a tensor proto whose content is larger than 2GB``
i_step = 10000
start_i = 0
ranking_preds = []
while start_i < len(users_items_df):
    end_i = min(start_i + i_step, len(users_items_df))
    ranking_preds.extend([
        p['predictions'][0]  # model outputs are [{'predictions': array([3.072578], dtype=float32)}, ...]
        for p in model.predict(
            input_fn=tf_utils.pandas_input_fn(
                df=users_items_df.iloc[start_i:end_i]
            )
        )
    ])
    start_i = end_i

ranking_preds_df = users_items_df.copy()
ranking_preds_df["prediction"] = ranking_preds

for metric_name, metric_fn in {
    "ndcg_at_k": ndcg_at_k,
    "precision_at_k": precision_at_k,
    "recall_at_k": recall_at_k,
    "map_at_k": map_at_k,
}.items():
    metrics[metric_name] = metric_fn(
        rating_true=test_features_df,
        rating_pred=ranking_preds_df,
        col_user="userID",
        col_item="itemID",
        col_rating="rating",
        col_prediction="prediction",
        k=TOP_K,
    )

print(metrics)

# Record metrics
mlflow.set_experiment(experiment_name="movielens")
with mlflow.start_run(run_name="wide-and-deep") as run:
    for metric_name, metric_value in metrics.items():
        mlflow.log_metric(metric_name, metric_value)