# **DATA LOADING**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!mkdir -p /content/dataset
!unzip -q /content/drive/MyDrive/Big\ Data/Spark/dataset.zip -d /content/dataset

# **INSTALLAZIONE PY-SPARK**

In [3]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=fa2bef717a8affb2bb63e7f8e282307abce7c3e60840eef526f6d72cc918b78b
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [4]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext

In [5]:
# Verify the Spark version running on the virtual cluster
sc = SparkContext.getOrCreate()
assert  "3." in sc.version, "Verify that the cluster Spark's version is 3.x"
print("Spark version:", sc.version)

Spark version: 3.5.1


In [6]:
# Creiamo sessione Spark
spark = SparkSession(sc)
print(spark)

<pyspark.sql.session.SparkSession object at 0x79ab60633340>


# Dataset

In [7]:
import os

from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *

import requests
from bs4 import BeautifulSoup

In [8]:
# PATH
genres_path = '/content/dataset/genres'
imdb_path = '/content/dataset/imdb/movies.csv'
imdb_path_output = '/content/dataset/output/movies.csv'

action_path = genres_path + "/Action.csv"
adventure_path = genres_path + "/Adventure.csv"
animation_path = genres_path + "/Animation.csv"
biography_path = genres_path + "/Biography.csv"
comedy_path = genres_path + "/Comedy.csv"
drama_path = genres_path + "/Drama.csv"
fantasy_path = genres_path + "/Fantasy.csv"
history_path = genres_path + "/History.csv"
horror_path = genres_path + "/Horror.csv"
music_path = genres_path + "/Music.csv"
mystery_path = genres_path + "/Mystery.csv"
romance_path = genres_path + "/Romance.csv"
sci_fi_path = genres_path + "/Sci-Fi.csv"
sport_path = genres_path + "/Sport.csv"
thriller_path = genres_path + "/Thriller.csv"
war_path = genres_path + "/War.csv"

## IMDB DATASET

In [9]:
!mkdir -p /content/dataset/output

In [10]:
# Apri il file CSV in modalità lettura
with open(imdb_path, 'r') as file:
    # Leggi tutte le righe del file
    lines = file.readlines()

# Uniformiamo la formattazione delle tabelle
lines = [line.replace('""', "'") for line in lines]
lines = [line.replace("['", "") for line in lines]
lines = [line.replace("']", "") for line in lines]
lines = [line.replace("',", ",") for line in lines]
lines = [line.replace(", ", ",") for line in lines]
lines = [line.replace(",'", ",") for line in lines]

# Apri il file CSV in modalità scrittura e scrivi le righe modificate
with open(imdb_path_output, 'w') as file:
    file.writelines(lines)

In [11]:
# IMDB DATASET
df_imdb_movies = spark.read \
                    .option("inferSchema", "true") \
                    .option("header", "true") \
                    .csv(imdb_path_output)

df_imdb_movies.printSchema()

root
 |-- title: string (nullable = true)
 |-- year: string (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- certificate: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- director: string (nullable = true)
 |-- stars: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- metascore: double (nullable = true)
 |-- votes: integer (nullable = true)
 |-- gross: double (nullable = true)



In [12]:
# Lista delle colonne da convertire e il relativo tipo di dato
columns_to_convert = {
    "year": "integer",
    "rating": "float",
    "metascore": "float",
    "votes": "integer",
    "gross": "float"
}

# Converti le colonne specificate nel tipo di dato desiderato
for col_name, col_type in columns_to_convert.items():
    df_imdb_movies = df_imdb_movies.withColumn(col_name, col(col_name).cast(col_type))

# Specifica la colonna da convertire e il relativo tipo di dato
columns_to_convert = ["genre","director","stars"]

# Converti la colonna specificata nel tipo di dato desiderato
for column in columns_to_convert:
    df_imdb_movies = df_imdb_movies.withColumn(column, split(col(column), ","))

# Elimina film ripetuti
df_imdb_movies = df_imdb_movies.distinct()

# Verifica lo schema aggiornato
df_imdb_movies.printSchema()

root
 |-- title: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- runtime: integer (nullable = true)
 |-- certificate: string (nullable = true)
 |-- genre: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- director: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- stars: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- rating: float (nullable = true)
 |-- metascore: float (nullable = true)
 |-- votes: integer (nullable = true)
 |-- gross: float (nullable = true)



In [13]:
# Eliminiamo tutti i film con anno NULL (per le successive operazioni)
df_imdb_movies = df_imdb_movies.filter(df_imdb_movies["year"].isNotNull())

## GENRE DATASET

In [14]:
# GENRE DATASET

# Definisci una lista contenente tutti i percorsi dei file CSV
file_paths = [
    action_path, adventure_path, animation_path, biography_path,
    comedy_path, drama_path, fantasy_path, history_path,
    horror_path, music_path, mystery_path, romance_path,
    sci_fi_path, sport_path, thriller_path, war_path
]

# Inizializza un DataFrame vuoto
df_genre_movies = None

# Carica ciascun file CSV e uniscilo al DataFrame combinato
for path in file_paths:
    df = spark.read.option("inferSchema", "true").option("header", "true").csv(path)
    if df_genre_movies is None:
        df_genre_movies = df
    else:
        df_genre_movies = df_genre_movies.union(df)

# Verifica lo schema del DataFrame combinato
df_genre_movies.printSchema()

root
 |-- name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- movie_rated: string (nullable = true)
 |-- run_length: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- num_raters: integer (nullable = true)
 |-- num_reviews: integer (nullable = true)
 |-- review_url: string (nullable = true)



In [15]:
# Converti durata in minuti per leggerlo come int
hours = regexp_extract(col("run_length"), r"(\d+)h", 1).cast("int")  # Estrae le ore
minutes = regexp_extract(col("run_length"), r"(\d+)min", 1).cast("int")  # Estrae i minuti
df_genre_movies = df_genre_movies.withColumn("run_length", (hours * 60) + minutes)

# Specifica la colonna da convertire e il relativo tipo di dato
column_to_convert = "genres"

# Converti la colonna specificata nel tipo di dato desiderato
df_genre_movies = df_genre_movies.withColumn(column_to_convert, regexp_replace(col(column_to_convert),r" ",""))
df_genre_movies = df_genre_movies.withColumn(column_to_convert, split(col(column_to_convert), ";"))
df_genre_movies = df_genre_movies.withColumn(column_to_convert, array_remove(col(column_to_convert), ""))

# Elimina film ripetuti
df_genre_movies = df_genre_movies.distinct()

# Verifica lo schema aggiornato
df_genre_movies.printSchema()

root
 |-- name: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- movie_rated: string (nullable = true)
 |-- run_length: integer (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- release_date: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- num_raters: integer (nullable = true)
 |-- num_reviews: integer (nullable = true)
 |-- review_url: string (nullable = true)



# OPERATIONS

## Troviamo tutti film comuni tra dataset

In [16]:
print(df_imdb_movies.count())
print(df_genre_movies.count())

9184
1338


In [17]:
df_common_movies = df_imdb_movies.join(df_genre_movies, (df_imdb_movies["title"] == df_genre_movies["name"])
                                         & (df_imdb_movies["year"] == df_genre_movies["year"]))\
                                        .select(df_imdb_movies["title"],df_imdb_movies['year'])\
                                        .distinct()

df_common_movies.show()
print(df_common_movies.count())

+--------------------+----+
|               title|year|
+--------------------+----+
|    The Caine Mutiny|1954|
|         Crazy Heart|2009|
|   Dog Day Afternoon|1975|
|The Secret Life o...|2013|
|       The Godfather|1972|
|          Shark Tale|2004|
|The Greatest Showman|2017|
|     Velvet Goldmine|1998|
|  American History X|1998|
|    Independence Day|1996|
|The Passion of Jo...|1928|
|         The Pianist|2002|
|  The Color of Money|1986|
|For Love of the Game|1999|
|          Cinderella|1950|
|Star Wars: Episod...|2017|
|            Insomnia|2002|
|The Devil Wears P...|2006|
|      Shall We Dance|2004|
|       The Evil Dead|1981|
+--------------------+----+
only showing top 20 rows

973


## Troviamo tutti film tra i dataset

In [18]:
df_all_movies = df_imdb_movies.select('title','year').union(df_genre_movies.select('name','year')).distinct()

df_all_movies.show()
print(df_all_movies.count())

+--------------------+----+
|               title|year|
+--------------------+----+
|         Money Train|1995|
|    The Caine Mutiny|1954|
|      In the Bedroom|2001|
|Everything Everyw...|2022|
|The Midnight Meat...|2008|
|         Çiçek Abbas|1982|
|  And Then We Danced|2019|
|       Smoke Signals|1998|
|     The Hate U Give|2018|
|Fear City: A Fami...|1994|
|             Beckett|2021|
|          Stake Land|2010|
|Balls Out: Gary t...|2009|
|         Crazy Heart|2009|
|A Home at the End...|2004|
|     Valhalla Rising|2009|
|   Riders of Justice|2020|
|The Siege of Jado...|2016|
|        Belle Epoque|1992|
|             Sleeper|1973|
+--------------------+----+
only showing top 20 rows

9337


## Film più votati positivamente

In [19]:
rating_imdb = df_imdb_movies.select("title","year","rating")
rating_imdb = rating_imdb.withColumnRenamed("rating", "rating_imdb")
df_all_movies_voted = df_all_movies.join(rating_imdb, (df_all_movies["title"]==rating_imdb["title"])
                                   & when(df_all_movies["year"].isNotNull(), (df_all_movies["year"]==rating_imdb["year"])), "left_outer")\
                                   .select(df_all_movies["title"],df_all_movies["year"],rating_imdb["rating_imdb"]).distinct()

rating_genre = df_genre_movies.select("name","year","rating")
rating_genre = rating_genre.withColumnRenamed("rating", "rating_genre")
df_all_movies_voted = df_all_movies_voted.join(rating_genre, (df_all_movies_voted["title"]==rating_genre["name"])
                                         &(df_all_movies_voted["year"]==rating_genre["year"]), "left_outer")\
                                         .select(df_all_movies["title"],df_all_movies["year"],\
                                                 rating_imdb["rating_imdb"],rating_genre["rating_genre"]).distinct()

df_all_movies_voted = df_all_movies_voted.withColumn("rating", when(col("rating_imdb").isNotNull()
                                                             & col("rating_genre").isNotNull(),
                                                             round((col("rating_imdb")+col("rating_genre"))/lit(2),1))\
                                                            .otherwise(round(coalesce(col("rating_imdb"), col("rating_genre")), 1)))

df_all_movies_voted = df_all_movies_voted.select("title","year","rating")
top_10 = df_all_movies_voted.orderBy(desc("rating")).limit(10)

top_10.show()

+--------------------+----+------+
|               title|year|rating|
+--------------------+----+------+
|The Shawshank Red...|1994|   9.3|
|     The Chaos Class|1975|   9.2|
|       The Godfather|1972|   9.2|
|Ramayana: The Leg...|1993|   9.2|
|               Daman|2022|   9.1|
|        12 Angry Men|1957|   9.0|
|The Godfather: Pa...|1974|   9.0|
|The Godfather Par...|1974|   9.0|
|         Mirror Game|2016|   9.0|
|    Schindler's List|1993|   9.0|
+--------------------+----+------+



## Film Peggiori

In [20]:
flop_10 = df_all_movies_voted.orderBy("rating").limit(10)
flop_10.show()

+--------------------+----+------+
|               title|year|rating|
+--------------------+----+------+
|         Spice World|1997|   3.5|
|       The Love Guru|2008|   3.8|
|Fifty Shades of Grey|2015|   4.1|
|In the Land of Bl...|2011|   4.5|
|              Driven|2001|   4.6|
|The Twilight Saga...|2009|   4.7|
|      The Big Bounce|2004|   4.9|
|           Boat Trip|2002|   4.9|
|Temptation: Confe...|2013|   4.9|
|Did You Hear Abou...|2009|   4.9|
+--------------------+----+------+



## Film votato da più utenti

In [21]:
df_voters = df_all_movies.select("*").withColumn("voters",lit(0))

df_voters_imdb = df_imdb_movies.select("title","year","votes")
df_voters_imdb = df_voters_imdb.withColumnRenamed("votes", "voters_imdb")
df_voters = df_voters.join(df_voters_imdb, (df_voters["title"]==df_voters_imdb["title"])
                     &(df_voters["year"]==df_voters_imdb["year"]), "left_outer")\
                     .select(df_voters["title"],df_voters["year"],df_voters_imdb["voters_imdb"]).distinct()

df_voters_genre = df_genre_movies.select("name","year","num_raters")
df_voters_genre = df_voters_genre.groupBy("name","year").avg("num_raters")
df_voters_genre = df_voters_genre.withColumn("voters_genre", col("avg(num_raters)").cast("int"))
df_voters_genre = df_voters_genre.select("name","year","voters_genre")
df_voters = df_voters.join(df_voters_genre, (df_voters["title"]==df_voters_genre["name"])
                     &(df_voters["year"]==df_voters_genre["year"]), "left_outer")\
                     .select(df_voters["title"],df_voters["year"],df_voters_imdb["voters_imdb"],df_voters_genre["voters_genre"])\
                     .distinct()

df_voters = df_voters.withColumn("voters", greatest(col("voters_imdb"),col("voters_genre")))\
                     .select("title","year","voters")

most_voted = df_voters.orderBy(desc("voters")).limit(1)

most_voted.show()

+--------------------+----+-------+
|               title|year| voters|
+--------------------+----+-------+
|The Shawshank Red...|1994|2780534|
+--------------------+----+-------+



Un'alternativa di esecuzione della query precedente, che non prevede l'ordinamento del dataframe finale ma l'individuazione del valore massimo della colonna "voters", è la seguente:

In [22]:
df_voters = df_all_movies.select("*").withColumn("voters",lit(0))

df_voters_imdb = df_imdb_movies.select("title","year","votes")
df_voters_imdb = df_voters_imdb.withColumnRenamed("votes", "voters_imdb")
df_voters = df_voters.join(df_voters_imdb, (df_voters["title"]==df_voters_imdb["title"])
                     &(df_voters["year"]==df_voters_imdb["year"]), "left_outer")\
                     .select(df_voters["title"],df_voters["year"],df_voters_imdb["voters_imdb"]).distinct()

df_voters_genre = df_genre_movies.select("name","year","num_raters")
df_voters_genre = df_voters_genre.groupBy("name","year").avg("num_raters")
df_voters_genre = df_voters_genre.withColumn("voters_genre", col("avg(num_raters)").cast("int"))
df_voters_genre = df_voters_genre.select("name","year","voters_genre")
df_voters = df_voters.join(df_voters_genre, (df_voters["title"]==df_voters_genre["name"])
                     &(df_voters["year"]==df_voters_genre["year"]), "left_outer")\
                     .select(df_voters["title"],df_voters["year"],df_voters_imdb["voters_imdb"],df_voters_genre["voters_genre"])\
                     .distinct()

df_voters = df_voters.withColumn("voters", greatest(col("voters_imdb"),col("voters_genre")))

most_voted = df_voters.select("title","year","voters").select(max("voters")).collect()[0][0]

most_voted = df_voters.select('title','year','voters').where(col('voters')==most_voted)

most_voted.show()

+--------------------+----+-------+
|               title|year| voters|
+--------------------+----+-------+
|The Shawshank Red...|1994|2780534|
+--------------------+----+-------+



## Film recensito da più utenti

In [23]:
most_reviewed = df_genre_movies.groupBy('name','year').avg('num_reviews')\
                .select('name','year', 'avg(num_reviews)')

most_reviewed = most_reviewed.select("name","year","avg(num_reviews)")\
                   .orderBy(desc("avg(num_reviews)")).limit(1)

most_reviewed.show()

+-----+----+----------------+
| name|year|avg(num_reviews)|
+-----+----+----------------+
|Joker|2019|         10279.0|
+-----+----+----------------+



## Rating Ponderato

In [24]:
# A parità di voti, si studia il rating di ogni film
df_ratio = df_all_movies_voted.join(df_voters, (df_all_movies_voted['title']==df_voters['title'])\
                             & (df_all_movies_voted['year']==df_voters['year']))\
                        .select(df_all_movies_voted['title'], df_all_movies_voted['year'], 'voters', 'rating')

df_ratio = df_ratio.withColumn("weighted_rating", round(df_ratio['rating'] * (log10(col("voters"))),1))
best_movies = df_ratio.select('title','year','weighted_rating', 'rating', 'voters').orderBy(desc('weighted_rating')).limit(10)
best_movies.show(truncate=False)

+-------------------------------------------------+----+---------------+------+-------+
|title                                            |year|weighted_rating|rating|voters |
+-------------------------------------------------+----+---------------+------+-------+
|The Shawshank Redemption                         |1994|59.9           |9.3   |2780534|
|The Dark Knight                                  |2008|58.0           |9.0   |2758250|
|The Godfather                                    |1972|57.8           |9.2   |1935895|
|The Lord of the Rings: The Return of the King    |2003|56.5           |9.0   |1905920|
|Pulp Fiction                                     |1994|56.3           |8.9   |2133402|
|Inception                                        |2010|56.2           |8.8   |2448697|
|Fight Club                                       |1999|55.8           |8.8   |2215896|
|Forrest Gump                                     |1994|55.7           |8.8   |2162613|
|Schindler's List               

## Quanti film per genere

In [25]:
df_imdb_exploded = df_imdb_movies.withColumn("genre", explode("genre"))

# Ottieni i valori unici della colonna esplosa
df_unique_genre = df_imdb_exploded.select("genre").distinct()

lista_genres = []
for i in range (df_unique_genre.count()):
    lista_genres.append(df_unique_genre.collect()[i][0])

print(lista_genres)

['Crime', 'Romance', 'Thriller', 'Adventure', 'Drama', 'War', 'Family', 'Fantasy', 'History', 'Mystery', 'Musical', 'Animation', 'Music', 'Film-Noir', 'Horror', 'Western', 'Biography', 'Comedy', 'Action', 'Sport', 'Sci-Fi']


In [26]:
film_x_genre = {}

for elem in lista_genres:
    imdb_genre = df_imdb_movies.filter(array_contains(col("genre"), elem))\
                                .select('title','year','genre').count()
    film_x_genre[elem] = imdb_genre

print(film_x_genre)

{'Crime': 1940, 'Romance': 1616, 'Thriller': 1455, 'Adventure': 1556, 'Drama': 5366, 'War': 220, 'Family': 435, 'Fantasy': 661, 'History': 325, 'Mystery': 952, 'Musical': 128, 'Animation': 462, 'Music': 261, 'Film-Noir': 59, 'Horror': 1079, 'Western': 107, 'Biography': 626, 'Comedy': 3414, 'Action': 2281, 'Sport': 191, 'Sci-Fi': 619}


In [27]:
df_diff = df_genre_movies.join(df_common_movies, (df_genre_movies["name"]==df_common_movies["title"])
                            & (df_genre_movies["year"]==df_common_movies["year"]), how="left_anti")

for elem in lista_genres:
    genre_genre = df_diff.filter(array_contains(col("genres"), elem))\
                                .select('name','year','genres').count()
    film_x_genre[elem] += genre_genre

print(film_x_genre)

{'Crime': 1957, 'Romance': 1649, 'Thriller': 1489, 'Adventure': 1596, 'Drama': 5460, 'War': 236, 'Family': 435, 'Fantasy': 676, 'History': 336, 'Mystery': 971, 'Musical': 128, 'Animation': 473, 'Music': 279, 'Film-Noir': 59, 'Horror': 1099, 'Western': 108, 'Biography': 651, 'Comedy': 3465, 'Action': 2312, 'Sport': 206, 'Sci-Fi': 645}


## Media voti per genere

In [28]:
df_diff_exploded = df_diff.withColumn("genre", explode("genres"))

# uniamo le tabelle exploded
df_exploded = df_imdb_exploded.select('genre', 'rating').union(df_diff_exploded.select('genre', 'rating'))

df_mean_genre = df_exploded.groupBy("genre").avg('rating')\
                            .select('genre', 'avg(rating)')

df_mean_genre = df_mean_genre.withColumn("rating", round(col("avg(rating)"),1)).select("genre","rating")

# Conta il numero di film per genere
film_count_per_genre = df_exploded.groupBy("genre").agg(count('*').alias('film_count'))

# Unisci i due DataFrame
df_mean_genre_with_count = df_mean_genre.join(film_count_per_genre, 'genre', 'inner')

df_mean_genre_with_count.show()

+---------+------+----------+
|    genre|rating|film_count|
+---------+------+----------+
|    Crime|   6.8|      1957|
|  Romance|   6.7|      1649|
| Thriller|   6.6|      1489|
|Adventure|   6.7|      1596|
|    Drama|   7.0|      5460|
|      War|   7.3|       236|
|   Family|   6.6|       435|
|  Fantasy|   6.5|       676|
|  History|   7.1|       336|
|  Mystery|   6.6|       971|
|  Musical|   7.0|       128|
|Animation|   7.0|       473|
|    Music|   6.9|       279|
|Film-Noir|   7.7|        59|
|   Horror|   6.2|      1099|
|  Western|   7.2|       108|
|Biography|   7.1|       651|
|   Comedy|   6.6|      3465|
|   Action|   6.6|      2312|
|    Sport|   6.8|       206|
+---------+------+----------+
only showing top 20 rows



## Quanto dura in media un film

In [29]:
df_time = df_genre_movies.select(avg('run_length'))
df_time = df_time.withColumn("run_length", round(col("avg(run_length)"),1)).select("run_length")

# Vediamo che in media un film dura 2h, ovvero 120min
df_time.show()

+----------+
|run_length|
+----------+
|     120.5|
+----------+



## Quanto dura in media un film per genere

In [30]:
df_genre_exploded = df_genre_movies.withColumn("genre", explode("genres"))

df_time_genre = df_genre_exploded.groupBy('genre').avg('run_length').select('genre', 'avg(run_length)')

df_time_genre = df_time_genre.orderBy(desc('avg(run_length)'))

df_time_genre = df_time_genre.withColumn("run_length", round(col("avg(run_length)"),1))\
                             .select("genre","run_length")

df_time_genre.show()

+---------+----------+
|    genre|run_length|
+---------+----------+
|  Western|     151.5|
|  History|     139.6|
|Biography|     132.2|
|      War|     130.8|
|    Drama|     128.5|
|    Crime|     127.7|
|   Action|     125.5|
|   Sci-Fi|     124.3|
|Adventure|     123.7|
|  Fantasy|     121.1|
|  Mystery|     120.3|
| Thriller|     119.4|
|  Romance|     119.4|
|    Sport|     118.2|
|    Music|     111.3|
|   Horror|     107.1|
|   Comedy|     105.6|
|Animation|      96.0|
+---------+----------+



## Top Film per cadenza decennale

In [31]:
df_decade = df_all_movies_voted.withColumn("decade", (floor(col("year") / 10) * 10).cast("int"))

k = 5

window_spec = Window.partitionBy("decade").orderBy(col("rating").desc())

df_top_decade = df_decade[['title','year','rating','decade']].withColumn("row_number", row_number().over(window_spec))

df_top_decade = df_top_decade.filter(col("row_number") <= k).drop("row_number").drop("decade")

df_top_decade.show()

+--------------------+----+------+
|               title|year|rating|
+--------------------+----+------+
|         Intolerance|1916|   7.7|
|Broken Blossoms o...|1919|   7.2|
|The Birth of a Na...|1915|   6.2|
|        Sherlock Jr.|1924|   8.2|
|The Passion of Jo...|1928|   8.1|
|          The Circus|1928|   8.1|
|       The Gold Rush|1925|   8.1|
|         The General|1926|   8.1|
|         City Lights|1931|   8.5|
|        Modern Times|1936|   8.5|
|I Am a Fugitive f...|1932|   8.2|
|All Quiet on the ...|1930|   8.1|
|  Gone with the Wind|1939|   8.1|
|It's a Wonderful ...|1946|   8.6|
|          Casablanca|1942|   8.5|
|  The Great Dictator|1940|   8.4|
|Children of Paradise|1945|   8.3|
|        Citizen Kane|1941|   8.3|
|        12 Angry Men|1957|   9.0|
|       Seven Samurai|1954|   8.6|
+--------------------+----+------+
only showing top 20 rows



Un'alternativa all'approccio precedente, per risolvere questa query, è il seguente:

In [32]:
df_year = []
for year in range(1910, 2030, 10):
    df_year.append(df_all_movies_voted.filter((col('year')>=year) & (col('year')<(year+10)))\
                                    .orderBy(desc('rating')).limit(5))
    df_year[-1].show(truncate=False)

+----------------------------------------------+----+------+
|title                                         |year|rating|
+----------------------------------------------+----+------+
|Intolerance                                   |1916|7.7   |
|Broken Blossoms or the Yellow Man and the Girl|1919|7.2   |
|The Birth of a Nation                         |1915|6.2   |
+----------------------------------------------+----+------+

+--------------------------+----+------+
|title                     |year|rating|
+--------------------------+----+------+
|Sherlock Jr.              |1924|8.2   |
|The Passion of Joan of Arc|1928|8.1   |
|The Circus                |1928|8.1   |
|The Gold Rush             |1925|8.1   |
|Sunrise                   |1927|8.1   |
+--------------------------+----+------+

+---------------------------------+----+------+
|title                            |year|rating|
+---------------------------------+----+------+
|City Lights                      |1931|8.5   |
|Modern Ti

## Quale decennio ha dato i film migliori mediamente e in assoluto

In [33]:
df_10_mean = df_decade.groupBy('decade').avg('rating')
df_10_mean = df_10_mean.withColumn('rating', round(col('avg(rating)'), 1)).select('decade', 'rating')
df_10_mean = df_10_mean.orderBy(desc('rating'))
df_10_mean.show()

+------+------+
|decade|rating|
+------+------+
|  1920|   7.8|
|  1930|   7.6|
|  1940|   7.6|
|  1950|   7.5|
|  1960|   7.5|
|  1970|   7.2|
|  1910|   7.0|
|  1980|   6.8|
|  1990|   6.7|
|  2020|   6.6|
|  2000|   6.6|
|  2010|   6.6|
+------+------+



In [34]:
df_10_max = df_decade.groupBy('decade').max('rating')
df_10_max = df_10_max.withColumn('rating', round(col('max(rating)'), 1)).select('decade', 'rating')
df_10_max = df_10_max.orderBy(desc('rating'))
df_10_max.show()

+------+------+
|decade|rating|
+------+------+
|  1990|   9.3|
|  1970|   9.2|
|  2020|   9.1|
|  1950|   9.0|
|  2000|   9.0|
|  2010|   9.0|
|  1960|   8.8|
|  1980|   8.8|
|  1940|   8.6|
|  1930|   8.5|
|  1920|   8.2|
|  1910|   7.7|
+------+------+



## Attori che hanno fatto più film

In [35]:
df_imdb_exploded_actors = df_imdb_movies.withColumn("actor", explode("stars"))

df_actors = df_imdb_exploded_actors.groupBy("actor").agg(count("*").alias("movies_number"))
df_actors = df_actors.orderBy(desc("movies_number"))
df_stars = df_actors.limit(10)
df_stars.show()

+-----------------+-------------+
|            actor|movies_number|
+-----------------+-------------+
|   Robert De Niro|           62|
|Samuel L. Jackson|           60|
|     Nicolas Cage|           60|
|     Bruce Willis|           54|
|        Tom Hanks|           49|
|   Clint Eastwood|           49|
|   Morgan Freeman|           48|
|    Nicole Kidman|           47|
|      Jackie Chan|           45|
|      Johnny Depp|           45|
+-----------------+-------------+



## Attori più presenti che hanno partecipato ai film più apprezzati

In [36]:
df_most_appreciated = top_10.join(df_imdb_exploded_actors, (top_10["title"]==df_imdb_exploded_actors["title"])
                                 & (top_10["year"]==df_imdb_exploded_actors["year"]))\
                                  .select(top_10["title"], top_10["year"], top_10["rating"],
                                         df_imdb_exploded_actors["actor"])

df_most_appreciated = df_most_appreciated.withColumnRenamed("actor","top_actor")

df_most_appreciated = df_most_appreciated.join(df_actors, df_most_appreciated["top_actor"]==df_actors["actor"])\
                                         .select(df_most_appreciated["title"], df_actors["actor"],
                                                 df_most_appreciated["rating"], df_actors["movies_number"])

df_film_stars = df_most_appreciated.orderBy(desc("movies_number")).limit(10)
df_film_stars.show(truncate=False)

+------------------------+--------------+------+-------------+
|title                   |actor         |rating|movies_number|
+------------------------+--------------+------+-------------+
|The Godfather Part II   |Robert De Niro|9.0   |62           |
|The Shawshank Redemption|Morgan Freeman|9.3   |48           |
|Schindler's List        |Liam Neeson   |9.0   |42           |
|The Godfather           |Al Pacino     |9.2   |35           |
|The Godfather Part II   |Al Pacino     |9.0   |35           |
|Schindler's List        |Ralph Fiennes |9.0   |30           |
|Schindler's List        |Ben Kingsley  |9.0   |30           |
|The Godfather Part II   |Robert Duvall |9.0   |29           |
|The Godfather           |Diane Keaton  |9.2   |26           |
|The Godfather Part II   |Diane Keaton  |9.0   |26           |
+------------------------+--------------+------+-------------+



## Attori e registi con più collaborazioni

In [37]:
df_imdb_directors = df_imdb_movies.withColumn("director", col("director")[0])
df_directors_actors = df_imdb_directors.withColumn("actor", explode("stars"))
df_couple = df_directors_actors.groupBy("director","actor").agg(count("*").alias("movies_number"))
df_couple = df_couple.filter(col("actor")!=col("director"))
df_couple = df_couple.orderBy(desc("movies_number")).limit(10)
df_couple.show()

+---------------+------------------+-------------+
|       director|             actor|movies_number|
+---------------+------------------+-------------+
| Bobby Farrelly|    Peter Farrelly|           11|
| Akira Kurosawa|    Toshirô Mifune|           11|
|    Woody Allen|        Mia Farrow|           11|
|      John Ford|        John Wayne|           10|
|Martin Scorsese|    Robert De Niro|            9|
|      Joel Coen|        Ethan Coen|            9|
|     Ethan Coen|         Joel Coen|            8|
| Ingmar Bergman|     Max von Sydow|            8|
| Ingmar Bergman|Gunnar Björnstrand|            7|
|  Anthony Russo|         Joe Russo|            7|
+---------------+------------------+-------------+



## Parole più ricorrenti nei Commenti della top 10

In [38]:
df_tweet = df_genre_movies.groupBy('name', 'year', 'rating', 'review_url').agg(count('*'))\
                            .select('name', 'year', 'rating', 'review_url')
df_tweet_top_10 = df_tweet.orderBy(desc('rating')).limit(10)
df_tweet_top_10.show()

+--------------------+----+------+--------------------+
|                name|year|rating|          review_url|
+--------------------+----+------+--------------------+
|The Shawshank Red...|1994|   9.3|https://www.imdb....|
|       The Godfather|1972|   9.2|https://www.imdb....|
|     The Dark Knight|2008|   9.0|https://www.imdb....|
|The Godfather: Pa...|1974|   9.0|https://www.imdb....|
|    Schindler's List|1993|   8.9|https://www.imdb....|
|The Lord of the R...|2003|   8.9|https://www.imdb....|
|        12 Angry Men|1957|   8.9|https://www.imdb....|
|        Pulp Fiction|1994|   8.9|https://www.imdb....|
|     The Mountain II|2016|   8.9|https://www.imdb....|
|The Lord of the R...|2001|   8.8|https://www.imdb....|
+--------------------+----+------+--------------------+



In [39]:
# Funzione per recuperare la recensione da un URL
def get_review_from_url(url):
    reviews = []
    try:
        response = requests.get(url)
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'html.parser')
            comments = soup.find_all('div', class_='text show-more__control')
            for comment in comments:
                reviews.append(comment.text.strip())
            text = ' '.join(reviews)
            text = text.replace(",", "") \
                        .replace(";", "") \
                        .replace(":", "") \
                        .replace("(", "") \
                        .replace(")", "") \
                        .replace("!", "") \
                        .replace("?", "") \
                        .replace(".", "")
            text = text.lower()
            text = text.split()
            return text
        else:
            return "else"
    except:
        return "eccezione"

In [40]:
# Definisce le stop words
englishStopWords = ["and", "to", "the", "into", "on", "of", "by", "or", "in", "a", "with", "that", "she", "it", "i",
                    "you", "he", "we", "they", "her", "his", "its", "this", "that", "at", "as", "for", "not", "so",
                    "do", "is", "was", "are", "have", "has", "an", "my", "-", "but", "be", "film", "movie", "one",
                   "from", "it's", "me", "where"]

In [41]:
top_tweets = df_tweet_top_10.select('name', 'year', 'rating', 'review_url').collect()

df_top_review = None

for tweet in top_tweets:
    url = tweet['review_url']
    text = get_review_from_url(url)
    text = [word for word in text if word not in englishStopWords]
    top_word_dic = {}
    top_word_list = []

    for word in text:
        if word in top_word_list:
            top_word_dic[word] += 1
        else:
            top_word_dic[word] = 1
            top_word_list.append(word)

    # Ordinare il dizionario filtrato per valori in ordine decrescente
    top_word_dic_ordinato = dict(sorted(top_word_dic.items(), key=lambda x: x[1], reverse=True))
    top_five_words = dict(list(top_word_dic_ordinato.items())[:5])

    # Crea un DataFrame Spark con una colonna di tipo MapType
    df_temp = spark.createDataFrame([(tweet['name'], tweet['year'],
                                 tweet['rating'], top_five_words,)],
                               ['name', 'year', 'rating', "top_words"])

    if df_top_review is None:
        df_top_review = df_temp
    else:
        df_top_review = df_top_review.union(df_temp)

# Mostra il DataFrame
df_top_review.show(truncate=False)

+-------------------------------------------------+----+------+---------------------------------------------------------------------+
|name                                             |year|rating|top_words                                                            |
+-------------------------------------------------+----+------+---------------------------------------------------------------------+
|The Shawshank Redemption                         |1994|9.3   |{all -> 43, about -> 30, shawshank -> 54, prison -> 31, andy -> 33}  |
|The Godfather                                    |1972|9.2   |{all -> 37, family -> 33, just -> 29, godfather -> 40, much -> 30}   |
|The Dark Knight                                  |2008|9.0   |{all -> 30, dark -> 37, best -> 35, joker -> 38, batman -> 45}       |
|The Godfather: Part II                           |1974|9.0   |{michael -> 49, vito -> 42, godfather -> 46, first -> 48, part -> 46}|
|Schindler's List                                 |1993|8.9   

## Parole più ricorrenti nei Commenti della flop 10

In [42]:
df_tweet_flop_10 = df_tweet.orderBy(('rating')).limit(10)
df_tweet_flop_10.show()

+--------------------+----+------+--------------------+
|                name|year|rating|          review_url|
+--------------------+----+------+--------------------+
|         Spice World|1997|   3.5|https://www.imdb....|
|       The Love Guru|2008|   3.8|https://www.imdb....|
|Fifty Shades of Grey|2015|   4.1|https://www.imdb....|
|In the Land of Bl...|2011|   4.5|https://www.imdb....|
|              Driven|2001|   4.6|https://www.imdb....|
|The Twilight Saga...|2009|   4.7|https://www.imdb....|
|The Twilight Saga...|2011|   4.9|https://www.imdb....|
|The Twilight Saga...|2010|   5.0|https://www.imdb....|
|       The Happening|2008|   5.0|https://www.imdb....|
|      Happy New Year|2014|   5.0|https://www.imdb....|
+--------------------+----+------+--------------------+



In [43]:
flop_tweets = df_tweet_flop_10.select('name', 'year', 'rating', 'review_url').collect()

df_flop_review = None

for tweet in flop_tweets:
    url = tweet['review_url']
    text = get_review_from_url(url)
    text = [word for word in text if word not in englishStopWords]
    flop_word_dic = {}
    flop_word_list = []

    for word in text:
        if word in flop_word_list:
            flop_word_dic[word] += 1
        else:
            flop_word_dic[word] = 1
            flop_word_list.append(word)

    # Ordinare il dizionario filtrato per valori in ordine decrescente
    flop_word_dic_ordinato = dict(sorted(flop_word_dic.items(), key=lambda x: x[1], reverse=True))
    flop_five_words = dict(list(flop_word_dic_ordinato.items())[:5])

    # Crea un DataFrame Spark con una colonna di tipo MapType
    df_temp = spark.createDataFrame([(tweet['name'], tweet['year'],
                                 tweet['rating'], flop_five_words,)],
                               ['name', 'year', 'rating', "flop_words"])

    if df_flop_review is None:
        df_flop_review = df_temp
    else:
        df_flop_review = df_flop_review.union(df_temp)

# Mostra il DataFrame
df_flop_review.show(truncate=False)

+-----------------------------------------+----+------+----------------------------------------------------------------+
|name                                     |year|rating|flop_words                                                      |
+-----------------------------------------+----+------+----------------------------------------------------------------+
|Spice World                              |1997|3.5   |{their -> 24, girls -> 39, just -> 22, were -> 21, spice -> 35} |
|The Love Guru                            |2008|3.8   |{guru -> 55, myers -> 59, mike -> 41, just -> 39, funny -> 39}  |
|Fifty Shades of Grey                     |2015|4.1   |{about -> 47, no -> 40, what -> 45, book -> 39, who -> 36}      |
|In the Land of Blood and Honey           |2011|4.5   |{about -> 51, war -> 83, bosnian -> 35, what -> 38, story -> 40}|
|Driven                                   |2001|4.6   |{all -> 25, racing -> 35, race -> 22, just -> 24, there -> 23}  |
|The Twilight Saga: New Moon    