# Silver

In [0]:
# configs = {"fs.azure.account.key.storage001de.blob.core.windows.net": "MA38y0ZA2Ij1L3QXl/F/Y8EGXR1VjTGb896PNbUmPT99uhVLoi0qCEiBEBagBQ4yscqwq9Zxb0vX+AStlr0Z3Q=="}

# # create mount to silver
# dbutils.fs.mount(
# 	source = "wasbs://silver@storage001de.blob.core.windows.net/",
# 	mount_point = "/mnt/datalake/silver",
# 	extra_configs = configs
# )

In [0]:
# Define the silver location
movies_metadata_silverLocation = "/mnt/datalake/silver/cleaned_movies_metadata"

# Table name
movies_metadata_bronzeTable = "bronze.movies_metadata_cs"
movies_metadata_silverTable = "silver.movies_metadata"

In [0]:
# retrieve the bronze data
movies_metadata_bronze = spark.read.table(movies_metadata_bronzeTable)

In [0]:
movies_metadata_bronze.count()

# Cleaning

#### Drop rows have IMDB ID null

In [0]:
from pyspark.sql.functions import col

cleaned_movies_metadata = movies_metadata_bronze.filter(col('imdb_id').isNotNull())
cleaned_movies_metadata.count()

#### Drop duplicates based on IMDB ID

In [0]:
# DROP Duplicates - using drop_duplicates - RETURNS NEW DF
cleaned_movies_metadata = cleaned_movies_metadata.drop_duplicates(['imdb_id','title','release_date', 'overview'])
cleaned_movies_metadata.count()

#### Change position columns and change name columns

In [0]:
cleaned_movies_metadata.printSchema()

In [0]:
cleaned_movies_metadata = cleaned_movies_metadata.select('id', 'imdb_id', 'title', 'genres', 'release_date', 'original_language', 'overview', 'tagline','production_companies', 'production_countries', 'budget', 'revenue', 'popularity', 'vote_average', 'vote_count')

#### Drop rows which have id incorrect values 

In [0]:
# drop rows which have id incorrect values 
cleaned_movies_metadata = cleaned_movies_metadata.filter(col('id').cast('int').isNotNull())
cleaned_movies_metadata.count()

#### Drop rows which have null values in the Json column - "production_companies","production_countries"

In [0]:
# drop rows which have null values in the Json column - "production_companies","production_countries"
cleaned_movies_metadata = cleaned_movies_metadata.na.drop(subset=["production_companies","production_countries","genres"])
cleaned_movies_metadata.count()

#### There are many rows where these columns value is '[]'. We will replace '[]' to 'Unknwon' to avoid any Json parsing issues.

In [0]:
# Columns 'production_countries','production_companies','genres' have Json format data. There are many rows where these columns value is '[]'. We will replace '[]' to 'Unknwon' to avoid any Json parsing issues.
from pyspark.sql.functions import when
cleaned_movies_metadata = cleaned_movies_metadata.withColumn('genres', when(col('genres')=='[]',"[{'id': 0, 'name': 'Unknown'}]").otherwise(col('genres')))\
                                                 .withColumn('production_companies', when(col('production_companies')=='[]',"[{'name': 'Unknown', 'id': 0}]").otherwise(col('production_companies')))\
                                                 .withColumn('production_countries', when(col('production_countries')=='[]',"[{'iso_3166_1': 'Unknown', 'name': 'Unknown'}]").otherwise(col('production_countries')))

#### Drop rows have vote average null but vote count not null

In [0]:
from pyspark.sql.functions import count, mean

# Filter out rows where 'vote_count' or 'vote_average' is null

cleaned_movies_metadata = cleaned_movies_metadata.filter(
    (~col('vote_average').cast('int').isNull() &
    col('vote_count').cast('int').isNotNull()))
cleaned_movies_metadata.count()

#### Get vote_average from 0-10

In [0]:
cleaned_movies_metadata = cleaned_movies_metadata.filter((col('vote_average').between(0, 10)))
cleaned_movies_metadata.count()

#### Check Null count

In [0]:
from pyspark.sql.functions import count, isnan

print(cleaned_movies_metadata.select([count(when(col(c).isNull()|isnan(c),'True')).alias(c) for c,c_type in cleaned_movies_metadata.dtypes if c_type not in ('timestamp','boolean')]))

#### Change column data type

In [0]:
# Change Column data type
cleaned_movies_metadata = cleaned_movies_metadata.withColumn('budget',col('budget').cast('integer'))\
                                                 .withColumn('popularity',col('popularity').cast('float'))\
                                                 .withColumn('revenue',col('revenue').cast('integer')) \
                                                 .withColumn('vote_average', col('vote_average').cast('integer'))\
                                                 .withColumn('vote_count', col('vote_count').cast('integer')) \
                                                 .withColumn("popularity", when(col("popularity").isNull(), "0").otherwise(col("popularity")))


In [0]:
cleaned_movies_metadata.count()

##### credit & keywords bronze

In [0]:
# credits = spark.read.table("bronze.credits_cs")
# keywords = spark.read.table("bronze.keywords_cs")

In [0]:
# credits.show()
# credits.count()

In [0]:
# keywords.show(5)
# keywords.count()

#### covert id astype int

In [0]:
# keywords = keywords.withColumn("id", col("id").cast("int"))
# credits = credits.withColumn("id", col("id").cast("int"))

#### Drop null

In [0]:
# from pyspark.sql.functions import col
# credits = credits.dropna(subset=["id"])
# keywords = keywords.dropna(subset=["id"])


#### Drop duplicates

In [0]:
# credits = credits.dropDuplicates(["id"])
# keywords = keywords.dropDuplicates(["id"])


In [0]:
# print(credits.count())
# print(keywords.count())

In [0]:
# credits.show()
# keywords.show()

In [0]:
# cleaned_movies_metadata = cleaned_movies_metadata.join(credits, on='id', how='left')
# cleaned_movies_metadata = cleaned_movies_metadata.join(keywords, on='id', how='left')

In [0]:
# cleaned_movies_metadata.show()

#### Create silver table

In [0]:
cleaned_movies_metadata.printSchema()

In [0]:
 
from delta.tables import *

#check if the bronze location contain the delta table
# if (DeltaTable.isDeltaTable(spark, movies_metadata_silverLocation)):
#     DeltaTable.forPath(spark, movies_metadata_silverLocation).alias("target").merge(
#         source = cleaned_movies_metadata.alias("src"),
#         condition = "target.id = src.id"
#     ) \
#     .whenMatchedUpdate(
#         condition = "src.imdb_id = target.imdb_id",
#         set = {
#             "genres" : "src.genres",
#             "production_companies": "src.production_companies",
#         }
#     ).WhenNoMatchedInsertAll().execute()
# else:
cleaned_movies_metadata.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save(movies_metadata_silverLocation)

In [0]:
# # create the schema and table, if required

spark.sql("CREATE SCHEMA IF NOT EXISTS silver")
spark.sql(f"CREATE EXTERNAL TABLE IF NOT EXISTS {movies_metadata_silverTable} USING delta LOCATION '{movies_metadata_silverLocation}'")



In [0]:
# Maintenance for Delta table

# To optimized the performance of the Delta table, we need to excute 2 commands:
# 1. optimize(): Optimize the number of files used to store the data
# 2. vacuum(): Remove the old version of the data. This reduces the overhead but it limites the version we can go back to.
movies_metadata_DataDelta = DeltaTable.forName(spark, movies_metadata_silverTable)

if movies_metadata_DataDelta.history(30).filter("operation = 'VACUUM START'").count() == 0:
	movies_metadata_DataDelta.optimize()
	movies_metadata_DataDelta.vacuum()

In [0]:
%sql
DESCRIBE HISTORY silver.movies_metadata;

In [0]:
%sql
select * from silver.movies_metadata

### Get movies are not in movie-mysql bronze

In [0]:
# movie_silver = spark.read.table('bronze.movie')

In [0]:
# # Assuming you have two DataFrames: df1 and df2
# # And they have a common id column: 'id'

# # Get ids from df1 that are not in df2
# ids_in_df1_not_in_df2 = cleaned_movies_metadata.select('imdb_id').subtract(movie_silver.select('imdbId'))

# # Print the result
# ids_in_df1_not_in_df2.show()

In [0]:
# ids_in_df1_not_in_df2.count()

In [0]:

# # Perform a inner join on the 'imdb' column
# cleaned_movies_metadata = cleaned_movies_metadata.join(ids_in_df1_not_in_df2, cleaned_movies_metadata.imdb_id == ids_in_df1_not_in_df2.imdb_id, 'inner')
# cleaned_movies_metadata = cleaned_movies_metadata.drop(ids_in_df1_not_in_df2.imdb_id)

# # Show the filtered DataFrame
# cleaned_movies_metadata.show(1)

In [0]:
# cleaned_movies_metadata.select('production_companies').show(1)

# Machine Learning

## Simple Content-based

#### Chuyển genres thành danh sách các genre

In [0]:
# from pyspark.sql.functions import udf
# from pyspark.sql.types import ArrayType, StringType
# from ast import literal_eval
# import ast

# def extract_genre_names(genres_string):
#     # Convert the string representation of the list to a list
#     genres_list = literal_eval(genres_string)

#     # Check if the conversion result is a list
#     if isinstance(genres_list, list):
#         # If it is a list, extract 'name' from each item and return the list of names
#         genre_names = [item['name'] for item in genres_list]
#         return genre_names
#     else:
#         # If it is not a list, return an empty list
#         return []
# extract_name_udf = udf(extract_genre_names, ArrayType(StringType()))

# # Apply the UDF to the 'genres' column
# cleaned_movies_metadata = cleaned_movies_metadata.withColumn('genres', extract_name_udf(cleaned_movies_metadata['genres']))

In [0]:
# display(cleaned_movies_metadata)

#### Get year

In [0]:
from pyspark.sql.functions import col, year, to_date

# Convert 'release_date' to date and extract the year
cleaned_movies_metadata = cleaned_movies_metadata.withColumn('year', year(to_date(col('release_date'), 'yyyy-MM-dd')))


#### Calculate m & C

In [0]:
# Tính trung bình cộng của `vote_average`.
C = cleaned_movies_metadata.agg({'vote_average': 'mean'}).collect()[0][0]

# Tính lượng bình chọn tối thiểu mà một bộ phim cần có để được xem xét (Tính phân vị thứ 95 của 'vote_count').
m = cleaned_movies_metadata.approxQuantile('vote_count', [0.95], 0)[0]
print(m, C)

In [0]:
# Select the required columns
qualified = cleaned_movies_metadata.filter(col('vote_count') >= m).select('title', 'year', 'vote_count', 'vote_average', 'popularity', 'genres')

# Print the shape of the DataFrame
print((qualified.count(), len(qualified.columns)))

## Kiểm tra 250 bộ phim có điểm số được đánh trọng số cao nhất.

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Define the function to calculate weighted rating
def weighted_rating(v, R):
    return (v/(v+m) * R) + (m/(m+v) * C)

# Create a user defined function (UDF) based on the weighted_rating function
udf_weighted_rating = udf(weighted_rating, FloatType())

# Apply the UDF to the DataFrame to create a new column 'wr'
qualified = qualified.withColumn('wr', udf_weighted_rating(col('vote_count'), col('vote_average')))

# Sort the DataFrame based on the 'wr' column and take the top 250 rows
qualified = qualified.sort(col('wr').desc()).limit(250)

qualified.show(5)

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when, array_contains
from pyspark.sql.window import Window

def build_chart(genre_input, percentile=0.85):
    # Lọc DataFrame `gen_md` để chỉ giữ lại các hàng có thể loại phim (`genre`) khớp với `genre_input`.
    df = cleaned_movies_metadata.filter(array_contains(col('genres'), genre_input))

    # Tính trung bình cộng của `vote_average`.
    C = df.agg({'vote_average': 'mean'}).collect()[0][0]

    # Tính lượng bình chọn tối thiểu mà một bộ phim cần có để được xem xét (ở phần trăm thứ `percentile`).
    m = df.approxQuantile('vote_count', [percentile], 0)[0]
    qualified = df.filter(df['vote_count'] >= m).select(['title', 'year', 'vote_count', 'vote_average', 'popularity'])
    
    # Tính điểm xếp hạng trọng số (`wr`) cho mỗi bộ phim đủ điều kiện.
    qualified = qualified.withColumn('wr', (col('vote_count')/(col('vote_count')+m) * col('vote_average')) + (m/(m+col('vote_count')) * C))

    # Xếp hạng các bộ phim dựa trên `wr` và chỉ giữ lại 10 bộ phim hàng đầu.
    window = Window.orderBy(col('wr').desc())
    qualified = qualified.withColumn('rank', F.rank().over(window)).filter(col('rank') <= 10)
    
    return qualified

In [0]:
# build_chart('Romance').show()

### Movie Description Based Recommender

Let us first try to build a recommender using movie descriptions and taglines. We do not have a quantitative metric to judge our machine's performance so this will have to be done qualitatively.

# Content based
I will build two Content Based Recommenders based on:
* Movie Overviews and Taglines
* Movie Cast, Crew, Keywords and Genre