In [1]:
import pyspark

from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.ml.feature import OneHotEncoder

import numpy as np

import sys
import os
sys.path.insert(0, os.path.abspath('../'))
from utils import write_dataframe

In [2]:
min_ratings = 10
drop_missing_years = False

write_files = False
create_train_test = True
train_test_ratio = [.8,.2]
seed = 0

In [3]:
# Change the number of cores in this code block
# by setting `spark.master` to `local[n]` where
# n is the number of cores

conf = pyspark.SparkConf().setAll([('spark.master', 'local[4]'),
                                   ('spark.app.name', 'Basic Setup')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-05-30 01:16:54,853 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-05-30 01:16:55,519 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Load Data

Read data from the `ratings.csv` file

In [4]:
movies_df = spark.read.option("header",True).csv("file:///home/work/data/movies.csv")
movies_df.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



## Add Year column

In [5]:
#Extracting the year from Movie Title
movies_df = movies_df.withColumn('movieYear',regexp_extract(col('title'), '(.+)(\()([0-9]{4})(\))', 3))

In [6]:
movies_df.show()

+-------+--------------------+--------------------+---------+
|movieId|               title|              genres|movieYear|
+-------+--------------------+--------------------+---------+
|      1|    Toy Story (1995)|Adventure|Animati...|     1995|
|      2|      Jumanji (1995)|Adventure|Childre...|     1995|
|      3|Grumpier Old Men ...|      Comedy|Romance|     1995|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|     1995|
|      5|Father of the Bri...|              Comedy|     1995|
|      6|         Heat (1995)|Action|Crime|Thri...|     1995|
|      7|      Sabrina (1995)|      Comedy|Romance|     1995|
|      8| Tom and Huck (1995)|  Adventure|Children|     1995|
|      9| Sudden Death (1995)|              Action|     1995|
|     10|    GoldenEye (1995)|Action|Adventure|...|     1995|
|     11|American Presiden...|Comedy|Drama|Romance|     1995|
|     12|Dracula: Dead and...|       Comedy|Horror|     1995|
|     13|        Balto (1995)|Adventure|Animati...|     1995|
|     14

## Removing IMAX
Upon visual inspection, there were only two cases for IMAX
### Case 1: IMAX was only genre
* 1 movie where this was true
    * 4460,Encounter in the Third Dimension (1999),IMAX -> only movie with IMAX as only genre -> 27 ratings
Solution: Replace pattern: ",IMAX" -> ",(no genres listed)"
### Case 2: IMAX was last genre listed
* All other 194 instances of IMAX, IMAX was the last genre
Solution: Replace pattern: "|IMAX" -> ""

In [7]:
#Check IMAX genres in records
movies_df.filter(col("genres").contains("IMAX")).show()

+-------+--------------------+--------------------+---------+
|movieId|               title|              genres|movieYear|
+-------+--------------------+--------------------+---------+
|     33|Wings of Courage ...|Adventure|Romance...|     1995|
|     37|Across the Sea of...|    Documentary|IMAX|     1995|
|    150|    Apollo 13 (1995)|Adventure|Drama|IMAX|     1995|
|    364|Lion King, The (1...|Adventure|Animati...|     1994|
|    595|Beauty and the Be...|Animation|Childre...|     1991|
|   1797|      Everest (1998)|    Documentary|IMAX|     1998|
|   3159|Fantasia 2000 (1999)|Animation|Childre...|     1999|
|   4382|       Wolves (1999)|    Documentary|IMAX|     1999|
|   4445|T-Rex: Back to th...|Adventure|Documen...|     1998|
|   4453|Michael Jordan to...|    Documentary|IMAX|     2000|
|   4454|         More (1998)|Animation|Drama|S...|     1998|
|   4455|Thrill Ride: The ...|Adventure|Documen...|     1997|
|   4456|Haunted Castle (2...|Animation|Horror|...|     2001|
|   4457

In [8]:
# Case 1: IMAX is only genre
movies_df = movies_df.withColumn('genres', regexp_replace(col('genres'), '\\|IMAX', '')) \
                    .withColumn('genres', regexp_replace(col('genres'), 'IMAX', '(no genres listed)'))     

In [9]:
#check after IMAX genre removal 
print("Verifying there are no longer movies with IMAX genre")
movies_df.filter(col("genres").contains("IMAX")).show()

Verifying there are no longer movies with IMAX genre
+-------+-----+------+---------+
|movieId|title|genres|movieYear|
+-------+-----+------+---------+
+-------+-----+------+---------+



## Remove movies without genre info

In [10]:
tot_movies = movies_df.select('movieId').distinct().count()
tot_missing_genre = movies_df.filter(col("genres") == "(no genres listed)").count()
tot_with_genre = movies_df.filter(col("genres") != "(no genres listed)").count()
print(f"Unique movie Counts: {tot_movies}")
print(f"Total movies after removing missing genres: {tot_with_genre}")
print(f"Movies without genres: {tot_missing_genre}")

Unique movie Counts: 62423
Total movies after removing missing genres: 57360
Movies without genres: 5063


In [11]:
#Creating the flattened version of movies with genres
genre_cleaning_df = movies_df.select(col('movieId'),col('title'),col('movieYear'),explode(split("genres","\\|")).alias("genre"))

#Removing movies with (no genres listed)
genre_cleaning_df = genre_cleaning_df.filter(col("genres") != "(no genres listed)")

# How to deal with genres?
## One hot encoding
```
Action           001
Action|Adventure 010
Drama            100
Dram|Action      110
```
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html#:~:text=A%20one%2Dhot%20encoder%20that,0.0%2C%201.0%2C%200.0%5D%20

In [12]:
genre_list = np.array(genre_cleaning_df.select(col('genre')).distinct().collect()).squeeze().tolist()
genre_map = {g: i for i, g in enumerate(genre_list)}

# https://stackoverflow.com/questions/42980704/pyspark-create-new-column-with-mapping-from-a-dict

#This UDF converts the genre_id to mapped numerical value
def translate(mapping):
    def translate_(col):
        return mapping.get(col)
    return udf(translate_, IntegerType())

genre_cleaning_df = genre_cleaning_df.withColumn("genre_id", translate(genre_map)("genre"))

In [13]:
ohe = OneHotEncoder(inputCol='genre_id', outputCol='ohe_test', dropLast=False)

ohe_model = ohe.fit(genre_cleaning_df)
transformed_df = ohe_model.transform(genre_cleaning_df)
transformed_df.show()

                                                                                

+-------+--------------------+---------+---------+--------+---------------+
|movieId|               title|movieYear|    genre|genre_id|       ohe_test|
+-------+--------------------+---------+---------+--------+---------------+
|      1|    Toy Story (1995)|     1995|Adventure|       3| (18,[3],[1.0])|
|      1|    Toy Story (1995)|     1995|Animation|      10|(18,[10],[1.0])|
|      1|    Toy Story (1995)|     1995| Children|      15|(18,[15],[1.0])|
|      1|    Toy Story (1995)|     1995|   Comedy|      14|(18,[14],[1.0])|
|      1|    Toy Story (1995)|     1995|  Fantasy|       7| (18,[7],[1.0])|
|      2|      Jumanji (1995)|     1995|Adventure|       3| (18,[3],[1.0])|
|      2|      Jumanji (1995)|     1995| Children|      15|(18,[15],[1.0])|
|      2|      Jumanji (1995)|     1995|  Fantasy|       7| (18,[7],[1.0])|
|      3|Grumpier Old Men ...|     1995|   Comedy|      14|(18,[14],[1.0])|
|      3|Grumpier Old Men ...|     1995|  Romance|       1| (18,[1],[1.0])|
|      4|Wai

In [14]:
# note that using Sparse and Dense Vectors from ml.linalg. There are other Sparse/Dense vectors in spark.

#This is user defined function to convert the sparse vector to array for easy parsing
def sparse_to_array(v):
  v = DenseVector(v)
  new_array = list([float(x) for x in v])
  return new_array

sparse_to_array_udf = F.udf(sparse_to_array, T.ArrayType(T.FloatType()))


In [15]:
genre_cleaning_df = transformed_df.withColumn('ohe_array', sparse_to_array_udf('ohe_test')) \
    .select("movieId","title","movieYear","genre","ohe_array") \
    .groupBy("movieId","title","movieYear") \
    .agg( 
        concat_ws('|',collect_list(col("genre"))).alias("genres"),
        F.array(*[F.sum(F.col('ohe_array')[i]) for i in range(len(genre_map))]).alias('dense_ohe_feature')
    )

In [16]:
genre_cleaning_df.show(5, truncate=False)

[Stage 21:>                                                         (0 + 1) / 1]

+-------+-------------------------------------+---------+-------------------------+------------------------------------------------------------------------------------------+
|movieId|title                                |movieYear|genres                   |dense_ohe_feature                                                                         |
+-------+-------------------------------------+---------+-------------------------+------------------------------------------------------------------------------------------+
|10     |GoldenEye (1995)                     |1995     |Action|Adventure|Thriller|[0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0]|
|1000   |Curdled (1996)                       |1996     |Crime                    |[1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]|
|100003 |Up in Smoke (1957)                   |1957     |Comedy                   |[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0

                                                                                

In [17]:
genre_cleaning_df = genre_cleaning_df.select(['movieId','title','movieYear','genres'] + [genre_cleaning_df.dense_ohe_feature[x] for x in range(len(genre_map))])

*To get the corresponding genre key*

In [18]:
def get_keys_from_value(d, val):
    return [k for k, v in d.items() if v == val][0]

*Renaming all the genre columns*

In [19]:
for i in range(len(genre_map)):
    old_column = "dense_ohe_feature[{}]".format(i)
    new_column = get_keys_from_value(genre_map,i)
    genre_cleaning_df = genre_cleaning_df.withColumnRenamed(old_column,new_column)
genre_cleaning_df.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- movieYear: string (nullable = true)
 |-- genres: string (nullable = false)
 |-- Crime: double (nullable = true)
 |-- Romance: double (nullable = true)
 |-- Thriller: double (nullable = true)
 |-- Adventure: double (nullable = true)
 |-- Drama: double (nullable = true)
 |-- War: double (nullable = true)
 |-- Documentary: double (nullable = true)
 |-- Fantasy: double (nullable = true)
 |-- Mystery: double (nullable = true)
 |-- Musical: double (nullable = true)
 |-- Animation: double (nullable = true)
 |-- Film-Noir: double (nullable = true)
 |-- Horror: double (nullable = true)
 |-- Western: double (nullable = true)
 |-- Comedy: double (nullable = true)
 |-- Children: double (nullable = true)
 |-- Action: double (nullable = true)
 |-- Sci-Fi: double (nullable = true)



## Add average movie ratings

In [20]:
ratings_df = spark.read.csv("file:///home/work/data/ratings.csv", inferSchema=True, header=True)
ratings_df.printSchema()



root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



                                                                                

In [21]:
genre_cleaning_df.createOrReplaceTempView("m")
ratings_df.createOrReplaceTempView("r")

In [22]:
avg_ratings = spark.sql("SELECT r.movieId, AVG(r.rating) as avg_rating FROM r GROUP BY r.movieId HAVING COUNT(r.userId) >= {} ORDER BY avg_rating DESC".format(min_ratings))
avg_ratings.createOrReplaceTempView("avg_r")

In [23]:
# join average ratings with movie table
final_df_cols = ["m.`" + c + "`" if '-' in c else "m." + c for c in genre_cleaning_df.columns]

final_df_cols.insert(2, "avg_r.avg_rating")
final_df_cols = ", ".join(final_df_cols)

# put together query
query = "SELECT " + final_df_cols + " FROM m JOIN avg_r ON m.movieId == avg_r.movieId"

# run query
cleaned_df = spark.sql(query)

if drop_missing_years:
    # Drop missing movie years
    cleaned_df = cleaned_df.filter(cleaned_df.movieYear != "")
    
cleaned_df.createOrReplaceTempView("c")

In [24]:
tot_ratings = spark.sql("SELECT r.movieId FROM r INNER JOIN c ON r.movieId == c.movieId").count()
tot_movies = cleaned_df.select('movieID').distinct().count()
tot_genre_combos = cleaned_df.select('genres').distinct().count()
print(f"Total distinct ratings after cleaning: {tot_ratings}")
print(f"Total distinct movies after cleaning: {tot_movies}")
print(f"Total distinct genres after cleaning: {tot_genre_combos}")



Total distinct ratings after cleaning: 24873798
Total distinct movies after cleaning: 24009
Total distinct genres after cleaning: 1232


                                                                                

In [25]:
if write_files and create_train_test:
    # Split cleaned data into train and test
    train, test = cleaned_df.randomSplit(train_test_ratio, seed=seed)
    print(f"Train observations: {train.count()} Test observations: {test.count()}")
    
    # write train file
    train.coalesce(1).write.csv('file:///home/work/data/kmeans_movies_train', header=True)
    
    # write test file
    test.coalesce(1).write.csv('file:///home/work/data/kmeans_movies_test', header=True)
elif write_files:
    cleaned_df.coalesce(1).write.csv('file:///home/work/data/cleaned_movies', header=True)

                                                                                

Train observations: 19199 Test observations: 4793


                                                                                

In [26]:
spark.stop()