                                                                            Test Spark PMN

Sujet
Vous travaillez pour l’entreprise Data’s Madness, spécialisée dans le data engineering. Un
nouveau client vient d’arriver et on vous demande de vous occuper de lui. Il s’agit de
l’entreprise movieMax, spécialisée dans la vente de films sur internet, qui possède des
données mais qui ne sait pas comment les traiter. Votre rôle est de développer un job spark
permettant de répondre à leurs problématiques.

Ils souhaiteraient avoir un reporting sur la vente de leurs produits.

In [10]:
import pyspark
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder \
    .appName("TEST_SPARK_PMN") \
    .master("local[*]") \
    .getOrCreate()


In [3]:
spark = SparkSession.builder \
    .appName("TEST_SPARK_PMN") \
    .master("local[*]") \
    .getOrCreate()


créer un dataframe à partir du fichier csv

In [4]:
film_df = spark.read.option("delimiter", ";").csv("data/film.csv", header=True, inferSchema=True)

film_df.show(truncate=False)

+----+------+--------------------------------------+-------+-----------------------+-------------------+------------------------+----------+------+-------------------+
|Year|Length|Title                                 |Subject|Actor                  |Actress            |Director                |Popularity|Awards|*Image             |
+----+------+--------------------------------------+-------+-----------------------+-------------------+------------------------+----------+------+-------------------+
|INT |INT   |STRING                                |CAT    |CAT                    |CAT                |CAT                     |INT       |BOOL  |STRING             |
|1990|111   |Tie Me Up! Tie Me Down!               |Comedy |Banderas, Antonio      |Abril, Victoria    |Almod�var, Pedro        |68        |No    |NicholasCage.png   |
|1991|113   |High Heels                            |Comedy |Bos�, Miguel           |Abril, Victoria    |Almod�var, Pedro        |68        |No    |NicholasCage.

supprimer la column image

In [5]:
df_cleaned = film_df.drop("*Image")
df_cleaned.show(truncate=False)

+----+------+--------------------------------------+-------+-----------------------+-------------------+------------------------+----------+------+
|Year|Length|Title                                 |Subject|Actor                  |Actress            |Director                |Popularity|Awards|
+----+------+--------------------------------------+-------+-----------------------+-------------------+------------------------+----------+------+
|INT |INT   |STRING                                |CAT    |CAT                    |CAT                |CAT                     |INT       |BOOL  |
|1990|111   |Tie Me Up! Tie Me Down!               |Comedy |Banderas, Antonio      |Abril, Victoria    |Almod�var, Pedro        |68        |No    |
|1991|113   |High Heels                            |Comedy |Bos�, Miguel           |Abril, Victoria    |Almod�var, Pedro        |68        |No    |
|1983|104   |Dead Zone, The                        |Horror |Walken, Christopher    |Adams, Brooke      |Cronenbe

supprimer la ligne des types

In [6]:
film_df_filted= df_cleaned.filter(df_cleaned["Year"].cast("string") != "INT")
film_df_filted.show(truncate=False)

+----+------+--------------------------------------+-------+-----------------------+-------------------+------------------------+----------+------+
|Year|Length|Title                                 |Subject|Actor                  |Actress            |Director                |Popularity|Awards|
+----+------+--------------------------------------+-------+-----------------------+-------------------+------------------------+----------+------+
|1990|111   |Tie Me Up! Tie Me Down!               |Comedy |Banderas, Antonio      |Abril, Victoria    |Almod�var, Pedro        |68        |No    |
|1991|113   |High Heels                            |Comedy |Bos�, Miguel           |Abril, Victoria    |Almod�var, Pedro        |68        |No    |
|1983|104   |Dead Zone, The                        |Horror |Walken, Christopher    |Adams, Brooke      |Cronenberg, David       |79        |No    |
|1979|122   |Cuba                                  |Action |Connery, Sean          |Adams, Brooke      |Lester, 

- créer une nouvelle column “Credits” qui prend comme valeur :
“[title] : a [Actor] and [actress] film’s, directed by [Director]”

In [7]:
from pyspark.sql import functions as F
df_with_credits = film_df_filted.withColumn(
    "Credits",
    F.concat(
        F.col("Title"), 
        F.lit(" : a "), 
        F.col("Actor"), 
        F.lit(" and "), 
        F.col("Actress"), 
        F.lit(" film's, directed by "), 
        F.col("Director")
    )
)
df_with_credits.show(truncate=False)

+----+------+--------------------------------------+-------+-----------------------+-------------------+------------------------+----------+------+--------------------------------------------------------------------------------------------------------------------+
|Year|Length|Title                                 |Subject|Actor                  |Actress            |Director                |Popularity|Awards|Credits                                                                                                             |
+----+------+--------------------------------------+-------+-----------------------+-------------------+------------------------+----------+------+--------------------------------------------------------------------------------------------------------------------+
|1990|111   |Tie Me Up! Tie Me Down!               |Comedy |Banderas, Antonio      |Abril, Victoria    |Almod�var, Pedro        |68        |No    |Tie Me Up! Tie Me Down! : a Banderas, Antonio and Abril, V

In [9]:
film_df_sorted = df_with_credits.orderBy("Year", ascending=True)
film_df_sorted.show(truncate=False)
#film_df_sorted.write.parquet("data/film_df_sorted.parquet")

+----+------+---------------------------------------+---------------+--------------------+----------------+--------------------+----------+------+--------------------------------------------------------------------------------------------------------------+
|Year|Length|Title                                  |Subject        |Actor               |Actress         |Director            |Popularity|Awards|Credits                                                                                                       |
+----+------+---------------------------------------+---------------+--------------------+----------------+--------------------+----------+------+--------------------------------------------------------------------------------------------------------------+
|1920|137   |Spiders                                |Drama          |De Vogy, Carl       |NULL            |Lang, Fritz         |29        |No    |NULL                                                                            

trier les films du plus anciens au plus récents

In [11]:
film_df_sorted = df_with_credits.orderBy("Year", ascending=True)
film_df_sorted.show(truncate=False)
#film_df_sorted_popularity.write.option("header", "true").csv("data/film_df_sorted.csv")


+----+------+---------------------------------------+---------------+--------------------+----------------+--------------------+----------+------+--------------------------------------------------------------------------------------------------------------+
|Year|Length|Title                                  |Subject        |Actor               |Actress         |Director            |Popularity|Awards|Credits                                                                                                       |
+----+------+---------------------------------------+---------------+--------------------+----------------+--------------------+----------+------+--------------------------------------------------------------------------------------------------------------+
|1920|137   |Spiders                                |Drama          |De Vogy, Carl       |NULL            |Lang, Fritz         |29        |No    |NULL                                                                            

trier les films par popularité

Du moins aux plus populaires

In [12]:
from pyspark.sql.functions import col
film_df_sorted_popularity = df_with_credits.orderBy(col("Popularity").asc())
film_df_sorted_popularity.show(truncate=False)
#film_df_sorted_popularity.write.option("header", "true").csv("data/film_df_sorted_popularity.csv")

+----+------+---------------------------+--------+--------------------+-------------------+-------------------+----------+------+-------------------------------------------------------------------------------------------------+
|Year|Length|Title                      |Subject |Actor               |Actress            |Director           |Popularity|Awards|Credits                                                                                          |
+----+------+---------------------------+--------+--------------------+-------------------+-------------------+----------+------+-------------------------------------------------------------------------------------------------+
|1953|61    |White Lightning            |NULL    |Clements, Stanley   |Blondell, Gloria   |Bernds, Edward     |NULL      |No    |White Lightning : a Clements, Stanley and Blondell, Gloria film's, directed by Bernds, Edward    |
|1927|62    |Drop Kick, The             |Drama   |Barthelmess, Richard|Kent, Barbara    

les films qui durent plus de 2 heures

In [13]:

film_df_2h= df_with_credits.select("Year", "Length","Title").filter(df_with_credits["Length"] > 120)
film_df_2h.show(truncate=False)
#film_df_2h.write.option("header", "true").csv("data/film_df_2h.csv")

+----+------+------------------------------+
|Year|Length|Title                         |
+----+------+------------------------------+
|1979|122   |Cuba                          |
|1983|140   |Octopussy                     |
|1990|149   |Camille Claudel               |
|1982|188   |Fanny and Alexander           |
|1976|150   |Lindbergh Kidnapping Case, The|
|1988|127   |Colors                        |
|1967|130   |Casino Royale                 |
|1979|121   |Ten                           |
|1966|190   |Hawaii                        |
|1966|125   |Torn Curtain                  |
|1965|172   |Sound of Music, The           |
|1984|140   |Tartuffe                      |
|1992|286   |Tommy                         |
|1926|126   |Don Juan                      |
|1988|141   |Thunderball                   |
|1974|128   |Murder on the Orient Express  |
|1977|136   |Spy Who Loved Me, The         |
|1980|124   |Elephant Man, The             |
|1962|134   |A Very Private Affair         |
|1989|126 

filtrer les films selon le genre (2 genre de votre choix ⇒ 2 dataframes)

In [14]:
film_df_action = df_with_credits.filter(df_with_credits["Subject"] == "Action")
film_df_action.show()
#film_df_action.write.option("header", "true").csv("data/film_df_action.csv")

+----+------+--------------------+-------+------------------+--------------------+--------------------+----------+------+--------------------+
|Year|Length|               Title|Subject|             Actor|             Actress|            Director|Popularity|Awards|             Credits|
+----+------+--------------------+-------+------------------+--------------------+--------------------+----------+------+--------------------+
|1979|   122|                Cuba| Action|     Connery, Sean|       Adams, Brooke|     Lester, Richard|         6|    No|Cuba : a Connery,...|
|1983|   140|           Octopussy| Action|      Moore, Roger|         Adams, Maud|          Glen, John|        68|    No|Octopussy : a Moo...|
|1984|   101|        Target Eagle| Action|    Connors, Chuck|         Adams, Maud|Loma, Jos� Antoni...|        14|    No|Target Eagle : a ...|
|1981|   116|Raiders of the Lo...| Action|    Ford, Harrison|        Allen, Karen|   Spielberg, Steven|         8|    No|Raiders of the Lo...|

In [15]:
film_df_drama= df_with_credits.filter(df_with_credits["Subject"] == "Drama")
film_df_drama.show()
#film_df_drama.write.option("header", "true").csv("data/film_df_drama.csv")

+----+------+--------------------+-------+--------------------+--------------------+--------------------+----------+------+--------------------+
|Year|Length|               Title|Subject|               Actor|             Actress|            Director|Popularity|Awards|             Credits|
+----+------+--------------------+-------+--------------------+--------------------+--------------------+----------+------+--------------------+
|1978|    94|      Days of Heaven|  Drama|       Gere, Richard|       Adams, Brooke|    Malick, Terrence|        14|    No|Days of Heaven : ...|
|1989|    99|American Angels: ...|  Drama|   Bergen, Robert D.|        Adams, Trudy|  Sebastian, Beverly|        28|    No|American Angels: ...|
|1985|   104|              Subway|  Drama|Lambert, Christopher|    Adjani, Isabelle|         Besson, Luc|         6|    No|Subway : a Lamber...|
|1990|   149|     Camille Claudel|  Drama|   Depardieu, G�rard|    Adjani, Isabelle|      Nuytten, Bruno|        32|    No|Camille

l'acteur ayant tourné le plus de films

In [16]:
actor_filted = df_with_credits.groupBy("Actor").agg(F.count("Title").alias("num_films"))
actor_film_nb1= actor_filted.orderBy("num_films", ascending=False)

actor_film_nb1.show(1)

+-----------+---------+
|      Actor|num_films|
+-----------+---------+
|Wayne, John|       81|
+-----------+---------+
only showing top 1 row



le directeur ayant remporté le plus de récompenses

In [17]:
awards_filted = df_with_credits.filter(col("Awards") == "Yes")
df_grouped = awards_filted.groupBy("Director").agg(
    F.count("Awards").alias("total_awards") 
)
best_director = df_grouped.orderBy("total_awards", ascending=False)

best_director.show(1) 

+---------------+------------+
|       Director|total_awards|
+---------------+------------+
|Bergman, Ingmar|           8|
+---------------+------------+
only showing top 1 row



le film le plus populaire ayant remporté un prix

In [18]:
film_awars_filted = df_with_credits.filter(col("Awards") == "Yes") 
film_best_popularity_awards = film_awars_filted.orderBy("Popularity", ascending=False)
film_best_popularity_awards.show(1) 

+----+------+-------------+-------+---------------+-------------+---------------+----------+------+--------------------+
|Year|Length|        Title|Subject|          Actor|      Actress|       Director|Popularity|Awards|             Credits|
+----+------+-------------+-------+---------------+-------------+---------------+----------+------+--------------------+
|1985|   161|Out of Africa|  Drama|Redford, Robert|Streep, Meryl|Pollack, Sydney|        88|   Yes|Out of Africa : a...|
+----+------+-------------+-------+---------------+-------------+---------------+----------+------+--------------------+
only showing top 1 row




- le genre de films qui n'a obtenue aucune récompense

In [None]:
from pyspark.sql import functions as F

# Filtrer les films n'ayant pas reçu de récompenses
df_no_awards = df_with_credits.filter(col("Awards") == "No")

# Grouper par genre et compter les films n'ayant pas reçu de récompenses
df_genres_no_awards = df_no_awards.groupBy("Subject").count()

# Afficher les résultats
df_genres_no_awards.show(truncate=False)


+---------------+-----+
|Subject        |count|
+---------------+-----+
|Crime          |1    |
|Romance        |1    |
|Adventure      |3    |
|NULL           |2    |
|Drama          |542  |
|War            |32   |
|Fantasy        |1    |
|Mystery        |105  |
|Music          |39   |
|Science Fiction|34   |
|Horror         |55   |
|Short          |1    |
|Western        |115  |
|Comedy         |354  |
|Action         |205  |
|Westerns       |6    |
+---------------+-----+



Partie 2

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

df_genres_popularity = df_with_credits.select("Title", "Subject", "Popularity").filter(col("Popularity").isNotNull())
window_spec = Window.partitionBy("Subject").orderBy(F.col("Popularity").desc())

#Appliquons la fonction rank()
df_genres_popularity_ranked = df_genres_popularity.withColumn("Classement", F.rank().over(window_spec))

df_genres_popularity_ranked.show(truncate=False)


+-------------------------------+-------+----------+----------+
|Title                          |Subject|Popularity|Classement|
+-------------------------------+-------+----------+----------+
|Wild Times                     |NULL   |75        |1         |
|Lionheart                      |Action |9         |1         |
|Tango & Cash                   |Action |9         |1         |
|Boxing Babes                   |Action |9         |1         |
|Dead-Bang                      |Action |9         |1         |
|Destroyer                      |Action |87        |5         |
|Choice of Arms                 |Action |87        |5         |
|Superman, The Movie            |Action |87        |5         |
|Star Trek V: The Final Frontier|Action |87        |5         |
|B. A. D. Cats                  |Action |87        |5         |
|Russkies                       |Action |87        |5         |
|For Your Eyes Only             |Action |86        |11        |
|Prince & the Pauper, The       |Action 

nombre de films de chaque directeur

In [21]:
from pyspark.sql import functions as F

df_director_films_count = df_with_credits.select("Director", "Title")
df_director_films_count = df_director_films_count.groupBy("Director").count()
df_director_films_count.show(truncate=False)


+-------------------+-----+
|Director           |count|
+-------------------+-----+
|Rossellini, Roberto|4    |
|Totten, Robert     |1    |
|Dearden, Basil     |2    |
|Paris, Jerry       |1    |
|Mizrahi, Moshe     |1    |
|Zwick, Edward      |1    |
|Jackson, Mick      |1    |
|Keach, James       |1    |
|Altman, Robert     |2    |
|Fitzmaurice, George|1    |
|Logan, Joshua      |3    |
|Ward, David S.     |1    |
|Arnold, Newt       |1    |
|Szwarc, Jeannot    |1    |
|Glen, John         |3    |
|Gilbert, Lewis     |4    |
|Litvak, Anatole    |2    |
|Feferman, Linda    |1    |
|Comeau, Alain      |1    |
|Brooks, Richard    |2    |
+-------------------+-----+
only showing top 20 rows



créer une UDF pour convertir le “Title” en majuscule

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Définissons la fonction pour convertir le titre en majuscule
def to_case(Title):
    if Title is not None:
        return Title.upper()  # Conversion en majuscule
    return None  # Si le titre est None, on retourne None
 
# Enregistrement de Sla fonction en tant qu'UDF
to_case_udf = udf(to_case, StringType())

#  Appliquons la UDF sur la colonne "Title" pour créer une nouvelle colonne "Title_uppercase"
df_with_case_title = df_with_credits.withColumn("Title_Maj", to_case_udf("Title"))


df_with_case_title.show(truncate=False)


+----+------+--------------------------------------+-------+-----------------------+-------------------+------------------------+----------+------+--------------------------------------------------------------------------------------------------------------------+--------------------------------------+
|Year|Length|Title                                 |Subject|Actor                  |Actress            |Director                |Popularity|Awards|Credits                                                                                                             |Title_Maj                             |
+----+------+--------------------------------------+-------+-----------------------+-------------------+------------------------+----------+------+--------------------------------------------------------------------------------------------------------------------+--------------------------------------+
|1990|111   |Tie Me Up! Tie Me Down!               |Comedy |Banderas, Antonio      |Abri

Autre methode plus rapide

In [None]:
from pyspark.sql.functions import upper

# Utilisation de la fonction native upper() pour convertir "Title" en majuscule
df_with_uppercase_title = df_with_credits.withColumn("Title_uppercase", upper("Title"))

# Affichage du résultat
df_with_uppercase_title.select("Title", "Title_uppercase").show(truncate=False)


+--------------------------------------+--------------------------------------+
|Title                                 |Title_uppercase                       |
+--------------------------------------+--------------------------------------+
|Tie Me Up! Tie Me Down!               |TIE ME UP! TIE ME DOWN!               |
|High Heels                            |HIGH HEELS                            |
|Dead Zone, The                        |DEAD ZONE, THE                        |
|Cuba                                  |CUBA                                  |
|Days of Heaven                        |DAYS OF HEAVEN                        |
|Octopussy                             |OCTOPUSSY                             |
|Target Eagle                          |TARGET EAGLE                          |
|American Angels: Baptism of Blood, The|AMERICAN ANGELS: BAPTISM OF BLOOD, THE|
|Subway                                |SUBWAY                                |
|Camille Claudel                       |