In [1]:
from pyspark.sql import SparkSession

In [2]:
spark=SparkSession.builder.appName("TMDB Project").getOrCreate()

24/05/06 11:20:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [3]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,DateType,ArrayType

In [4]:
TMDB_Schema=StructType([StructField("budget",IntegerType(),False),
                        StructField('genres', StringType(), nullable=False),
                        StructField('homepage', StringType(), nullable=True),
                        StructField('id', IntegerType(), nullable=False),
                        StructField('keywords', StringType(), nullable=False),
                        StructField('original_language', StringType(), nullable=False),
                        StructField('original_title', StringType(), nullable=False),
                        StructField('overview', StringType(), nullable=True),
                        StructField('popularity', FloatType(), nullable=False),
                        StructField('production_companies', StringType(), nullable=False),
                        StructField('production_countries', StringType(),  nullable=True), 
                        StructField('release_date', DateType(), nullable=True),
                        StructField('revenue', IntegerType(), nullable=False),
                        StructField('runtime', StringType(),  nullable=True),
                        StructField('spoken_languages', StringType(), nullable=False),
                        StructField('status', StringType(), nullable=False),
                        StructField('tagline', StringType(), nullable=True),
                        StructField('title', StringType(), nullable=False),
                        StructField('vote_average', FloatType(), nullable=False),
                        StructField('vote_count', IntegerType(), nullable=False)])

In [5]:
#Reading the data from the HDFS an
df = spark.read.csv("hdfs://localhost:9000/filmData.csv", schema=TMDB_Schema ,header=True,quote='"',escape='"', multiLine=True)

In [6]:
#showing the first five rows to ensure the data have been read correctly 
df.show(5)

                                                                                

+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+
|   budget|              genres|            homepage|    id|            keywords|original_language|      original_title|            overview|popularity|production_companies|production_countries|release_date|   revenue|runtime|    spoken_languages|  status|             tagline|               title|vote_average|vote_count|
+---------+--------------------+--------------------+------+--------------------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+------------+----------+-------+--------------------+--------+--------------------+--------------------+------------+----------+
|237000000|[{"id": 28, "name...

## 1. Finding The most popular film in each original language

#### 1.1 Using SQL expression

In [7]:
df.createOrReplaceTempView('Popular_films')

PopularFilms_df=spark.sql('''WITH CTE AS (
    SELECT id, title, original_language,popularity, RANK() OVER(PARTITION BY original_language ORDER BY popularity desc) AS popularityInEachCategory
    FROM Popular_films
)
SELECT original_language , id as Film_ID, title, popularity
FROM CTE 
WHERE popularityInEachCategory = 1;''')

In [8]:
PopularFilms_df.show(truncate=False)

[Stage 3:>                                                          (0 + 1) / 1]

+-----------------+-------+------------------------------+----------+
|original_language|Film_ID|title                         |popularity|
+-----------------+-------+------------------------------+----------+
|af               |868    |Tsotsi                        |2.504169  |
|ar               |159037 |The Square                    |4.892203  |
|cn               |365222 |Ip Man 3                      |19.167377 |
|cs               |12555  |I Served the King of England  |2.387463  |
|da               |9029   |What Happens in Vegas         |38.100487 |
|de               |582    |The Lives of Others           |34.938175 |
|el               |38810  |Dogtooth                      |28.858238 |
|en               |211672 |Minions                       |875.5813  |
|es               |1417   |Pan's Labyrinth               |90.80941  |
|fa               |60243  |A Separation                  |12.049373 |
|fr               |194    |Amélie                        |73.720245 |
|he               |8

                                                                                

#### 1.2 Using dataFrame Transformasions

In [9]:
from pyspark.sql.functions import max,first

# Group by original_language and find the maximum popularity in each group
max_popularity_df = df.groupBy("original_language").agg(max("popularity").alias("max_popularity"),first("id").alias("Film_id"),first("title").alias("Title"))


In [10]:
max_popularity_df.show(truncate=False)

[Stage 6:>                                                          (0 + 1) / 1]

+-----------------+--------------+-------+----------------------------+
|original_language|max_popularity|Film_id|Title                       |
+-----------------+--------------+-------+----------------------------+
|af               |2.504169      |868    |Tsotsi                      |
|ar               |4.892203      |159037 |The Square                  |
|cn               |19.167377     |365222 |Ip Man 3                    |
|cs               |2.387463      |357837 |Dancin' It's On             |
|da               |38.100487     |1951   |Manderlay                   |
|de               |34.938175     |53953  |The Tooth Fairy             |
|el               |28.858238     |38810  |Dogtooth                    |
|en               |875.5813      |19995  |Avatar                      |
|es               |90.80941      |293644 |Top Cat Begins              |
|fa               |12.049373     |60421  |Circumstance                |
|fr               |73.720245     |2395   |Asterix at the Olympic

                                                                                

#### Save the result of the most Popular films

In [11]:
# Save PySpark DataFrame to a CSV file locally
PopularFilms_df.coalesce(1).write.csv('file:///usr/local/spark/popular_film_per_lan.csv', header=True, mode='overwrite')

                                                                                

## 2. Create a pre-aggregated table for the genres using PySpark.

In [12]:
#show the json colomn to ensure the column has been defined correctly
df.select('genres').show(3,truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------+
|genres                                                                                                                                |
+--------------------------------------------------------------------------------------------------------------------------------------+
|[{"id": 28, "name": "Action"}, {"id": 12, "name": "Adventure"}, {"id": 14, "name": "Fantasy"}, {"id": 878, "name": "Science Fiction"}]|
|[{"id": 12, "name": "Adventure"}, {"id": 14, "name": "Fantasy"}, {"id": 28, "name": "Action"}]                                        |
|[{"id": 28, "name": "Action"}, {"id": 12, "name": "Adventure"}, {"id": 80, "name": "Crime"}]                                          |
+--------------------------------------------------------------------------------------------------------------------------------------+
only showing top 3 rows



In [13]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import ArrayType

# Define the schema for the JSON strings
schema = ArrayType(StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True)
]))

In [14]:
# Apply the transformation ,that make a column with a  Array struct type
df = df.withColumn("genres_array", from_json(col("genres"), schema))

In [15]:
df.select("genres","genres_array").show(1,truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------+
|genres                                                                                                                                |genres_array                                                          |
+--------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------+
|[{"id": 28, "name": "Action"}, {"id": 12, "name": "Adventure"}, {"id": 14, "name": "Fantasy"}, {"id": 878, "name": "Science Fiction"}]|[{28, Action}, {12, Adventure}, {14, Fantasy}, {878, Science Fiction}]|
+--------------------------------------------------------------------------------------------------------------------------------------+--------------------------------

                                                                                

In [16]:
from pyspark.sql.functions import explode

# Explode the array to create a new row for each genre
df_expanded = df.select(explode("genres_array").alias("genre"))
df_expanded.show(5,truncate=False)

+----------------------+
|genre                 |
+----------------------+
|{28, Action}          |
|{12, Adventure}       |
|{14, Fantasy}         |
|{878, Science Fiction}|
|{12, Adventure}       |
+----------------------+
only showing top 5 rows



In [17]:
#seprate the elements of genres_array(genre) column into seperated columns
df_expanded =df_expanded.withColumn("genre_id",df_expanded['genre.id']).withColumn("genre_name",df_expanded['genre.name'])

In [18]:
genre_counts =df_expanded.select("genre_id","genre_name").groupBy("genre_id","genre_name").count().withColumnRenamed("count","No_of_Films_contain_This_Type")
genre_counts.show()

[Stage 13:>                                                         (0 + 1) / 1]

+--------+---------------+-----------------------------+
|genre_id|     genre_name|No_of_Films_contain_This_Type|
+--------+---------------+-----------------------------+
|     878|Science Fiction|                          535|
|      28|         Action|                         1154|
|      35|         Comedy|                         1722|
|    9648|        Mystery|                          348|
|   10769|        Foreign|                           34|
|      36|        History|                          197|
|      27|         Horror|                          519|
|   10751|         Family|                          513|
|      16|      Animation|                          234|
|      18|          Drama|                         2297|
|   10749|        Romance|                          894|
|      14|        Fantasy|                          424|
|   10770|       TV Movie|                            8|
|      37|        Western|                           82|
|   10752|            War|     

                                                                                

### Save the results of genres aggregations

In [19]:
# Write the result to a new CSV file on HDFS.
genre_counts.write.csv("hdfs://localhost:9000//Genres_Aggregations.csv", header=True)

                                                                                

# **<center>The End</center>**