# <span style="font-size: 1em">Spark</span><span style="font-size: 0.8em"> Assignment</span>
<h3>Big Data Systems 2022-2023</h3>
<h5>M.Sc. In Business Analytics (Part Time) 2022-2024 at Athens University of Economics and Business (A.U.E.B.)</h5>
<hr>

> Student: Panagiotis G. Vaidomarkakis<br />
> Student I.D.: p2822203<br />
> Tutor: Thanasis Vergoulis<br />
> Due Date: 15/04/2023

## Table Of Contents:
* [Importing Libraries](#first-bullet)
* [$1^{st}$ Question](#q1)
* [$2^{nd}$ Question](#q2)
* [$3^{rd}$ Question](#q3)
* [$4^{th}$ Question](#q4)

## Importing Libraries <a class="anchor" id="first-bullet"></a>
In the following lines, we will import all the nessecary liblaries in order to be able to execute all the following commands. <br> First, we will run a check to see if the PC containing this Jupiter Notebook file has all the necessary libraries and if it hasn't, it will automatically download them in order to import them:

In [1]:
import importlib
import subprocess

def install_library(lib):
    try:
        importlib.import_module(lib)
        print(f'{lib} is already installed.')
    except ImportError:
        print(f'{lib} is not installed. Installing now...')
        subprocess.call(['pip ', 'install ', lib])

libraries = ['pyspark','pyspark.sql','pyspark.ml.feature', 'pyspark.ml.evaluation']

for lib in libraries:
    install_library(lib)

pyspark is already installed.
pyspark.sql is already installed.
pyspark.ml.feature is already installed.
pyspark.ml.evaluation is already installed.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import array, col, mean, when, isnan, count, ntile, floor, round, max
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

## $1^{st}$ Question <a class="anchor" id="q1"></a>
Use the *json()* function to load the dataset.<br>
After that, prepare the feature vectors.

In [3]:
# Create a SparkSession
spark = SparkSession.builder.appName('Loading JSON Data').getOrCreate()

In [4]:
# Load the JSON data as a DataFrame using the json() function
json_movie = spark.read.json('movie.json')
json_movie.show(5)

+---------------+--------------------+---------+--------------------+--------------------+------------------+--------------------+--------------------+-----------------+---------+------+-------+--------------------+--------------------+------------+-------+----+
|_corrupt_record|              actors|countries|         description|           directors|             genre|            imdb_url|             img_url|        languages|metascore|rating|runtime|             tagline|               title|users_rating|  votes|year|
+---------------+--------------------+---------+--------------------+--------------------+------------------+--------------------+--------------------+-----------------+---------+------+-------+--------------------+--------------------+------------+-------+----+
|              [|                null|     null|                null|                null|              null|                null|                null|             null|     null|  null|   null|                n

In [5]:
# Keep only columns that I want to have in my prediction and remove null when the whole row is null
json_movie_in = json_movie.select("metascore", "runtime", "genre", "languages", "users_rating").dropna(how="all")
json_movie_in.show(5)
json_movie_in.printSchema()

+---------+-------+------------------+--------------------+------------+
|metascore|runtime|             genre|           languages|users_rating|
+---------+-------+------------------+--------------------+------------+
|       44| 92 min| [Comedy, Romance]|           [English]|         6.6|
|     null| 91 min|[Horror, Thriller]|           [English]|         2.4|
|       61|103 min|          [Comedy]|   [English, German]|         7.8|
|       73|128 min| [Crime, Thriller]|           [English]|         7.3|
|       66| 97 min|          [Comedy]|[English, German,...|         7.0|
+---------+-------+------------------+--------------------+------------+
only showing top 5 rows

root
 |-- metascore: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- genre: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- languages: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- users_rating: string (nullable = true)



Now I need to keep only the first genre and first language.

In [6]:
json_movie_in = json_movie_in.withColumn("first_language", col('genre')[0]) \
       .withColumn("first_genre", col('languages')[0]) \
       .drop("languages", "genre")
json_movie_in.show(5)
json_movie_in.printSchema()

+---------+-------+------------+--------------+-----------+
|metascore|runtime|users_rating|first_language|first_genre|
+---------+-------+------------+--------------+-----------+
|       44| 92 min|         6.6|        Comedy|    English|
|     null| 91 min|         2.4|        Horror|    English|
|       61|103 min|         7.8|        Comedy|    English|
|       73|128 min|         7.3|         Crime|    English|
|       66| 97 min|         7.0|        Comedy|    English|
+---------+-------+------------+--------------+-----------+
only showing top 5 rows

root
 |-- metascore: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- users_rating: string (nullable = true)
 |-- first_language: string (nullable = true)
 |-- first_genre: string (nullable = true)



Now, we need to fix the schema in order to be correct

In [7]:
json_movie_in = json_movie_in.withColumn("metascore", col("metascore").cast("int"))
json_movie_in = json_movie_in.withColumn("runtime", col("runtime").substr(1, 3).cast("int"))
json_movie_in = json_movie_in.withColumn("users_rating", col("users_rating").cast("float"))
json_movie_in.printSchema()

root
 |-- metascore: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- users_rating: float (nullable = true)
 |-- first_language: string (nullable = true)
 |-- first_genre: string (nullable = true)



Now, that we have the right schema (the right columns), we need to see what we can do with NULL values.

In [8]:
json_movie_in.select([count(when(col(c).isNull(), c)).alias(c) for c in json_movie_in.columns]).show()

+---------+-------+------------+--------------+-----------+
|metascore|runtime|users_rating|first_language|first_genre|
+---------+-------+------------+--------------+-----------+
|    52447|  12176|           2|          1252|          0|
+---------+-------+------------+--------------+-----------+



We have many NULL values in metascore and runtime so we need to do something about that.

In [9]:
json_movie_in = json_movie_in.withColumn("users_rating_group", floor(json_movie_in["users_rating"] * 2))  # multiply by 2 and floor to get some group number
json_movie_in.show(5)

+---------+-------+------------+--------------+-----------+------------------+
|metascore|runtime|users_rating|first_language|first_genre|users_rating_group|
+---------+-------+------------+--------------+-----------+------------------+
|       44|     92|         6.6|        Comedy|    English|                13|
|     null|     91|         2.4|        Horror|    English|                 4|
|       61|    103|         7.8|        Comedy|    English|                15|
|       73|    128|         7.3|         Crime|    English|                14|
|       66|     97|         7.0|        Comedy|    English|                14|
+---------+-------+------------+--------------+-----------+------------------+
only showing top 5 rows



In [10]:
json_movie_in = json_movie_in.withColumn("runtime_group", (json_movie_in.runtime / 5).cast("integer"))  # divide by 5 to get some group number
json_movie_in.show(5)

+---------+-------+------------+--------------+-----------+------------------+-------------+
|metascore|runtime|users_rating|first_language|first_genre|users_rating_group|runtime_group|
+---------+-------+------------+--------------+-----------+------------------+-------------+
|       44|     92|         6.6|        Comedy|    English|                13|           18|
|     null|     91|         2.4|        Horror|    English|                 4|           18|
|       61|    103|         7.8|        Comedy|    English|                15|           20|
|       73|    128|         7.3|         Crime|    English|                14|           25|
|       66|     97|         7.0|        Comedy|    English|                14|           19|
+---------+-------+------------+--------------+-----------+------------------+-------------+
only showing top 5 rows



In [11]:
json_movie_in = json_movie_in.withColumn("metascore_group", (json_movie_in.metascore / 5).cast("integer"))  # divide by 5 to get group number
json_movie_in.show(5)

+---------+-------+------------+--------------+-----------+------------------+-------------+---------------+
|metascore|runtime|users_rating|first_language|first_genre|users_rating_group|runtime_group|metascore_group|
+---------+-------+------------+--------------+-----------+------------------+-------------+---------------+
|       44|     92|         6.6|        Comedy|    English|                13|           18|              8|
|     null|     91|         2.4|        Horror|    English|                 4|           18|           null|
|       61|    103|         7.8|        Comedy|    English|                15|           20|             12|
|       73|    128|         7.3|         Crime|    English|                14|           25|             14|
|       66|     97|         7.0|        Comedy|    English|                14|           19|             13|
+---------+-------+------------+--------------+-----------+------------------+-------------+---------------+
only showing top 5 

In [12]:
# calculate mean of each group for metascore, runtime, and users_rating
metascore_means = json_movie_in.groupBy(['first_language', 'first_genre', 'users_rating_group', 'runtime_group']).agg(mean('metascore').alias('meta_mean'))
runtime_means = json_movie_in.groupBy(['first_language', 'first_genre', 'users_rating_group', 'metascore_group']).agg(mean('runtime').alias('runtime_mean'))
users_rating_means = json_movie_in.groupBy(['first_language', 'first_genre', 'metascore_group', 'runtime_group']).agg(mean('users_rating').alias('rating_mean'))
first_language_means = json_movie_in.groupBy(['first_genre', 'metascore_group', 'runtime_group','users_rating_group']).agg(max('first_language').alias('language_max'))

# join the means dataframes with the original dataframe
json_movie_in = json_movie_in.join(metascore_means, on=['first_language', 'first_genre', 'users_rating_group', 'runtime_group'], how='left')
json_movie_in = json_movie_in.join(runtime_means, on=['first_language', 'first_genre', 'users_rating_group', 'metascore_group'], how='left')
json_movie_in = json_movie_in.join(users_rating_means, on=['first_language', 'first_genre', 'metascore_group', 'runtime_group'], how='left')
json_movie_in = json_movie_in.join(first_language_means, on=['first_genre', 'metascore_group', 'runtime_group','users_rating_group'], how='left')

# fill missing values in metascore, runtime, and users_rating using the mean of corresponding group
json_movie_in = json_movie_in.withColumn('metascore', when(json_movie_in['metascore'].isNull(), json_movie_in['meta_mean']).otherwise(json_movie_in['metascore']))
json_movie_in = json_movie_in.withColumn('runtime', when(json_movie_in['runtime'].isNull(), json_movie_in['runtime_mean']).otherwise(json_movie_in['runtime']))
json_movie_in = json_movie_in.withColumn('users_rating', when(json_movie_in['users_rating'].isNull(), json_movie_in['rating_mean']).otherwise(json_movie_in['users_rating']))
json_movie_in = json_movie_in.withColumn('first_language', when(json_movie_in['first_language'].isNull(), json_movie_in['language_max']).otherwise(json_movie_in['first_language']))

# drop all the uneccesary columns
json_movie_in = json_movie_in.drop('meta_mean', 'runtime_mean', 'rating_mean','metascore_group', 'runtime_group', 'users_rating_group', 'language_max')

# fix the values again so we have the right schema
json_movie_in = json_movie_in.withColumn("users_rating", round("users_rating", 1))
json_movie_in = json_movie_in.withColumn("metascore", col("metascore").cast("int"))
json_movie_in = json_movie_in.withColumn("runtime", col("runtime").substr(1, 3).cast("int"))
json_movie_in.show(5)

+-----------+--------------+---------+-------+------------+
|first_genre|first_language|metascore|runtime|users_rating|
+-----------+--------------+---------+-------+------------+
|    English|        Comedy|       61|    103|         7.8|
|    English|       Fantasy|       64|     87|         5.3|
|    English|         Crime|       73|    128|         7.3|
|    English|        Comedy|       66|     97|         7.0|
|    English|        Comedy|       44|     92|         6.6|
+-----------+--------------+---------+-------+------------+
only showing top 5 rows



In [13]:
json_movie_in.select([count(when(col(c).isNull(), c)).alias(c) for c in json_movie_in.columns]).show()

+-----------+--------------+---------+-------+------------+
|first_genre|first_language|metascore|runtime|users_rating|
+-----------+--------------+---------+-------+------------+
|          0|          1251|    25335|  11657|           2|
+-----------+--------------+---------+-------+------------+



In the above steps, we tried to replace the missing values based on the mean. We didn't just take the mean because each movie has different genre, language, runtime, metascore and user rating. We tried to group the values based on genre and language and then we categorize metascore, runtime and users_rating. Then, we took the mean for each of these groups in order to fill some missing values. The result of the missing values that we could fill is above. If we try to fill rest of them, we will not have good results so now we need to drop NA's.

In [14]:
json_movie_in = json_movie_in.dropna()
json_movie_in.show(5)

+-----------+--------------+---------+-------+------------+
|first_genre|first_language|metascore|runtime|users_rating|
+-----------+--------------+---------+-------+------------+
|    English|        Comedy|       25|     89|         3.1|
|    English|         Crime|       45|    114|         5.9|
|    English|        Comedy|       13|     89|         3.1|
|    English|        Comedy|       25|     88|         3.4|
|    English|        Comedy|       21|     89|         3.3|
+-----------+--------------+---------+-------+------------+
only showing top 5 rows



In [15]:
json_movie_in.select([count(when(col(c).isNull(), c)).alias(c) for c in json_movie_in.columns]).show()
json_movie_in.count()

+-----------+--------------+---------+-------+------------+
|first_genre|first_language|metascore|runtime|users_rating|
+-----------+--------------+---------+-------+------------+
|          0|             0|        0|      0|           0|
+-----------+--------------+---------+-------+------------+



36661

We left with 36,6K observations instead of 62K but we fill them based on the mean of similar movies. If we just drop all NA's from the beginning, we would have left with less than 10k observations for the model.

## $2^{nd}$ Question <a class="anchor" id="q2"></a>
Preparation of the training and testing datasets (85%-15%)

In [16]:
trainDF, testDF = json_movie_in.randomSplit([0.85, 0.15], seed=42)

print(trainDF.cache().count()) # Cache because accessing training data multiple times

print(testDF.count())

31098
5488


We will deal now with categorical variables like genre and language and we will transform them using one-hot encoding.

In [17]:
# We determine which of the columns are categorical.
categoricalCols = ['first_genre','first_language']

# The following two lines are estimators. They return functions that we will later apply to transform the dataset.
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols], handleInvalid="keep") 
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols])

stringIndexerModel = stringIndexer.fit(trainDF)
stringIndexerModel.transform(trainDF).show(7)

+-----------+--------------+---------+-------+------------+----------------+-------------------+
|first_genre|first_language|metascore|runtime|users_rating|first_genreIndex|first_languageIndex|
+-----------+--------------+---------+-------+------------+----------------+-------------------+
|    English|        Comedy|       41|     94|         7.1|             0.0|                0.0|
|    English|        Comedy|       41|    107|         7.3|             0.0|                0.0|
|    English|        Comedy|       42|     97|         7.4|             0.0|                0.0|
|    English|        Comedy|       44|    103|         7.1|             0.0|                0.0|
|    English|        Comedy|       44|    107|         7.1|             0.0|                0.0|
|    English|        Comedy|       44|    110|         7.0|             0.0|                0.0|
|    English|         Drama|       85|     98|         7.5|             0.0|                1.0|
+-----------+--------------+--

We will put now all the variables in one vector

In [18]:
# This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.
numericCols = ['metascore', 'runtime']
assemblerInputs = [c + "OHE" for c in categoricalCols] + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

## $3^{rd}$ Question <a class="anchor" id="q3"></a>
Train the model

In [19]:
# train the model using linear regression
lr = LinearRegression(featuresCol="features", labelCol="users_rating", maxIter=10, regParam=0.3, elasticNetParam=0.8)

## $4^{th}$ Question <a class="anchor" id="q4"></a>
Evaluate the accuracy of the model using $R^{2}$

In [20]:
# Define the pipeline based on the stages created in previous steps.
pipeline = Pipeline(stages=[stringIndexer, encoder, vecAssembler, lr])

# Define the pipeline model.
pipelineModel = pipeline.fit(trainDF)

# Apply the pipeline model to the test dataset to classify the respective samples.
predDF = pipelineModel.transform(testDF)
predDF.select('features', 'users_rating','prediction').show(5)

+--------------------+------------+-----------------+
|            features|users_rating|       prediction|
+--------------------+------------+-----------------+
|(63,[14,44,61,62]...|         5.6|5.562481871145322|
|(63,[4,44,61,62],...|         6.6|5.562481871145322|
|(63,[43,61,62],[1...|         7.2|5.393450589052653|
|(63,[0,44,61,62],...|         2.9|3.998942511788143|
|(63,[0,44,61,62],...|         2.5|3.998942511788143|
+--------------------+------------+-----------------+
only showing top 5 rows



In [21]:
# evaluate the accuracy of the model using the test dataset
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="users_rating", metricName="r2")
r2 = evaluator.evaluate(predDF)
print("R-squared on test data = {:.2f}%".format(r2*100))

R-squared on test data = 62.47%
