In [0]:
# Data source 'MovieLens 20M movie ratings. Stable benchmark dataset. 20 million ratings and 465,000 tag applications applied to 27,000 movies by 138,000 users' can be download from  https://grouplens.org/datasets/movielens/20m/.

# Reading Movie Data
df_movie = spark.read.format("com.databricks.spark.csv").option("header", "true").load("/mnt/PLEASE_ENTER_YOUR_OWN_MOUNT_POINT_NAME/movies.csv")
display(df_movie)

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


In [0]:
#Reading Rating Data

df_rating = spark.read.format("com.databricks.spark.csv").option("header", "true").load("/mnt/PLEASE_ENTER_YOUR_OWN_MOUNT_POINT_NAME/ratings.csv")
display(df_rating)

userId,movieId,rating,timestamp
1,2,3.5,1112486027
1,29,3.5,1112484676
1,32,3.5,1112484819
1,47,3.5,1112484727
1,50,3.5,1112484580
1,112,3.5,1094785740
1,151,4.0,1094785734
1,223,4.0,1112485573
1,253,4.0,1112484940
1,260,4.0,1112484826


In [0]:
#Trying Regex with 2 Cases

import re
x = "halo (1990)"
y = "City of Lost Children, The (Cité des enfants perdus, La) (1995)"
hehe = re.search('\(([0-9]{4})', y).group(1)
# hehe = re.search('/[(][0-9]{4}[)]/g', x)
print(hehe)

In [0]:
#Making Year Functions with Regex

import re

def insertYear(title):
  year = re.search('\(([0-9]{4})', title)
  if year is not None:
    year = year.group(1)
  else:
    year = 0

  return int(year)

In [0]:
# Registering Functions
sqlContext.udf.register("yearCleansing", insertYear)

In [0]:
# Using The Functions

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

year_udf = udf(insertYear, IntegerType())

df_movie_updated = df_movie.select("movieId",
                                   "title",
                                   "genres",
                                   year_udf("title").alias("year"))

In [0]:
# Writing The Result to Parquet

df_movie_updated.write.parquet("/mnt/PLEASE_ENTER_YOUR_OWN_MOUNT_POINT_NAME/movie/movie_cleaned.pq")

In [0]:
# Reading Parquet Data to Verify Result

dff = spark.read.parquet("/mnt/PLEASE_ENTER_YOUR_OWN_MOUNT_POINT_NAME/movie/movie_cleaned.pq")
dff.createOrReplaceTempView("mov")

In [0]:
# Displaying The Results

display(dff)

movieId,title,genres,year
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,1995
2,Jumanji (1995),Adventure|Children|Fantasy,1995
3,Grumpier Old Men (1995),Comedy|Romance,1995
4,Waiting to Exhale (1995),Comedy|Drama|Romance,1995
5,Father of the Bride Part II (1995),Comedy,1995
6,Heat (1995),Action|Crime|Thriller,1995
7,Sabrina (1995),Comedy|Romance,1995
8,Tom and Huck (1995),Adventure|Children,1995
9,Sudden Death (1995),Action,1995
10,GoldenEye (1995),Action|Adventure|Thriller,1995


In [0]:
# Reading Rating Data

df_rating = spark.read.format("com.databricks.spark.csv").option("header", "true").load("/mnt/PLEASE_ENTER_YOUR_OWN_MOUNT_POINT_NAME/ratings.csv")
df_rating.createOrReplaceTempView("rating")

In [0]:
# Inserting Rating Data to Movie Table

df_new_mov = spark.sql("""
WITH NEW AS (
select CAST(m.movieId AS INTEGER), title, genres, year, round(avg(rating),2) as avg_rating
from mov m, rating r
where m.movieId = r.movieId
group by m.movieId, title, genres, year)

select * 
from new
order by movieId ASC
""")
display(df_new_mov)

movieId,title,genres,year,avg_rating
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,1995,3.92
2,Jumanji (1995),Adventure|Children|Fantasy,1995,3.21
3,Grumpier Old Men (1995),Comedy|Romance,1995,3.15
4,Waiting to Exhale (1995),Comedy|Drama|Romance,1995,2.86
5,Father of the Bride Part II (1995),Comedy,1995,3.06
6,Heat (1995),Action|Crime|Thriller,1995,3.83
7,Sabrina (1995),Comedy|Romance,1995,3.37
8,Tom and Huck (1995),Adventure|Children,1995,3.14
9,Sudden Death (1995),Action,1995,3.0
10,GoldenEye (1995),Action|Adventure|Thriller,1995,3.43


In [0]:
# Writing The Result to Parquet

df_new_mov.write.parquet("/mnt/PLEASE_ENTER_YOUR_OWN_MOUNT_POINT_NAME/movie/movie_cleaned_fix.pq")

In [0]:
# Reading The Result
dfnew = spark.read.parquet("/mnt/PLEASE_ENTER_YOUR_OWN_MOUNT_POINT_NAME/movie/movie_cleaned_fix.pq")
dfnew.createOrReplaceTempView("movie_new")

In [0]:
# Verifying The Result
display(dfnew)

movieId,title,genres,year,avg_rating
102248,Holy Flame of the Martial World (Wu lin sheng huo jin)(1983),Action|Adventure|Comedy|Fantasy,1983,3.0
102250,"Web of Death, The (1976)",Action|Adventure|Fantasy|Horror,1976,3.0
102252,Legendary Weapons of China (1982),Action|Adventure,1982,4.0
102263,Ju-on: White Ghost (2009),Horror,2009,3.0
102265,Ju-on: Black Ghost (2009),Horror,2009,3.0
102267,Daniel Deronda (2002),Drama|Romance,2002,3.8
102269,"South Shaolin Master, The (Nan quan wang) (1984)",Action|Adventure|Drama,1984,4.0
102273,Cold Steel (1987),Action|Thriller,1987,2.0
102275,"Afflicted, The (2010)",Horror|Thriller,2010,3.33
102278,Pawn (2013),Crime|Thriller,2013,3.09


In [0]:
%sql
-- Searching for Data That Have 0 Year or If genre = no genres listed

select * from movie_new where (genres != '(no genres listed)' and year != 0) OR (genres != '(no genres listed)' and year == 0) order by year asc

movieId,title,genres,year,avg_rating
40697,Babylon 5,Sci-Fi,0,3.87
79607,"Millions Game, The (Das Millionenspiel)",Action|Drama|Sci-Fi|Thriller,0,3.13
112406,Brazil: In the Shadow of the Stadiums,Documentary,0,3.83
126438,Two: The Story of Roman & Nyro,Documentary|Drama,0,3.0
115133,Tatort: Im Schmerz geboren,Crime,0,4.0
113190,Slaying the Badger,Documentary,0,4.0
128734,Polskie gówno,Comedy|Musical,0,4.5
87442,"Bicycle, Spoon, Apple (Bicicleta, cullera, poma)",Documentary,0,3.0
115685,National Theatre Live: Frankenstein,Drama|Fantasy,0,3.43
128612,Body/Cialo,Comedy|Drama|Mystery,0,3.25


In [0]:
%sql
-- Updating Movies that Have 0 Year or Having a Wrong Year
select movieId, 
       title, 
       genres, 
       CASE
       WHEN movieId = 126438 THEN 2013
       WHEN movieId = 40697 THEN 1994
       WHEN movieId = 113190 THEN 2014
       WHEN movieId = 87442 THEN 2010
       WHEN movieId = 115133 THEN 2014
       WHEN movieId = 79607 THEN 1970
       WHEN movieId = 115685 THEN 2011
       WHEN movieId = 128612 THEN 2015
       WHEN movieId = 112406 THEN 2014
       WHEN movieId = 128734 THEN 2014
       WHEN movieId = 107155 THEN 1952
       WHEN movieId = 4311 THEN 1998
       ELSE year
       END AS year, 
       avg_rating
from movie_new
order by 1 ASC

movieId,title,genres,year,avg_rating
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,1995,3.92
2,Jumanji (1995),Adventure|Children|Fantasy,1995,3.21
3,Grumpier Old Men (1995),Comedy|Romance,1995,3.15
4,Waiting to Exhale (1995),Comedy|Drama|Romance,1995,2.86
5,Father of the Bride Part II (1995),Comedy,1995,3.06
6,Heat (1995),Action|Crime|Thriller,1995,3.83
7,Sabrina (1995),Comedy|Romance,1995,3.37
8,Tom and Huck (1995),Adventure|Children,1995,3.14
9,Sudden Death (1995),Action,1995,3.0
10,GoldenEye (1995),Action|Adventure|Thriller,1995,3.43


In [0]:
# Updating Movies that Have 0 Year or Having a Wrong Year Manually

df_cleanse_terakhir_bismillah = spark.sql("""
select movieId, 
       title, 
       genres, 
       CASE
       WHEN movieId = 126438 THEN 2013
       WHEN movieId = 40697 THEN 1994
       WHEN movieId = 113190 THEN 2014
       WHEN movieId = 87442 THEN 2010
       WHEN movieId = 115133 THEN 2014
       WHEN movieId = 79607 THEN 1970
       WHEN movieId = 115685 THEN 2011
       WHEN movieId = 128612 THEN 2015
       WHEN movieId = 112406 THEN 2014
       WHEN movieId = 128734 THEN 2014
       WHEN movieId = 107155 THEN 1952
       WHEN movieId = 4311 THEN 1998
       ELSE year
       END AS year, 
       avg_rating
from movie_new
order by 1 ASC
""")

In [0]:
df_cleanse_terakhir_bismillah.write.parquet("/mnt/PLEASE_ENTER_YOUR_OWN_MOUNT_POINT_NAME/movie/movie_fixed.pq")

In [0]:
# Splitting Multiple Genres Movies To A Single Genre With Multiple Tuple

from pyspark.sql import functions as F
dfix = spark.read.parquet("/mnt/PLEASE_ENTER_YOUR_OWN_MOUNT_POINT_NAME/movie/movie_fixed.pq");
dfMovies = dfix.withColumn('genres', (F.split(dfix.genres,'\|')))
dfMovies = dfMovies.withColumn('genres', (F.explode(dfMovies.genres)))
dfMovies.write.parquet("/mnt/PLEASE_ENTER_YOUR_OWN_MOUNT_POINT_NAME/movie/movie_split.pq")

In [0]:
# Displaying The Result

display(dfMovies)

movieId,title,genres,year,avg_rating
102248,Holy Flame of the Martial World (Wu lin sheng huo jin)(1983),Action,1983,3.0
102248,Holy Flame of the Martial World (Wu lin sheng huo jin)(1983),Adventure,1983,3.0
102248,Holy Flame of the Martial World (Wu lin sheng huo jin)(1983),Comedy,1983,3.0
102248,Holy Flame of the Martial World (Wu lin sheng huo jin)(1983),Fantasy,1983,3.0
102250,"Web of Death, The (1976)",Action,1976,3.0
102250,"Web of Death, The (1976)",Adventure,1976,3.0
102250,"Web of Death, The (1976)",Fantasy,1976,3.0
102250,"Web of Death, The (1976)",Horror,1976,3.0
102252,Legendary Weapons of China (1982),Action,1982,4.0
102252,Legendary Weapons of China (1982),Adventure,1982,4.0
