<a href="https://colab.research.google.com/github/Arwa601/Projet_Big_Data_Spark/blob/main/Projet_Big_Data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install pyspark




In [None]:
# Importer les bibliothèques nécessaires
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Étape 1 : Créer une SparkSession


# Check if a Spark session already exists and stop it
if 'spark' in globals() and spark is not None:
    spark.stop()
    print("Existing Spark session stopped.")

# Create a new Spark session
spark = SparkSession.builder \
    .appName("Movie Metadata Classification") \
    .getOrCreate()
print("New Spark session created.")

New Spark session created.


Charger le jeu de données

In [None]:
# Étape 2 : Charger le jeu de données

file_path = "movies_metadata.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [None]:
# Étape 3 : Explorer les données
df.show(5)  # Afficher les 5 premières lignes
df.printSchema()  # Afficher le schéma des données

+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------+--------------------+-----------------+
|adult|belongs_to_collection|  budget|              genres|            homepage|   id|  imdb_id|original_language|      original_title|            overview|popularity|         poster_path|production_companies|production_countries|        release_date|             revenue|             runtime|    spoken_languages|  status|             tagline|               title|   video|        vote_average|       vote_count|
+-----+---------------------+--------+--------------------+--------------------+-----+---------+-----------------+--------------------+--------------------+----------+-----

In [None]:
df.columns


['adult',
 'belongs_to_collection',
 'budget',
 'genres',
 'homepage',
 'id',
 'imdb_id',
 'original_language',
 'original_title',
 'overview',
 'popularity',
 'poster_path',
 'production_companies',
 'production_countries',
 'release_date',
 'revenue',
 'runtime',
 'spoken_languages',
 'status',
 'tagline',
 'title',
 'video',
 'vote_average',
 'vote_count']

Preprocessing

In [None]:
# Étape 3 : Convertir les colonnes numériques en DoubleType
from pyspark.sql.types import DoubleType

colonnes_a_converter = ["budget", "popularity", "revenue", "runtime", "vote_average", "vote_count"]
for colonne in colonnes_a_converter:
    if colonne in df.columns:
        df = df.withColumn(colonne, df[colonne].cast(DoubleType()))



Label :Adult
contenu adapté ou non pour un public général

In [None]:
# Étape 4 : Créer une colonne binaire pour le label
from pyspark.sql.functions import when
df = df.withColumn("label", when(df["adult"] == "True", 1).otherwise(0))

Indexer et Encoder les colonnes catégorielles


Assemblage des caractéristiques :regroupe toutes les colonnes numériques utilisées comme caractéristiques
Configurer le classifieur:  Un DecisionTreeClassifier est configuré, prenant "label" comme colonne cible et "features" comme vecteur de caractéristiques. Ce modèle d’arbre de décision va tenter de prédire la variable cible en fonction des caractéristiques fournies.
Créer un Pipeline:Le Pipeline est construit pour automatiser l’ensemble des étapes de traitement des données et d’entraînement du modèle.

In [None]:
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# Étape 5 : Indexer et encoder les colonnes catégorielles avec gestion des valeurs inconnues
string_cols = ["genres", "original_language"]
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index", handleInvalid="skip") for col in string_cols]
encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol() + "_encoded") for indexer in indexers]

# Étape 6 : Assembler les caractéristiques
feature_columns = ["budget", "popularity", "revenue", "runtime", "vote_average", "vote_count"] + \
                  [encoder.getOutputCol() for encoder in encoders]

assembler = VectorAssembler(inputCols=feature_columns, outputCol="features", handleInvalid="skip")

# Étape 7 : Configurer le classifieur
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# Étape 8 : Créer un Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, dt])

# Étape 9 : Diviser les données en ensembles d'entraînement et de test
train_df, test_df = df.randomSplit([0.8, 0.2], seed=1234)

StringIndexer et OneHotEncoder : utilisées pour transformer des colonnes catégorielles (comme "genres" et "original_language") en une représentation numérique, ce qui est nécessaire pour l'entraînement du modèle.
VectorAssembler : combine les colonnes de caractéristiques en une seule colonne vectorielle, indispensable pour que les modèles Spark traitent correctement les données.
DecisionTreeClassifier : le modèle de classification utilisé pour prédire la cible, qui est également fourni par PySpark.

In [None]:
# Vérifier les colonnes du DataFrame d'entraînement
train_df.printSchema()

root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: double (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: double (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: double (nullable = true)
 |-- runtime: double (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: double (nu

Entraînement du modèle

In [None]:
# Étape 10 : Entraîner le modèle
model = pipeline.fit(train_df)

Prédiction et Evaluation

In [None]:
# Effectuer des prédictions sur l'ensemble de test
predictions = model.transform(test_df)

# Évaluer les performances du modèle
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

# Afficher l'accuracy du modèle
print(f"L'accuracy du modèle est de : {accuracy:.2f}")

# Arrêter la session Spark
spark.stop()

L'accuracy du modèle est de : 1.00


In [None]:
# Évaluer les performances du modèle avec F1 Score
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
f1_score = evaluator_f1.evaluate(predictions)
print(f"Le F1 Score du modèle est de : {f1_score:.2f}")


NameError: name 'MulticlassClassificationEvaluator' is not defined