In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
# We create a spark Session
spark = SparkSession.builder.appName("Jupyter Notebook").getOrCreate()

spark

In [3]:
# import the data 
data = spark.read.json("movie.json")

In [4]:
# We first keep only the relevant columns:
LR_data = data.select('users_rating','metascore','runtime','genre','languages')
LR_data.show(5)

+------------+---------+-------+------------------+-----------------+
|users_rating|metascore|runtime|             genre|        languages|
+------------+---------+-------+------------------+-----------------+
|        null|     null|   null|              null|             null|
|         6.6|       44| 92 min| [Comedy, Romance]|        [English]|
|         2.4|     null| 91 min|[Horror, Thriller]|        [English]|
|         7.8|       61|103 min|          [Comedy]|[English, German]|
|         7.3|       73|128 min| [Crime, Thriller]|        [English]|
+------------+---------+-------+------------------+-----------------+
only showing top 5 rows



In [5]:
from pyspark.sql.functions import expr
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType

# remove min from runtime
LR_data = LR_data.withColumn("runtime", expr("replace(runtime, ' min', '')"))

# keep first element of genre:
LR_data = LR_data.withColumn("genre", col("genre")[0])

# keep first element of languages:
LR_data = LR_data.withColumn("languages", col("languages")[0])

In [6]:
# change data types:
LR_data = LR_data.withColumn("users_rating", col("users_rating").cast("double"))
LR_data = LR_data.withColumn("metascore", col("metascore").cast("double"))
LR_data = LR_data.withColumn("runtime", col("runtime").cast("integer"))

In [7]:
LR_data.show()

+------------+---------+-------+---------+---------+
|users_rating|metascore|runtime|    genre|languages|
+------------+---------+-------+---------+---------+
|        null|     null|   null|     null|     null|
|         6.6|     44.0|     92|   Comedy|  English|
|         2.4|     null|     91|   Horror|  English|
|         7.8|     61.0|    103|   Comedy|  English|
|         7.3|     73.0|    128|    Crime|  English|
|         7.0|     66.0|     97|   Comedy|  English|
|         5.3|     64.0|     87|  Fantasy|  English|
|         6.2|     51.0|    128|   Action|  English|
|         8.0|     69.0|    143|   Action|  English|
|         8.3|     70.0|    126|    Drama|  English|
|         8.3|     84.0|    149|Adventure|  English|
|         8.3|     65.0|    170|    Crime|  English|
|         7.6|     78.0|    102|   Comedy|  English|
|         5.3|     45.0|     90|    Drama|  English|
|         8.3|     76.0|    116|    Drama|  English|
|         7.4|     63.0|    161|Adventure|  En

In [8]:
# we have NULL values for all columns. 
LR_data.filter(col("runtime").isNull()).count()

12169

In [9]:
# we remove rows that cotntain null values in the genre and language columns: 
LR_data = LR_data.filter(col("genre").isNotNull())
LR_data = LR_data.filter(col("languages").isNotNull())

In [10]:
# because we dont want to losse a lot information we will make imputations for the three continious variables: 
from pyspark.ml.feature import Imputer

# We will use the median for the imputation 
imputer = Imputer(inputCols=["users_rating","runtime", "metascore"], outputCols=["imputed_users_rating", "imputed_runtime", "imputed_metascore"]).setStrategy("median")
movie_dataset = imputer.fit(LR_data).transform(LR_data)

In [11]:
movie_dataset.show()

+------------+---------+-------+---------+---------+--------------------+---------------+-----------------+
|users_rating|metascore|runtime|    genre|languages|imputed_users_rating|imputed_runtime|imputed_metascore|
+------------+---------+-------+---------+---------+--------------------+---------------+-----------------+
|         6.6|     44.0|     92|   Comedy|  English|                 6.6|             92|             44.0|
|         2.4|     null|     91|   Horror|  English|                 2.4|             91|             53.0|
|         7.8|     61.0|    103|   Comedy|  English|                 7.8|            103|             61.0|
|         7.3|     73.0|    128|    Crime|  English|                 7.3|            128|             73.0|
|         7.0|     66.0|     97|   Comedy|  English|                 7.0|             97|             66.0|
|         5.3|     64.0|     87|  Fantasy|  English|                 5.3|             87|             64.0|
|         6.2|     51.0|    

In [12]:
# lets keep only the columns that we need: 
movie_dataset = movie_dataset.select('imputed_users_rating','imputed_runtime','imputed_metascore','genre','languages')

In [13]:
movie_dataset.show()

+--------------------+---------------+-----------------+---------+---------+
|imputed_users_rating|imputed_runtime|imputed_metascore|    genre|languages|
+--------------------+---------------+-----------------+---------+---------+
|                 6.6|             92|             44.0|   Comedy|  English|
|                 2.4|             91|             53.0|   Horror|  English|
|                 7.8|            103|             61.0|   Comedy|  English|
|                 7.3|            128|             73.0|    Crime|  English|
|                 7.0|             97|             66.0|   Comedy|  English|
|                 5.3|             87|             64.0|  Fantasy|  English|
|                 6.2|            128|             51.0|   Action|  English|
|                 8.0|            143|             69.0|   Action|  English|
|                 8.3|            126|             70.0|    Drama|  English|
|                 8.3|            149|             84.0|Adventure|  English|

In [14]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# Stri indexer and encoder: 
stringIndexer = StringIndexer(handleInvalid="keep", inputCols=["genre", "languages"], 
                              outputCols=[i + "Index" for i in ["genre", "languages"]])
encoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), 
                        outputCols=[i + "_One_Hot_Encoder" for i in ["genre", "languages"]])


In [15]:
# vector assembler:
assemblerInput = [ai + "_One_Hot_Encoder" for ai in ["genre", "languages"]] + ['imputed_runtime','imputed_metascore']
vecAss = VectorAssembler(inputCols=assemblerInput, outputCol="features")

In [16]:
assemblerInput

['genre_One_Hot_Encoder',
 'languages_One_Hot_Encoder',
 'imputed_runtime',
 'imputed_metascore']

In [17]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="imputed_users_rating")

In [18]:
# our data are almsost ready for the linear regression, lets split them:
train_data, test_data = movie_dataset.randomSplit([0.8, 0.2], seed=100)

In [19]:
# 48579 observation for training the model
train_data.count()

48579

In [20]:
# 12227 observations for testing the model
test_data.count()

12227

In [21]:
from pyspark.ml import Pipeline

# lets run the model and test it! 

# creating a pipeline to run everything 
pipeline = Pipeline(stages=[stringIndexer, encoder, vecAss, lr])

# run the pipeline 
Model = pipeline.fit(train_data)

# lets apply the model to the test data 
DF_PREDICTIONS = Model.transform(test_data)

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator

regEvaluator = RegressionEvaluator(labelCol="imputed_users_rating", predictionCol="prediction", metricName="r2")

r2 = regEvaluator.evaluate(DF_PREDICTIONS)
# R2 is 15% for the test data
r2

0.1503208001214773