In [223]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, ArrayType, LongType, DateType

# Define the schema
schema = StructType([
    StructField("rowno", IntegerType(), True),
    StructField("id", IntegerType(), True),
    StructField("original_language", StringType(), True),
    StructField("original_title", StringType(), True),
    StructField("popularity", FloatType(), True),
    StructField("release_date", DateType(), True),
    StructField("vote_average", FloatType(), True),
    StructField("vote_count", IntegerType(), True),
    StructField("genre", StringType(), True),
    StructField("overview", StringType(), True),
    StructField("revenue", LongType(), True),
    StructField("runtime", IntegerType(), True),
    StructField("tagline", StringType(), True)
])


spark = SparkSession.builder.appName('Movie Analysis').getOrCreate()

In [224]:
movies_df = spark.read.csv('Top_10000_Movies.csv', header = True, schema=schema )

In [225]:
movies_df.head(5)

[Row(rowno=66, id=19995, original_language='en', original_title='Avatar', popularity=374.0589904785156, release_date=None, vote_average=7.5, vote_count=24280, genre="['Action', 'Adventure', 'Fantasy', 'Science Fiction']", overview='In the 22nd century, a paraplegic Marine is dispatched to the moon Pandora on a unique mission, but becomes torn between following orders and protecting an alien civilization.', revenue=2847246203, runtime=162, tagline='Enter the World of Pandora.'),
 Row(rowno=167, id=299534, original_language='en', original_title='Avengers: Endgame', popularity=193.86000061035156, release_date=None, vote_average=8.300000190734863, vote_count=19434, genre="['Adventure', 'Science Fiction', 'Action']", overview="After the devastating events of Avengers: Infinity War, the universe is in ruins due to the efforts of the Mad Titan, Thanos. With the help of remaining allies, the Avengers must assemble once more in order to undo Thanos' actions and restore order to the universe onc

In [226]:
movies_df.printSchema()

root
 |-- rowno: integer (nullable = true)
 |-- id: integer (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- release_date: date (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: integer (nullable = true)
 |-- genre: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- revenue: long (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- tagline: string (nullable = true)



In [227]:
# Count rows and inspect the first few rows
print(f"Total Rows: {movies_df.count()}")
movies_df.show(5)


Total Rows: 10036
+-----+------+-----------------+--------------------+----------+------------+------------+----------+--------------------+--------------------+----------+-------+--------------------+
|rowno|    id|original_language|      original_title|popularity|release_date|vote_average|vote_count|               genre|            overview|   revenue|runtime|             tagline|
+-----+------+-----------------+--------------------+----------+------------+------------+----------+--------------------+--------------------+----------+-------+--------------------+
|   66| 19995|               en|              Avatar|   374.059|        NULL|         7.5|     24280|['Action', 'Adven...|In the 22nd centu...|2847246203|    162|Enter the World o...|
|  167|299534|               en|   Avengers: Endgame|    193.86|        NULL|         8.3|     19434|['Adventure', 'Sc...|After the devasta...|2797800564|    181|Part of the journ...|
|  489|   597|               en|             Titanic|    94.23

In [228]:
movies_df.createOrReplaceTempView("Movies")

In [229]:
#List the top ten highest-grossing films together with their titles and earnings.
topMovies = spark.sql("""SELECT original_title, revenue
FROM movies
ORDER BY revenue DESC
LIMIT 10
""")
topMovies.show()

+--------------------+----------+
|      original_title|   revenue|
+--------------------+----------+
|              Avatar|2847246203|
|   Avengers: Endgame|2797800564|
|             Titanic|2187463944|
|Star Wars: The Fo...|2068223624|
|Avengers: Infinit...|2046239637|
|      Jurassic World|1671713208|
|       The Lion King|1667635327|
|        The Avengers|1518815515|
|           Furious 7|1515047671|
|           Frozen II|1450026933|
+--------------------+----------+



In [230]:
# #Display movies in descending order by popularity score.
popular_movies = spark.sql("""
    SELECT original_title, popularity
    FROM movies
    ORDER BY popularity DESC
""")
popular_movies.show()


+------------------------------------+----------+
|                      original_title|popularity|
+------------------------------------+----------+
|                Venom: Let There ...|  5401.308|
|                            Eternals|  3365.535|
|                                Dune|  2911.423|
|                     Army of Thieves|  2552.437|
|                            Free Guy|   1850.47|
|                 Gunpowder Milkshake|  1453.423|
|                Shang-Chi and the...|   1327.18|
|                               Venom|  1212.352|
|                     American Badger|  1148.822|
|劇場版 七つの大罪 光に呪われし者たち|  1108.815|
|                W lesie dziś nie ...|  1089.852|
|                Snake Eyes: G.I. ...|  1052.222|
|                           The Vault|   1046.86|
|                 The Addams Family 2|  1031.821|
|                     Halloween Kills|  1023.024|
|                    W jak morderstwo|   957.866|
|                Peçanha Contra o ...|   943.619|
|        劇場版「鬼滅の刃

In [231]:
# Calculate the total number of movies produced in each language.
movies_per_language = spark.sql("""
    SELECT original_language, COUNT(*) AS total_movies
    FROM movies
    GROUP BY original_language
    ORDER BY total_movies DESC
""")
movies_per_language.show()


+-----------------+------------+
|original_language|total_movies|
+-----------------+------------+
|               en|        7798|
|               ja|         609|
|               es|         417|
|               fr|         293|
|               ko|         140|
|               it|         104|
|               zh|         103|
|               cn|          79|
|               de|          78|
|               ru|          69|
|               pt|          55|
|               hi|          30|
|               da|          30|
|               no|          24|
|             NULL|          23|
|               sv|          22|
|               pl|          19|
|               nl|          19|
|               th|          15|
|               id|          15|
+-----------------+------------+
only showing top 20 rows



In [232]:
# Fetch movies’ names with the longest runtime 
longest_runtime_movies = spark.sql("""
    SELECT original_title, runtime
    FROM movies
    WHERE runtime IS NOT NULL
    ORDER BY runtime DESC
""")
longest_runtime_movies.show()


+--------------------+----------+
|      original_title|   runtime|
+--------------------+----------+
|         Toy Story 4|1073394593|
|The Hunger Games:...| 847423452|
|           Inception| 825532764|
|  Gone with the Wind| 402352579|
|Spider-Man: Into ...| 375540831|
|        A Bug's Life| 363258859|
|             Top Gun| 356830601|
|           Enchanted| 340487652|
|Inglourious Basterds| 321455689|
|        Ghostbusters| 296187079|
|         The Tourist| 278731369|
|     Dumb and Dumber| 247275374|
|              8 Mile| 242875078|
|Gone in Sixty Sec...| 237202299|
|            The Help| 216639112|
|             Everest| 203427584|
|Snow White and th...| 184925486|
| In the Line of Fire| 176997168|
|       Evan Almighty| 174440724|
|          The Island| 162949164|
+--------------------+----------+
only showing top 20 rows



In [233]:
# Determine the Vote Count Per Language
vote_count_per_language = spark.sql("""
    SELECT original_language, SUM(vote_count) AS total_votes
    FROM movies
    GROUP BY original_language
    ORDER BY total_votes DESC
""")
vote_count_per_language.show()


+-----------------+-----------+
|original_language|total_votes|
+-----------------+-----------+
|               en|   12308900|
|               ja|     192866|
|               fr|     185384|
|               es|     106373|
|               it|      79523|
|               ko|      69952|
|               de|      40024|
|               cn|      28625|
|               zh|      23190|
|               ru|      16890|
|               sv|      16034|
|               pt|      13600|
|               da|      12564|
|               hi|      10409|
|               pl|       9085|
|               no|       6753|
|               id|       5570|
|               tr|       4954|
|               th|       4945|
|               nl|       3226|
+-----------------+-----------+
only showing top 20 rows



In [234]:
# Determine the Vote Average Per Language
vote_average_per_language = spark.sql("""
    SELECT original_language, AVG(vote_average) AS average_votes
    FROM movies
    GROUP BY original_language
    ORDER BY average_votes DESC
""")
vote_average_per_language.show()


+-----------------+------------------+
|original_language|     average_votes|
+-----------------+------------------+
|               ar| 8.300000190734863|
|               ms| 7.699999809265137|
|               tr|7.2200000286102295|
|               la| 7.099999904632568|
|               ta| 6.949999809265137|
|               da| 6.870000012715658|
|               ja| 6.829228249285217|
|               ko| 6.735714306150164|
|               nb| 6.699999809265137|
|               te|6.6333333651224775|
|               hi| 6.626666673024496|
|               cn| 6.602531650398351|
|               de| 6.515384625165891|
|               zh|6.4951456333827045|
|               eu| 6.450000047683716|
|               es| 6.410311754944799|
|               ur| 6.400000095367432|
|               is|6.3999998569488525|
|               ru| 6.373913066974585|
|               it| 6.345192336119139|
+-----------------+------------------+
only showing top 20 rows



In [238]:
# Determine the TOP 10 Original Titles with Largest Tagline 
largest_tagline = spark.sql("""
    SELECT original_title, tagline, LENGTH(tagline) AS tagline_length
    FROM movies
    WHERE tagline IS NOT NULL
    ORDER BY tagline_length DESC
    LIMIT 10
""")
largest_tagline.show()


+--------------------+--------------------+--------------+
|      original_title|             tagline|tagline_length|
+--------------------+--------------------+--------------+
|        Dracula 3000| they see a tape ...|           254|
| Singing in Oblivion| though passersby...|           247|
|          Stay Alive| the kids begin t...|           242|
|        Woman on Top| a sultry enchant...|           219|
|I Walked with a Z...|See this strange;...|           217|
|          Moonwalker| and then a movie...|           213|
|        Modern Times|He stands alone a...|           206|
|       Monkey Shines|Once there was a ...|           204|
|        Galaxy Quest| aliens under att...|           182|
|      The Messengers|There is evidence...|           182|
+--------------------+--------------------+--------------+



In [239]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

In [240]:
#Determine the Highest Revenue Film in Each Year
highest_revenue_per_year = spark.sql("""
    SELECT m.release_year, m.original_title, m.revenue
    FROM (
        SELECT release_year, MAX(revenue) AS max_revenue
        FROM movies_with_year
        WHERE revenue IS NOT NULL
        GROUP BY release_year
    ) AS max_revenue_per_year
    JOIN movies_with_year m
    ON m.release_year = max_revenue_per_year.release_year AND m.revenue = max_revenue_per_year.max_revenue
""")
highest_revenue_per_year.show()

+------------+--------------------+----------+
|release_year|      original_title|   revenue|
+------------+--------------------+----------+
|        2009|              Avatar|2847246203|
|        2019|   Avengers: Endgame|2797800564|
|        1997|             Titanic|2187463944|
|        2015|Star Wars: The Fo...|2068223624|
|        2018|Avengers: Infinit...|2046239637|
|        2012|        The Avengers|1518815515|
|        2011|Harry Potter and ...|1341511219|
|        2017|Star Wars: The La...|1332539889|
|        2013|              Frozen|1274219009|
|        2016|Captain America: ...|1153296293|
|        2003|The Lord of the R...|1118888979|
|        2010|         Toy Story 3|1066969703|
|        2006|Pirates of the Ca...|1065659812|
|        2008|     The Dark Knight|1004558444|
|        2001|Harry Potter and ...| 976475550|
|        2007|Pirates of the Ca...| 961000000|
|        2014|The Hobbit: The B...| 956019788|
|        2004|             Shrek 2| 928760770|
|        2002

In [243]:
from pyspark.sql.functions import year, to_date

# Extract the release year and add it as a new column
df = df.withColumn("release_year", year(to_date(df["release_date"], "MM/dd/yyyy")))

# Register the DataFrame as a temporary SQL view
df.createOrReplaceTempView("movies_with_year")

# Replace `2021` with the desired year
specific_year = 2021
movies_in_year = spark.sql(f"""
    SELECT original_title, release_date, genre, revenue, runtime, vote_average
    FROM movies_with_year
    WHERE release_year = {specific_year}
""")

# Show the result
movies_in_year.show()


+--------------------+------------+--------------------+---------+-------+------------+
|      original_title|release_date|               genre|  revenue|runtime|vote_average|
+--------------------+------------+--------------------+---------+-------+------------+
|                  F9|   5/19/2021|['Action', 'Crime...|721077945|    143|         7.4|
|      No Time to Die|   9/29/2021|['Adventure', 'Ac...|667000000|    163|         7.4|
|Zack Snyder's Jus...|   3/18/2021|['Action', 'Adven...|657000000|    242|         8.4|
|   Godzilla vs. Kong|   3/24/2021|['Action', 'Adven...|467863133|    113|         7.9|
|Shang-Chi and the...|    9/1/2021|['Action', 'Adven...|427480601|    132|         7.7|
|Venom: Let There ...|   9/30/2021|['Science Fiction...|424000000|     97|         6.8|
|         Black Widow|    7/7/2021|['Action', 'Adven...|378328978|    134|         7.7|
|                Dune|   9/15/2021|['Action', 'Adven...|331116356|    155|         8.0|
|            Free Guy|   8/11/20

In [244]:
#Results Summary generated files.
topMovies.coalesce(1).write.csv("topmovies.csv", header=True, mode='overwrite')
popular_movies.coalesce(1).write.csv("popularmovies.csv", header=True, mode='overwrite')
movies_per_language.coalesce(1).write.csv("movielang.csv", header=True, mode='overwrite')
longest_runtime_movies.coalesce(1).write.csv("longrun.csv", header=True, mode='overwrite')
vote_count_per_language.coalesce(1).write.csv("votecount.csv", header=True, mode='overwrite')
vote_average_per_language.coalesce(1).write.csv("voteaverage.csv", header=True, mode='overwrite')
largest_tagline.coalesce(1).write.csv("largesttag.csv", header=True, mode='overwrite')
highest_revenue_per_year.coalesce(1).write.csv("highrevenue.csv", header=True, mode='overwrite')
movies_in_year.coalesce(1).write.csv("moviesinayear.csv", header=True, mode='overwrite')
