In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import*
from pyspark.sql.window import Window
from pyspark.sql.functions import*
spark=SparkSession.builder.appName('Neflix_data').getOrCreate()


In [0]:
raw_titles_schema = StructType([
    StructField("id", StringType(), True),                    # Unique ID (String)
    StructField("title", StringType(), True),                 # Title of the media (String)
    StructField("type", StringType(), True),                  # Type of content (e.g., Movie, Series) (String)
    StructField("release_year", IntegerType(), True),         # Release year (Integer)
    StructField("age_certification", StringType(), True),     # Age rating (String)
    StructField("runtime", IntegerType(), True),              # Runtime in minutes (Integer)
    StructField("genres", StringType(), True),                # Genres as comma-separated values (String)
    StructField("production_countries", StringType(), True),  # Production countries as comma-separated values (String)
    StructField("seasons", IntegerType(), True),              # Number of seasons (Integer, for series)
    StructField("imdb_id", StringType(), True),               # IMDb ID (String)
    StructField("imdb_score", DecimalType(3, 1), True),       # IMDb score (Decimal with precision 3, scale 1)
    StructField("imdb_votes", IntegerType(), True)            # Number of IMDb votes (Integer)
])
raw_titles_df=spark.read.format("csv").schema(raw_titles_schema).option("header",True).load("/FileStore/tables/raw_titles.csv")
#raw_titles_df.show()



In [0]:
raw_credits_schema=StructType([
    StructField("person_id", IntegerType(), True),  # Unique person ID (Integer)
    StructField("id", StringType(), True),         # Associated content ID (String)
    StructField("name", StringType(), True),       # Person's name (String)
    StructField("character", StringType(), True),  # Character played (String)
    StructField("role", StringType(), True)        # Role (e.g., Actor, Director) (String)
])

raw_credits_df=spark.read.format("csv").schema(raw_credits_schema).option("header",True).load("/FileStore/tables/raw_credits.csv")

In [0]:
best_shows_schema=StructType([
    StructField("title", StringType(), True),               # Title of the media (String)
    StructField("release_year", IntegerType(), True),       # Release year (Integer, used in place of YearType)
    StructField("score", DecimalType(4, 2), True),          # Score (Decimal with precision 4, scale 2)
    StructField("number_of_votes", IntegerType(), True),    # Number of votes (Integer)
    StructField("duration", IntegerType(), True),           # Duration in minutes (Integer)
    StructField("number_of_seasons", IntegerType(), True),  # Number of seasons (Integer, for series)
    StructField("main_genre", StringType(), True),          # Main genre (String)
    StructField("main_production", StringType(), True)      # Main production country (String)
])

best_shows_df=spark.read.format("csv").schema(best_shows_schema).option("header",True).load("/FileStore/tables/Best_Shows_Netflix.csv")

In [0]:
best_show_by_year_schema=StructType([
    StructField("title", StringType(), True),               # Title of the media (String)
    StructField("release_year", IntegerType(), True),       # Release year (Integer, used in place of YearType)
    StructField("score", DecimalType(4, 2), True),          # Score (Decimal with precision 4, scale 2)
    StructField("number_of_seasons", IntegerType(), True),  # Number of seasons (Integer, nullable for movies)
    StructField("main_genre", StringType(), True),          # Main genre (String)
    StructField("main_production", StringType(), True)      # Main production country (String)
])

best_show_by_year_df=spark.read.format("csv").schema(best_show_by_year_schema).option("header",True).load("/FileStore/tables/Best_Show_by_Year_Netflix.csv")

In [0]:
best_movies_schema=StructType([
    StructField("title", StringType(), True),               # Title of the media (String)
    StructField("release_year", IntegerType(), True),       # Release year (Integer, used in place of YearType)
    StructField("score", DecimalType(4, 2), True),          # Score (Decimal with precision 4, scale 2)
    StructField("number_of_votes", IntegerType(), True),    # Number of votes (Integer)
    StructField("duration", IntegerType(), True),           # Duration in minutes (Integer)
    StructField("main_genre", StringType(), True),          # Main genre (String)
    StructField("main_production", StringType(), True)      # Main production country (String)
])
best_movies_df=spark.read.format("csv").schema(best_movies_schema).option("header",True).load("/FileStore/tables/Best_Movies_Netflix.csv")

In [0]:
best_movie_by_year_schema=StructType([
    StructField("title", StringType(), True),               # Title of the media (String)
    StructField("release_year", IntegerType(), True),       # Release year (Integer, used in place of YearType)
    StructField("score", DecimalType(4, 2), True),          # Score (Decimal with precision 4, scale 2)
    StructField("main_genre", StringType(), True),          # Main genre (String)
    StructField("main_production", StringType(), True)      # Main production country (String)
])

best_movie_by_year_df=spark.read.format("csv").schema(best_movie_by_year_schema).option("header",True).load("/FileStore/tables/Best_Movie_by_Year_Netflix.csv")

In [0]:
best_movie_by_year_df=best_movie_by_year_df.dropDuplicates(['release_year'])
# finding top production with best movies and top genre
top_production_and_genre_df=best_movie_by_year_df.groupBy("main_production").count().orderBy(col("count").desc()).select("main_production").limit(1)
top_genre=best_movie_by_year_df.groupBy("main_genre").count().orderBy(col("count").desc()).select("main_genre").limit(1)
top_production_and_genre_df=top_production_and_genre_df.crossJoin(top_genre)

top_production_and_genre_df.show()

+---------------+----------+
|main_production|main_genre|
+---------------+----------+
|             US|     drama|
+---------------+----------+



In [0]:
# checking for duplicates and removing them
window=Window.partitionBy("title","release_year").orderBy("release_year")
best_movies_check_duplicates_df=best_movies_df.withColumn("rn",row_number().over(window))
best_movies_df=best_movies_check_duplicates_df.select("title","release_year","score","number_of_votes","duration","main_genre","main_production").filter(col("rn")==1)
display(best_movies_df.limit(2))

title,release_year,score,number_of_votes,duration,main_genre,main_production
13th,2016,8.2,34914,100,documentary,US
14 Peaks: Nothing Is Impossible,2021,7.8,22858,101,documentary,US


In [0]:
# movies information
movies=best_movies_df.join(raw_titles_df,on='title',how='left')
movies=movies.select("id","title",best_movies_df["release_year"],"duration","imdb_id","imdb_score","number_of_votes","genres","main_production","age_certification")
movies=movies.filter(col("id").isNotNull())
movies=movies.join(raw_credits_df,on='id',how='inner').select("id","title","release_year","duration","imdb_id","imdb_score","number_of_votes","genres","main_production","age_certification","name","role")

movies_map=movies.groupBy("id", "role").agg(
    concat_ws(", ", collect_list("name")).alias("names")
)
movies_map=movies_map.groupBy("id").agg(map_from_arrays(collect_list("role"),collect_list("names")).alias("cast_&_crew"))
movies=movies.join(movies_map,on='id',how='inner').select("id","title","release_year","duration","imdb_id","imdb_score","number_of_votes","genres","main_production","age_certification","cast_&_crew")
movies=movies.dropDuplicates(["id"])
movies=movies.orderBy(col("imdb_score").desc())
display(movies.limit(2))



id,title,release_year,duration,imdb_id,imdb_score,number_of_votes,genres,main_production,age_certification,cast_&_crew
tm853783,David Attenborough: A Life on Our Planet,2020,83,tt11989890,9.0,31180,['documentation'],GB,PG,"Map(ACTOR -> Max Hughes, David Attenborough, DIRECTOR -> Alastair Fothergill, Jonathan Hughes, Keith Scholey)"
tm92641,Inception,2010,148,tt1375666,8.8,2268288,"['scifi', 'music', 'thriller', 'action']",GB,PG-13,"Map(ACTOR -> Daniel Girondeaud, Shannon Welles, Lisa Reynolds, Andrew Pleavin, Felix Scott, Michael Gaston, Peter Basham, Nicole Pulliam, Alex Lombard, Jill Maddrell, Carl Gilliard, Natasha Beaumont, Jack Murray, Adam Cole, Mark Fleischmann, Helena Cullinan, Magnus Nolan, Tai-Li Lee, Marc Raducci, Jean-Michel Dagory, Nicolas Clerc, Virgile Bramly, Silvie Laguna, Coralie Dedykere, Tim Kelleher, Russ Fega, Miranda Nolan, Ryan Hayward, Earl Cameron, Yuji Okumoto, Johnathan Geare, Claire Geare, Taylor Geare, Tohoru Masamune, Talulah Riley, Lukas Haas, Michael Caine, Pete Postlethwaite, Marion Cotillard, Tom Berenger, Cillian Murphy, Dileep Rao, Elliot Page, Tom Hardy, Ken Watanabe, Joseph Gordon-Levitt, Leonardo DiCaprio, DIRECTOR -> Christopher Nolan)"


In [0]:
# getting highly apprecited movie genre trends across years

window_genre=Window.partitionBy("release_year","main_genre").orderBy("release_year")
accepted_genere=best_movies_df.withColumn("no_of_best_movies",count("main_genre").over(window_genre)).orderBy("release_year",col("no_of_best_movies").desc())
accepted_genere=accepted_genere.withColumn("rn",row_number().over(window_genre)).filter(col("rn")==1).select("release_year","main_genre")
accepted_genere=accepted_genere.groupBy("release_year").agg(collect_list("main_genre").alias("top_movie_genre_of_the_year")).orderBy("release_year")

year_wise_best_df=best_movie_by_year_df.join(accepted_genere,on="release_year",how="right").orderBy("release_year")


display(year_wise_best_df.limit(2))



release_year,title,score,main_genre,main_production,top_movie_genre_of_the_year
1954,White Christmas,7.5,romance,US,List(romance)
1961,The Guns of Navarone,7.5,war,US,List(war)


In [0]:
# getting highly apprecited show genre trends across years

window_genre_shows=Window.partitionBy("release_year","main_genre").orderBy("release_year")
accepted_genere_shows=best_shows_df.withColumn("no_of_best_shows",count("main_genre").over(window_genre_shows)).orderBy("release_year",col("no_of_best_shows").desc())
accepted_genere_shows=accepted_genere_shows.withColumn("rn",row_number().over(window_genre_shows)).filter(col("rn")==1).select("release_year","main_genre")
accepted_genere_shows=accepted_genere_shows.groupBy("release_year").agg(collect_list("main_genre").alias("top_show_genre_of_the_year")).orderBy("release_year")

year_wise_best_shows_df=best_show_by_year_df.join(accepted_genere_shows,on="release_year",how="right").orderBy("release_year")


display(year_wise_best_shows_df.limit(2))



release_year,title,score,number_of_seasons,main_genre,main_production,top_show_genre_of_the_year
1969,Monty Python's Flying Circus,8.8,4,comedy,GB,List(comedy)
1989,Seinfeld,8.9,9,comedy,US,List(comedy)
