# Extract data from MovieLens dataset


In [3]:
# First, we have to create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MovieLens").getOrCreate()

In [4]:
# We will use PySpark to read movies.csv and ratings.csv
movie = spark.read.csv("movies.csv", header=True, inferSchema=True)
rating = spark.read.csv("ratings.csv", header=True, inferSchema=True)

In [5]:
# Print the first 5 movies from the movies dataset
movie.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [6]:
# Print the first 5 ratings
rating.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      1|   4.0|1225734739|
|     1|    110|   4.0|1225865086|
|     1|    158|   4.0|1225733503|
|     1|    260|   4.5|1225735204|
|     1|    356|   5.0|1225735119|
+------+-------+------+----------+
only showing top 5 rows



In [7]:
# Let's consider the shape of the movie and rating dataset
print("Shape of the movie dataset: ", (movie.count(), len(movie.columns)))
print("Shape of the rating dataset: ", (rating.count(), len(rating.columns)))

Shape of the movie dataset:  (86537, 3)
Shape of the rating dataset:  (1599874, 4)


# Transform the data

Now, let's join the ratings and movies dataset together

In [8]:
# Inner join the rating and movie
movie_rating = rating.join(movie, on="movieId", how="inner")

In [9]:
# Print the first 5 records of the new joined dataset
movie_rating.show(5)

+-------+------+------+----------+--------------------+--------------------+
|movieId|userId|rating| timestamp|               title|              genres|
+-------+------+------+----------+--------------------+--------------------+
|      1|     1|   4.0|1225734739|    Toy Story (1995)|Adventure|Animati...|
|    110|     1|   4.0|1225865086|   Braveheart (1995)|    Action|Drama|War|
|    158|     1|   4.0|1225733503|       Casper (1995)|  Adventure|Children|
|    260|     1|   4.5|1225735204|Star Wars: Episod...|Action|Adventure|...|
|    356|     1|   5.0|1225735119| Forrest Gump (1994)|Comedy|Drama|Roma...|
+-------+------+------+----------+--------------------+--------------------+
only showing top 5 rows



In [10]:
# Now, we split the genres into multiple rows
from pyspark.sql.functions import split, explode
movie_rating = movie_rating.withColumn("genres", explode(split("genres", "\\|")))

In [11]:
movie_rating.show(10)

+-------+------+------+----------+-----------------+---------+
|movieId|userId|rating| timestamp|            title|   genres|
+-------+------+------+----------+-----------------+---------+
|      1|     1|   4.0|1225734739| Toy Story (1995)|Adventure|
|      1|     1|   4.0|1225734739| Toy Story (1995)|Animation|
|      1|     1|   4.0|1225734739| Toy Story (1995)| Children|
|      1|     1|   4.0|1225734739| Toy Story (1995)|   Comedy|
|      1|     1|   4.0|1225734739| Toy Story (1995)|  Fantasy|
|    110|     1|   4.0|1225865086|Braveheart (1995)|   Action|
|    110|     1|   4.0|1225865086|Braveheart (1995)|    Drama|
|    110|     1|   4.0|1225865086|Braveheart (1995)|      War|
|    158|     1|   4.0|1225733503|    Casper (1995)|Adventure|
|    158|     1|   4.0|1225733503|    Casper (1995)| Children|
+-------+------+------+----------+-----------------+---------+
only showing top 10 rows



In [12]:
# Convert the timestamp to datetime
from pyspark.sql.functions import from_unixtime
movie_rating = movie_rating.withColumn("timestamp", from_unixtime("timestamp"))

In [13]:
movie_rating.show(10)

+-------+------+------+-------------------+-----------------+---------+
|movieId|userId|rating|          timestamp|            title|   genres|
+-------+------+------+-------------------+-----------------+---------+
|      1|     1|   4.0|2008-11-03 17:52:19| Toy Story (1995)|Adventure|
|      1|     1|   4.0|2008-11-03 17:52:19| Toy Story (1995)|Animation|
|      1|     1|   4.0|2008-11-03 17:52:19| Toy Story (1995)| Children|
|      1|     1|   4.0|2008-11-03 17:52:19| Toy Story (1995)|   Comedy|
|      1|     1|   4.0|2008-11-03 17:52:19| Toy Story (1995)|  Fantasy|
|    110|     1|   4.0|2008-11-05 06:04:46|Braveheart (1995)|   Action|
|    110|     1|   4.0|2008-11-05 06:04:46|Braveheart (1995)|    Drama|
|    110|     1|   4.0|2008-11-05 06:04:46|Braveheart (1995)|      War|
|    158|     1|   4.0|2008-11-03 17:31:43|    Casper (1995)|Adventure|
|    158|     1|   4.0|2008-11-03 17:31:43|    Casper (1995)| Children|
+-------+------+------+-------------------+-----------------+---

In [14]:
# Remove timestamp column to finalize the dataset
movie_rating = movie_rating.drop("timestamp")

In [15]:
movie_rating.show(10)

+-------+------+------+-----------------+---------+
|movieId|userId|rating|            title|   genres|
+-------+------+------+-----------------+---------+
|      1|     1|   4.0| Toy Story (1995)|Adventure|
|      1|     1|   4.0| Toy Story (1995)|Animation|
|      1|     1|   4.0| Toy Story (1995)| Children|
|      1|     1|   4.0| Toy Story (1995)|   Comedy|
|      1|     1|   4.0| Toy Story (1995)|  Fantasy|
|    110|     1|   4.0|Braveheart (1995)|   Action|
|    110|     1|   4.0|Braveheart (1995)|    Drama|
|    110|     1|   4.0|Braveheart (1995)|      War|
|    158|     1|   4.0|    Casper (1995)|Adventure|
|    158|     1|   4.0|    Casper (1995)| Children|
+-------+------+------+-----------------+---------+
only showing top 10 rows



In [17]:
# Print the shape of movie_rating dataset
print("Shape of the movie_rating dataset: ", (movie_rating.count(), len(movie_rating.columns)))

Shape of the movie_rating dataset:  (4358974, 5)


In [16]:
# Export the final dataset to a single CSV file
movie_rating.coalesce(1).write.csv("movie_ratings.csv", header=True)