In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, explode, col,when,avg, count

import plotly.express as px

In [17]:
# Créer une session Spark
spark = SparkSession.builder \
    .appName("PySpark HDFS Example") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .getOrCreate()

In [18]:
df = spark.read.csv("hdfs://localhost:9000/user/hadoop/imdb-movies-dataset.csv", header=True, inferSchema=True)


In [19]:
df.show(20000)

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [20]:
# Convertir les colonnes 'Metascore' et 'Rank' en type numérique si nécessaire
df = df.withColumn("Metascore", col("Metascore").cast("float"))
df = df.withColumn("Rating", col("Rating").cast("float"))

# Création de la nouvelle colonne en utilisant une expression
df = df.withColumn("average_rank", ((col("Metascore") / 10) + col("Rating")) / 2)
df.show(20000)

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



In [21]:
df= df.dropna()
average_by_certificate = df.groupBy("Certificate").avg("average_rank").orderBy("Certificate")
average_by_certificate_pandas = average_by_certificate.toPandas()
# Création du graphique à barres
fig = px.bar(average_by_certificate_pandas, x='Certificate', y='avg(average_rank)',
             title='Moyenne de average_rank par Certificate',
             labels={'avg(average_rank)': 'Moyenne de average_rank'})

# Affichage du graphique
fig.show()

In [22]:
# Création de catégories de durée
df = df.withColumn("Duration_Category", 
                   when(col("Duration (min)") <= 90, "<= 90")
                   .when((col("Duration (min)") > 90) & (col("Duration (min)") <= 120), "90 - 120")
                   .otherwise("> 120"))

# Calcul de la moyenne de average_rank pour chaque catégorie de durée
average_by_duration = df.groupBy("Duration_Category").agg(avg("average_rank").alias("avg_average_rank")).orderBy("Duration_Category")

# Convertir en DataFrame Pandas
average_by_duration_pandas = average_by_duration.toPandas()

# Création du graphique à barres
fig = px.bar(average_by_duration_pandas, x='Duration_Category', y='avg_average_rank',
             title='Moyenne de average_rank par Durée des Films',
             labels={'avg_average_rank': 'Moyenne de average_rank', 'Duration_Category': 'Durée (min)'})

# Affichage du graphique
fig.show()

In [36]:
# Séparation des genres et explosion des listes de genres en lignes individuelles
df_genres = df.withColumn("Genre", explode(split(col("Genre"), ",\s*")))

# Comptage des occurrences de chaque genre
genre_counts = df_genres.groupBy("Genre").count().orderBy("count", ascending=False)

# Convertir en DataFrame Pandas
genre_counts_pandas = genre_counts.toPandas()

# Création du graphique avec Plotly
import plotly.express as px

fig = px.bar(genre_counts_pandas, x='Genre', y='count', 
             title='Comptage des Genres',
             labels={'count': 'Nombre de films', 'Genre': 'Genre'})

# Affichage du graphique
fig.show()



invalid escape sequence '\s'


invalid escape sequence '\s'


invalid escape sequence '\s'



In [33]:
# Calcul de la moyenne de Rating et du nombre de films par Director
director_stats = df.groupBy("Director").agg(
    avg("Rating").alias("avg_rating"),
    count("Title").alias("film_count")
).orderBy("avg_rating", ascending=False)
director_stats = director_stats.filter(col("film_count") > 2).filter(col("avg_rating")>8).orderBy("avg_rating", ascending=False)

# Convertir en DataFrame Pandas
director_stats_pandas = director_stats.toPandas()
fig = px.bar(director_stats_pandas, x='Director', y='avg_rating',
             hover_data=['film_count'],
             title='Moyenne de Rating par Director et Nombre de Films',
             labels={'avg_rating': 'Moyenne de Rating', 'Director': 'Directeur', 'film_count': 'Nombre de Films'})

# Personnalisation du graphique pour améliorer la lisibilité
fig.update_layout(xaxis={'categoryorder': 'total descending'},
                  xaxis_title='Directeur',
                  yaxis_title='Moyenne de Rating',
                  hovermode='x unified')

# Affichage du graphique
fig.show()

In [38]:
# Séparation des genres et explosion des listes de genres en lignes individuelles
df_genres = df.withColumn("Genre", explode(split(col("Genre"), ",\s*")))

# Calcul de la moyenne de Rating pour chaque genre
average_rating_by_genre = df_genres.groupBy("Genre").agg(
    avg("Rating").alias("avg_rating")
).orderBy("avg_rating", ascending=False)

# Convertir en DataFrame Pandas
average_rating_by_genre_pandas = average_rating_by_genre.toPandas()

fig = px.bar(average_rating_by_genre_pandas, x='Genre', y='avg_rating',
             title='Moyenne de Rating par Genre',
             labels={'avg_rating': 'Moyenne de Rating', 'Genre': 'Genre'})

# Personnalisation du graphique pour améliorer la lisibilité
fig.update_layout(xaxis={'categoryorder': 'total descending'},
                  xaxis_title='Genre',
                  yaxis_title='Moyenne de Rating',
                  hovermode='x unified')

# Affichage du graphique
fig.show()


invalid escape sequence '\s'


invalid escape sequence '\s'


invalid escape sequence '\s'



In [44]:
# Séparation des acteurs et explosion des listes d'acteurs en lignes individuelles
df_cast = df.withColumn("Actor", explode(split(col("Cast"), ",\s*")))

# Calcul de la moyenne de Rating et du nombre de films par acteur
average_rating_by_actor = df_cast.groupBy("Actor").agg(
    avg("Rating").alias("avg_rating"),
    count("Title").alias("film_count")
)

# Filtrer les acteurs ayant joué dans plus de 2 films
average_rating_by_actor_filtered = average_rating_by_actor.filter(col("film_count") > 10).orderBy("avg_rating", ascending=False)

# Convertir en DataFrame Pandas
average_rating_by_actor_pandas = average_rating_by_actor_filtered.toPandas()

fig = px.bar(average_rating_by_actor_pandas, x='Actor', y='avg_rating',
             hover_data=['film_count'],
             title='Moyenne de Rating par Acteur (plus de 10 films)',
             labels={'avg_rating': 'Moyenne de Rating', 'Actor': 'Acteur', 'film_count': 'Nombre de Films'})

# Personnalisation du graphique pour améliorer la lisibilité
fig.update_layout(xaxis={'categoryorder': 'total descending'},
                  xaxis_title='Acteur',
                  yaxis_title='Moyenne de Rating',
                  hovermode='x unified')

# Affichage du graphique
fig.show()


invalid escape sequence '\s'


invalid escape sequence '\s'


invalid escape sequence '\s'



In [45]:
spark.stop()