###Set PySpark SQL and read the files

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SQL_MovieMaster").getOrCreate()

In [8]:
movie_master = spark.read.csv("Movie_Master.csv", header=True, inferSchema=True)
movie_master.show()

+-------+-------------+---------+-----------+-------+-----------+------------------------------+
|MovieID|Production_ID|MovieName|ReleaseYear|RunTime|Certificate|Displayed_on_Number_of_screens|
+-------+-------------+---------+-----------+-------+-----------+------------------------------+
|      1|        10050|       M1|       1972|    175|          A|                           160|
|      2|        10051|       M2|       1994|    142|        U/A|                           140|
|      3|        10052|       M3|       2009|    172|          U|                           310|
|      4|        10053|       M4|       2017|    160|          U|                           450|
|      5|        10054|       M5|       1993|    121|        U/A|                           220|
|      6|        10054|       M6|       2018|    160|          U|                           135|
|      7|        10050|       M7|       1988|    159|        U/A|                           142|
|      8|        10051|       

In [9]:
production_company = spark.read.csv("Production_Companies.csv", header=True, inferSchema=True)
production_company.show()

+-------------+--------------+------------------+---------------+
|Prodcution_ID|ProductionName|workforce_strength|no_of_investors|
+-------------+--------------+------------------+---------------+
|        10050|            P1|               128|              6|
|        10051|            P2|               223|              5|
|        10052|            P3|               110|              3|
|        10053|            P4|               161|              6|
|        10054|            P5|               344|              7|
+-------------+--------------+------------------+---------------+



In [10]:
movie_award = spark.read.csv("Movie_Award.csv", header=True, inferSchema=True)
movie_award.show()

+--------+--------------------+
|Movie_ID|               Award|
+--------+--------------------+
|       1|               Oscar|
|       1|        Golden Globe|
|       1|         Bafta Award|
|       2|               Oscar|
|       2|        Golden Globe|
|       2|         Bafta Award|
|       3|National Film Awards|
|       3|      Academy Awards|
|       3|            Filmfare|
|       4|National Film Awards|
|       4|      Academy Awards|
|       4|            Filmfare|
|       5|               Oscar|
|       5|        Golden Globe|
|       6|National Film Awards|
|       6|      Academy Awards|
|       6|            Filmfare|
|       7|        Golden Globe|
|       8|        Golden Globe|
|       9|National Film Awards|
+--------+--------------------+
only showing top 20 rows


In [11]:
rating_agency = spark.read.csv("Rating_Agency.csv", header=True, inferSchema=True)
rating_agency.show()

+---------+---------------+
|Rating_ID|  Rating_Agency|
+---------+---------------+
|     7777|           IMDB|
|     8888|Rotten Tomatoes|
|     9999|     Metacritic|
+---------+---------------+



In [12]:
movies_rating = spark.read.csv("Movies_Rating.csv", header=True, inferSchema=True)
movies_rating.show()

+---------+------+---------+
|Movies_ID|Rating|Rating_ID|
+---------+------+---------+
|        1|     6|     7777|
|        1|     9|     8888|
|        1|     9|     9999|
|        2|     7|     7777|
|        2|     7|     8888|
|        2|     7|     9999|
|        3|    10|     7777|
|        3|     8|     8888|
|        3|     8|     9999|
|        4|     6|     7777|
|        4|    10|     8888|
|        4|     7|     9999|
|        5|     6|     7777|
|        5|     6|     8888|
|        5|    10|     9999|
|        6|     8|     7777|
|        6|     6|     8888|
|        6|     8|     9999|
|        7|    10|     7777|
|        7|     9|     8888|
+---------+------+---------+
only showing top 20 rows


In [13]:
movie_finance = spark.read.csv("Movie_Finance.csv", header=True, inferSchema=True)
movie_finance.show()

+-------+------------+-------------+
|MovieID|MovieRevenue|MovieLanguage|
+-------+------------+-------------+
|      1|          67|      English|
|      1|          33|        Hindi|
|      2|          39|      English|
|      2|          32|        Hindi|
|      3|          45|      Chinese|
|      3|          34|       French|
|      3|          43|        Hindi|
|      3|          36|      Spanish|
|      4|          39|      English|
|      4|          42|        Hindi|
|      4|          34|      Chinese|
|      5|          44|        Hindi|
|      5|          42|      Chinese|
|      5|          30|       French|
|      5|          31|      Spanish|
|      6|          30|      English|
|      6|          42|        Hindi|
|      7|          30|      English|
|      7|          42|        Hindi|
|      7|          41|      Chinese|
+-------+------------+-------------+
only showing top 20 rows


In [14]:
from pyspark.sql.functions import count, sum, max, min, countDistinct, mean, avg, round, length, stddev,kurtosis, first, last, skewness, collect_list, variance, corr, length

### Section B1:
**Display the maximum revenue (in millions) generated from each language from each production company.**

In [15]:
result_B1 = movie_master \
    .join(production_company, movie_master["Production_ID"] == production_company["Prodcution_ID"], "inner") \
    .join(movie_finance, movie_master["MovieID"] == movie_finance["MovieID"], "inner") \
    .groupBy(production_company["ProductionName"], movie_finance["MovieLanguage"]) \
    .agg(max(movie_finance["MovieRevenue"]).alias("MaxRevenue")) \
    .orderBy("ProductionName", "MovieLanguage")

row_count = result_B1.count()
result_B1.show(row_count, truncate=False)

+--------------+-------------+----------+
|ProductionName|MovieLanguage|MaxRevenue|
+--------------+-------------+----------+
|P1            |Chinese      |41        |
|P1            |English      |67        |
|P1            |German       |43        |
|P1            |Hindi        |42        |
|P1            |Philipino    |30        |
|P1            |Spanish      |40        |
|P2            |Chinese      |36        |
|P2            |English      |39        |
|P2            |German       |33        |
|P2            |Hindi        |44        |
|P2            |Spanish      |35        |
|P3            |Chinese      |45        |
|P3            |French       |34        |
|P3            |German       |42        |
|P3            |Hindi        |43        |
|P3            |Spanish      |45        |
|P4            |Chinese      |34        |
|P4            |English      |39        |
|P4            |Hindi        |42        |
|P5            |Chinese      |42        |
|P5            |English      |30  



---


### Section B2:
**Display the name of the production company, name of the rating agency and average rating given by rating agency for movies of each production company.**

â€» I remove the redundant space in the original dataset Rating column of the Movies_Rating.csv to ensure data consistency, so I can use "Rating" instead of "Rating ".

In [16]:
result_B2 = movie_master \
    .join(production_company, movie_master["Production_ID"] == production_company["Prodcution_ID"], "inner") \
    .join(movies_rating, movie_master["MovieID"] == movies_rating["Movies_ID"], "inner") \
    .join(rating_agency, movies_rating["Rating_ID"] == rating_agency["Rating_ID"], "inner") \
    .groupBy(production_company["ProductionName"], rating_agency["Rating_Agency"]).mean("Rating") \
    .orderBy("ProductionName", "Rating_Agency")

result_B2.show(truncate=False)

+--------------+---------------+-----------------+
|ProductionName|Rating_Agency  |avg(Rating)      |
+--------------+---------------+-----------------+
|P1            |IMDB           |8.333333333333334|
|P1            |Metacritic     |8.333333333333334|
|P1            |Rotten Tomatoes|8.0              |
|P2            |IMDB           |7.5              |
|P2            |Metacritic     |6.5              |
|P2            |Rotten Tomatoes|8.0              |
|P3            |IMDB           |9.5              |
|P3            |Metacritic     |9.0              |
|P3            |Rotten Tomatoes|7.0              |
|P4            |IMDB           |6.0              |
|P4            |Metacritic     |7.0              |
|P4            |Rotten Tomatoes|10.0             |
|P5            |IMDB           |7.0              |
|P5            |Metacritic     |9.0              |
|P5            |Rotten Tomatoes|6.0              |
+--------------+---------------+-----------------+

