In [98]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import RegressionEvaluator

In [60]:
spark=SparkSession.builder.appName('BD_Assignment3').getOrCreate()

In [61]:
df=spark.read.csv('Movies.csv',header=True)

In [62]:
df.printSchema()
df.show(5)

root
 |-- Year: string (nullable = true)
 |-- Length: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- Actor: string (nullable = true)
 |-- Actress: string (nullable = true)
 |-- Director: string (nullable = true)
 |-- Popularity: string (nullable = true)
 |-- Awards: string (nullable = true)
 |-- Image: string (nullable = true)

+----+------+--------------------+------+-----------------+-------------+---------------+----------+------+----------------+
|Year|Length|               Title| Genre|            Actor|      Actress|       Director|Popularity|Awards|           Image|
+----+------+--------------------+------+-----------------+-------------+---------------+----------+------+----------------+
|1990|   111|Tie Me Up! Tie Me...|Comedy|  BanderasAntonio|AbrilVictoria| AlmodóvarPedro|        68|    No|NicholasCage.png|
|1991|   113|          High Heels|Comedy|       BoséMiguel|AbrilVictoria| AlmodóvarPedro|        68|    No|Nichol

In [63]:
print(df.columns)


['Year', 'Length', 'Title', 'Genre', 'Actor', 'Actress', 'Director', 'Popularity', 'Awards', 'Image']


In [64]:
df = df.drop("Image", "Awards",'Title')

In [65]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+------+-----+-----+-------+--------+----------+
|Year|Length|Genre|Actor|Actress|Director|Popularity|
+----+------+-----+-----+-------+--------+----------+
|   0|    67|    2|    8|    378|     253|         6|
+----+------+-----+-----+-------+--------+----------+



In [66]:
mean_popularity = df.select(mean("Popularity")).first()[0]
df = df.fillna({"Popularity": mean_popularity})
mean_Length = df.select(mean("Length")).first()[0]
df = df.fillna({"Length": mean_Length})

In [67]:
df = df.dropna(subset=["Genre", "Actor", "Actress", "Director"])
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+------+-----+-----+-------+--------+----------+
|Year|Length|Genre|Actor|Actress|Director|Popularity|
+----+------+-----+-----+-------+--------+----------+
|   0|     0|    0|    0|      0|       0|         0|
+----+------+-----+-----+-------+--------+----------+



In [74]:
df = df.drop("GenreIndex", "ActorIndex", "ActressIndex", "DirectorIndex", "GenreVec", "ActorVec", "ActressVec", "DirectorVec")


In [75]:
indexers = [
    StringIndexer(inputCol="Genre", outputCol="GenreIndex"),
    StringIndexer(inputCol="Actor", outputCol="ActorIndex"),
    StringIndexer(inputCol="Actress", outputCol="ActressIndex"),
    StringIndexer(inputCol="Director", outputCol="DirectorIndex"),
]


In [76]:
encoders = [
    OneHotEncoder(inputCol="GenreIndex", outputCol="GenreVec"),
    OneHotEncoder(inputCol="ActorIndex", outputCol="ActorVec"),
    OneHotEncoder(inputCol="ActressIndex", outputCol="ActressVec"),
    OneHotEncoder(inputCol="DirectorIndex", outputCol="DirectorVec"),
]

In [77]:
pipeline = Pipeline(stages=indexers + encoders)
df = pipeline.fit(df).transform(df)

In [80]:
df = df.drop("features")

In [81]:
assembler = VectorAssembler(inputCols=["GenreVec", "ActorVec", "ActressVec", "DirectorVec"], outputCol="features")
df = assembler.transform(df)

In [82]:
df = df.drop("Genre", "Actor", "Actress", "Director")

In [83]:
df.show(5)

+----+------+----------+----------+----------+------------+-------------+--------------+-----------------+----------------+-----------------+--------------------+
|Year|Length|Popularity|GenreIndex|ActorIndex|ActressIndex|DirectorIndex|      GenreVec|         ActorVec|      ActressVec|      DirectorVec|            features|
+----+------+----------+----------+----------+------------+-------------+--------------+-----------------+----------------+-----------------+--------------------+
|1990|   111|        68|       1.0|      60.0|        94.0|         25.0|(14,[1],[1.0])| (597,[60],[1.0])|(663,[94],[1.0])| (652,[25],[1.0])|(1926,[1,74,705,1...|
|1991|   113|        68|       1.0|     208.0|        94.0|         25.0|(14,[1],[1.0])|(597,[208],[1.0])|(663,[94],[1.0])| (652,[25],[1.0])|(1926,[1,222,705,...|
|1983|   104|        79|       5.0|     155.0|        53.0|         11.0|(14,[5],[1.0])|(597,[155],[1.0])|(663,[53],[1.0])| (652,[11],[1.0])|(1926,[5,169,664,...|
|1979|   122|         

In [91]:
df = df.withColumn("Length", df["Length"].cast(DoubleType()))

In [92]:
df.printSchema()

root
 |-- Year: string (nullable = true)
 |-- Length: double (nullable = true)
 |-- Popularity: string (nullable = false)
 |-- GenreIndex: double (nullable = false)
 |-- ActorIndex: double (nullable = false)
 |-- ActressIndex: double (nullable = false)
 |-- DirectorIndex: double (nullable = false)
 |-- GenreVec: vector (nullable = true)
 |-- ActorVec: vector (nullable = true)
 |-- ActressVec: vector (nullable = true)
 |-- DirectorVec: vector (nullable = true)
 |-- features: vector (nullable = true)



In [93]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=1234)

In [94]:
lr = LinearRegression(featuresCol="features", labelCol="Length")
dt = DecisionTreeRegressor(featuresCol="features", labelCol="Length")
rf = RandomForestRegressor(featuresCol="features", labelCol="Length")

In [95]:
lr_model = lr.fit(train_data)
dt_model = dt.fit(train_data)
rf_model = rf.fit(train_data)

In [96]:
lr_predictions = lr_model.transform(test_data)
dt_predictions = dt_model.transform(test_data)
rf_predictions = rf_model.transform(test_data)

In [97]:
lr_predictions.show(5)
dt_predictions.show(5)
rf_predictions.show(5)

+----+------------------+----------+----------+----------+------------+-------------+--------------+-----------------+-----------------+-----------------+--------------------+------------------+
|Year|            Length|Popularity|GenreIndex|ActorIndex|ActressIndex|DirectorIndex|      GenreVec|         ActorVec|       ActressVec|      DirectorVec|            features|        prediction|
+----+------------------+----------+----------+----------+------------+-------------+--------------+-----------------+-----------------+-----------------+--------------------+------------------+
|1924|              95.0|        74|       0.0|     418.0|       583.0|         34.0|(14,[0],[1.0])|(597,[418],[1.0])|(663,[583],[1.0])| (652,[34],[1.0])|(1926,[0,432,1194...|102.32069228034689|
|1926|              66.0|        76|       3.0|     241.0|       219.0|          1.0|(14,[3],[1.0])|(597,[241],[1.0])|(663,[219],[1.0])|  (652,[1],[1.0])|(1926,[3,255,830,...| 99.90920069500048|
|1931|              74.0|

In [99]:
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Length", metricName="rmse")

In [100]:
lr_rmse = evaluator.evaluate(lr_predictions)
dt_rmse = evaluator.evaluate(dt_predictions)
rf_rmse = evaluator.evaluate(rf_predictions)


In [101]:
print(f"Linear Regression RMSE: {lr_rmse}")
print(f"Decision Tree RMSE: {dt_rmse}")
print(f"Random Forest RMSE: {rf_rmse}")


Linear Regression RMSE: 43.59517010029684
Decision Tree RMSE: 39.17585883067662
Random Forest RMSE: 39.449560302306324


In [102]:
evaluator.setMetricName("r2")

RegressionEvaluator_56033b4e02da

In [103]:
lr_r2 = evaluator.evaluate(lr_predictions)
dt_r2 = evaluator.evaluate(dt_predictions)
rf_r2 = evaluator.evaluate(rf_predictions)

In [104]:
print(f"Linear Regression R²: {lr_r2}")
print(f"Decision Tree R²: {dt_r2}")
print(f"Random Forest R²: {rf_r2}")

Linear Regression R²: -0.2285509100400256
Decision Tree R²: 0.00790455202835838
Random Forest R²: -0.006006389045799088


In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

try:
    sc = SparkContext.getOrCreate()
    sc.stop()
except:
    pass

try:
    spark = SparkSession.builder.getOrCreate()
    spark.stop()
except:
    pass
