Bronze Layer

In [0]:
%python
from pyspark.sql import SparkSession, functions as Fs
from pyspark.sql.functions import to_date, col, udf, from_unixtime, trim, regexp_replace, split, regexp_extract

spark = SparkSession.builder.appName("movie_rating").getOrCreate()

movie_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/workspace/movie_rating/raw/movies.csv") \
    .limit(500)

rating_df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/Volumes/workspace/movie_rating/raw/ratings.csv") \
    .limit(500)


Silver Layer

In [0]:
# drop nulls
movie_df = movie_df.na.drop()
rating_df = rating_df.na.drop()

# drop 0 ratings
rating_df = (
    rating_df
    .filter(rating_df.rating > 0)
    .withColumn("rating_date", from_unixtime(col('timestamp')).cast("date"))
    .drop("timestamp")
)
movie_df = (
    movie_df
    .withColumn("title_trim", trim(col('title')))
    .withColumn("year", regexp_extract(col('title_trim'), r"\((\d{4})\)$", 1).cast('int'))
    .withColumn("title_clean", regexp_extract(col('title_trim'), r"^(.*?)(?:\s\(\d{4}\))?$", 1))
    .withColumn('genres_array', split(col('genres'), r"\|"))
    .drop('title', 'title_trim')
    .withColumnRenamed('title_clean', 'title')
)


In [0]:
# Save in Bronze layer 
rating_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/Volumes/workspace/movie_rating/bronze/ratings_cleaned")
movie_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/Volumes/workspace/movie_rating/bronze/movies_cleaned")

Bronze Layer

In [0]:
rating_df_cleaned = spark.read.format("delta").load("/Volumes/workspace/movie_rating/bronze/ratings_cleaned")
movie_df_cleaned = spark.read.format("delta").load("/Volumes/workspace/movie_rating/bronze/movies_cleaned")
rating_df_cleaned.show(5)
movie_df_cleaned.show(5)

+------+-------+------+-----------+
|userId|movieId|rating|rating_date|
+------+-------+------+-----------+
|     1|      1|   4.0| 2000-07-30|
|     1|      3|   4.0| 2000-07-30|
|     1|      6|   4.0| 2000-07-30|
|     1|     47|   5.0| 2000-07-30|
|     1|     50|   5.0| 2000-07-30|
+------+-------+------+-----------+
only showing top 5 rows
+-------+--------------------+----+--------------------+--------------------+
|movieId|              genres|year|               title|        genres_array|
+-------+--------------------+----+--------------------+--------------------+
|      1|Adventure|Animati...|1995|           Toy Story|[Adventure, Anima...|
|      2|Adventure|Childre...|1995|             Jumanji|[Adventure, Child...|
|      3|      Comedy|Romance|1995|    Grumpier Old Men|   [Comedy, Romance]|
|      4|Comedy|Drama|Romance|1995|   Waiting to Exhale|[Comedy, Drama, R...|
|      5|              Comedy|1995|Father of the Bri...|            [Comedy]|
+-------+-------------------

In [0]:
merged_df = movie_df_cleaned.join(rating_df_cleaned, on='movieId', how='inner')
merged_df.show(5)

+-------+--------------------+----+----------------+--------------------+------+------+-----------+
|movieId|              genres|year|           title|        genres_array|userId|rating|rating_date|
+-------+--------------------+----+----------------+--------------------+------+------+-----------+
|      1|Adventure|Animati...|1995|       Toy Story|[Adventure, Anima...|     1|   4.0| 2000-07-30|
|      3|      Comedy|Romance|1995|Grumpier Old Men|   [Comedy, Romance]|     1|   4.0| 2000-07-30|
|      6|Action|Crime|Thri...|1995|            Heat|[Action, Crime, T...|     1|   4.0| 2000-07-30|
|     21|Comedy|Crime|Thri...|1995|      Get Shorty|[Comedy, Crime, T...|     4|   3.0| 2001-04-10|
|     31|               Drama|1995| Dangerous Minds|             [Drama]|     3|   0.5| 2011-05-27|
+-------+--------------------+----+----------------+--------------------+------+------+-----------+
only showing top 5 rows


In [0]:

aggregated_df = merged_df.groupBy('movieId').agg(
    Fs.avg('rating').alias('avg_rating'),
    Fs.count('rating').alias('total_rating')
    )

final_df = aggregated_df.join(merged_df, on='movieId', how='inner')
final_df = final_df.orderBy('total_rating', ascending=False)

final_df = final_df.filter(final_df.total_rating < 50)

final_df.select('movieId', 'title', 'year', 'avg_rating', 'total_rating').show(5)

final_df.write.format('delta').mode('overwrite').option("overwriteSchema", "true").save('/Volumes/workspace/movie_rating/gold/movie_ratings_delta')

+-------+--------------------+----+----------+------------+
|movieId|               title|year|avg_rating|total_rating|
+-------+--------------------+----+----------+------------+
|    333|           Tommy Boy|1995|       4.5|           2|
|    527|    Schindler's List|1993|      2.75|           2|
|     47|Seven (a.k.a. Se7en)|1995|       3.5|           2|
|    260|Star Wars: Episod...|1977|       5.0|           2|
|    441|  Dazed and Confused|1993|       2.5|           2|
+-------+--------------------+----+----------+------------+
only showing top 5 rows


Quality Checks

In [0]:
print("Bronze movies count: ", movie_df.count())
print("Silver movies count: ", movie_df_cleaned.count())

Bronze movies count:  500
Silver movies count:  500


In [0]:
rating_df_cleaned.filter(rating_df_cleaned.rating.isNull()).count()

0