In [1]:
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession

os.environ['AWS_ACCESS_KEY_ID'] = 'AKIAVM4CLAILYNCUGLC3'
os.environ['AWS_SECRET_ACCESS_KEY'] = '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'

spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

# 1. Read raw data from dataset

In [2]:
imdb_movies_metadata = spark.read.csv("s3a://minhha4-capstone-data-source/imdb_metadata/title.basics.tsv", header=True, sep="\t", inferSchema=True)

In [3]:
imdb_ratings = spark.read.json("s3a://minhha4-capstone-data-source/imdb_ratings/sample.json", multiLine=True)

In [4]:
movielens_metadata = spark.read.csv("s3a://minhha4-capstone-data-source/movielens/movies_metadata.csv", header=True, inferSchema=True)

In [5]:
movielens_ratings = spark.read.csv("s3a://minhha4-capstone-data-source/movielens/ratings.csv", header=True, inferSchema=True)

# 2. Load data into movies table

In [6]:
movies_df = imdb_movies_metadata.select(["tconst", "primaryTitle", "originalTitle", "startYear", "endYear", "runtimeMinutes"])

## 2.1 Rename columns

### 2.1.1 Rename column tconst to imdb_id

In [7]:
movies_df = movies_df.withColumnRenamed("tconst", "imdb_id")

### 2.1.2 Rename column primaryTitle to primary_title

In [8]:
movies_df = movies_df.withColumnRenamed("primaryTitle", "primary_title")

### 2.1.3 Rename column originalTitle to original_title

In [9]:
movies_df = movies_df.withColumnRenamed("originalTitle", "original_title")

### 2.1.4 Rename column startYear to start_year

In [10]:
movies_df = movies_df.withColumnRenamed("startYear", "start_year")

### 2.1.5 Rename column endYear to end_year

In [11]:
movies_df = movies_df.withColumnRenamed("endYear", "end_year")

### 2.1.6 Rename column runtimeMinutes to runtime_minutes

In [12]:
movies_df = movies_df.withColumnRenamed("runtimeMinutes", "runtime_minutes")

## 2.2 Remove any row that has imdb_id or original_title or start_year is null

In [13]:
movies_df = movies_df.filter(movies_df.imdb_id.isNotNull())

In [14]:
movies_df = movies_df.filter(movies_df.original_title.isNotNull())

In [15]:
movies_df = movies_df.filter(movies_df.start_year.isNotNull())

## 2.3 Remove any row that has duplicate imdb_id or duplicate combination of original_title and start_year

In [16]:
movies_df = movies_df.dropDuplicates(["imdb_id"])

In [17]:
movies_df = movies_df.dropDuplicates(["original_title", "start_year"])

## 2.4 Cast data type

### 2.4.1 Cast `start_year` to int

In [18]:
from py4j.protocol import Py4JJavaError

In [19]:
try:
    movies_df = movies_df.withColumn("start_year", movies_df["start_year"].cast("integer"))
except Py4JJavaError:
    print("start_year column has wrong data type")

### 2.4.2 Cast `end_year` to int

In [20]:
try:
    movies_df = movies_df.withColumn("end_year", movies_df["end_year"].cast("integer"))
except Py4JJavaError:
    print("end_year column has wrong data type")

### 2.4.3 Cast `runtime_minutes` to int

In [21]:
try:
    movies_df = movies_df.withColumn("runtime_minutes", movies_df["runtime_minutes"].cast("integer"))
except Py4JJavaError:
    print("runtime_minutes column has wrong data type")

### 2.4.4 Convert runtime_minutes = 0 to null

In [22]:
from pyspark.sql.functions import when
movies_df = movies_df.withColumn("runtime_minutes", \
    when(movies_df.runtime_minutes == 0, None) \
    .otherwise(movies_df.runtime_minutes))

## 2.5 Data quality checks for `movies_df`

### 2.5.1 Remove any row that has `start_year > end_year`

In [23]:
movies_df = movies_df.select("*").where("start_year <= end_year or end_year is null")

### 2.5.2 Remove any row that has `runtime_minutes < 0`

In [24]:
movies_df = movies_df.filter(movies_df.runtime_minutes >= 0)

### 2.5.3 Check if every `imdb_id` exists in `imdb_movies_metadata`

In [25]:
movies_df.join(imdb_movies_metadata, movies_df.imdb_id == imdb_movies_metadata.tconst, "leftanti").count()

0

## 2.6 Repartition data by start_year to improve performance

In [26]:
movies_df = movies_df.repartition(movies_df.start_year)

## 2.7 Print movies_df

In [27]:
movies_df.printSchema()

root
 |-- imdb_id: string (nullable = true)
 |-- primary_title: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- start_year: integer (nullable = true)
 |-- end_year: integer (nullable = true)
 |-- runtime_minutes: integer (nullable = true)



In [28]:
movies_df.show()

+---------+--------------------+--------------------+----------+--------+---------------+
|  imdb_id|       primary_title|      original_title|start_year|end_year|runtime_minutes|
+---------+--------------------+--------------------+----------+--------+---------------+
|tt1850332|An Artist Looks a...|An Artist Looks a...|      1959|    null|             14|
|tt0051372|               Araya|               Araya|      1959|    null|             90|
|tt0371557| To Be a Millionaire|Baekman jangjaga ...|      1959|    null|            100|
|tt9140438|Bargeld lacht - D...|Bargeld lacht - D...|      1959|    null|             40|
|tt0637624|      Boy on a Fence|      Boy on a Fence|      1959|    null|             30|
|tt1581368|       Bulldog Bully|       Bulldog Bully|      1959|    null|              6|
|tt1580833| Deep Freeze Squeeze| Deep Freeze Squeeze|      1959|    null|              6|
|tt1798961|Double Barrel Dou...|Double Barrel Dou...|      1959|    null|             30|
|tt1666239

In [29]:
count_movies = movies_df.count()

In [30]:
count_movies

2016245

# 3. Load data into users table

## 3.1 Select users from each platform and load into users table

In [31]:
imdb_ratings.createOrReplaceTempView("imdb_ratings")

In [32]:
movielens_ratings.createOrReplaceTempView("movielens_ratings")

Because we have two different data sources, we need to use union the two tables to get the full dataset. And because we use select distinct, we do not need to check for duplicate users later.

In [33]:
users_df = spark.sql("""
    SELECT DISTINCT
        userId AS platform_id,
        'movielens' AS platform
    FROM movielens_ratings
    WHERE userId IS NOT NULL
    UNION
    SELECT DISTINCT
        review_id AS platform_id,
        'imdb' AS platform
    FROM imdb_ratings
    WHERE review_id IS NOT NULL
""")

## 3.2 Add id to each users

In [34]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col

In [35]:
users_df = users_df.withColumn("id", row_number().over(Window.orderBy(col("platform_id"))))

## 3.3 Data quality checks for `users_df`

### 3.3.1 Check if there is any row that has `platform_id` is null

In [36]:
users_df.filter("platform_id is null").count()

0

### 3.3.2 Check if platform is either imdb or movielens

In [37]:
users_df.filter((users_df.platform != "imdb") & (users_df.platform != "movielens")).count()

0

### 3.3.3 Check if there is any row that has `id` is null

In [38]:
users_df.filter("id is null").count()

0

### 3.3.4 Check if every row has valid `platform_id` when joining with data source

In [39]:
valid_movielens = users_df.filter(users_df.platform == 'movielens')

In [40]:
valid_movielens.join(movielens_ratings, valid_movielens.platform_id == movielens_ratings.userId, "leftanti").count()

0

In [41]:
valid_imdb = users_df.filter(users_df.platform == 'imdb')

In [42]:
valid_imdb.join(imdb_ratings, valid_imdb.platform_id == imdb_ratings.review_id, "leftanti").count()

0

## 3.3 Print users_df

In [43]:
users_df.printSchema()

root
 |-- platform_id: string (nullable = true)
 |-- platform: string (nullable = false)
 |-- id: integer (nullable = true)



In [44]:
users_df.show()

+-----------+---------+---+
|platform_id| platform| id|
+-----------+---------+---+
|          1|movielens|  1|
|         10|movielens|  2|
|        100|movielens|  3|
|       1000|movielens|  4|
|      10000|movielens|  5|
|     100000|movielens|  6|
|     100001|movielens|  7|
|     100002|movielens|  8|
|     100003|movielens|  9|
|     100004|movielens| 10|
|     100005|movielens| 11|
|     100006|movielens| 12|
|     100007|movielens| 13|
|     100008|movielens| 14|
|     100009|movielens| 15|
|      10001|movielens| 16|
|     100010|movielens| 17|
|     100011|movielens| 18|
|     100012|movielens| 19|
|     100013|movielens| 20|
+-----------+---------+---+
only showing top 20 rows



In [45]:
count_users = users_df.count()

In [46]:
count_users

370894

# 4. Load data into timestamp table

In [47]:
timestamp_df = spark.sql("""
    SELECT DISTINCT
        timestamp,
        DATE_FORMAT(timestamp, 'yyyy') AS year,
        DATE_FORMAT(timestamp, 'MM') AS month,
        DATE_FORMAT(timestamp, 'dd') AS day
    FROM (
        SELECT DISTINCT
            DATE_FORMAT(FROM_UNIXTIME(timestamp), 'yyyy-MM-dd') AS timestamp
        FROM movielens_ratings
        WHERE timestamp is not null
        UNION
        SELECT DISTINCT
            DATE_FORMAT(review_date, 'yyyy-MM-dd') AS timestamp
        FROM imdb_ratings
        WHERE review_date is not null
    )
""")

## 4.1 Repartition data by year and month to improve performance

In [48]:
timestamp_df = timestamp_df.repartition(timestamp_df.year, timestamp_df.month)

## 4.2 Data quality checks for `timestamp_df`

### 4.2.1 Check for null timestamp

In [49]:
timestamp_df.filter("timestamp is null").count()

1

In [50]:
timestamp_df = timestamp_df.filter("timestamp is not null")

### 4.2.2 Check for duplicate timestamp

In [51]:
count_timestamp = timestamp_df.count()

In [52]:
timestamp_df.select("timestamp").distinct().count() == count_timestamp

True

### 4.2.3 Check data extraction correctly from timestamp to day, month, year

In [53]:
timestamp_df.createOrReplaceTempView("timestamp")

In [54]:
extraction_df = spark.sql("""
    SELECT count(count_correct_year), count(count_correct_month), count(count_correct_day)
    FROM
        (SELECT
            DATE_FORMAT(timestamp.timestamp, 'yyyy') = year AS count_correct_year,
            DATE_FORMAT(timestamp.timestamp, 'MM') = month AS count_correct_month,
            DATE_FORMAT(timestamp.timestamp, 'dd') = day AS count_correct_day
        FROM
            timestamp)
    WHERE count_correct_year = true AND count_correct_month = true AND count_correct_day = true
""")

In [55]:
extraction_df.show()

+-------------------------+--------------------------+------------------------+
|count(count_correct_year)|count(count_correct_month)|count(count_correct_day)|
+-------------------------+--------------------------+------------------------+
|                     7773|                      7773|                    7773|
+-------------------------+--------------------------+------------------------+



In [56]:
count_timestamp

7773

## 4.2 Print timestamp_df

In [57]:
timestamp_df.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- day: string (nullable = true)



In [58]:
timestamp_df.show()

+----------+----+-----+---+
| timestamp|year|month|day|
+----------+----+-----+---+
|2016-05-21|2016|   05| 21|
|1997-03-13|1997|   03| 13|
|1997-03-28|1997|   03| 28|
|2016-05-16|2016|   05| 16|
|2015-09-16|2015|   09| 16|
|2015-09-01|2015|   09| 01|
|2016-05-27|2016|   05| 27|
|2016-05-08|2016|   05| 08|
|1997-03-08|1997|   03| 08|
|2015-09-07|2015|   09| 07|
|2015-09-08|2015|   09| 08|
|2016-05-19|2016|   05| 19|
|2015-09-10|2015|   09| 10|
|2016-05-06|2016|   05| 06|
|1997-03-15|1997|   03| 15|
|2015-09-22|2015|   09| 22|
|2016-05-14|2016|   05| 14|
|1997-03-07|1997|   03| 07|
|2016-05-26|2016|   05| 26|
|2016-05-25|2016|   05| 25|
+----------+----+-----+---+
only showing top 20 rows



In [59]:
count_timestamp

7773

# 5. Load data into ratings table

In [60]:
movies_df.createOrReplaceTempView("movies")

In [61]:
movielens_metadata.createOrReplaceTempView("movielens_metadata")

In [62]:
users_df.createOrReplaceTempView("users")

Movies information in imdb_ratings does not have imdb id, each movie is a combination of original title and start year, so we need to split them to find movie by original title and start year

In [63]:
imdb_ratings_split_movies_and_years = spark.sql("""
    SELECT
        rating,
        review_date,
        review_id,
        SPLIT(movie, '[()]')[0] AS original_title,
        SUBSTR(SPLIT(movie, '[()]')[1], 1, 4) AS year
    FROM imdb_ratings
    WHERE movie IS NOT NULL
""")

In [64]:
imdb_ratings_split_movies_and_years.createOrReplaceTempView("imdb_ratings_split_movies_and_years")

In [65]:
imdb_ratings_split_movies_and_years.show()

+------+------------+---------+--------------------+----+
|rating| review_date|review_id|      original_title|year|
+------+------------+---------+--------------------+----+
|     8|24 July 2005|rw1133942|  Kill Bill: Vol. 2 |2004|
|  null|24 July 2005|rw1133943|Journey to the Un...|1968|
|     9|24 July 2005|rw1133946|         The Island |2005|
|     3|24 July 2005|rw1133948|Win a Date with T...|2004|
|    10|24 July 2005|rw1133949|Saturday Night Li...|2000|
|    10|24 July 2005|rw1133950|        Outlaw Star |1998|
|    10|24 July 2005|rw1133952|        The Aviator |2004|
|     9|24 July 2005|rw1133953|Star Wars: Episod...|1999|
|     3|24 July 2005|rw1133954|The Amityville Ho...|2005|
|     6|24 July 2005|rw1133955|      Flying Tigers |1942|
|     6|24 July 2005|rw1133956|Phantasm III: Lor...|1994|
|     1|24 July 2005|rw1133957|The Truth About C...|2002|
|     1|24 July 2005|rw1133958|      Trainspotting |1996|
|     3|24 July 2005|rw1133959|         Feardotcom |2002|
|     7|24 Jul

In [66]:
ratings_df = spark.sql("""
    SELECT
        m2.imdb_id AS movie_id,
        u.id AS user_id,
        DATE_FORMAT(FROM_UNIXTIME(r1.timestamp), 'yyyy-MM-dd') AS rating_date,
        r1.rating AS score
    FROM movielens_ratings r1
    INNER JOIN movielens_metadata m1
        ON r1.movieId = m1.id
    INNER JOIN movies m2
        ON m1.imdb_id = m2.imdb_id
    INNER JOIN users u
        ON r1.userId = u.platform_id
    WHERE u.platform = 'movielens' AND r1.rating IS NOT NULL
    UNION
    SELECT
        m1.imdb_id AS movie_id,
        u.id AS user_id,
        DATE_FORMAT(review_date, 'yyyy-MM-dd') AS rating_date,
        (m2.rating / 2) AS score
    FROM movies m1
    INNER JOIN imdb_ratings_split_movies_and_years m2
        ON m1.original_title = m2.original_title AND m1.start_year = m2.year
    INNER JOIN users u
        ON m2.review_id = u.platform_id
    WHERE u.platform = 'imdb' AND m2.rating IS NOT NULL
""")

In [67]:
ratings_df.printSchema()

root
 |-- movie_id: string (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating_date: string (nullable = true)
 |-- score: double (nullable = true)



In [68]:
ratings_df.show()

+---------+-------+-----------+-----+
| movie_id|user_id|rating_date|score|
+---------+-------+-----------+-----+
|tt0762114| 211232| 1999-12-15|  2.0|
|tt0047034| 211232| 1999-12-15|  3.0|
|tt0083511|  71669| 2005-05-24|  2.0|
|tt0810868| 213321| 2000-03-21|  5.0|
|tt0317219| 214621| 2000-01-25|  5.0|
|tt0106743| 218678| 2011-09-25|  3.0|
|tt0119141| 230421| 2005-03-22|  3.5|
|tt0046250| 230421| 2005-03-25|  3.0|
|tt0371246| 230421| 2005-03-22|  3.5|
|tt0109045| 230421| 2005-03-22|  3.5|
|tt0097493| 233721| 2004-06-18|  3.5|
|tt0081323| 233721| 2004-08-11|  3.0|
|tt0077869| 246821| 2000-11-23|  5.0|
|tt0051745| 248598| 2002-07-14|  4.0|
|tt0119114|   2293| 1999-11-02|  4.0|
|tt0102368|   9081| 1999-12-11|  3.0|
|tt0113101|   9081| 1999-12-11|  4.0|
|tt0107286|  11482| 1996-12-16|  4.0|
|tt0099423|  20647| 2000-05-26|  4.0|
|tt0078346|  63636| 2000-06-07|  4.0|
+---------+-------+-----------+-----+
only showing top 20 rows



In [69]:
count_ratings = ratings_df.count()

In [70]:
count_ratings

10346460

## 5.1. Data quality checks for `ratings_df`

In [71]:
ratings_df \
    .join(users_df, ratings_df.user_id == users_df.id, 'leftanti').count() 

0

In [72]:
ratings_df \
    .join(movies_df, ratings_df.movie_id == movies_df.imdb_id, 'leftanti').count()

0

In [73]:
ratings_df \
    .join(timestamp_df, ratings_df.rating_date == timestamp_df.timestamp, 'leftanti').count()

0

In [74]:
ratings_df.filter((ratings_df.score < 0) | (ratings_df.score > 5)).count()

0

In [75]:
ratings_df.filter("score is null").count()

0

# 6. Save data to files

In [76]:
movies_df.coalesce(1).write.csv("./destination/movies")

In [77]:
users_df.coalesce(1).write.csv("./destination/users")

In [79]:
timestamp_df.coalesce(1).write.csv("./destination/timestamp")

In [80]:
ratings_df.coalesce(1).write.csv("./destination/ratings")

# 7. Example queries

In [81]:
ratings_df.createOrReplaceTempView("ratings")

Top 5 movies that have the highest score

In [83]:
max_avg_score = spark.sql("""
    SELECT r.movie_id, m.original_title, m.start_year, AVG(score) AS average_score
    FROM ratings r
    INNER JOIN movies m
        ON r.movie_id = m.imdb_id
    GROUP BY 1, 2, 3
    ORDER BY average_score DESC
    LIMIT 5
""")

In [84]:
max_avg_score.show()

+---------+--------------------+----------+-------------+
| movie_id|      original_title|start_year|average_score|
+---------+--------------------+----------+-------------+
|tt1714176|         Yellow Rock|      2011|          5.0|
|tt1745862|        Phil Spector|      2013|          5.0|
|tt0081299|Palermo oder Wolf...|      1980|          5.0|
|tt2066176|         Any Day Now|      2012|          5.0|
|tt0250690|             Refugee|      2000|          5.0|
+---------+--------------------+----------+-------------+



Top 5 movies that have the highest number of rating users

In [88]:
max_rating_users = spark.sql("""
    SELECT r.movie_id, m.original_title, m.start_year, COUNT(*) AS count_ratings
    FROM ratings r
    INNER JOIN movies m
        ON r.movie_id = m.imdb_id
    GROUP BY 1, 2, 3
    ORDER BY count_ratings DESC
    LIMIT 5
""")

In [89]:
max_rating_users.show()

+---------+--------------------+----------+-------------+
| movie_id|      original_title|start_year|count_ratings|
+---------+--------------------+----------+-------------+
|tt0120753|The Million Dolla...|      2000|        91082|
|tt0069293|            Solyaris|      1972|        84078|
|tt0026029|        The 39 Steps|      1935|        77045|
|tt0265343|     Monsoon Wedding|      2001|        74355|
|tt0110729|  Once Were Warriors|      1994|        67662|
+---------+--------------------+----------+-------------+



Top 5 users watch the most number of movies

In [91]:
top_watching_users = spark.sql("""
    SELECT u.id, u.platform_id, u.platform, COUNT(*) AS count_watched_movies
    FROM users u
    INNER JOIN ratings r
        ON u.id = r.user_id
    GROUP BY 1, 2, 3
    ORDER BY count_watched_movies DESC
    LIMIT 5
""")

In [92]:
top_watching_users.show()

+------+-----------+---------+--------------------+
|    id|platform_id| platform|count_watched_movies|
+------+-----------+---------+--------------------+
|210690|      45811|movielens|                3359|
|255998|       8659|movielens|                2788|
| 88660|     179792|movielens|                2378|
|  8582|     107720|movielens|                1912|
|189030|     270123|movielens|                1877|
+------+-----------+---------+--------------------+

