In [1]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext()
spark = SparkSession(sc)

In [2]:
data = spark.read.options(header='True', inferSchema=True).csv('hdfs://hadoop-master:9000/preprocess_data_to_train_model.csv')

In [3]:
data.printSchema()

root
 |-- title: string (nullable = true)
 |-- year: double (nullable = true)
 |-- rating: double (nullable = true)
 |-- runtime: double (nullable = true)
 |-- kind: integer (nullable = true)
 |-- color_info: integer (nullable = true)
 |-- sound_mix: integer (nullable = true)
 |-- director_name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- director_id: string (nullable = true)
 |-- cast_names: string (nullable = true)
 |-- cast_ids: string (nullable = true)
 |-- votes: string (nullable = true)
 |-- country: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- age: string (nullable = true)
 |-- number_cast: string (nullable = true)



In [4]:
from pyspark.sql.types import IntegerType
data = data.withColumn("votes", data["votes"].cast(IntegerType()))
data = data.withColumn("country", data["country"].cast(IntegerType()))
data = data.withColumn("day", data["day"].cast(IntegerType()))
data = data.withColumn("month", data["month"].cast(IntegerType()))
data = data.withColumn("age", data["age"].cast(IntegerType()))
data = data.withColumn("number_cast", data["number_cast"].cast(IntegerType()))

In [5]:
data.printSchema()

root
 |-- title: string (nullable = true)
 |-- year: double (nullable = true)
 |-- rating: double (nullable = true)
 |-- runtime: double (nullable = true)
 |-- kind: integer (nullable = true)
 |-- color_info: integer (nullable = true)
 |-- sound_mix: integer (nullable = true)
 |-- director_name: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- director_id: string (nullable = true)
 |-- cast_names: string (nullable = true)
 |-- cast_ids: string (nullable = true)
 |-- votes: integer (nullable = true)
 |-- country: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- number_cast: integer (nullable = true)



In [6]:
data

DataFrame[title: string, year: double, rating: double, runtime: double, kind: int, color_info: int, sound_mix: int, director_name: string, genre: string, director_id: string, cast_names: string, cast_ids: string, votes: int, country: int, day: int, month: int, age: int, number_cast: int]

In [7]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['year', 'runtime', 'kind', 'color_info', 'sound_mix',
       'votes', 'country', 'day', 'month', 'age', 'number_cast'], outputCol = 'features')
movie_df = vectorAssembler.setHandleInvalid("skip").transform(data)
movie_df = movie_df.select(['features', 'rating'])
# movie_df.show()

In [8]:
splits = movie_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

In [9]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='rating', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.00010049872996590219,0.0019438997220908706,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.00010049872997920101,0.0]
Intercept: 5.951667884894286


In [10]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 1.217123
r2: 0.017396


In [11]:
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","rating","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="rating",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-----------------+------+--------------------+
|       prediction|rating|            features|
+-----------------+------+--------------------+
|5.851842617929861|   5.9|[1894.0,40.0,1.0,...|
|5.907747640073246|   6.1|[1906.0,70.0,1.0,...|
|6.011311242846537|   7.4|[1913.0,124.0,1.0...|
|5.875037264840231|   6.4|[1914.0,54.0,1.0,...|
|5.876981164562322|   5.8|[1914.0,55.0,1.0,...|
+-----------------+------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.0188672


In [12]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 1.2127


In [13]:
print("numIterations: %d" % trainingSummary.totalIterations)
print("objectiveHistory: %s" % str(trainingSummary.objectiveHistory))
trainingSummary.residuals.show()

numIterations: 9
objectiveHistory: [0.5, 0.49986712396105876, 0.49930976210576794, 0.499299483259487, 0.4992960925746888, 0.499296016806366, 0.49929601293854287, 0.4992960128340995, 0.4992960128307556, 0.49929601283065606]
+--------------------+
|           residuals|
+--------------------+
|0.026303642502024438|
|-0.08710265809266637|
|  0.4962633585818814|
|  1.1247617376998242|
|  1.1111544396451878|
|  0.5645008463150072|
|  0.5645008463150072|
|  0.5645008463150072|
| 0.05866914714873417|
|  1.0547813477045525|
|  1.0431179493720073|
| -0.1789250642844129|
|  -0.684756763450685|
|  1.0152432365493151|
| 0.21135543710513272|
|-0.19253236233904847|
| -0.5275225573366846|
|  0.3705335429412244|
|   0.232939331508077|
|  0.6251637326197139|
+--------------------+
only showing top 20 rows





In [14]:
predictions = lr_model.transform(test_df)
predictions.select("prediction","rating","features").show()

+------------------+------+--------------------+
|        prediction|rating|            features|
+------------------+------+--------------------+
| 5.851842617929861|   5.9|[1894.0,40.0,1.0,...|
| 5.907747640073246|   6.1|[1906.0,70.0,1.0,...|
| 6.011311242846537|   7.4|[1913.0,124.0,1.0...|
| 5.875037264840231|   6.4|[1914.0,54.0,1.0,...|
| 5.876981164562322|   5.8|[1914.0,55.0,1.0,...|
| 5.921690858170412|   6.4|[1914.0,78.0,1.0,...|
| 5.933354256502957|   6.9|[1914.0,84.0,1.0,...|
| 5.945017654835502|   6.3|[1914.0,90.0,1.0,...|
| 5.859285069603559|   6.9|[1915.0,46.0,1.0,...|
|  5.88455576599074|   6.0|[1915.0,59.0,1.0,...|
|5.9098264623779215|   6.9|[1915.0,72.0,1.0,...|
| 6.588247465387635|   7.3|[1915.0,421.0,1.0...|
| 5.866859671031977|   6.4|[1916.0,50.0,1.0,...|
| 5.866859671031977|   6.6|[1916.0,50.0,1.0,...|
| 5.886298668252886|   6.9|[1916.0,60.0,1.0,...|
| 6.037922846575974|   6.5|[1916.0,138.0,1.0...|
| 5.870546473016214|   6.1|[1917.0,52.0,1.0,...|
| 5.872490372738305|

In [15]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'rating')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_evaluator = RegressionEvaluator(
    labelCol="rating", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 1.05904


In [16]:
dt_model.featureImportances

SparseVector(11, {0: 0.0383, 1: 0.3271, 3: 0.0886, 5: 0.2397, 6: 0.156, 9: 0.1402, 10: 0.01})

In [17]:
movie_df.take(1)


[Row(features=DenseVector([1894.0, 40.0, 1.0, 0.0, 4.0, 191.0, 136.0, 0.0, 8.0, 127.0, 3.0]), rating=5.9)]

In [18]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'rating', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'rating', 'features').show(5)

+------------------+------+--------------------+
|        prediction|rating|            features|
+------------------+------+--------------------+
| 6.072570351339152|   5.9|[1894.0,40.0,1.0,...|
| 6.653217690548708|   6.1|[1906.0,70.0,1.0,...|
| 7.054937696948859|   7.4|[1913.0,124.0,1.0...|
| 6.056904834370278|   6.4|[1914.0,54.0,1.0,...|
|6.0366864610484186|   5.8|[1914.0,55.0,1.0,...|
+------------------+------+--------------------+
only showing top 5 rows



In [19]:
gbt_evaluator = RegressionEvaluator(
    labelCol="rating", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 1.03105
