## Problem Statement
Aim of this project is to implement a Probabilistic Latent Semantic Model.<br>
In the first part of the notebook, we will use an expectation maximization algorithm to learn its parameters on the MovieLens dataset.<br>
Once we have made sure that the algorithm works correctly, we will see how to use the model in order to recommend some movies to users based on their interests.<br>
Last sections of the notebook contains some follow-up questions to drill-down on specific topics.

## Tools of the trade
This section contains all the functions you need to load the dataset, as well as useful imports for the next parts of the project.

In [0]:
!pip install -q mlflow

In [0]:
import os
import time
import zipfile

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import mlflow

from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql import types as st
from pyspark.sql.functions import udf
from pyspark.storagelevel import StorageLevel

In [0]:
%matplotlib inline

## Setup Spark session

In [0]:
# This section configures hdfs root and how to load a spark session.
# Reading can be skipped during the first reading.

def load_spark_session():
  return (
    SparkSession
    .builder
    .appName("Dataset")
    .getOrCreate()
  )

In [0]:
ss = load_spark_session()
ss

## Prelimenary exercices
In these exercices we will implement matrix operations that will be usefull to run the PLSI algorithm afterwards.

#### `matrix_sum_rows` 
Takes a matrix (a column containing arrays of fixed length) and returns the sum of each row.

In [0]:
# Hint: https://stackoverflow.com/a/57448698/2015762
def matrix_sum_rows(col_name, length_of_array):
    return sum([sf.col(col_name)[i] for i in range(length_of_array)])

input_array = np.array([[1, 2, 3, 4], [40, 30, 20, 10]], dtype=float)
expected_output = input_array.sum(axis=1)
print('Input array')
print(input_array)
print('Expected output')
print(expected_output)
print('Obtained output')
(
    ss.sparkContext.parallelize(input_array.tolist()).map(lambda x: st.Row(matrix=x)).toDF()
    .withColumn('row_sum', matrix_sum_rows('matrix', 4))
).show(truncate=False)

#### `matrix_sum_columns`
Takes a matrix (a column containing arrays of fixed length) and returns the sum of each column.

In [0]:
# Hint: https://stackoverflow.com/a/54382990/2015762
def matrix_sum_columns(col_name, length_of_array):
    return sf.array(*[sf.sum(sf.col(col_name)[i]) for i in range(length_of_array)])

input_array = np.array([[1, 2, 3, 4], [40, 30, 20, 10]], dtype=float)
expected_output = input_array.sum(axis=0)
print('Input array')
print(input_array)
print('Expected output')
print(expected_output)
print('Obtained output')
(
    ss.sparkContext.parallelize(input_array.tolist()).map(lambda x: st.Row(matrix=x)).toDF()
    .select(matrix_sum_columns('matrix', 4).alias('col_sum'))
).show(truncate=False)

#### `matrix_normalize_rows`
Takes a matrix (a column containing arrays of fixed length) and returns the same matrix where the rows have been divded by their sum, such that each row sums to 1.

In [0]:
def matrix_normalize_rows(col_name, length_of_array):
    row_sums = matrix_sum_rows(col_name, length_of_array)
    return sf.array(*[sf.col(col_name)[i] / row_sums for i in range(length_of_array)])

input_array = np.array([[1, 2, 3, 4], [40, 30, 20, 10]], dtype=float)
expected_output = input_array / input_array.sum(axis=1).reshape(-1, 1)
print('Input array')
print(input_array)
print('Expected output')
print(expected_output)
print('Obtained output')
(
    ss.sparkContext.parallelize(input_array.tolist()).map(lambda x: st.Row(numbers=x)).toDF()
    .withColumn('normalized_elements', matrix_normalize_rows('numbers', 4))
).show(truncate=False)

#### `matrix_elementwise_product`
Takes two matrices and return their elementwise product (aka. Hadamard product)

In [0]:
def matrix_elementwise_product(col_name_1, col_name_2, length_of_array):
    return sf.array(*[sf.col(col_name_1)[i] * sf.col(col_name_2)[i] for i in range(length_of_array)])

input_array_1 = np.array([[1, 2, 3, 4], [40, 30, 20, 10]], dtype=float)
input_array_2 = np.array([[1, 2, 1, 2], [10, 20, 10, 20]], dtype=float)
expected_output = input_array_1 * input_array_2
print('Input array')
print(input_array_1)
print(input_array_2)
print('Expected output')
print(expected_output)
print('Obtained output')
(
    ss.sparkContext.parallelize(zip(input_array.tolist(), input_array_2.tolist())).map(lambda x: st.Row(numbers_1=x[0], numbers_2=x[1])).toDF()
    .withColumn('elementwise_products', matrix_elementwise_product('numbers_1', 'numbers_2', 4))
).show(truncate=False)

#### `matrix_elementwise_divide`
Takes two matrices and divide elementwise the first one by the second one.

In [0]:
def matrix_elementwise_divide(col_name_1, col_name_2, length_of_array):
    return sf.array(*[sf.col(col_name_1)[i] / sf.col(col_name_2)[i] for i in range(length_of_array)])

input_array_1 = np.array([[1, 2, 3, 4], [40, 30, 20, 10]], dtype=float)
input_array_2 = np.array([[1, 2, 1, 2], [10, 20, 10, 20]], dtype=float)
expected_output = input_array_1 / input_array_2
print('Input array')
print(input_array_1)
print(input_array_2)
print('Expected output')
print(expected_output)
print('Obtained output')
(
    ss.sparkContext.parallelize(zip(input_array.tolist(), input_array_2.tolist())).map(lambda x: st.Row(numbers_1=x[0], numbers_2=x[1])).toDF()
    .withColumn('elementwise_divided', matrix_elementwise_divide('numbers_1', 'numbers_2', 4))
).show(truncate=False)

## Load Movie Lens dataset

In [0]:
# Remove from cache all data from preliminary exercices
ss.catalog.clearCache()

In [0]:
# This section defines utility method to load our dataset.
# Reading can be skipped during the first reading.

import urllib
def download_dataset(dataset_path):
    os.makedirs(dataset_path, exist_ok=True)
    urllib.request.urlretrieve(
        "http://files.grouplens.org/datasets/movielens/ml-20m.zip", 
        "movie_lens_data/ml-20m.zip"
    )
    with zipfile.ZipFile("movie_lens_data/ml-20m.zip", "r") as zip_ref:
        zip_ref.extractall(dataset_path)

In [0]:
download_dataset(os.path.abspath('movie_lens_data'))

In [0]:
hdfs_prefix = 'file:///databricks/driver/'
movies_path = f"{hdfs_prefix}/movie_lens_data/ml-20m/movies.csv"
ratings_path= f"{hdfs_prefix}/movie_lens_data/ml-20m/ratings.csv"

In [0]:
movies_df = ss.read.options(header=True).csv(movies_path)
ratings_df = ss.read.options(header=True).csv(ratings_path)
positive_ratings_df = ratings_df.filter("rating>=3.5").cache()

In [0]:
movies_df.show(3)
ratings_df.show(3)

## PLSI

### Preprocess dataset

In [0]:
# Control the dataset size
min_positive_ratings_per_movie = 1000
selected_movies_df = (
    positive_ratings_df
    .groupBy('movieId')
    .count()
    .filter(sf.col('count') > min_positive_ratings_per_movie)
    .select('movieId')
)

# TASK 1: explain what the filter below does.  Why did we not use sample instead?
keep_user_every = 10
user_movies_interactions = (
    positive_ratings_df
    .join(selected_movies_df, on='movieId')
    .filter(sf.expr(f'PMOD(HASH(userId),{keep_user_every})')==0)
    .select('userId', 'movieId')
    .repartition(10, 'userId', 'movieId')
).cache()

In [0]:
user_movies_interactions.select(
    sf.count('*').alias('n_pairs'),
    sf.countDistinct('userId').alias('n_users'),
    sf.countDistinct('movieId').alias('n_movies'),
).show()

In [0]:
user_movies_interactions.groupby('userId', 'movieId').count().select(sf.max('count')).show()

With
* N the number of users u
* M the number of movies s
* L the number of latent classes z
* T number of user, movie interactions (each interaction (s_t, u_t) means user u_t liked movie s_t)

We suppose that the probability that a user will like a movie can be written in the form of a mixture model given by the equation:
$$
p(s|u) = \sum_{z=1}^L p(s|z) p(z|u)
$$
And we want to optimize the likelihood of the observed user interactions
$$
L = - \frac{1}{T} \sum_{1}^{T} \log p(s_t|u_t) = - \frac{1}{T} \sum_{1}^{T} \sum_{z=1}^L p(s_t|z) p(z|u_t)
$$
That can be done using an EM algorithm working as follow:

**E step**

For each interaction (u_t, s_t), compute for all z = 1, ..., L:
$$
p(z|(u_t, s_t)) = \frac{p(s_t|z) p(z|u_t)}{\sum_z p(s_t|z) p(z|u_t)}
$$

**M step**

Find each movie probability given a latent class
$$
p(s|z) = \frac{N(z, s)}{N(z)} 
\quad \text{where} \quad N(z, s) = \sum_s \sum_u p(z|(u, s)) 
\quad \text{and} \quad N(z) = \sum_s N(z, s)
$$
Find each latent class probability given each user.
$$
p(z|u) = \frac{\sum_s p(z|(u, s))}{\sum_z \sum_s p(z|(u, s))}
$$

We will have the following dataframes

* `count_z_s`: M rows, with columns  `movieId`, `N(z,s)`.
* `count_z`: 1 row, with column `N(z)`.
* `p_s_knowing_z`: M rows, with columns  `movieId`, `p(s|z)`. For a given z, the sum of p(s|z) equals 1.
* `p_z_knowing_u`: N rows, with columns `userId`, `p(z|u)`. For a given u, the sum of p(z|u) equals 1.
* `p_z_knowing_u_and_s`: N x M rows, with columns `userId`, `movieId`, `p(z|u,s)`.

In [0]:
def get_count_z(count_z_s, n_latent_classes):
    """Compute N(z) = sum_s N(z,s)
    """
    return (
        count_z_s
        .select(matrix_sum_columns('N(z,s)', n_latent_classes).alias('N(z)'))
    )
  
count_z_s = ss.sparkContext.parallelize([
    st.Row(**{"movieId":1, "N(z,s)": [1., 3., 4.]}),
    st.Row(**{"movieId":2, "N(z,s)": [4., 5., 0.]}),
]).toDF()
get_count_z(count_z_s, 3).show()

In [0]:
def get_count_z_s(p_z_knowing_u_and_s, n_latent_classes):
    """Compute N(z,s) = sum_u p(z|u,s)
    """
    return (
        p_z_knowing_u_and_s
        .groupby('movieId')
        .agg(matrix_sum_columns('p(z|u,s)', n_latent_classes).alias('N(z,s)'))
    )

In [0]:
def get_p_s_knowing_z(count_z_s, count_z, n_latent_classes):
    """Compute p(s|z) = N(z,s) / N(z)
    
    Hint: use crossJoin
    """
    
    return (
        count_z_s
        .crossJoin(count_z)
        .select(
            'movieId',
            matrix_elementwise_divide('N(z,s)', 'N(z)', n_latent_classes).alias('p(s|z)'),
        )
    )

In [0]:
def get_p_z_knowing_u(p_z_knowing_u_and_s, n_latent_classes):
    """Compute p(z|u) = sum_s p(z|u,s) / sum_z sum_s p(z|u,s)
    """
    
    sum_z_knowing_u = (
        p_z_knowing_u_and_s
        .groupby('userId')
        .agg(matrix_sum_columns('p(z|u,s)', n_latent_classes).alias('sum_s p(z|u,s)'))
    )
    return (
        sum_z_knowing_u
        .select(
            'userId',
            matrix_normalize_rows('sum_s p(z|u,s)', n_latent_classes).alias('p(z|u)'),
        )
    )

In [0]:
def get_p_z_knowing_u_and_s(observed_pairs, count_z_s, count_z, p_z_knowing_u, n_latent_classes):
    """For all pairs of observed (u, s)
    
    Compute p(z|u,s) = [N(z, s) / N(z) * p(z|u)] / sum_z [N(z, s) / N(z) * p(z|u)]
                     = [p(s|z) * p(z|u)] / sum_z [p(s|z) * p(z|u)]
    """
    p_s_knowing_z = get_p_s_knowing_z(count_z_s, count_z, n_latent_classes)
    
    sum_z_knowing_u_and_s = (
        observed_pairs
        .join(sf.broadcast(p_s_knowing_z), on='movieId')
        .join(sf.broadcast(p_z_knowing_u), on='userId')
        .select(
            'userId',
            'movieId',
            matrix_elementwise_product('p(s|z)', 'p(z|u)', n_latent_classes).alias('sum_z [p(s|z) * p(z|u)]'),
        )
    )
    
    return (
        sum_z_knowing_u_and_s
        .select(
            'userId',
            'movieId',
            matrix_normalize_rows('sum_z [p(s|z) * p(z|u)]', n_latent_classes).alias('p(z|u,s)'),
        )
    )

In [0]:
def log_likelihood(observed_pairs, count_z_s, count_z, p_z_knowing_u, n_latent_classes):
    """Compute the log likelihood of the observed pairs
    
    L = - 1 / T * sum_t log[ p(s|u) ]
      = - 1 / T * sum_t log[ sum_z p(s|z) * p(z|u) ]
    """
    p_s_knowing_z = get_p_s_knowing_z(count_z_s, count_z, n_latent_classes)
    return (
        observed_pairs
        .join(sf.broadcast(p_s_knowing_z), on='movieId')
        .join(sf.broadcast(p_z_knowing_u), on='userId')
        .select(matrix_elementwise_product('p(s|z)', 'p(z|u)', n_latent_classes).alias('p(s|z) * p(z|u)'))
        .select(matrix_sum_rows('p(s|z) * p(z|u)', n_latent_classes).alias('p(s|u)'))
        .select(sf.mean(sf.log('p(s|u)')).alias('log_likelihood'))
        .take(1)[0].log_likelihood
    )

In [0]:
def initialize_statistics(observed_pairs, n_latent_classes):
    p_z_knowing_u_and_s = (
        observed_pairs
        .withColumn('random_values', sf.array(*[sf.rand(seed=i) for i in range(n_latent_classes)]))
        .select('userId', 'movieId', matrix_normalize_rows('random_values', n_latent_classes).alias('p(z|u,s)'))
    ).cache()
    
    p_z_knowing_u = get_p_z_knowing_u(p_z_knowing_u_and_s, n_latent_classes)
    count_z_s = get_count_z_s(p_z_knowing_u_and_s, n_latent_classes)
    count_z = get_count_z(count_z_s, n_latent_classes)
    
    p_z_knowing_u_and_s.unpersist()
    return count_z_s, count_z, p_z_knowing_u

In [0]:
def add_checkpoint(df, should_checkpoint):
  if should_checkpoint:
    return df.localCheckpoint()
  else:
    return df

def run_plsi(observed_pairs, n_iterations, n_latent_classes, checkpoint_every=1):
    start_init_time = time.time()
    ss.sparkContext.setJobDescription("Initialization")

    count_z_s, count_z, p_z_knowing_u = initialize_statistics(observed_pairs, n_latent_classes)
    count_z_s, count_z, p_z_knowing_u = count_z_s.cache(), count_z.cache(), p_z_knowing_u.cache()
    llh = log_likelihood(observed_pairs, count_z_s, count_z, p_z_knowing_u, n_latent_classes)
    mlflow.log_metric(key="llh", value=llh, step=0)
    print(f'LLH: {llh:.10f}')
    
    end_init_time = time.time()
    print(f'Initialization: {end_init_time - start_init_time:.1f}s')
    
    for i in range(n_iterations):
        start_e_step = time.time()
        ss.sparkContext.setJobDescription(f"Iteration {i+1}: E-step")
        # E step
        p_z_knowing_u_and_s = get_p_z_knowing_u_and_s(observed_pairs, count_z_s, count_z, p_z_knowing_u, n_latent_classes).cache()
        p_z_knowing_u_and_s.count()
        count_z_s.unpersist(), count_z.unpersist(), p_z_knowing_u.unpersist()
        
        end_e_step = time.time()
        print(f'Iteration {i+1}: E-step: {end_e_step - start_e_step:.1f}s')
        
        ss.sparkContext.setJobDescription(f"Iteration {i+1}: M-step")
        # M step
        should_checkpoint = (i % checkpoint_every) == 0
        print('should_checkpoint', should_checkpoint)
        p_z_knowing_u = add_checkpoint(get_p_z_knowing_u(p_z_knowing_u_and_s, n_latent_classes), should_checkpoint).cache()
        count_z_s = add_checkpoint(get_count_z_s(p_z_knowing_u_and_s, n_latent_classes), should_checkpoint).cache()
        count_z = get_count_z(count_z_s, n_latent_classes).cache()
        
        llh = log_likelihood(observed_pairs, count_z_s, count_z, p_z_knowing_u, n_latent_classes)
        mlflow.log_metric(key="llh", value=llh, step=i+1)
        p_z_knowing_u_and_s.unpersist()
        
        end_m_step = time.time()
        print(f'Iteration {i+1}: M-step: {end_m_step - end_e_step:.1f}s')
        print(f'LLH: {llh:.10f}')
    
    count_z_s.unpersist(), count_z.unpersist(), p_z_knowing_u.unpersist()
    return get_p_s_knowing_z(count_z_s, count_z, n_latent_classes)

In [0]:
n_iterations = 15
n_latent_classes = 5

with mlflow.start_run():
    mlflow.log_param("n_iterations", n_iterations)
    mlflow.log_param("n_latent_classes", n_latent_classes)
    run_plsi(user_movies_interactions, n_iterations=n_iterations, n_latent_classes=n_latent_classes)

## Extensions

Extensions:
All _non optional_ extensions are rated over 15pts / 20. Select some _optional_ extensions as well to get extra points. Yes, maximal grade is 30/20, so you do not need to do all _optional_ extensions. Note that the extensions are more 

*Performance:* Try to improve the algorithm technical performance
1. Estimate the algorithm memory and computation requirements in terms of M, N and L. How does it scale with M, N, L and the number of EM steps?
2. If each steap takes longer than the previous one: Try using .cache() wisely.
3. Try to unpersist your dataframes when they become uneeded (look at the Storage tab in the Spark UI) (Optional + 2pts)
4. If after few steps (typically 5), your algorithm starts being much slower and you evventually obtain "java.lang.OutOfMemoryError: Java heap space", try using [.localCheckpoint()](https://spark.apache.org/docs/2.1.0/api/python/pyspark.html#pyspark.RDD.localCheckpoint). How does it differ from caching ? What are the benefits and the drawbacks ?

*Algorithm quality:* Make sure we are running things correctly
1. Compute the log likelihood across iterations ? Try for several values of L. You can log these values to mlflow.
2. Split randomly your ratings in two folds, train (80%) and test (20%). Train the algorithm on the train ratings and look for the likelihood on the test ratings ? How does it compare to the likelihood on the train ratings?
3. Compute the top movies per latent classes.
4. If we suppose each movie can be represented by p(s|z) ∈ R^L, pick few movies and look at their nearest neighbors. (Optional + 4pts)

*Recommender System (Optional + 6pts)*
1. How could this algorithm be used as a recommender system ? Formulate the prediction task: what would be the new movies yoou would you recommend to a user in your database ?
2. Compute [Precision and Recall](https://en.wikipedia.org/wiki/Precision_and_recall) for both train and test ratings.

*LDA (Optional + 3pts)*

Compare PLSI with LDA (implemented in [spark mllib](https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/mllib/clustering/LDA.html)) in terms of performance, quality and recommender system.