In [0]:
spark

In [0]:
df1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/rohitjoshi9july@gmail.com/TMDB_all_movies.csv")

In [0]:
class Main:
    def __init__(self):
        self.spark = spark
        self.last_processed_id = 0
        self.schema = None

    def read_parquet(self, path,partition_col):
        try:
            logger.info(f"Reading file from path:{path}")
            df = self.spark.read.option("parititionColumn",partition_col).option("mode","DROPMALFORMED").parquet(path, inferSchema=True)
            return df
        except Exception as e:
            logger.error(f"Encountered exception while reading file: {path}: {e}")
            if True:
                logger.info(f"No data present at source [Infer schema issue]: {path}")
                return None
            else:
                #raise Exception("Error while reading file.")
                return None

    def save_details(self):
        with open("schema.json","w") as f:
            if self.schema:
                f.write(self.schema.json())
            f.close()

        with open("last_processed_data.txt","w") as f:

            f.write(f"last_processed_id={self.last_processed_id}")
            f.close()

    def initialize(self):
        try:
            with open("schema.json","r") as f:
                self.schema = StructType.fromJson(json.loads(f.read()))
                f.close()

            with open("last_processed_data.txt","r") as f:
                data = f.read()
                self.last_processed_id = int(data.split("=")[1])

            logger.info("Initialised schema and last_processed_id!")
        except Exception as e:
            self.last_processed_id = 0
            logger.error(f"Exception occured during intialization: {e}")

    def process(self, source,outputPath):
        
        try:
        #load data from source
            df = self.spark.read.format("csv").option("header",True).option("delimeter",',').option("inferSchema",True).load(source)

            print(f"Total Entries in data:{df.count()}")
            print(df.printSchema())
            print("*"*50)
            logger.info(f"Last processed id: {self.last_processed_id}")
            max_id = df.agg({"id": "max"}).collect()[0]["max(id)"]
            logger.info(f"Max id to be processed: {max_id}")

            #data type casting
            df = df.withColumn("vote_average",col("vote_average").cast("float"))\
                    .withColumn("vote_count",col("vote_count").cast("integer")) \
                    .withColumn("release_date",to_date("release_date")) \
                    .withColumn("revenue", col("revenue").cast("float"))\
                    .withColumn("budget", col("budget").cast("float"))\
                    .withColumn("runtime", col("runtime").cast("integer"))\
                    .withColumn("popularity",col("popularity").cast("float"))

            df = df.withColumn("year",year(col("release_date"))) \
                    .withColumn("month",month(col("release_date")))
            print(df.printSchema())

            #partitioning
            df = df.repartition(col("year"))
            schema = self.schema if self.schema else df.schema
            existing_df = self.read_parquet(outputPath,"year")
            if not existing_df:
                existing_df = self.spark.createDataFrame([], schema)

            # merged_df = existing_df.union(cleaned_df)
            new_df = df.join(existing_df, df["id"] == existing_df["id"], "left_anti")
            print(f"Rows needs to be processed: {new_df.count()}")

            new_df.write.mode("append") \
                .partitionBy("year") \
                .parquet(outputPath)

            logger.info(f"Processed data successfully with max_id: {max_id}")

            #update final details
            self.schema = new_df.schema
            self.last_processed_id = int(max_id)

        except Exception as e:
            logger.info(f"Encountered exception while processing data: {e}")
            raise Exception("Encountered exception in data processing")
        finally:
            self.save_details()
            # self.exit_session()

    # def exit_session(self):
    #     if self.spark:
    #         self.spark.stop()

In [0]:
from pyspark.sql.functions import *
import logging
import json

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [0]:


obj = Main()
source = "dbfs:/FileStore/shared_uploads/rohitjoshi9july@gmail.com/TMDB_all_movies.csv"
destination = "s3a://tmdb-movies-datalake/movie-ratings-data/"
obj.initialize()
obj.process(source, destination)

In [0]:
df = spark.read.format("csv").option("header",True).option("delimeter",',').option("inferSchema",True).load(source)

In [0]:
#EDA
print(f"Total Entries in original data: {df.count()}")

print("*"*50)

print(f"Schema of original data: {df.schema}")

print("*"*50)



Total Entries in original data: 908174
**************************************************
Schema of original data: StructType([StructField('id', StringType(), True), StructField('title', StringType(), True), StructField('vote_average', StringType(), True), StructField('vote_count', StringType(), True), StructField('status', StringType(), True), StructField('release_date', StringType(), True), StructField('revenue', StringType(), True), StructField('runtime', StringType(), True), StructField('budget', StringType(), True), StructField('imdb_id', StringType(), True), StructField('original_language', StringType(), True), StructField('original_title', StringType(), True), StructField('overview', StringType(), True), StructField('popularity', StringType(), True), StructField('tagline', StringType(), True), StructField('genres', StringType(), True), StructField('production_companies', StringType(), True), StructField('production_countries', StringType(), True), StructField('spoken_languages',

In [0]:
# Count the null values in the original df
for col_name in df.columns:
    null_count = df.filter(col(col_name).isNull()).count()
    print(f"Column '{col_name}' has {null_count} null values.")


Column 'id' has 0 null values.
Column 'title' has 124 null values.
Column 'vote_average' has 177 null values.
Column 'vote_count' has 175 null values.
Column 'status' has 187 null values.
Column 'release_date' has 86386 null values.
Column 'revenue' has 198 null values.
Column 'runtime' has 195 null values.
Column 'budget' has 193 null values.
Column 'imdb_id' has 355135 null values.
Column 'original_language' has 259 null values.
Column 'original_title' has 231 null values.
Column 'overview' has 153629 null values.
Column 'popularity' has 347 null values.
Column 'tagline' has 735508 null values.
Column 'genres' has 260615 null values.
Column 'production_companies' has 473791 null values.
Column 'production_countries' has 346323 null values.
Column 'spoken_languages' has 346883 null values.
Column 'cast' has 300356 null values.
Column 'director' has 167398 null values.
Column 'director_of_photography' has 671060 null values.
Column 'writers' has 466194 null values.
Column 'producers' h

In [0]:
print(f"Distinct status in original data: {df.select('status').distinct().count()}")
print("\n")
print(f"Distinct genres in original data: {df.select('genres').distinct().count()}")
print("\n")
print(f"Distinct production companies in original data: {df.select('production_companies').distinct().count()}")


print("\n"*4)
df.show(3)

Distinct status in original data: 74


Distinct genres in original data: 42205


Distinct production companies in original data: 210603





+---+-------------------+------------+----------+--------+------------+---------+-------+---------+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------------+--------------------+--------------------+------------------+
| id|              title|vote_average|vote_count|  status|release_date|  revenue|runtime|   budget|  imdb_id|original_language|      original_title|            overview|popularity|             tagline|              genres|production_companies|production_countries|    spoken_languages|                cast|            director|director_of_photography|             writers|           producers|    music_composer|
+---+-------------------+--------

In [0]:
#data cleaning
df = df.dropna(subset = ["release_date","revenue","budget"])
df = df.withColumn("vote_average",col("vote_average").cast("float"))\
                    .withColumn("vote_count",col("vote_count").cast("integer")) \
                    .withColumn("release_date",to_date("release_date")) \
                    .withColumn("revenue", col("revenue").cast("float"))\
                    .withColumn("budget", col("budget").cast("float"))\
                    .withColumn("runtime", col("runtime").cast("integer"))\
                    .withColumn("year",year(col("release_date"))) \
                    .withColumn("month",month(col("release_date"))) \
                    .withColumn("popularity",col("popularity").cast("float"))


df = df.withColumn("year", col("year").cast("string")).withColumn("month",format_string("%02d", col("month")))

status_list= ["Released","Planned","In Production","Post Production","Cancelled","Rumoured","NA"]
cleaned_df = df.repartition(col("month")).withColumn("genres",regexp_replace(col("genres"), "[^a-zA-Z\s,]", "")) \
                                    .withColumn("production_companies",regexp_replace(col("production_companies"),"[^a-zA-Z\s,.]","")) \
                                    .withColumn("status",when(lower(trim(col("status"))).isin([s.lower() for s in status_list]), trim(col("status"))).otherwise("Others"))  \
                                    .withColumn("production_companies",trim(lower(col("production_companies"))))
                                       
cleaned_df = cleaned_df.filter(cleaned_df["release_date"].isNotNull() & cleaned_df["revenue"].isNotNull() & cleaned_df["budget"].isNotNull())\
            .select("id","status","genres","release_date","runtime","year","month","budget","revenue","vote_count","vote_average","title","production_companies")
cleaned_df = cleaned_df.fillna({"genres":"NA","title":"","production_companies":"NA"})

print(f"Total entries in cleaned data: {cleaned_df.count()}")

print(f"Distinct status in cleaned data: {cleaned_df.select('status').distinct().count()}")
print("\n")
print(f"Distinct genres in cleaned data: {cleaned_df.select('genres').distinct().count()}")
print("\n")
print(f"Distinct production companies in cleaned data: {cleaned_df.select('production_companies').distinct().count()}")


Total entries in cleaned data: 821655
Distinct status in cleaned data: 5


Distinct genres in cleaned data: 37339


Distinct production companies in cleaned data: 197887


In [0]:
# Count the null values in the cleaned df
for col_name in cleaned_df.columns:
    null_count = cleaned_df.filter(col(col_name).isNull()).count()
    print(f"Column '{col_name}' has {null_count} null values.")


Column 'id' has 0 null values.
Column 'status' has 0 null values.
Column 'genres' has 0 null values.
Column 'release_date' has 0 null values.
Column 'year' has 0 null values.
Column 'month' has 0 null values.
Column 'budget' has 0 null values.
Column 'revenue' has 0 null values.
Column 'vote_count' has 0 null values.
Column 'vote_average' has 0 null values.
Column 'title' has 0 null values.
Column 'production_companies' has 0 null values.


In [0]:
#genre level data
keywords = ["action", "thriller", "crime", "comedy", "drama", "romance", "horror", "adventure", "documentary","tv movie", "fantasy","mystery","history","science fiction","family", "war","western","animation","na"]
genre_df = cleaned_df.withColumn("genres",lower(col("genres"))) \
                     .withColumn("unique_genres", explode(split(col("genres"),", "))).drop("genres").withColumn("genres",trim(col("unique_genres"))).drop("unique_genres").filter(col("genres") \
                     .isin(keywords))
yoy_genre_df = genre_df.groupBy("year","genres").agg(count(col("id")).alias("total_movies"), sum(col("budget")).alias("total_budget"), sum(col("revenue")).alias("total_revenue"), corr(col("budget"),col("revenue")).alias("budget_revenue_correlation"))

mby_genre_df = genre_df.groupBy("month","genres").agg(count(col("id")).alias("total_movies"), sum(col("budget")).alias("total_budget"), sum(col("revenue")).alias("total_revenue"), corr(col("budget"),col("revenue")).alias("budget_revenue_correlation"))

genre_agg_df = genre_df.groupBy("genres").agg(count(col("id")).alias("total_movies"), sum(col("budget")).alias("total_budget"), sum(col("revenue")).alias("total_revenue"), corr(col("budget"),col("revenue")).alias("budget_revenue_correlation"))



In [0]:
### YOY Growth by genre 
yoy_genre_df.orderBy(col("total_movies").desc()).show(5,False)

+----+------+------------+-------------+-------------+--------------------------+
|year|genres|total_movies|total_budget |total_revenue|budget_revenue_correlation|
+----+------+------------+-------------+-------------+--------------------------+
|2023|drama |10379       |3.205250455E9|5.187245185E9|0.5452831224775868        |
|2021|na    |9633        |2.8775614E7  |1.0724283E7  |-1.0442661562615438E-4    |
|2022|drama |9341        |2.419772397E9|6.347856944E9|0.5708785585644467        |
|2020|na    |8940        |7768524.0    |1.6222448E7  |0.01936607266319677       |
|2022|na    |8560        |2.45766676E8 |2476071.0    |-2.3654401538571047E-5    |
+----+------+------------+-------------+-------------+--------------------------+
only showing top 5 rows



In [0]:
## Month By Month growth on genre
mby_genre_df.orderBy(col("total_movies").desc()).show(5,False)

+-----+-----------+------------+---------------+---------------+--------------------------+
|month|genres     |total_movies|total_budget   |total_revenue  |budget_revenue_correlation|
+-----+-----------+------------+---------------+---------------+--------------------------+
|1    |na         |61141       |1.18082897E8   |3.0759825E7    |0.00867625480197731       |
|1    |documentary|33264       |1.1813026E8    |4.71938344E8   |0.15570897260607067       |
|1    |drama      |31095       |4.265659792E9  |7.050643651E9  |0.6219397323544973        |
|10   |drama      |19643       |1.0371387046E10|1.8590852115E10|0.4991922665788824        |
|9    |drama      |18495       |1.1388057498E10|2.079154531E10 |0.6348540681218942        |
+-----+-----------+------------+---------------+---------------+--------------------------+
only showing top 5 rows



In [0]:
## Genre based data
genre_agg_df.orderBy(col("total_budget").desc(), col("total_revenue").desc(), col("budget_revenue_correlation").desc()).show(truncate=False)

+---------------+------------+----------------+----------------+--------------------------+
|genres         |total_movies|total_budget    |total_revenue   |budget_revenue_correlation|
+---------------+------------+----------------+----------------+--------------------------+
|action         |38376       |1.10890885074E11|2.88051581018E11|0.7932478178052217        |
|drama          |199305      |9.9885740128E10 |2.22429344714E11|0.6597071451636347        |
|adventure      |20204       |9.7977276483E10 |2.87793872669E11|0.7700461652986312        |
|comedy         |121427      |8.7868395574E10 |2.40146420408E11|0.7144258044435013        |
|thriller       |39907       |6.7381790094E10 |1.53336549423E11|0.7456152318251615        |
|science fiction|17368       |5.3828660124E10 |1.44895560343E11|0.7419260747644612        |
|fantasy        |18887       |4.9591241724E10 |1.36047528284E11|0.7083470554989598        |
|family         |24641       |4.5573316988E10 |1.33409390853E11|0.77484138266871

In [0]:
#check samples of genre based df
genre_df.filter(col("genres")=="na").show(5,False)

+------+--------+------------+-------+----+-----+------+-------+----------+------------+-------------------+--------------------+------+
|id    |status  |release_date|runtime|year|month|budget|revenue|vote_count|vote_average|title              |production_companies|genres|
+------+--------+------------+-------+----+-----+------+-------+----------+------------+-------------------+--------------------+------+
|32841 |Released|1959-01-01  |0      |1959|1    |0.0   |0.0    |0         |0.0         |Naadodikal         |                    |na    |
|35070 |Released|1959-11-14  |173    |1959|11   |0.0   |0.0    |4         |6.5         |Khovanshchina      |NA                  |na    |
|74353 |Released|1959-01-01  |11     |1959|1    |0.0   |0.0    |6         |6.8         |Pátio              |NA                  |na    |
|103098|Released|1959-01-01  |101    |1959|1    |0.0   |0.0    |0         |0.0         |15 under the canvas|NA                  |na    |
|3923  |Released|1990-11-06  |27     |199

In [0]:
print(f"Rows in genre df: {genre_df.count()}")
print("*"*50)
temp = genre_df.filter((col("budget")==float(0)) & (col("revenue")==float(0))).count()
print(f"Entries where budget and revenue is 0: {temp}")

Rows in genre df: 1015167
**************************************************
Entries where budget and revenue is 0: 923724


In [0]:
#production companies growth
from pyspark.sql.window import Window
pc_growth_df = cleaned_df.groupBy("year").agg(countDistinct(col("production_companies")).alias("prod_companies_count"))
pc_window =  Window.orderBy("year").rowsBetween(Window.unboundedPreceding,-1)
pc_growth_df = pc_growth_df.withColumn("prev_yr_pc_count", sum("prod_companies_count").over(pc_window)).withColumn("prev_yr_pc_count",when(col("prev_yr_pc_count").isNull(),0).otherwise(col("prev_yr_pc_count")))

pc_growth_df = pc_growth_df.withColumn("pc_yoy_growth_count",col("prod_companies_count")+col("prev_yr_pc_count"))

pc_growth_df.orderBy(col("year")).show()

+----+--------------------+----------------+-------------------+
|year|prod_companies_count|prev_yr_pc_count|pc_yoy_growth_count|
+----+--------------------+----------------+-------------------+
|1865|                   1|               0|                  1|
|1874|                   1|               1|                  2|
|1878|                   1|               2|                  3|
|1882|                   1|               3|                  4|
|1885|                   1|               4|                  5|
|1887|                   1|               5|                  6|
|1888|                   3|               6|                  9|
|1889|                   1|               9|                 10|
|1890|                   4|              10|                 14|
|1891|                   2|              14|                 16|
|1892|                   2|              16|                 18|
|1893|                   2|              18|                 20|
|1894|                   

In [0]:
#production_companies_growth_descending
pc_growth_df.orderBy(col("year").desc()).show()

+----+--------------------+----------------+-------------------+
|year|prod_companies_count|prev_yr_pc_count|pc_yoy_growth_count|
+----+--------------------+----------------+-------------------+
|2115|                   1|           32548|              32549|
|2074|                   1|           32547|              32548|
|2064|                   1|           32546|              32547|
|2031|                   1|           32545|              32546|
|2030|                   1|           32544|              32545|
|2029|                   2|           32542|              32544|
|2028|                   1|           32541|              32542|
|2027|                   3|           32538|              32541|
|2026|                   6|           32532|              32538|
|2025|                  20|           32512|              32532|
|2024|                 475|           32037|              32512|
|2023|                1206|           30831|              32037|
|2022|                110

In [0]:
#finding outliers:

runtime_otl = cleaned_df.filter((col("runtime")>300) | (col("runtime")<0))
print(f"Entries with runtime greater than 300 or less than 0: {runtime_otl.count()}")

#cleaning runtime outliers via anti join
cleaned_df = cleaned_df.join(runtime_otl,runtime_otl.id==cleaned_df.id, how="left_anti")

vote_avg_otl = cleaned_df.filter((col("vote_average")>float(90)) | (col("vote_average")<float(0)))
print(f"Entries with vote average greater than 90 or less than 0: {vote_avg_otl.count()}")

#cleaning runtime outliers via anti join
cleaned_df = cleaned_df.join(runtime_otl,vote_avg_otl.id==cleaned_df.id, how="left_anti")


budget_revenue_otl = cleaned_df.filter((col("budget")<float(0)) | (col("revenue")<float(0)))
print(f"Entries with budget or revenue less than 0: {budget_revenue_otl.count()}")

#cleaning budget and revenue outliers via anti join
cleaned_df = cleaned_df.join(runtime_otl, budget_revenue_otl.id==cleaned_df.id, how="left_anti")

#
print(f"Entries in cleaned df after remove outliers: {cleaned_df.count()}")



Entries with runtime greater than 300 or less than 0: 1360
Entries with vote average greater than 90 or less than 0: 0
Entries with budget or revenue less than 0: 1
Entries in cleaned df after remove outliers: 820295


In [0]:
#schema of cleaned data
cleaned_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- genres: string (nullable = false)
 |-- release_date: date (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = false)
 |-- budget: float (nullable = true)
 |-- revenue: float (nullable = true)
 |-- vote_count: integer (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- title: string (nullable = false)
 |-- production_companies: string (nullable = false)



In [0]:
#write cleaned data to a s3 bucket
FILTERED_DATA_S3_PATH = "s3a://tmdb-movies-datalake/movie-ratings-filtered/"
cleaned_df.write.mode("overwrite").partitionBy("month").parquet(FILTERED_DATA_S3_PATH)
print(f"successfully loaded filtered data in s3 path: {FILTERED_DATA_S3_PATH}")

successfully loaded filtered data in s3 path: s3a://tmdb-movies-datalake/movie-ratings-filtered/


In [0]:
cleaned_df.withColumn("genres_len",length(col("genres"))).orderBy(col("genres_len").desc()).show(10,False)

+-------+--------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+-------+----+-----+------+-------+----------+------------+-----------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------

In [0]:
cleaned_df.withColumn("pc_len",length(col("production_companies"))).orderBy(col("pc_len").desc()).show(10,False)

+-------+--------+----------------------------------+------------+-------+----+-----+---------+---------+----------+------------+-----------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|id     |status  |genres                            |release_date|runtime|year|month

In [0]:
cleaned_df.select("production_companies").distinct().count()

Out[31]: 197996

In [0]:

#production company filtering 
keywords = ["action", "thriller", "crime", "comedy", "drama", "romance", "horror", "adventure", "documentary","tv movie", "fantasy","mystery","history","science fiction","family", "war","western","na","", "animation"]

pc1 = cleaned_df.withColumn("pc",explode(split(col("production_companies"),", "))).withColumn("pc",trim(lower(col("pc")))).filter(~col("pc").isin(keywords)).filter(length(col("pc"))>5).filter(regexp_like(col("production_company"),'^[a-zA-Z]+(\.[a-zA-Z]+)*$'))

ws = Window.partitionBy("production_company").orderBy("year")
pc_ranked_df = pc1.withColumn("pc_rank", dense_rank().over(ws))
pc_ranked_df = pc_ranked_df.filter(col("pc_rank") == 1)
pc_distinct_df = pc_ranked_df.groupBy("year").agg(countDistinct("pc").alias("pc_distinct"))


window = Window.orderBy("year").rowsBetween(Window.unboundedPreceding, -1)

pc_distinct_df = pc_distinct_df.withColumn("prev_yr_pc_count", sum("pc_distinct").over(pc_window)).withColumn("prev_yr_pc_count",when(col("prev_yr_pc_count").isNull(),0).otherwise(col("prev_yr_pc_count")))

pc_yoy_df = pc_distinct_df.withColumn("pc_yoy_growth_count",col("pc_distinct")+col("prev_yr_pc_count"))

pc_yoy_df.orderBy(col("year").desc()).show(truncate=False)


+----+-----------+----------------+-------------------+
|year|pc_distinct|prev_yr_pc_count|pc_yoy_growth_count|
+----+-----------+----------------+-------------------+
|2074|1          |338113          |338114             |
|2031|2          |338111          |338113             |
|2029|3          |338108          |338111             |
|2027|8          |338100          |338108             |
|2026|42         |338058          |338100             |
|2025|196        |337862          |338058             |
|2024|6224       |331638          |337862             |
|2023|17442      |314196          |331638             |
|2022|16249      |297947          |314196             |
|2021|15169      |282778          |297947             |
|2020|13943      |268835          |282778             |
|2019|14867      |253968          |268835             |
|2018|13589      |240379          |253968             |
|2017|12813      |227566          |240379             |
|2016|11881      |215685          |227566       

In [0]:
temp  = pc1.groupBy("pc").agg(count("*").alias("count")).orderBy(col("count").desc())
temp.show()

+--------------------+-----+
|                  pc|count|
+--------------------+-----+
|warner bros. pict...| 2955|
|   columbia pictures| 2741|
|   metrogoldwynmayer| 2685|
|           paramount| 2400|
|            onf  nfb| 2391|
|  universal pictures| 2352|
|        toei company| 2109|
|nikkatsu corporation| 1693|
|      th century fox| 1603|
|             mosfilm| 1375|
|        esk televize| 1282|
|            shochiku| 1176|
|  rko radio pictures| 1136|
|       france  cinma| 1009|
|    france tlvisions|  993|
|walt disney produ...|  953|
|filmov studio bar...|  926|
|             lenfilm|  826|
|   republic pictures|  785|
|          daiei film|  782|
+--------------------+-----+
only showing top 20 rows



In [0]:
temp.select("pc").count()

Out[43]: 155055

In [0]:
pc1.groupBy("pc").agg(count("*").alias("count")).filter(col("count")>5).select("pc").count()

Out[47]: 15020

In [0]:
#pc more detailed filtering 
ws = Window.partitionBy("production_companies").orderBy("year")
pc_ranked_df = cleaned_df.withColumn("production_companies", lower(col("production_companies")))
pc_ranked_df = pc_ranked_df.withColumn("pc_rank", dense_rank().over(ws))
pc_ranked_df = pc_ranked_df.filter(col("pc_rank") == 1)


pc_df = pc_ranked_df.groupBy("year").agg(countDistinct(col("production_companies")).alias("pc_count_by_year"))

pc_df.orderBy(col("pc_count_by_year").desc()).show()



+----+----------------+
|year|pc_count_by_year|
+----+----------------+
|2023|           10473|
|2022|            9628|
|2021|            9213|
|2019|            8845|
|2020|            8106|
|2018|            7985|
|2017|            7597|
|2016|            6883|
|2015|            6647|
|2014|            6085|
|2013|            5491|
|2012|            4760|
|2011|            4450|
|2009|            4121|
|2010|            4033|
|2008|            3918|
|2007|            3817|
|2006|            3687|
|2005|            3412|
|2004|            3091|
+----+----------------+
only showing top 20 rows



In [0]:
path = "s3a://tmdb-movies-datalake/movie-ratings-data/"
tdf = obj.read_parquet(path,"year")

if tdf:
    print(f"Entries in data: {tdf.count()}")

    print(f"columns in raw data: {tdf.columns}")
else:
    print("Error reading data")


INFO:__main__:Reading file from path:s3a://tmdb-movies-datalake/movie-ratings-data/


Entries in data: 908174
columns in raw data: ['id', 'title', 'vote_average', 'vote_count', 'status', 'release_date', 'revenue', 'runtime', 'budget', 'imdb_id', 'original_language', 'original_title', 'overview', 'popularity', 'tagline', 'genres', 'production_companies', 'production_countries', 'spoken_languages', 'cast', 'director', 'director_of_photography', 'writers', 'producers', 'music_composer', 'month', 'year']


In [0]:
#trasnformations

def transform(df):
    df = df.dropna(subset = ["release_date","revenue","budget"])
    df = df.withColumn("vote_average",col("vote_average").cast("float"))\
                        .withColumn("vote_count",col("vote_count").cast("integer")) \
                        .withColumn("release_date",to_date("release_date")) \
                        .withColumn("revenue", col("revenue").cast("float"))\
                        .withColumn("budget", col("budget").cast("float"))\
                        .withColumn("runtime", col("runtime").cast("integer"))\
                        .withColumn("year",year(col("release_date"))) \
                        .withColumn("month",month(col("release_date"))) \
                        .withColumn("popularity",col("popularity").cast("float"))


    df = df.withColumn("year", col("year").cast("string")).withColumn("month",format_string("%02d", col("month")))

    status_list= ["Released","Planned","In Production","Post Production","Cancelled","Rumoured","NA"]
    cleaned_df = df.repartition(col("month")).withColumn("genres",regexp_replace(col("genres"), "[^a-zA-Z\s,]", "")) \
                                        .withColumn("production_companies",regexp_replace(col("production_companies"),"[^a-zA-Z\s,.]","")) \
                                        .withColumn("status",when(lower(trim(col("status"))).isin([s.lower() for s in status_list]), trim(col("status"))).otherwise("Others"))  \
                                        .withColumn("production_companies",trim(lower(col("production_companies"))))
                                        
    cleaned_df = cleaned_df.filter(cleaned_df["release_date"].isNotNull() & cleaned_df["revenue"].isNotNull() & cleaned_df["budget"].isNotNull())\
                .select("id","status","genres","release_date","runtime","year","month","budget","revenue","vote_count","vote_average","title","production_companies")
    cleaned_df = cleaned_df.fillna({"genres":"NA","title":"","production_companies":"NA"})

    #cleaning outliers
    runtime_otl = cleaned_df.filter((col("runtime")>300) | (col("runtime")<0))
    #cleaning runtime outliers via anti join
    cleaned_df = cleaned_df.join(runtime_otl,runtime_otl.id==cleaned_df.id, how="left_anti")
    vote_avg_otl = cleaned_df.filter((col("vote_average")>float(90)) | (col("vote_average")<float(0)))
    #cleaning runtime outliers via anti join
    cleaned_df = cleaned_df.join(runtime_otl,vote_avg_otl.id==cleaned_df.id, how="left_anti")
    budget_revenue_otl = cleaned_df.filter((col("budget")<float(0)) | (col("revenue")<float(0)))
    #cleaning budget and revenue outliers via anti join
    cleaned_df = cleaned_df.join(runtime_otl, budget_revenue_otl.id==cleaned_df.id, how="left_anti")

    return cleaned_df




In [0]:
cleaned_df = transform(tdf)

In [0]:
print(f"Entries in cleaned df: {cleaned_df.count()}")
print(f"Unique dates in cleaned df :{cleaned_df.select('release_date').distinct().count()}")


Entries in cleaned df: 820295
Unique dates in cleaned df :42387


In [0]:
cleaned_df.show(5)

+------+--------+--------------------+------------+-------+----+-----+------+------------+----------+------------+--------------------+--------------------+
|    id|  status|              genres|release_date|runtime|year|month|budget|     revenue|vote_count|vote_average|               title|production_companies|
+------+--------+--------------------+------------+-------+----+-----+------+------------+----------+------------+--------------------+--------------------+
|346698|Released|   Comedy, Adventure|  2023-07-19|    114|2023|   07|1.45E8| 1.4456384E9|      7775|         7.1|              Barbie|luckychap enterta...|
|496450|Released|Animation, Fantas...|  2023-07-05|    107|2023|   07| 8.6E7|      4.01E7|       772|         7.7|Miraculous: Ladyb...|the awakening pro...|
|457332|Released|Action, Adventure...|  2023-07-06|    103|2023|   07| 8.0E7|         0.0|       877|       6.983|       Hidden Strike|talent internatio...|
|509447|Released|Horror, Fantasy, ...|  2023-07-14|     92

In [0]:
from pyspark.sql.window import Window
ws = Window.partitionBy("production_companies").orderBy("year")
pc_ranked_df = cleaned_df.withColumn("production_companies", lower(col("production_companies"))).filter(col("production_companies")!="na")
pc_ranked_df = pc_ranked_df.withColumn("pc_rank", dense_rank().over(ws))
pc_ranked_df = pc_ranked_df.filter(col("pc_rank") == 1)

pc_df = pc_ranked_df.groupBy("year").agg(countDistinct(col("production_companies")).alias("pc_count_by_year"))

window = Window.orderBy("year").rowsBetween(Window.unboundedPreceding, -1)
pc_distinct_df = pc_df.withColumn("prev_yr_pc_count", sum("pc_count_by_year").over(window)).withColumn("prev_yr_pc_count",when(col("prev_yr_pc_count").isNull(),0).otherwise(col("prev_yr_pc_count")))

pc_yoy_df = pc_distinct_df.withColumn("pc_yoy_growth_count",col("pc_count_by_year")+col("prev_yr_pc_count"))


+----+----------------+
|year|pc_count_by_year|
+----+----------------+
|2023|           10473|
|2022|            9628|
|2021|            9213|
|2019|            8845|
|2020|            8106|
|2018|            7985|
|2017|            7597|
|2016|            6883|
|2015|            6647|
|2014|            6085|
|2013|            5491|
|2012|            4760|
|2011|            4450|
|2009|            4121|
|2010|            4033|
|2008|            3918|
|2007|            3817|
|2006|            3687|
|2005|            3412|
|2004|            3091|
+----+----------------+
only showing top 20 rows



In [0]:

pc_yoy_df.orderBy(col("pc_yoy_growth_count").asc()).show()


+----+----------------+----------------+-------------------+
|year|pc_count_by_year|prev_yr_pc_count|pc_yoy_growth_count|
+----+----------------+----------------+-------------------+
|1887|               1|               0|                  1|
|1888|               1|               1|                  2|
|1890|               3|               2|                  5|
|1894|               7|               5|                 12|
|1895|               8|              12|                 20|
|1896|              32|              20|                 52|
|1897|              23|              52|                 75|
|1898|              17|              75|                 92|
|1899|              24|              92|                116|
|1900|              18|             116|                134|
|1901|               9|             134|                143|
|1902|               8|             143|                151|
|1903|              12|             151|                163|
|1904|              11| 

In [0]:
pc_yoy_df.orderBy(col("pc_yoy_growth_count").desc()).show()

+----+----------------+----------------+-------------------+
|year|pc_count_by_year|prev_yr_pc_count|pc_yoy_growth_count|
+----+----------------+----------------+-------------------+
|2028|               1|          197650|             197651|
|2027|               2|          197648|             197650|
|2026|              14|          197634|             197648|
|2025|              71|          197563|             197634|
|2024|            2916|          194647|             197563|
|2023|           10473|          184174|             194647|
|2022|            9628|          174546|             184174|
|2021|            9213|          165333|             174546|
|2020|            8106|          157227|             165333|
|2019|            8845|          148382|             157227|
|2018|            7985|          140397|             148382|
|2017|            7597|          132800|             140397|
|2016|            6883|          125917|             132800|
|2015|            6647| 

In [0]:
#YOY budget
ws = Window.orderBy("year").rowsBetween(Window.unboundedPreceding, -1)
yoy_budget = cleaned_df.groupby("year").agg(sum(col("budget")).alias("yearly_budget")).withColumn("yoy_budget",sum(col("yearly_budget")).over(ws))

yoy_budget.orderBy(col("year").asc()).show(100)

#yoy revenue
yoy_revenue = cleaned_df.groupby("year").agg(sum(col("revenue")).alias("yearly_revenue")).withColumn("yoy_revenue",sum(col("yearly_revenue")).over(ws))

yoy_revenue.orderBy(col("year").asc()).show(100)                                                                                 
                                                           

+----+-------------+-------------+
|year|yearly_budget|   yoy_budget|
+----+-------------+-------------+
|1865|          0.0|         null|
|1874|          0.0|          0.0|
|1878|        153.0|          0.0|
|1882|          0.0|        153.0|
|1885|          0.0|        153.0|
|1887|          0.0|        153.0|
|1888|          0.0|        153.0|
|1889|          0.0|        153.0|
|1890|          0.0|        153.0|
|1891|          0.0|        153.0|
|1892|          0.0|        153.0|
|1893|          0.0|        153.0|
|1894|          0.0|        153.0|
|1895|          0.0|        153.0|
|1896|          0.0|        153.0|
|1897|          0.0|        153.0|
|1898|          0.0|        153.0|
|1899|          0.0|        153.0|
|1900|          1.0|        153.0|
|1901|          0.0|        154.0|
|1902|       5985.0|        154.0|
|1903|        150.0|       6139.0|
|1904|       7500.0|       6289.0|
|1905|         37.0|      13789.0|
|1906|          0.0|      13826.0|
|1907|        500.0|

In [0]:
yoy_budget.orderBy(col("year").desc()).show(50)

+----+---------------+----------------+
|year|  yearly_budget|      yoy_budget|
+----+---------------+----------------+
|2115|            0.0|2.90206110073E11|
|2074|            0.0|2.90206110073E11|
|2064|            0.0|2.90206110073E11|
|2031|          6.0E8|2.89606110073E11|
|2030|            0.0|2.89606110073E11|
|2029|          5.0E8|2.89106110073E11|
|2028|            0.0|2.89106110073E11|
|2027|       1.0025E7|2.89096085073E11|
|2026|      9108000.0|2.89086977073E11|
|2025|  1.411367772E9|2.87675609301E11|
|2024|   4.13858052E9|2.83537028781E11|
|2023|1.1104715036E10|2.72432313745E11|
|2022|  9.399428229E9|2.63032885516E11|
|2021|  8.121946606E9| 2.5491093891E11|
|2020|  4.607772234E9|2.50303166676E11|
|2019|  9.854952311E9|2.40448214365E11|
|2018|1.0056755153E10|2.30391459212E11|
|2017|1.0816426538E10|2.19575032674E11|
|2016|1.2003904151E10|2.07571128523E11|
|2015|1.0058613175E10|1.97512515348E11|
|2014|1.0592040864E10|1.86920474484E11|
|2013|1.0977803358E10|1.75942671126E11|
