## 

# 🎬 Movies Data Pipeline
Ce notebook contient un pipeline de traitement des données du fichier `TMDB_all_movies.csv`, dans le cadre d'un projet de data engineering.

## 1. Ingestion

In [1]:
from pyspark.sql import SparkSession
import pyspark
from pyspark.sql.functions import count, col, when, array, split, size, sum as _sum, row_number, mean, year, udf
from pyspark.sql.window import Window
from pyspark.ml.feature import CountVectorizer, StandardScaler, VectorAssembler, Tokenizer, StopWordsRemover, HashingTF, IDF, PCA, Normalizer
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors, DenseVector, VectorUDT
from pyspark.sql.types import DoubleType
from pyspark.ml.functions import vector_to_array

In [2]:
spark = SparkSession.builder.appName("Movie recommender").config("spark.driver.memory", "8g").getOrCreate()

df = spark.read.csv("../data/TMDB_all_movies.csv", header=True, inferSchema=True, sep=",", quote='"', escape='"', multiLine=True)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/14 13:03:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [3]:
# ne garder que les 100 000 premières lignes de df
# df = df.limit(500000)

In [4]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: double (nullable = true)
 |-- status: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: double (nullable = true)
 |-- budget: double (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- tagline: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- cast: string (nullable = true)
 |-- director: string (nullable = true)
 |-- director_of_photography: string (nullable = true)
 |-- writers: string (nullable = true)
 |-- producers: string (nu

In [5]:
# Type de chaque colonne :
    # vote_average: float
    # vote_count: int
    # release_date: date
    # revenue: float
    # runtime: float
    # budget: float
    # popularity: float
    # genres, production_countries, production_companies, spoken, cast, director, writers: string / one-hot encoded ?

cols_to_check = ["title", "original_title", "overview", "release_date", "genres", "production_countries", "production_companies", "spoken_languages", "cast", "director", "writers"]
df = df.withColumn(
    "completeness_score",
    sum([when(col(c).isNotNull(), 1).otherwise(0) for c in cols_to_check])
)

df = df.withColumn("vote_average", df["vote_average"].try_cast("double").try_cast("float")) \
    .withColumn("vote_count", df["vote_count"].try_cast("double").try_cast("int")) \
    .withColumn("release_date", df["release_date"].try_cast("date")) \
    .withColumn("runtime", df["runtime"].try_cast("double").try_cast("float")) \
    .withColumn("budget", df["budget"].try_cast("double").try_cast("float")) \
    .withColumn("popularity", df["popularity"].try_cast("double").try_cast("float")) \
    .withColumn("genres_array", when(col("genres").isNotNull(), split(col("genres"), ",\\s*")).otherwise(array())) \
    .withColumn("production_countries_array", when(col("production_countries").isNotNull(), split(col("production_countries"), ",\\s*")).otherwise(array())) \
    .withColumn("production_companies_array", when(col("production_companies").isNotNull(), split(col("production_companies"), ",\\s*")).otherwise(array())) \
    .withColumn("spoken_languages_array", when(col("spoken_languages").isNotNull(), split(col("spoken_languages"), ",\\s*")).otherwise(array())) \
    .withColumn("cast_array", when(col("cast").isNotNull(), split(col("cast"), ",\\s*")).otherwise(array())) \
    .withColumn("director_array", when(col("director").isNotNull(), split(col("director"), ",\\s*")).otherwise(array())) \
    .withColumn("writers_array", when(col("writers").isNotNull(), split(col("writers"), ",\\s*")).otherwise(array())) \
    .withColumn("release_year", year("release_date").try_cast("double").try_cast("int")) \
    .withColumn("original_language_array", when(col("original_language").isNotNull(), split(col("original_language"), ",\\s*")).otherwise(array()))

df = df.drop("genres", "production_countries", "production_companies", "spoken_languages", "cast", "director", "writers", "original_language")

df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: float (nullable = true)
 |-- vote_count: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- release_date: date (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: float (nullable = true)
 |-- budget: float (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: float (nullable = true)
 |-- tagline: string (nullable = true)
 |-- director_of_photography: string (nullable = true)
 |-- producers: string (nullable = true)
 |-- music_composer: string (nullable = true)
 |-- imdb_rating: double (nullable = true)
 |-- imdb_votes: double (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- completeness_score: integer (nullable = false)
 |-- genres_array: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- production_countrie

## 2. Exploration

In [6]:
# df.show(5, truncate=False)

In [7]:
# null values by columns
# df.select([pyspark.sql.functions.count(pyspark.sql.functions.when(pyspark.sql.functions.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [8]:
# overview null but vote_count > 5

# df.filter(
#     (df["overview"].isNull()) & (df["vote_count"] > 5)
# ).count()

# df.filter(
#     (df["overview"].isNull()) & (df["vote_count"] > 5)
# ).show(100, truncate=False)

In [9]:
# genres null but vote_count > 10
# df.filter(
#     (size(col("genres_array")) == 0) & (df["vote_count"] > 10)
# ).count()

# genres null
# df.filter(
#     size(col("genres_array")) == 0
# ).show(100, truncate=False)

In [10]:
# get duplicate titles
# duplicate_titles = (
#     df.groupBy("title") \
#       .agg(count("*").alias("count"))\
#       .filter(col("count") > 1)\
# )

# duplicate_titles.show(100)

In [11]:
# get duplicate titles with all columns
# duplicate_titles_full = (
#     df.join(duplicate_titles, "title") \
#       .select(df["title"], df["release_year"], df["completeness_score"], df["original_language_array"], df["original_title"], df["overview"], df["genres_array"], df["production_countries_array"], df["production_companies_array"], df["spoken_languages_array"], df["cast_array"], df["writers_array"], df["director_array"], duplicate_titles["count"])
# )
# duplicate_titles_full.show(100, truncate=False)

In [12]:
# df.select("completeness_score").describe().show()

In [13]:
# rows with completeness_score < 4
# df.filter(
#     df["completeness_score"] < 4
# ).show(100, truncate=False)

In [14]:
# count rows who have duplicate titles and same release_date, with not null release_date
# duplicate_titles_count = (
#     df.groupBy("title", "release_date") \
#         .agg(count("*").alias("count"))\
#         .filter((col("count") > 1) & (col("release_date").isNotNull()))\
# )
# duplicate_titles_count.show(100, truncate=False)

In [15]:
# release_date null
# df.filter(
#     df["release_date"].isNull()
# ).show(100, truncate=False)

In [16]:
# count rows with release_date, production_companies, production_countries, spoken_languages, cast, director and writers null
# df.filter(
#     df.release_date.isNull() &
#     df.overview.isNull() &
#     (size(col("production_companies_array")) == 0) &
#     (size(col("production_countries_array")) == 0) &
#     (size(col("spoken_languages_array")) == 0) &
#     (size(col("cast_array")) == 0) &
#     (size(col("director_array")) == 0) &
#     (size(col("writers_array")) == 0) &
#     (size(col("genres_array")) == 0)
# ).count()

In [17]:
# df.select("popularity", "vote_count", "vote_average", "runtime", "budget").describe().show()

In [18]:
# count values = 0 in popularity, vote_count, vote_average, revenue, runtime, budget
# df.select(
#     _sum(when(col("popularity") == 0, 1).otherwise(0)).alias("popularity_0"),
#     _sum(when(col("vote_count") == 0, 1).otherwise(0)).alias("vote_count_0"),
#     _sum(when(col("vote_average") == 0, 1).otherwise(0)).alias("vote_average_0"),
#     _sum(when(col("runtime") == 0, 1).otherwise(0)).alias("runtime_0"),
#     _sum(when(col("budget") == 0, 1).otherwise(0)).alias("budget_0")
# ).show()

In [19]:
# df.select("original_language_array").distinct().count()

## 3. Nettoyage

In [20]:
# enlever toutes les lignes qui ne sont pas en released
df = df.filter(df["status"] == "Released")
# df.select("status").distinct().show()

In [21]:
# drop columns that are not useful for the calculation
df = df.drop("status", "imdb_id", "tagline", "director_of_photography", "producers", "imdb_rating", "imdb_votes", "music_composer", "revenue")
# df.show(5, truncate=False)

In [22]:
# drop rows that have no title
df = df.filter(df["title"].isNotNull() & (df["title"] != ""))

In [23]:
# replace null values in overview with empty string and release_year with median year
median_year = df.approxQuantile("release_year", [0.5], 0.01)[0]
df = df.fillna({
    'overview': '',
    'release_year': median_year
})

                                                                                

In [24]:
# drop rows who have completeness_score < 4
df = df.filter(df["completeness_score"] >= 4)

In [25]:
# drop rows with release_date, production_companies, production_countries, spoken_languages, cast, director, writers, overview and genres null
df = df.filter(
    ~(
        df.release_date.isNull() &
        df.overview.isNull() &
        (size(col("production_companies_array")) == 0) &
        (size(col("production_countries_array")) == 0) &
        (size(col("spoken_languages_array")) == 0) &
        (size(col("cast_array")) == 0) &
        (size(col("director_array")) == 0) &
        (size(col("writers_array")) == 0) &
        (size(col("genres_array")) == 0)
    )
)

In [26]:
# vérifier les doublons de titres avec release_date, et supprimer la ligne avec le completeness_score le plus bas
window = Window.partitionBy("title", "release_date").orderBy(col("completeness_score").desc())

# Garder la ligne la plus complète
df = df.withColumn("row_num", row_number().over(window)) \
       .filter(col("row_num") == 1) \
       .drop("row_num")

In [27]:
# vérifier les doublons de titres avec overview, et supprimer la ligne avec le completeness_score le plus bas
window = Window.partitionBy("title", "overview").orderBy(col("completeness_score").desc())

# Garder la ligne la plus complète
df = df.withColumn("row_num", row_number().over(window)) \
       .filter(col("row_num") == 1) \
       .drop("row_num", "completeness_score")

In [28]:
# set la moyenne de runtime si runtime = 0
mean_runtime = df.select(mean("runtime")).first()[0]
df = df.withColumn("runtime", when(col("runtime") == 0, mean_runtime).otherwise(col("runtime")))

                                                                                

In [29]:
# null values by columns 2
# df.select([pyspark.sql.functions.count(pyspark.sql.functions.when(pyspark.sql.functions.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

In [30]:
# # split df to put id, title, oiginal_title, poster_path in a separate df
# df_id_title = df.select("id", "title", "original_title", "poster_path", "genres_array", "production_countries_array", "production_companies_array", "spoken_languages_array", "cast_array", "director_array", "writers_array", "vote_count", "vote_average", "budget", "release_date", "runtime", "original_language_array", "popularity", "overview")
# df = df.drop("title", "original_title", "poster_path", "vote_count", "vote_average", "budget", "revenue", "release_date")

In [31]:
# df.show(5, truncate=True)

## 4. Transformation

In [32]:
# use CountVectorizer to vectorize the genres, production_countries, production_companies, spoken_languages, cast, director and writers columns
vectorizer_genres = CountVectorizer(inputCol="genres_array", outputCol="genres_vector")
vectorizer_production_countries = CountVectorizer(inputCol="production_countries_array", outputCol="production_countries_vector")
vectorizer_production_companies = CountVectorizer(inputCol="production_companies_array", outputCol="production_companies_vector")
vectorizer_spoken_languages = CountVectorizer(inputCol="spoken_languages_array", outputCol="spoken_languages_vector")
vectorizer_cast = CountVectorizer(inputCol="cast_array", outputCol="cast_vector")
vectorizer_director = CountVectorizer(inputCol="director_array", outputCol="director_vector")
vectorizer_writers = CountVectorizer(inputCol="writers_array", outputCol="writers_vector")
vectorizer_original_language = CountVectorizer(inputCol="original_language_array", outputCol="original_language_vector")

In [33]:
# 1. Tokenisation
tokenizer = Tokenizer(inputCol="overview", outputCol="overview_tokens")

# 2. Suppression des mots vides
remover = StopWordsRemover(inputCol="overview_tokens", outputCol="overview_filtered")

# 3. TF
hashingTF = HashingTF(inputCol="overview_filtered", outputCol="overview_tf", numFeatures=10000)

# 4. IDF
idf = IDF(inputCol="overview_tf", outputCol="overview_vector")

In [34]:
runtime_assembler = VectorAssembler(inputCols=["runtime"], outputCol="runtime_vec")
runtime_scaler = StandardScaler(inputCol="runtime_vec", outputCol="runtime_scaled")
popularity_assembler = VectorAssembler(inputCols=["popularity"], outputCol="popularity_vec")
popularity_scaler = StandardScaler(inputCol="popularity_vec", outputCol="popularity_scaled")
release_year_assembler = VectorAssembler(inputCols=["release_year"], outputCol="release_year_vec", handleInvalid="keep")
release_year_scaler = StandardScaler(inputCol="release_year_vec", outputCol="release_year_scaled")

In [51]:
pipeline = Pipeline(stages=[
    tokenizer,
    remover,
    hashingTF,
    idf,
    vectorizer_genres,
    vectorizer_production_countries,
    vectorizer_production_companies,
    vectorizer_spoken_languages,
    vectorizer_cast,
    vectorizer_director,
    vectorizer_writers,
    vectorizer_original_language,
    runtime_assembler,
    runtime_scaler,
    popularity_assembler,
    popularity_scaler,
    release_year_assembler,
    release_year_scaler
])
pipeline_model = pipeline.fit(df)
df_vec = pipeline_model.transform(df)

                                                                                

In [None]:
# df_vec = df_vec.drop(
#     "genres_array",
#     "production_countries_array",
#     "production_companies_array",
#     "spoken_languages_array",
#     "cast_array",
#     "director_array",
#     "writers_array",
#     "original_language_array",
#     "runtime",
#     "popularity",
#     "release_year",
#     "runtime_vec",
#     "popularity_vec",
#     "release_year_vec",
#     "overview_tokens",
#     "overview_filtered",
#     "overview_tf",
#     "overview"
# )

In [37]:
# list 1000 release_year_scaled
# df_vec.select("release_year_scaled").show(10, truncate=True)

 
# df_vec.show(5, truncate=True)

In [52]:
final_assembler = VectorAssembler(
    inputCols=[
        "genres_vector",
        "production_countries_vector",
        "production_companies_vector",
        "spoken_languages_vector",
        "cast_vector",
        "director_vector",
        "writers_vector",
        "original_language_vector",
        # "runtime_scaled",
        # "popularity_scaled",
        "release_year_scaled",
        "overview_vector"
    ],
    outputCol="content_features"
)

df_vec_assembler = final_assembler.transform(df_vec)

In [53]:
# Étape 2 : normaliser les vecteurs pour le produit scalaire = similarité cosinus
normalizer = Normalizer(inputCol="content_features", outputCol="norm_features", p=2.0)
df_norm = normalizer.transform(df_vec_assembler)

In [54]:
# Étape 3 : extraire le vecteur cible
# 4248 Scary movie 2
# 424 La liste de Schindler
# 238 Le Parrain
# 62 2001 l'odyssée de l'espace
target_id = 238
target_vector_row = df_norm.filter(col("id") == target_id).select("norm_features").first()

if target_vector_row is None:
    raise ValueError(f"Aucun film avec id={target_id}")

target_vector = target_vector_row["norm_features"]

                                                                                

In [55]:
# UDF pour calculer la similarité cosinus avec le vecteur cible (produit scalaire car normalisé)
@udf(returnType=DoubleType())
def cosine_similarity_udf(vec):
    return float(vec.dot(target_vector))

df_with_similarity = df_norm.withColumn("cosine_similarity", cosine_similarity_udf(col("content_features")))

In [56]:
# Étape 4 : trier et récupérer les 10 films les plus similaires (en excluant le film lui-même)
top_20 = df_with_similarity.filter(col("id") != target_id) \
    .orderBy(col("cosine_similarity").desc()) \
    .select("id", "title", "cosine_similarity", "popularity", "vote_count", "release_date", "genres_array") \
    .limit(20)

top_20.show(truncate=False)

25/07/14 13:21:26 WARN DAGScheduler: Broadcasting large task binary with size 62.7 MiB

+-------+---------------------------------------------------------------+-----------------+----------+----------+------------+-------------------------+
|id     |title                                                          |cosine_similarity|popularity|vote_count|release_date|genres_array             |
+-------+---------------------------------------------------------------+-----------------+----------+----------+------------+-------------------------+
|312430 |Ringo Starr 70th Birthday Bash Radio City NYC 7-07-10          |80.53628161188904|0.661     |0         |2010-07-07  |[Music]                  |
|65187  |Pachinko Queen Explosion                                       |73.65197826438158|1.586     |0         |2007-01-10  |[Drama]                  |
|1322784|Mario Puzo's The Godfather, Coda: The Death of Michael Corleone|72.28483307657797|1.96      |0         |2020-12-04  |[Crime, Drama]           |
|587855 |Celebrating 50 Years of the Monaro                             |71.830217

                                                                                

In [70]:
popular = top_20.orderBy(col("popularity").desc()).limit(1).toJSON()
underground = top_20.orderBy(col("popularity").asc()).limit(1).toJSON()
newest = top_20.orderBy(col("release_date").desc()).limit(1).toJSON()



In [None]:
popular.collect()

NameError: name 'popular' is not defined

In [72]:
underground.collect()

25/07/14 13:50:27 WARN DAGScheduler: Broadcasting large task binary with size 62.7 MiB
25/07/14 13:52:52 WARN DAGScheduler: Broadcasting large task binary with size 62.6 MiB
                                                                                

['{"id":1030339,"title":"Pachinko Horrifying Urban Legend","cosine_similarity":71.64944693363117,"popularity":0.006,"vote_count":0,"release_date":"2013-01-01","genres_array":["Horror","Comedy","Mystery"]}']

In [73]:
newest.collect()

25/07/14 13:53:02 WARN DAGScheduler: Broadcasting large task binary with size 62.7 MiB
25/07/14 13:55:18 WARN DAGScheduler: Broadcasting large task binary with size 62.6 MiB
                                                                                

['{"id":1300790,"title":"The Supernatural Sweet Shop: The Movie","cosine_similarity":70.80015759784196,"popularity":0.9137,"vote_count":2,"release_date":"2024-12-13","genres_array":["Fantasy","Mystery"]}']

In [None]:
# spark.stop()

## 5. Chargement en base