In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

In [0]:
# Read a csv file

path = "/FileStore/tables/DataAnalytics/top_rated_movies.csv"
movies_df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(path)
)
display(movies_df)

In [0]:
# id - int
# title - string
# overview - string
# release_date - date
# popularity - float
# vote_average - float
# vote_count - int

In [0]:
cleaned_movies_df = (
    movies_df
    .withColumn("release_date", F.regexp_replace(F.col("release_date"), "[^0-9:/-]", "9999-01-01"))
    .filter(~(F.col("release_date").contains("9999-01-01")))
    # .withColumn("release_date", F.regexp_replace(F.col("release_date"), "[^0-9:/-]", ""))
)
cleaned_movies_df.display()

In [0]:
cleaned_movies_df.columns

In [0]:
cleaned_movies_df1 = (
    cleaned_movies_df
    .select(
        F.col("id").cast(IntegerType()),
        F.col("title").cast(StringType()),
        F.col("overview").cast(StringType()),
        F.col("release_date").cast(DateType()),
        (F.round(F.col("popularity"), 3).alias("popularity")).cast(FloatType()),
        F.col("vote_average").cast(FloatType()),
        F.col("vote_count").cast(IntegerType()),
    )
)
cleaned_movies_df1.display()

In [0]:
movie_count = cleaned_movies_df1.groupBy("release_date").agg(F.count("id").alias("no_of_movies_released"))
display(movie_count)

In [0]:
movie_count = (
    cleaned_movies_df1
    .groupBy("release_date")
    .agg(F.count("id").alias("no_of_movies_released"))
)
display(movie_count.orderBy(F.col("no_of_movies_released").desc()).limit(1))

In [0]:
cleaned_movies_df1 = (
    cleaned_movies_df1
    .withColumn("year", F.year(F.col("release_date")))
    .withColumn("month", F.month(F.col("release_date")))
    .withColumn("day", F.dayofmonth(F.col("release_date")))
)
display(cleaned_movies_df1)

In [0]:
# movie_reach - if vote_average is between 6 to 6.9 --> Hit, if vote_average is between 7 to 7.999 Super Hit, if vote_average is >= 8 else Blockboster

In [0]:
movie_reach_df = (
    cleaned_movies_df1
    .withColumn(
        "movie_reach",
        F.when(
            F.col("vote_average").between(6, 6.9999),
            "Hit"
        )
        .when(
            F.col("vote_average").between(7, 7.9999),
            "SuperHit"
        )
        .when(
            F.col("vote_average") >= 8,
            "BlockBosterHit"
        )
        .otherwise("Average")
    )
)
display(movie_reach_df)

In [0]:
# year listOfmovies
# 2020 ["F1", "Race", "Gang", "SpiderMan1"]

In [0]:
list_of_movies_df = (
    movie_reach_df
    .groupBy("year")
    .agg(
        F.collect_list(F.col("title")).alias("list_of_movies")
    )
)
display(list_of_movies_df)

In [0]:
list_of_movies_df = (
    movie_reach_df
    .groupBy("year")
    .agg(
        F.collect_set(F.col("title")).alias("list_of_movies")
    )
)
display(list_of_movies_df)