<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc" style="margin-top: 1em;"><ul class="toc-item"><li><span><a href="#Introduction" data-toc-modified-id="Introduction-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Introduction</a></span></li><li><span><a href="#Setup" data-toc-modified-id="Setup-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Setup</a></span></li><li><span><a href="#Libraries" data-toc-modified-id="Libraries-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Libraries</a></span></li><li><span><a href="#Movie-Category" data-toc-modified-id="Movie-Category-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Movie Category</a></span></li><li><span><a href="#User-Preference" data-toc-modified-id="User-Preference-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>User Preference</a></span></li></ul></div>

# Introduction

This notebook computes the category of each movie (long tail or short head) and each user's preference for long tail movies.

# Setup

In [1]:
%%capture
%cd ..

In [2]:
data_dir = "/tmp/ml-20m"

# Libraries

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window

In [4]:
from src import data

In [5]:
spark = SparkSession.builder.getOrCreate()

# Movie Category

In [6]:
interactions_df = data.get_data("/tmp/ml-20m/ratings.csv")

In [7]:
interactions_df.show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      2|   3.5|2005-04-02 23:53:47|
|     1|     29|   3.5|2005-04-02 23:31:16|
|     1|     32|   3.5|2005-04-02 23:33:39|
|     1|     47|   3.5|2005-04-02 23:32:07|
|     1|     50|   3.5|2005-04-02 23:29:40|
+------+-------+------+-------------------+
only showing top 5 rows



In [8]:
# window to sum total number of ratings for all moviews
w1 = Window() \
    .orderBy(F.col("total_rating").desc()) \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# window to do cumulative sum of ratings
w2 = Window() \
    .orderBy(F.col("total_rating").desc()) \
    .rowsBetween(Window.unboundedPreceding, 0)

movie_categories = interactions_df \
    .groupBy("movieId") \
    .count() \
    .withColumnRenamed("count", "total_rating") \
    .withColumn("grand_total_ratings", F.sum("total_rating").over(w1)) \
    .withColumn("cumulative_total_ratings", F.sum("total_rating").over(w2)) \
    .withColumn("frac", F.expr ("cumulative_total_ratings/grand_total_ratings")) \
    .orderBy(F.col("total_rating").desc()) \
    .withColumn("category", F.expr("""
    CASE WHEN frac <= 0.80 THEN 'shorthead'
         ELSE 'longtail'
    END
    """)) \
    .withColumn("category", F.expr("""
    CASE WHEN total_rating < 20 THEN 'distanttail'
         ELSE category
    END
    """)) \
    .select("movieId", "category")

movie_categories \
    .write.mode("overwrite").parquet(f"{data_dir}/movie_categories.parquet")

# User Preference

In [9]:
user_preference = interactions_df \
    .join(movie_categories, "MovieId", "left") \
    .groupBy("userId", "category") \
    .count() \
    .groupBy("userId") \
    .pivot("category") \
    .agg(F.sum("count")) \
    .na.fill(0) \
    .withColumn("longtail_pref", F.expr("longtail/(longtail + shorthead)")) \
    .withColumn("longtail_pref", F.expr("round(longtail_pref, 2)")) \
    .select("userId", "longtail_pref")

user_preference \
    .write.mode("overwrite").parquet(f"{data_dir}/user_preference.parquet")