## PySpark ML Discussion in Tutorial5

In [2]:
# Create SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PysparkML").getOrCreate()
spark

In [3]:
# Read the file from csv
df =spark.read.csv("top_4000_movies_data.csv",header=True,inferSchema=True)
df.show()

+------------+--------------------+-----------------+--------------+---------------+
|Release Date|         Movie Title|Production Budget|Domestic Gross|Worldwide Gross|
+------------+--------------------+-----------------+--------------+---------------+
|   4/23/2019|   Avengers: Endgame|        400000000|     858373000|     2797800564|
|   5/20/2011|Pirates of the Ca...|        379000000|     241071802|     1045713802|
|   4/22/2015|Avengers: Age of ...|        365000000|     459005868|     1395316979|
|  12/16/2015|Star Wars Ep. VII...|        306000000|     936662225|     2064615817|
|   4/25/2018|Avengers: Infinit...|        300000000|     678815482|     2044540523|
|   5/24/2007|Pirates of the Ca...|        300000000|     309420425|      960996492|
|  11/13/2017|      Justice League|        300000000|     229024295|      655945209|
|   10/6/2015|             Spectre|        300000000|     200074175|      879500760|
|  12/18/2019|Star Wars: The Ri...|        275000000|     5152025

In [4]:
df.printSchema()

root
 |-- Release Date: string (nullable = true)
 |-- Movie Title: string (nullable = true)
 |-- Production Budget: integer (nullable = true)
 |-- Domestic Gross: integer (nullable = true)
 |-- Worldwide Gross: long (nullable = true)



In [7]:
df.columns

['Release Date',
 'Movie Title',
 'Production Budget',
 'Domestic Gross',
 'Worldwide Gross']

In [17]:
# "VectorAssembler" is a transformer that combines a given list of columns into a single vector column.
from pyspark.ml.feature import VectorAssembler
feature_assembler = VectorAssembler(inputCols= ["Production Budget","Domestic Gross"],outputCol="VectorAssembler",handleInvalid="error")
output=feature_assembler.transform(df)
output.show()

+------------+--------------------+-----------------+--------------+---------------+--------------------+
|Release Date|         Movie Title|Production Budget|Domestic Gross|Worldwide Gross|     VectorAssembler|
+------------+--------------------+-----------------+--------------+---------------+--------------------+
|   4/23/2019|   Avengers: Endgame|        400000000|     858373000|     2797800564|   [4.0E8,8.58373E8]|
|   5/20/2011|Pirates of the Ca...|        379000000|     241071802|     1045713802|[3.79E8,2.4107180...|
|   4/22/2015|Avengers: Age of ...|        365000000|     459005868|     1395316979|[3.65E8,4.5900586...|
|  12/16/2015|Star Wars Ep. VII...|        306000000|     936662225|     2064615817|[3.06E8,9.3666222...|
|   4/25/2018|Avengers: Infinit...|        300000000|     678815482|     2044540523|[3.0E8,6.78815482E8]|
|   5/24/2007|Pirates of the Ca...|        300000000|     309420425|      960996492|[3.0E8,3.09420425E8]|
|  11/13/2017|      Justice League|        300

In [21]:
output.columns

['Release Date',
 'Movie Title',
 'Production Budget',
 'Domestic Gross',
 'Worldwide Gross',
 'VectorAssembler']

In [26]:
finalized_data=output.select("Production Budget","VectorAssembler")
finalized_data.show()

+-----------------+--------------------+
|Production Budget|     VectorAssembler|
+-----------------+--------------------+
|        400000000|   [4.0E8,8.58373E8]|
|        379000000|[3.79E8,2.4107180...|
|        365000000|[3.65E8,4.5900586...|
|        306000000|[3.06E8,9.3666222...|
|        300000000|[3.0E8,6.78815482E8]|
|        300000000|[3.0E8,3.09420425E8]|
|        300000000|[3.0E8,2.29024295E8]|
|        300000000|[3.0E8,2.00074175E8]|
|        275000000|[2.75E8,5.1520254...|
|        275000000|[2.75E8,2.1376751...|
|        263700000|[2.637E8,7.305867...|
|        263000000|[2.63E8,3.3036019...|
|        262000000|[2.62E8,6.2018138...|
|        260000000|[2.6E8,5.43638043E8]|
|        260000000|[2.6E8,2.00821936E8]|
|        258000000|[2.58E8,3.3653030...|
|        250000000|[2.5E8,4.08084349E8]|
|        250000000|[2.5E8,3.02089278E8]|
|        250000000|[2.5E8,2.58241522E8]|
|        250000000|[2.5E8,2.55119788E8]|
+-----------------+--------------------+
only showing top

In [27]:
#Linear Regression
from pyspark.ml.regression import LinearRegression

#train, test spilt
train_data,test_data= finalized_data.randomSplit([0.75,0.25])

#Train the linear regression model
regression = LinearRegression(featuresCol="VectorAssembler",labelCol ="Production Budget" )
regression = regression.fit(train_data)

In [28]:
# Coefficients
regression.coefficients

DenseVector([1.0, 0.0])

In [29]:
#Intercept
regression.intercept

0.0

In [30]:
#Prediction
pred_results = regression.evaluate(test_data)

In [36]:
pred_results.predictions.show()

+-----------------+--------------------+----------+
|Production Budget|     VectorAssembler|prediction|
+-----------------+--------------------+----------+
|          9500000|[9500000.0,115155.0]| 9500000.0|
|          9500000|[9500000.0,5.7176...| 9500000.0|
|          9800000|[9800000.0,1.6044...| 9800000.0|
|         10000000|         [1.0E7,0.0]|     1.0E7|
|         10000000|         [1.0E7,0.0]|     1.0E7|
|         10000000|         [1.0E7,0.0]|     1.0E7|
|         10000000|         [1.0E7,0.0]|     1.0E7|
|         10000000|      [1.0E7,6422.0]|     1.0E7|
|         10000000|      [1.0E7,9069.0]|     1.0E7|
|         10000000|     [1.0E7,17472.0]|     1.0E7|
|         10000000|    [1.0E7,137221.0]|     1.0E7|
|         10000000|    [1.0E7,221805.0]|     1.0E7|
|         10000000|   [1.0E7,1562800.0]|     1.0E7|
|         10000000|   [1.0E7,1799322.0]|     1.0E7|
|         10000000|   [1.0E7,2000000.0]|     1.0E7|
|         10000000|   [1.0E7,2185266.0]|     1.0E7|
|         10

In [37]:
pred_results.meanAbsoluteError,pred_results.meanSquaredError

(0.0, 0.0)