# ETL Processes

### Import all required libraries and packages

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, round

In [2]:
spark = SparkSession.builder.appName("ChristmasMovies").getOrCreate()

### Read CSV files into DataFrames

In [3]:
df1 = spark.read.format("csv").option("header", True).load("data/ratings.csv")
df1.show(10, truncate = False)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|1     |1      |4.0   |964982703|
|1     |3      |4.0   |964981247|
|1     |6      |4.0   |964982224|
|1     |47     |5.0   |964983815|
|1     |50     |5.0   |964982931|
|1     |70     |3.0   |964982400|
|1     |101    |5.0   |964980868|
|1     |110    |4.0   |964982176|
|1     |151    |5.0   |964984041|
|1     |157    |5.0   |964984100|
+------+-------+------+---------+
only showing top 10 rows



In [4]:
df2 = spark.read.format("csv").option("header", True).load("data/links.csv")
df2.show(10, truncate = False)

+-------+-------+------+
|movieId|imdbId |tmdbId|
+-------+-------+------+
|1      |0114709|862   |
|2      |0113497|8844  |
|3      |0113228|15602 |
|4      |0114885|31357 |
|5      |0113041|11862 |
|6      |0113277|949   |
|7      |0114319|11860 |
|8      |0112302|45325 |
|9      |0114576|9091  |
|10     |0113189|710   |
+-------+-------+------+
only showing top 10 rows



In [5]:
df3 = spark.read.format("csv").option("header", True).load("data/movies.csv")
df3.show(10, truncate = False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
|6      |Heat (1995)                       |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                    |Comedy|Romance                             |
|8      |Tom and Huck (1995)               |Adventure|Children                         |
|9      |Sudden Death

### Perform various operations on DataFrames

In [6]:
df11 = df1.groupBy("movieId").agg(avg("rating").alias("avg_rating"))
df11.show(10, truncate = False)

+-------+------------------+
|movieId|avg_rating        |
+-------+------------------+
|296    |4.197068403908795 |
|1090   |3.984126984126984 |
|115713 |3.9107142857142856|
|3210   |3.4761904761904763|
|88140  |3.546875          |
|829    |2.6666666666666665|
|2088   |2.5               |
|2294   |3.2444444444444445|
|4821   |3.1               |
|48738  |3.975             |
+-------+------------------+
only showing top 10 rows



In [7]:
df_movie = df11.join(df2, df11.movieId == df2.movieId, "inner") \
               .join(df3, df11.movieId == df3.movieId, "inner") \
               .where((col("avg_rating") > 3) & (col("title").like("%Christmas%"))) \
               .select(df11.movieId, round(df11.avg_rating, 2).alias("avg_rating"), df2.imdbId, df3.title)
df_movie.show(10, truncate = False)

+-------+----------+-------+-----------------------------------------------------------------+
|movieId|avg_rating|imdbId |title                                                            |
+-------+----------+-------+-----------------------------------------------------------------+
|2423   |3.57      |0097958|Christmas Vacation (National Lampoon's Christmas Vacation) (1989)|
|1099   |3.38      |0029992|Christmas Carol, A (1938)                                        |
|3988   |3.05      |0170016|How the Grinch Stole Christmas (a.k.a. The Grinch) (2000)        |
|72692  |5.0       |0238414|Mickey's Once Upon a Christmas (1999)                            |
|8492   |3.25      |0044008|Christmas Carol, A (Scrooge) (1951)                              |
|147372 |3.75      |4050552|Doctor Who: Last Christmas (2014)                                |
|117368 |3.5       |0484439|The Madagascar Penguins in a Christmas Caper (2005)              |
|117922 |3.5       |2100546|Ice Age: A Mammoth Chr

In [8]:
df_movie.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- title: string (nullable = true)



Convert "movieId" from string to integer

In [9]:
df_movie2 = df_movie.withColumn("movieId", df_movie.movieId.cast("int"))
df_movie2.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- title: string (nullable = true)



In [10]:
df_movie3 = df_movie2.orderBy("movieId")
df_movie3.show(10, truncate = False)

+-------+----------+-------+-----------------------------------------------------------------+
|movieId|avg_rating|imdbId |title                                                            |
+-------+----------+-------+-----------------------------------------------------------------+
|551    |3.55      |0107688|Nightmare Before Christmas, The (1993)                           |
|1099   |3.38      |0029992|Christmas Carol, A (1938)                                        |
|2083   |3.74      |0104940|Muppet Christmas Carol, The (1992)                               |
|2423   |3.57      |0097958|Christmas Vacation (National Lampoon's Christmas Vacation) (1989)|
|2804   |3.97      |0085334|Christmas Story, A (1983)                                        |
|3675   |4.19      |0047673|White Christmas (1954)                                           |
|3988   |3.05      |0170016|How the Grinch Stole Christmas (a.k.a. The Grinch) (2000)        |
|5980   |3.5       |0071222|Black Christmas (1974)

### Write df_movie3 to a CSV file

In [11]:
df_movie3.write.format("csv").option("header", "true").save("df_movie")