# Let's build regression models to predict movie revenue and vote averages

### Interesting datasets for our problem : 
- metadata_df
- credits_df
- ratings_df (on a second time, to predict user rating for the movies they have not rated for)

# Running PySpark on Colab

In [153]:
 # install Java
! apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
! wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
! tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# install findspark using pip
! pip install -q findspark

# ! pip install pyspark

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [None]:
# ! pip install pandas-datareader

# # We have to restart the runtime after these two execution command
# ! pip install --upgrade pandas
# ! pip install --upgrade pandas-datareader

In [155]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Reading the data and proceed some useful transformations

In [9]:
# Loading dataframes - data can be found here : https://www.kaggle.com/datasets/rounakbanik/the-movies-dataset

metadata_df = spark.read.format("com.databricks.spark.csv") \
    .options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
    .csv('/content/drive/MyDrive/Colab_files/csv_files/movies_metadata.csv')

keywords_df = spark.read.format("com.databricks.spark.csv") \
    .options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
    .csv('/content/drive/MyDrive/Colab_files/csv_files/keywords.csv')

credits_df = spark.read.format("com.databricks.spark.csv") \
    .options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
    .csv('/content/drive/MyDrive/Colab_files/csv_files/credits.csv')

links_df = spark.read.format("com.databricks.spark.csv") \
    .options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
    .csv('/content/drive/MyDrive/Colab_files/csv_files/links.csv')

ratings_df = spark.read.format("com.databricks.spark.csv") \
    .options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
    .csv('/content/drive/MyDrive/Colab_files/csv_files/ratings_small.csv')

In [10]:
print(ratings_df.columns)
print(metadata_df.columns)

['userId', 'movieId', 'rating', 'timestamp']
['adult', 'belongs_to_collection', 'budget', 'genres', 'homepage', 'id', 'imdb_id', 'original_language', 'original_title', 'overview', 'popularity', 'poster_path', 'production_companies', 'production_countries', 'release_date', 'revenue', 'runtime', 'spoken_languages', 'status', 'tagline', 'title', 'video', 'vote_average', 'vote_count']


In [154]:
import pyspark

# Shape

def sparkShape(dataFrame) :
    return (dataFrame.count(), len(dataFrame.columns))
pyspark.sql.dataframe.DataFrame.shape = sparkShape

# Some preprocessing yet again

In [47]:
# Read CSV file into a DataFrame

# joined_df = spark.read.options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
#                                        .csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df.csv")

joined_df = spark.read.options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
                                       .csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df.csv")

In [48]:
print(joined_df.shape())
joined_df.show(5, truncate=False)
# joined_df.columns

(42497, 30)
+-----+---------+------+---------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+----------+------------+-------+-------+--------+------------------+-----+------------+----------+---------+----+-------+------------+--------------------------+---+---------------+-------+-------+-------+------+-------+------+----------+
|adult|budget   |id_ori|imdb_id  |original_language|original_title    |overview                                                                                                                      |popularity|release_date|revenue|runtime|status  |title             |video|vote_average|vote_count|movie_age|year|name   |genres_value|production_countries_value|id |director       |actor_1|actor_2|actor_3|userId|movieId|rating|timestamp |
+-----+---------+------+---------+-----------------+------------------+-------------------------------------------

In [49]:
# Separate the columns into categorial, numerical & targets ones

def preprocessing(df) :

  categorial_features_cols = ['movieId', 'userId', 'original_language', 'original_title', 'overview',
                              'name', 'genres_value', 'production_countries_value', 
                              'director', 'actor_1', 'actor_2', 'actor_3']
  numeric_features_cols = ['budget', 'popularity', 'runtime', 'vote_count', 'movie_age', 'year', 'rating']
  target_cols = ['revenue', 'vote_average']
  # other_cols = ['adult', 'id_ori', 'imdb_id', 'release_date', 'status', 'title', 'video', 'id', 'timestamp']
  
  # features_cols = categorial_features_cols + numeric_features_cols
  cols = categorial_features_cols + numeric_features_cols + target_cols

  df = df.select([col for col in cols]) # joined_df.select(*features_cols)

  return df


sparkDF = preprocessing(joined_df)

print(sparkDF.shape())
sparkDF.show(5, truncate=False)

(42497, 21)
+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+-------+------------+--------------------------+---------------+-------+-------+-------+---------+----------+-------+----------+---------+----+------+-------+------------+
|movieId|userId|original_language|original_title    |overview                                                                                                                      |name   |genres_value|production_countries_value|director       |actor_1|actor_2|actor_3|budget   |popularity|runtime|vote_count|movie_age|year|rating|revenue|vote_average|
+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+-------+------------+--------------------------+---------------+-------+-------+-------+---------+------

In [50]:
import pandas as pd
from pyspark.sql.functions import col



# Reindex & rename some columns
def reindex_and_rename(sparkDF) :

  categorial_features_cols = ['movieId', 'userId', 'original_language', 'original_title', 'overview',
                              'name', 'genres_value', 'production_countries_value', 
                              'director', 'actor_1', 'actor_2', 'actor_3']
  numeric_features_cols = ['budget', 'popularity', 'runtime', 'vote_count', 'movie_age', 'year', 'rating']
  target_cols = ['revenue', 'vote_average']
  # other_cols = ['adult', 'id_ori', 'imdb_id', 'release_date', 'status', 'title', 'video', 'id', 'timestamp']

  cols = categorial_features_cols + numeric_features_cols + target_cols

  # From Spark DataFrame to Pandas DataFrame
  pandasDF = sparkDF.toPandas()

  # Change the position of columns
  pandasDF = pandasDF.reindex(columns=cols) 
  sparkDF = sparkDF.select(*cols)

  # Rename some columns
  pandasDF.rename(columns = {'name':'collection_name', 
                            'genres_value':'genres', 
                            'production_countries_value':'production_countries'}, inplace=True)
  sparkDF = sparkDF.withColumnRenamed('name', 'collection_name') \
                  .withColumnRenamed('genres_value', 'genres') \
                  .withColumnRenamed('production_countries_value', 'production_countries') \

  return pandasDF, sparkDF


pandasDF, sparkDF = reindex_and_rename(sparkDF)

# print(pandasDF.head(5))
sparkDF.show(5, truncate=False)

+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+---------------+------+------------------------+---------------+-------+-------+-------+---------+----------+-------+----------+---------+----+------+-------+------------+
|movieId|userId|original_language|original_title    |overview                                                                                                                      |collection_name|genres|production_countries    |director       |actor_1|actor_2|actor_3|budget   |popularity|runtime|vote_count|movie_age|year|rating|revenue|vote_average|
+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+---------------+------+------------------------+---------------+-------+-------+-------+---------+----------+-------

The '*genres*' column consists of multiple values. It needs to be decomposed into multiple columns, one value for each column.

In [51]:
# Implement a function that converts the values in the 'genres' column into a dictionary of values

def to_dict(row) :
  if type(row['genres']) is str :
    string = "{'"
    lst = row['genres'].split('; ')
    for i in range(len(lst)) : # lst[i] = 'Action'    
      string += str(i)+"':'"+lst[i]+"','"
    string = string[:-2]
    string += "}"

    return string


print(pandasDF['genres'].head(3))

pandasDF['genres'] = pandasDF.apply(lambda row: to_dict(row), axis=1)

print(pandasDF['genres'].head(3))

0    Comedy
1    Comedy
2    Comedy
Name: genres, dtype: object
0    {'0':'Comedy'}
1    {'0':'Comedy'}
2    {'0':'Comedy'}
Name: genres, dtype: object


Let's split the new '*genres*' column into several columns with PySpark

In [53]:
from pyspark.sql.types import StructType, StringType, IntegerType, MapType, ArrayType, StructField
from pyspark.sql.functions import count, when, col, isnan, from_json, to_json


# From Pandas DataFrame to Spark DataFrame
sparkDF = spark.createDataFrame(pandasDF)

schema = MapType(StringType(), StringType(), True)
mask = from_json(sparkDF.genres, schema) # from_json(col('genres').cast("string"), schema)
sparkDF = sparkDF.withColumn("genres", mask)


print(sparkDF.shape())
sparkDF.show(5, truncate=False)

(42497, 21)
+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+---------------+-------------+------------------------+---------------+-------+-------+-------+---------+----------+-------+----------+---------+----+------+-------+------------+
|movieId|userId|original_language|original_title    |overview                                                                                                                      |collection_name|genres       |production_countries    |director       |actor_1|actor_2|actor_3|budget   |popularity|runtime|vote_count|movie_age|year|rating|revenue|vote_average|
+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+---------------+-------------+------------------------+---------------+-------+-------+---

In [54]:
from pyspark.sql.functions import from_json, explode, map_keys


# Retrieve distinct keys from JSON object
distinct_keys_df = sparkDF.select(explode(map_keys(col('genres')))) \
                          .distinct()

# Convert key collection object into a list
key_list = list(map(lambda row: row[0], distinct_keys_df.collect()))

# Extract values based on keys and store them in separate columns
key_cols = map(lambda f: sparkDF['genres'].getItem(f).alias(str(f)), key_list)

sparkDF = sparkDF.select(*sparkDF.columns, *key_cols)


# df.printSchema()
print(sparkDF.shape())
sparkDF.show(5, truncate=False)

(42497, 28)
+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+---------------+-------------+------------------------+---------------+-------+-------+-------+---------+----------+-------+----------+---------+----+------+-------+------------+----+------+----+----+----+----+----+
|movieId|userId|original_language|original_title    |overview                                                                                                                      |collection_name|genres       |production_countries    |director       |actor_1|actor_2|actor_3|budget   |popularity|runtime|vote_count|movie_age|year|rating|revenue|vote_average|3   |0     |5   |6   |1   |4   |2   |
+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+---------------+

In [55]:
import numpy as np


# Some other preprocessing operations

def new_preprocessing(sparkDF) :

  # Drop the useless 'genres' dictionary column
  sparkDF = sparkDF.drop('genres')

  # Rename new 'genres' columns which have been created
  sparkDF = sparkDF.withColumnRenamed('0', 'genre_1') \
                  .withColumnRenamed('1', 'genre_2') \
                  .withColumnRenamed('2', 'genre_3') \
                  .withColumnRenamed('3', 'genre_4') \
                  .withColumnRenamed('4', 'genre_5') \
                  .withColumnRenamed('5', 'genre_6') \
                  .withColumnRenamed('6', 'genre_7') \


  categorial_features_cols = ['movieId', 'userId', 'original_language', 'original_title', 'overview',
                              'collection_name', 'production_countries', 
                              'director', 'actor_1', 'actor_2', 'actor_3', 
                              'genre_1', 'genre_2', 'genre_3', 'genre_4', 'genre_5', 'genre_6', 'genre_7']
  numeric_features_cols = ['budget', 'popularity', 'runtime', 'vote_count', 'movie_age', 'year', 'rating']
  target_cols = ['revenue', 'vote_average']


  cols = categorial_features_cols + numeric_features_cols + target_cols

  # Replace cells which contains null values by 'np.nan' ones
  cols = sparkDF.columns
  sparkDF = sparkDF.na.fill(value=np.nan, subset=[*cols])

  # Change position of columns
  sparkDF = sparkDF.select(*cols)

  return sparkDF


sparkDF = new_preprocessing(sparkDF)


print(sparkDF.shape())
sparkDF.show(5, truncate=False)

(42497, 27)
+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+---------------+------------------------+---------------+-------+-------+-------+---------+----------+-------+----------+---------+----+------+-------+------------+-------+-------+-------+-------+-------+-------+-------+
|movieId|userId|original_language|original_title    |overview                                                                                                                      |collection_name|production_countries    |director       |actor_1|actor_2|actor_3|budget   |popularity|runtime|vote_count|movie_age|year|rating|revenue|vote_average|genre_4|genre_1|genre_6|genre_7|genre_2|genre_5|genre_3|
+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+------

Handle data type issues on numerical columns: remove cells containing strings

In [56]:
pandasDF_ = sparkDF.toPandas()
pandasDF_.tail(2)
# pandasDF_.columns

Unnamed: 0,movieId,userId,original_language,original_title,overview,collection_name,production_countries,director,actor_1,actor_2,...,rating,revenue,vote_average,genre_4,genre_1,genre_6,genre_7,genre_2,genre_5,genre_3
42495,2295,587,en,Clerks II,A calamity at Dante and Randall's shops sends ...,Clerks Collection,United States of America,Kevin Smith,,,...,4.0,26888376.0,6.9,,Comedy,,,,,
42496,2295,564,en,Clerks II,A calamity at Dante and Randall's shops sends ...,Clerks Collection,United States of America,Kevin Smith,,,...,4.0,26888376.0,6.9,,Comedy,,,,,


In [58]:
num_cols = ['budget', 'popularity', 'runtime', 'vote_count', 'movie_age', 'year', 'rating', 'revenue', 'vote_average']

print(pandasDF_.shape)
pandasDF_[num_cols].head(2)

(42497, 27)


Unnamed: 0,budget,popularity,runtime,vote_count,movie_age,year,rating,revenue,vote_average
0,2000000.0,9.241748,125.0,756,82.0,1940,2.0,11000000.0,8.1
1,2000000.0,9.241748,125.0,756,82.0,1940,3.5,11000000.0,8.1


In [59]:
import re


# Implement the function that removes a row if a numerical column contains a string
def drop_row_if_bad_type(df, col) :
  for i in df.index :
    print(i)
    df[col] = df[col].apply(str)
    # If the cell does not start with one or more numeric characters
    if re.search('^\s*[^0-9]', df.loc[i, col]) :
      df = df.drop(labels=[i], axis=0)

  return df


# print(len(pandasDF_))

numeric_columns = ['budget', 'popularity', 'runtime', 'vote_count', 'movie_age', 'year', 'rating', 'revenue', 'vote_average']
for col in numeric_columns : 
  pandasDF_ = drop_row_if_bad_type(pandasDF_, col)
  
# print(len(pandasDF_))

[1;30;43mLe flux de sortie a été tronqué et ne contient que les 5000 dernières lignes.[0m
37498
37499
37500
37501
37502
37503
37504
37505
37506
37507
37508
37509
37510
37511
37512
37513
37514
37515
37516
37517
37518
37519
37520
37521
37522
37523
37524
37525
37526
37527
37528
37529
37530
37531
37532
37533
37534
37535
37536
37537
37538
37539
37540
37541
37542
37543
37544
37545
37546
37547
37548
37549
37550
37551
37552
37553
37554
37555
37556
37557
37558
37559
37560
37561
37562
37563
37564
37565
37566
37567
37568
37569
37570
37571
37572
37573
37574
37575
37576
37577
37578
37579
37580
37581
37582
37583
37584
37585
37586
37587
37588
37589
37590
37591
37592
37593
37594
37595
37596
37597
37598
37599
37600
37601
37602
37603
37604
37605
37606
37607
37608
37609
37610
37611
37612
37613
37614
37615
37616
37617
37618
37619
37620
37621
37622
37623
37624
37625
37626
37627
37628
37629
37630
37631
37632
37633
37634
37635
37636
37637
37638
37639
37640
37641
37642
37643
37644
37645
37646
37647
37648
37

In [62]:
pandasDF_.tail(3)

Unnamed: 0,movieId,userId,original_language,original_title,overview,collection_name,production_countries,director,actor_1,actor_2,...,rating,revenue,vote_average,genre_4,genre_1,genre_6,genre_7,genre_2,genre_5,genre_3
42494,4169,294,en,Breach,Eric O'Neill; a computer specialist who wants ...,no_data,United States of America,Billy Ray,Chris Cooper,Ryan Phillippe,...,4.5,33231264.0,6.5,History,Drama,,,Thriller,,Crime
42495,2295,587,en,Clerks II,A calamity at Dante and Randall's shops sends ...,Clerks Collection,United States of America,Kevin Smith,,,...,4.0,26888376.0,6.9,,Comedy,,,,,
42496,2295,564,en,Clerks II,A calamity at Dante and Randall's shops sends ...,Clerks Collection,United States of America,Kevin Smith,,,...,4.0,26888376.0,6.9,,Comedy,,,,,


In [63]:
print(pandasDF_.shape)

(42496, 27)


In [64]:
pandasDF_.to_csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv", index=False, sep=",")

# 2. Build regression models to predict movie revenue and vote averages

In [70]:
import pandas as pd

pandasDF_ = pd.read_csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv", header='infer', sep=",")

# print(pandasDF_.dtypes)

# Change data type
numeric_features_cols = ['budget', 'popularity', 'runtime', 'vote_count', 'movie_age', 'rating', 'revenue', 'vote_average']
categorial_features_cols = list(set(pandasDF_.columns) - set(numeric_features_cols))


def convert_column_to_string(df, column_name) :
    """
    Convert column values to strings if they are numeric
    """
    if df[column].dtype != 'string' :
      df[column_name] = df[column_name].astype("string")
    return df

def convert_column_to_string_(df, column_name) :
    """
    Convert column values to strings if they are numeric
    """
    df[column_name] = df[column_name].apply(str)
    return df

def replace_null_values(df, value) :
    """
    Replace all null values in the dataframe with the given value.
    """
    df = df.fillna(value)
    # df = df.replace('<NA>', value)
    return df


for column in categorial_features_cols :
  pandasDF_ = convert_column_to_string(pandasDF_, column)
  pandasDF_ = convert_column_to_string_(pandasDF_, column)
pandasDF_ = replace_null_values(pandasDF_, 'no_data')


# print(pandasDF_.dtypes)
pandasDF_.head(3)

  exec(code_obj, self.user_global_ns, self.user_ns)


Unnamed: 0,movieId,userId,original_language,original_title,overview,collection_name,production_countries,director,actor_1,actor_2,...,rating,revenue,vote_average,genre_4,genre_1,genre_6,genre_7,genre_2,genre_5,genre_3
0,914,628,en,The Great Dictator,Dictator Adenoid Hynkel tries to expand his em...,no_data,United States of America,Charlie Chaplin,,,...,2.0,11000000.0,8.1,,Comedy,,,,,
1,914,621,en,The Great Dictator,Dictator Adenoid Hynkel tries to expand his em...,no_data,United States of America,Charlie Chaplin,,,...,3.5,11000000.0,8.1,,Comedy,,,,,
2,914,620,en,The Great Dictator,Dictator Adenoid Hynkel tries to expand his em...,no_data,United States of America,Charlie Chaplin,,,...,4.5,11000000.0,8.1,,Comedy,,,,,


In [71]:
pandasDF_.to_csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv", index=False, sep=",")

In [72]:
# pandasDF_ = pd.read_csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv", header='infer', sep=",") # pose des problèmes pour convertir ce DF en pyspark DF

sparkDF = spark.read.options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
                    .csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv")

In [73]:
# sparkDF = spark.createDataFrame(pandasDF_)
print(sparkDF.shape())
sparkDF.show(5, truncate=False)

(42496, 27)
+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+---------------+------------------------+---------------+-------+-------+-------+---------+----------+-------+----------+---------+----+------+-------+------------+-------+-------+-------+-------+-------+-------+-------+
|movieId|userId|original_language|original_title    |overview                                                                                                                      |collection_name|production_countries    |director       |actor_1|actor_2|actor_3|budget   |popularity|runtime|vote_count|movie_age|year|rating|revenue|vote_average|genre_4|genre_1|genre_6|genre_7|genre_2|genre_5|genre_3|
+-------+------+-----------------+------------------+------------------------------------------------------------------------------------------------------------------------------+------

## 2.1. Let's try with PySpark & MLlib *`RandomForestRegressor`* model

In [74]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler #, OneHotEncoderEstimator # doesn't work
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql.functions import col
from tqdm import tqdm


sparkDF = spark.read.options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
                    .csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv")


# The code below shows how to load data, split categorical and numerical variables, 
# transform categorical variables into numerical variables, normalize the data and train a regression model with PySpark


# Select the columns to be used as input variables and as targets
# Separate categorical and numerical variables
# categorial_features_cols = ['director', 'actor_1', 'actor_2', 'actor_3', 'genre_1', 'genre_2', 'genre_3', 'genre_4', 'genre_5', 'genre_6', 'genre_7']
categorial_features_cols = ['genre_1', 'genre_2']
numeric_features_cols = ['budget', 'popularity', 'runtime', 'vote_count', 'movie_age', 'year', 'rating', 'vote_average', 'revenue']
target_cols = ['vote_average', 'revenue']


###########################################################################################################################

# Convert string columns to numerical columns
str_indexers = [StringIndexer(inputCol=col, outputCol="{}_index".format(col)) for col in categorial_features_cols]
for indexer in str_indexers :
  sparkDF = indexer.fit(sparkDF).transform(sparkDF)


# Transform/encode categorical columns into numerical columns
encoders = [OneHotEncoder(inputCol=idx.getOutputCol(), 
                          outputCol="{}_encoded".format(idx.getInputCol())) for idx in str_indexers]
for encoder in encoders :
  sparkDF = encoder.fit(sparkDF).transform(sparkDF)

# sparkDF.show(2, truncate=False)

# Combine numerical and categorical variables into a single vector
assembler = VectorAssembler(inputCols=[col+"_encoded" for col in categorial_features_cols] + numeric_features_cols, outputCol="features")
# sparkDF = assembler.transform(sparkDF) # not necessary as there is a pipeline below

# --> Assemble then normalize, because 'inputCol' of StandardScaler takes a string as a parameter and not a list

# Normalizing the (numericals) variables
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
# sparkDF = scaler.fit(sparkDF).transform(sparkDF) # not necessary as there is a pipeline below

# sparkDF.show(2, truncate=False)


###########################################################################################################################

# Split data into training and test sets (80% for training and 20% for testing)
split = sparkDF.randomSplit([0.8, 0.2], seed=42)
(train, test) = split[0], split[1]

# Define a regression model
reg_model_1 = RandomForestRegressor(featuresCol="scaled_features", labelCol='vote_average', maxBins=150)
reg_model_2 = RandomForestRegressor(featuresCol="scaled_features", labelCol="revenue", maxBins=150)

# Assemble the preprocessing steps and the regression algorithm into a pipeline
# pipeline = Pipeline(stages=encoders+ [assembler, scaler, model]) # does not work because already encoded above :
# -> 'IllegalArgumentException: requirement failed: Column genre_1_encoded already exists.'
pipeline_1 = Pipeline(stages=[assembler, scaler, reg_model_1])
pipeline_2 = Pipeline(stages=[assembler, scaler, reg_model_2])

# Train the regression model on the training data
# reg_model = model.fit(train) # if no pipeline
model_1 = pipeline_1.fit(train)
model_2 = pipeline_2.fit(train)

# Create predictions using the model.
# y_pred = reg_model.transform(test) # if no pipeline
predictions_1 = model_1.transform(test)
predictions_2 = model_2.transform(test)

predictions_1.select(col("prediction"), col("vote_average")).show(5, truncate=False)
predictions_2.select(col("prediction"), col("revenue")).filter(col('revenue') != 0).show(5, truncate=False)


###########################################################################################################################

# Evaluate the model on the test data
evaluator_1 = RegressionEvaluator(predictionCol="prediction", labelCol="vote_average", metricName="r2")
evaluator_2 = RegressionEvaluator(predictionCol="prediction", labelCol="revenue", metricName="r2")

# R2, the coefficient of determination
print("R2 on vote average : ", evaluator_1.evaluate(predictions_1), " | R2 on revenue : ", evaluator_2.evaluate(predictions_2))

# Root Mean Square Error
rmse_1 = evaluator_1.evaluate(predictions_1, {evaluator_1.metricName: 'rmse'})
rmse_2 = evaluator_2.evaluate(predictions_2, {evaluator_2.metricName: 'rmse'})
print("RMSE on vote average : ", rmse_1, " | RMSE on revenue : ", rmse_2)

# Mean Square Error
mse_1 = evaluator_1.evaluate(predictions_1, {evaluator_1.metricName: 'mse'})
mse_2 = evaluator_2.evaluate(predictions_2, {evaluator_2.metricName: 'mse'})
print("MSE on vote average : ", mse_1, " | MSE on revenue : ", mse_2)

# Mean Absolute Error
mae_1 = evaluator_1.evaluate(predictions_1, {evaluator_1.metricName: 'mae'})
mae_2 = evaluator_2.evaluate(predictions_2, {evaluator_2.metricName: 'mae'})
print("MAE on vote average : ", mae_1, " | MAE on revenue : ", mae_2)

+-----------------+------------+
|prediction       |vote_average|
+-----------------+------------+
|7.048100322349313|7.1         |
|7.048100322349313|7.1         |
|7.048100322349313|7.1         |
|7.048100322349313|7.1         |
|7.048100322349313|7.1         |
+-----------------+------------+
only showing top 5 rows

+--------------------+---------+
|prediction          |revenue  |
+--------------------+---------+
|2.1094947839870833E7|4300000.0|
|2.1094947839870833E7|4300000.0|
|2.1094947839870833E7|4300000.0|
|2.1094947839870833E7|4300000.0|
|2.1094947839870833E7|4300000.0|
+--------------------+---------+
only showing top 5 rows

R2 on vote average :  0.9523723051483397  | R2 on revenue :  0.9838207379921022
RMSE on vote average :  0.2038433436610161  | RMSE on revenue :  22288898.04129224
MSE on vote average :  0.04155210875490312  | MSE on revenue :  496794975895121.0
MAE on vote average :  0.13478047259946557  | MAE on revenue :  11560698.515856942


## 2.2. Let's try with PySpark & MLlib *`GBTRegressor`* model

In [75]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, StandardScaler, VectorAssembler #, OneHotEncoderEstimator # doesn't work
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql.functions import col
from tqdm import tqdm


sparkDF = spark.read.options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
                    .csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv")


# The code below shows how to load data, split categorical and numerical variables, 
# transform categorical variables into numerical variables, normalize the data and train a regression model with PySpark


# Select the columns to be used as input variables and as targets
# Separate categorical and numerical variables
# categorial_features_cols = ['director', 'actor_1', 'actor_2', 'actor_3', 'genre_1', 'genre_2', 'genre_3', 'genre_4', 'genre_5', 'genre_6', 'genre_7']
categorial_features_cols = ['genre_1', 'genre_2']
numeric_features_cols = ['budget', 'popularity', 'runtime', 'vote_count', 'movie_age', 'year', 'rating', 'vote_average', 'revenue']
target_cols = ['vote_average', 'revenue']


###########################################################################################################################

# Convert string columns to numerical columns
str_indexers = [StringIndexer(inputCol=col, outputCol="{}_index".format(col)) for col in categorial_features_cols]
for indexer in str_indexers :
  sparkDF = indexer.fit(sparkDF).transform(sparkDF)


# Transform/encode categorical columns into numerical columns
encoders = [OneHotEncoder(inputCol=idx.getOutputCol(), 
                          outputCol="{}_encoded".format(idx.getInputCol())) for idx in str_indexers]
for encoder in encoders :
  sparkDF = encoder.fit(sparkDF).transform(sparkDF)

# sparkDF.show(2, truncate=False)

# Combine numerical and categorical variables into a single vector
assembler = VectorAssembler(inputCols=[col+"_encoded" for col in categorial_features_cols] + numeric_features_cols, outputCol="features")
# sparkDF = assembler.transform(sparkDF) # not necessary as there is a pipeline below

# --> Assemble then normalize, because 'inputCol' of StandardScaler takes a string as a parameter and not a list

# Normalizing the (numericals) variables
scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withStd=True, withMean=True)
# sparkDF = scaler.fit(sparkDF).transform(sparkDF) # not necessary as there is a pipeline below

# sparkDF.show(2, truncate=False)


###########################################################################################################################

# Split data into training and test sets (80% for training and 20% for testing)
split = sparkDF.randomSplit([0.8, 0.2], seed=42)
(train, test) = split[0], split[1]

# Define a regression model
reg_model_1 = GBTRegressor(featuresCol="scaled_features", labelCol='vote_average', maxBins=150)
reg_model_2 = GBTRegressor(featuresCol="scaled_features", labelCol="revenue", maxBins=150)

# Assemble the preprocessing steps and the regression algorithm into a pipeline
# pipeline = Pipeline(stages=encoders+ [assembler, scaler, model]) # does not work because already encoded above :
# -> 'IllegalArgumentException: requirement failed: Column genre_1_encoded already exists.'
pipeline_1 = Pipeline(stages=[assembler, scaler, reg_model_1])
pipeline_2 = Pipeline(stages=[assembler, scaler, reg_model_2])

# Train the regression model on the training data
# reg_model = model.fit(train) # if no pipeline
model_1 = pipeline_1.fit(train)
model_2 = pipeline_2.fit(train)

# Create predictions using the model.
# y_pred = reg_model.transform(test) # if no pipeline
predictions_1 = model_1.transform(test)
predictions_2 = model_2.transform(test)

predictions_1.select(col("prediction"), col("vote_average")).show(5, truncate=False)
predictions_2.select(col("prediction"), col("revenue")).filter(col('revenue') != 0).show(5, truncate=False)


###########################################################################################################################

# Evaluate the model on the test data
evaluator_1 = RegressionEvaluator(predictionCol="prediction", labelCol="vote_average", metricName="r2")
evaluator_2 = RegressionEvaluator(predictionCol="prediction", labelCol="revenue", metricName="r2")

# R2, the coefficient of determination
print("R2 on vote average : ", evaluator_1.evaluate(predictions_1), " | R2 on revenue : ", evaluator_2.evaluate(predictions_2))

# Root Mean Square Error
rmse_1 = evaluator_1.evaluate(predictions_1, {evaluator_1.metricName: 'rmse'})
rmse_2 = evaluator_2.evaluate(predictions_2, {evaluator_2.metricName: 'rmse'})
print("RMSE on vote average : ", rmse_1, " | RMSE on revenue : ", rmse_2)

# Mean Square Error
mse_1 = evaluator_1.evaluate(predictions_1, {evaluator_1.metricName: 'mse'})
mse_2 = evaluator_2.evaluate(predictions_2, {evaluator_2.metricName: 'mse'})
print("MSE on vote average : ", mse_1, " | MSE on revenue : ", mse_2)

# Mean Absolute Error
mae_1 = evaluator_1.evaluate(predictions_1, {evaluator_1.metricName: 'mae'})
mae_2 = evaluator_2.evaluate(predictions_2, {evaluator_2.metricName: 'mae'})
print("MAE on vote average : ", mae_1, " | MAE on revenue : ", mae_2)

+-----------------+------------+
|prediction       |vote_average|
+-----------------+------------+
|7.100054324179656|7.1         |
|7.100054324179656|7.1         |
|7.100054324179656|7.1         |
|7.100054324179656|7.1         |
|7.100054324179656|7.1         |
+-----------------+------------+
only showing top 5 rows

+----------------+---------+
|prediction      |revenue  |
+----------------+---------+
|5385536.09813839|4300000.0|
|5385536.09813839|4300000.0|
|5385536.09813839|4300000.0|
|5385536.09813839|4300000.0|
|5385536.09813839|4300000.0|
+----------------+---------+
only showing top 5 rows

R2 on vote average :  0.999611100903611  | R2 on revenue :  0.999654785002428
RMSE on vote average :  0.01841981314459533  | RMSE on revenue :  3255772.6536084097
MSE on vote average :  0.00033928951628180685  | MSE on revenue :  10600055571984.346
MAE on vote average :  0.010253769747871021  | MAE on revenue :  1730130.3359182288


# 3. Collaborative filtering to build a movie recommendation system with two functions :

## 3.1. Suggest top N movies similar to a given movie title

### 3.1.a. With *`ALS`* algorithm

In [None]:
from pyspark.sql.functions import col


sparkDF = spark.read.options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
                    .csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv")

sparkDF.show(2, truncate=False)

+-------+------+-----------------+--------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+------------------------+------------+------------+-------------------+------------+--------+---------------+-------+-------+-------+-------+-------+------+----------+-------+----------+---------+------+-----------+------------+
|movieId|userId|original_language|original_title      |overview                                                                                                                                                                                                                                                                                                                       |colle

In [258]:
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, udf, struct, arrays_zip, explode, split, substring, concat, lit, regexp_replace
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, MapType, ArrayType


sparkDF = spark.read.options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
                    .csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv")


def top_N_similar_movies(df, movie_title) :

  # Define the ALS model
  als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

  # Fit the model to the data
  model = als.fit(df)

  # Define the movie title for which you want to get recommendations
  # movie_title = "The Thirteenth Floor"

  # Generate top 10 user recommendations for a given movie title
  # Get the movie_id for the given movie title
  movie_id = sparkDF.where(col("original_title") == movie_title) \
                  .select(als.getItemCol()).distinct() # .select("movieId")

  # Use the model to generate top N movie recommendations
  top_N = 3

  # Get the top N recommended_movies DataFrame
  recommended_movies = model.recommendForItemSubset(movie_id, top_N)
  # recommended_movies.show(5, truncate=False)

  ##############################################################################

  # # Create a sub DataFrame, composed of the movieId & rating, columns from the recommended_movies DataFrame
  # sub_recommended_movies = recommended_movies.select("recommendations.movieId", "recommendations.rating")
  # schema = StructType([StructField('movieId', IntegerType(), nullable=True), 
  #                      StructField('rating', DoubleType(), nullable=True)])
  # extract_val = udf(lambda x: (x[0], x[1]), schema)
  # sub_recommended_movies = recommended_movies.withColumn("recommendations", extract_val("recommendations")) \
  #                                            .select(col("movieId"), col("recommendations.movieId"), col("recommendations.rating"))
  sub_recommended_movies = recommended_movies.withColumn('recommendations', explode('recommendations'))

  ##############################################################################

  sub_recommended_movies = sub_recommended_movies.withColumn("recommendations", col("recommendations").cast("string")) \
                                                .withColumn('recommendations', regexp_replace('recommendations', ']', '')) \
                                                .withColumn('recommendations', substring('recommendations', 2, 50))

  # Use the split function to separate the values in the "recommendations" column using the comma as separator
  split_col = split(col("recommendations"), ",")

  # Create two new columns using the values of the split column
  sub_recommended_movies = sub_recommended_movies.withColumn("userId", split_col.getItem(0)) \
                                                .withColumn("rating", split_col.getItem(1))

  # Change the column types to match the expected types for userId and rating
  sub_recommended_movies = sub_recommended_movies.withColumn("userId", col("userId").cast("int")) \
                                                .withColumn("rating", col("rating").cast("float"))

  ##############################################################################

  # Display the final result
  sub_recommended_movies.show(5, truncate=False)

  return sub_recommended_movies

sub_recommended_movies = top_N_similar_movies(sparkDF, "The Thirteenth Floor")

+-------+---------------+------+---------+
|movieId|recommendations|userId|rating   |
+-------+---------------+------+---------+
|1090   |231, 7.4616146 |231   |7.4616146|
|1090   |364, 7.455324  |364   |7.455324 |
|1090   |64, 7.0616345  |64    |7.0616345|
+-------+---------------+------+---------+



In [259]:
# Join with the original movie data to get the movie titles for the recommended movies
recommended_movies = sub_recommended_movies.join(sparkDF, sub_recommended_movies.movieId == sparkDF.movieId) \
                                            .select("movieId", "original_title", "vote_count", "vote_average", "age", "rating", "overview")

# Show the top N recommended movies
recommended_movies.show(5, truncate=False)

AnalysisException: ignored

In [None]:
# recommended_movies.select(col("recommendations")).show(5, truncate=False)
# recommended_movies.select(col("recommendations").getItem(0)).show(5, truncate=False)
# recommended_movies.select(col("recommendations").getItem(0)).collect()[0][0][0]

+----------------------------------+
|recommendations                   |
+----------------------------------+
|[[477, 6.865376], [112, 6.310051]]|
+----------------------------------+

+------------------+
|recommendations[0]|
+------------------+
|[477, 6.865376]   |
+------------------+



477

### 3.1.b. With K-*`Means`* algorithm

In [156]:
# sparkDF = spark.read.options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
#                     .csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv")

sparkDF = spark.read.format("com.databricks.spark.csv") \
                    .options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
                    .csv('/content/drive/MyDrive/Colab_files/csv_files/ratings_small.csv')


In [157]:
# Limitation of the number of lines to limit the computation time
sparkDF = sparkDF.limit(1000)

print(sparkDF.shape())
sparkDF.show(5, truncate=False)

(1000, 4)
+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |31     |2.5   |1260759144|
|1     |1029   |3.0   |1260759179|
|1     |1061   |3.0   |1260759182|
|1     |1129   |2.0   |1260759185|
|1     |1172   |4.0   |1260759205|
+------+-------+------+----------+
only showing top 5 rows



Then we use the *K-Means* algorithm in order to cluster the users in the *user-movie matrix*

In [158]:
from pyspark.sql.functions import col

# Group the data by viewer ID
groupedDF = sparkDF.groupBy('userId')

# Apply the pivot function to group the data by movie ID
pivotedDF = groupedDF.pivot('movieId').agg({'rating': 'first'})

# Fill the null values with 0
pivotedDF = pivotedDF.na.fill(0)

In [159]:
print(pivotedDF.shape())
pivotedDF.show(8, truncate=False)

(15, 670)
+------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---

In [160]:
# from pyspark.mllib.clustering import KMeans
from pyspark.ml.clustering import KMeans
from pyspark.mllib.linalg import Vectors
from pyspark.ml.evaluation import ClusteringEvaluator 
from pyspark.ml.feature import VectorAssembler


# Transform the dataframe into a vector format
assembler = VectorAssembler(inputCols=pivotedDF.columns, outputCol="features")
matrixDF = assembler.transform(pivotedDF)

print(matrixDF.shape())
matrixDF.show(8, truncate=False)


(15, 671)
+------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---

In [161]:
from tqdm import tqdm

# Define the number of clusters
nb_clusters = 3

# Fit the model
kmeans = KMeans().setK(nb_clusters).setSeed(1)
for _ in tqdm(range(100)) :
  model = kmeans.fit(matrixDF)

# Using the model to make predictions
user_cluster_preds = model.transform(matrixDF)

100%|██████████| 100/100 [12:47<00:00,  7.68s/it]


In [162]:
user_cluster_preds.drop("features").show(8, truncate=False)

+------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---

In [163]:
from pyspark.sql.functions import col

# Displaying of the number of predicted data points for each cluster
counts_by_cluster = user_cluster_preds.groupBy("prediction").count()
counts_by_cluster.show()

+----------+-----+
|prediction|count|
+----------+-----+
|         0|   13|
|         1|    1|
|         2|    1|
+----------+-----+



In [164]:
evaluator = ClusteringEvaluator()

# Compute the silhouette score
silhouette = evaluator.evaluate(user_cluster_preds)
print("Silhouette using squared Euclidean distance as the measure : ", 
      silhouette)

Silhouette using squared Euclidean distance as the measure :  0.32075675585428937


Then we use the *K-Means* algorithm in order to cluster the movies in the *user-movie matrix*

In [165]:
from pyspark.sql.functions import col

# Group the data by movie ID
groupedDF_ = sparkDF.groupBy('movieId')

# Apply the pivot function to group the data by user ID
pivotedDF_ = groupedDF_.pivot('userId').agg({'rating': 'first'})

# Fill the null values with 0
pivotedDF_ = pivotedDF_.na.fill(0)

In [166]:
print(pivotedDF_.shape())
pivotedDF_.show(5, truncate=False)

(669, 16)
+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|movieId|1  |2  |3  |4  |5  |6  |7  |8  |9  |10 |11 |12 |13 |14 |15 |
+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|31     |2.5|0.0|0.0|0.0|0.0|0.0|3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|
|1029   |3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|
|1061   |3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|
|1129   |2.0|0.0|0.0|0.0|0.0|0.0|3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|
|1172   |4.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|
+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
only showing top 5 rows



In [167]:
# from pyspark.mllib.clustering import KMeans
from pyspark.ml.clustering import KMeans
from pyspark.mllib.linalg import Vectors
from pyspark.ml.evaluation import ClusteringEvaluator 
from pyspark.ml.feature import VectorAssembler


# Transform the dataframe into a vector format
assembler = VectorAssembler(inputCols=pivotedDF_.columns, outputCol="features")
matrixDF_ = assembler.transform(pivotedDF_)

print(matrixDF_.shape())
matrixDF_.show(5, truncate=False)

(669, 17)
+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-----------------------------+
|movieId|1  |2  |3  |4  |5  |6  |7  |8  |9  |10 |11 |12 |13 |14 |15 |features                     |
+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-----------------------------+
|31     |2.5|0.0|0.0|0.0|0.0|0.0|3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|(16,[0,1,7],[31.0,2.5,3.0])  |
|1029   |3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|(16,[0,1],[1029.0,3.0])      |
|1061   |3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|(16,[0,1],[1061.0,3.0])      |
|1129   |2.0|0.0|0.0|0.0|0.0|0.0|3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|(16,[0,1,7],[1129.0,2.0,3.0])|
|1172   |4.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|(16,[0,1],[1172.0,4.0])      |
+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+-----------------------------+
only showing top 5 rows



In [168]:
from tqdm import tqdm

# Define the number of clusters
nb_clusters_ = 10

# Fit train the model
kmeans_ = KMeans().setK(nb_clusters).setSeed(1)
for _ in tqdm(range(100)) :
  model_ = kmeans_.fit(matrixDF_)

# Using the model to make predictions
movies_cluster_preds = model_.transform(matrixDF_)

100%|██████████| 100/100 [01:09<00:00,  1.44it/s]


In [169]:
movies_cluster_preds.drop("features").show(10, truncate=False)

+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----------+
|movieId|1  |2  |3  |4  |5  |6  |7  |8  |9  |10 |11 |12 |13 |14 |15 |prediction|
+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+----------+
|31     |2.5|0.0|0.0|0.0|0.0|0.0|3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0         |
|1029   |3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0         |
|1061   |3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0         |
|1129   |2.0|0.0|0.0|0.0|0.0|0.0|3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0         |
|1172   |4.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0         |
|1263   |2.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0         |
|1287   |2.0|0.0|0.0|0.0|0.0|0.0|4.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0         |
|1293   |2.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0         |
|1339   |3.5|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0         |
|1343   |2.0|0.0|0.0|0.0|0.0

In [170]:
from pyspark.sql.functions import col

# Displaying of the number of predicted data points for each cluster
counts_by_cluster_ = movies_cluster_preds.groupBy("prediction").count()
counts_by_cluster_.show()

+----------+-----+
|prediction|count|
+----------+-----+
|         0|  617|
|         1|   29|
|         2|   23|
+----------+-----+



In [171]:
evaluator_ = ClusteringEvaluator()

# Compute the silhouette score
silhouette_ = evaluator_.evaluate(movies_cluster_preds)
print("Silhouette using squared Euclidean distance as the measure : ", 
      silhouette_)

Silhouette using squared Euclidean distance as the measure :  0.9839773342849435


- > Once we have grouped the users and items into clusters, we can provide movie recommendations to a specific user by suggesting to him movies, that are well-liked by other users, in the same cluster as the intended user

In [129]:

def N_movies_related_to_user(user_id, number_of_movies) :

  # Determine the cluster to which the target user belongs to
  user_cluster = user_cluster_preds.filter(user_cluster_preds.userId==user_id) \
                                     .select("prediction").take(1)[0][0]

  # Identify the movies that are commonly preferred by users within the same cluster as the target user
  cluster_ = movies_cluster_preds.filter(movies_cluster_preds.prediction == user_cluster)
  # movies_cluster_preds.drop('features').show(5, truncate=False)

  # Obtain the movieId from the movie cluster
  cluster_movies = cluster_.select('movieId')

  # Select movies that the user has not interacted with 
  # and join the ratings dataframe with the movies dataframe
  user_movies = sparkDF.filter(sparkDF.userId!=user_id).select('movieId')
  cluster_movies_ = cluster_movies.subtract(user_movies)

  # Combine cluster_movies (popular movies among user's cluster) and the ratings dataframe by linking them on the movieId column
  cluster_movies = cluster_movies.join(sparkDF, 'movieId')
  # cluster_movies.show(5, truncate=False)

  # Group the movies by movieId and compute the average rating for each movie
  cluster_movies = cluster_movies.groupBy("movieId").agg({"rating": "avg"})

  # Display the average rating for each movie grouped by movieId"
  cluster_items = cluster_movies.orderBy(col("avg(rating)").desc())
  cluster_items.show(number_of_movies)


N_movies_related_to_user(5, 15)

+-------+-----------+
|movieId|avg(rating)|
+-------+-----------+
|  33166|        5.0|
|  30707|        4.5|
|  34162|        4.5|
|  40819|        4.5|
|  30749|        4.5|
|  41569|        4.0|
|  33679|        4.0|
|  35836|        4.0|
|  41566|        4.0|
|  27369|        3.5|
|  44191|        3.5|
|  30793|        3.5|
+-------+-----------+



## 3.2. Predict user rating for the movies they have not rated for

### 3.2.a. With *`ALS`* algorithm

In [228]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import col


# sparkDF = spark.read.options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
#                     .csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv")

sparkDF = spark.read.format("com.databricks.spark.csv") \
                  .options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
                  .csv('/content/drive/MyDrive/Colab_files/csv_files/ratings_small.csv')


# Training of the model
def train_test_eval_model(sparkDF) :

  # Split the data into training and test sets (20% held out for testing)
  train, test = sparkDF.randomSplit([0.8, 0.2])

  # Define the ALS algorithm
  als = ALS(maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

  # Fit the model to the training data
  model = als.fit(train)

  # Use the model to predict user ratings for movies they have not rated
  preds = model.transform(test)

  # Evaluate the model on the test data
  evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

  # Root Mean Squared Error (RMSE)
  rmse = evaluator.evaluate(preds)
  print("Root Mean Squared Error : ", str(rmse))

  # R2, the coefficient of determination
  r2 = evaluator.evaluate(preds, {evaluator.metricName: 'r2'})
  print("R2 : ", r2)

  return model, preds, rmse, r2


model = train_test_eval_model(sparkDF)[0]

Root Mean Squared Error :  1.1646260376054205
R2 :  -0.23081007205516713


In [238]:
# Generate top N movies recommendations for each user
def recommend_movies_(model, userId, nb_of_recommendations) :
    user_recommendations = model.recommendForAllUsers(nb_of_recommendations)
    recommendations = user_recommendations.filter(user_recommendations.userId == userId) \
                                          .select("recommendations")

    return recommendations.first()[0]


# Making recommendations for a user
userId = 3
nb_of_recommendations = 15
recommendations = recommend_movies_(model, userId, nb_of_recommendations)

# Display the recommendations
for rec in recommendations :
    print("Movie ID :", rec.movieId, " | Rating :", rec.rating)


Movie ID : 3420  | Rating : 6.726067066192627
Movie ID : 1735  | Rating : 6.2897138595581055
Movie ID : 1177  | Rating : 6.285809516906738
Movie ID : 1332  | Rating : 6.1861042976379395
Movie ID : 59387  | Rating : 6.121302604675293
Movie ID : 1286  | Rating : 5.950160980224609
Movie ID : 2278  | Rating : 5.8833441734313965
Movie ID : 2111  | Rating : 5.878601551055908
Movie ID : 4299  | Rating : 5.86716890335083
Movie ID : 33004  | Rating : 5.850200653076172
Movie ID : 88129  | Rating : 5.848258018493652
Movie ID : 2240  | Rating : 5.8331522941589355
Movie ID : 64969  | Rating : 5.793621063232422
Movie ID : 5881  | Rating : 5.792594909667969
Movie ID : 327  | Rating : 5.764148712158203


- > In the '*rating*' field, there are the ratings that the user could give to a given movie.



### 3.2.b. With *`K-Means`* algorithm

In [223]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.feature import VectorAssembler
from tqdm import tqdm

# sparkDF = spark.read.options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
#                     .csv("/content/drive/MyDrive/Colab_files/csv_files/joined_df_.csv")

sparkDF = spark.read.format("com.databricks.spark.csv") \
                  .options(multiline="True", header="True", inferSchema="True", delimiter=',', escape="\"") \
                  .csv('/content/drive/MyDrive/Colab_files/csv_files/ratings_small.csv')

# Limitation of the number of lines to limit the computation time
sparkDF = sparkDF.limit(1000)


def user_rating_preds(sparkDF) : 

  # Specify the columns to be used as input features (userId, movieId, and rating)
  sparkDF = sparkDF.select("userId", "movieId", "rating")

  # Convert the dataframe into a vector format
  assembler = VectorAssembler(inputCols=sparkDF.columns, outputCol="features")
  newDF = assembler.transform(sparkDF)

  # Initiate the KMeans clustering algorithm with setting the number of clusters
  nb_clusters = 5
  kmeans = KMeans().setK(nb_clusters).setSeed(1)

  # Train the model on the data
  for _ in tqdm(range(100)) :
    model = kmeans.fit(newDF)

  # Generate predictions for unrated movies
  preds = model.transform(newDF)
  preds.show(10, truncate=False)

  # Group the predictions by userId
  preds = preds.groupBy("userId").agg({"prediction": "mean"})

  # Rename the columns
  preds = preds.select("userId", "avg(prediction)")

  # Display the predictions
  preds.show(10, truncate=False)

  return preds


user_rating_preds(sparkDF)

100%|██████████| 100/100 [01:05<00:00,  1.52it/s]


+------+-------+------+----------------+----------+
|userId|movieId|rating|features        |prediction|
+------+-------+------+----------------+----------+
|1     |31     |2.5   |[1.0,31.0,2.5]  |0         |
|1     |1029   |3.0   |[1.0,1029.0,3.0]|0         |
|1     |1061   |3.0   |[1.0,1061.0,3.0]|0         |
|1     |1129   |2.0   |[1.0,1129.0,2.0]|0         |
|1     |1172   |4.0   |[1.0,1172.0,4.0]|0         |
|1     |1263   |2.0   |[1.0,1263.0,2.0]|0         |
|1     |1287   |2.0   |[1.0,1287.0,2.0]|0         |
|1     |1293   |2.0   |[1.0,1293.0,2.0]|0         |
|1     |1339   |3.5   |[1.0,1339.0,3.5]|0         |
|1     |1343   |2.0   |[1.0,1343.0,2.0]|0         |
+------+-------+------+----------------+----------+
only showing top 10 rows

+------+------------------+
|userId|avg(prediction)   |
+------+------------------+
|1     |0.2               |
|2     |0.0               |
|3     |0.9215686274509803|
|4     |0.0196078431372549|
|5     |1.54              |
|6     |1.0           

DataFrame[userId: int, avg(prediction): double]

- > This code generates predictions for movies that users have not yet rated, organizes the predictions by user ID, alters the column names, and displays the predictions.