In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Analyse de données ETL") \
    .getOrCreate()

# Chargement des données
df = spark.read.csv('./jeu_donnees_etl_5000_lignes.csv', header=True, inferSchema=True)

# Affichage des premières lignes
df.show(5)  # équivalent de df.head() en Pandas

# Informations générales (schéma et types)
df.printSchema()

# Statistiques descriptives
df.describe().show()

# Nombre de lignes et de colonnes
print(f"Nombre de lignes : {df.count()}")
print(f"Nombre de colonnes : {len(df.columns)}")


+----------+-----------+---------------+-------------+----------+
|ID_produit|Nom_produit|Quantite_vendue|Prix_unitaire|Date_vente|
+----------+-----------+---------------+-------------+----------+
|         1|    Chemise|           10.0|         25.0|2022-01-05|
|         2|   Pantalon|            8.0|         35.0|2022-01-06|
|         3| Chaussures|           NULL|         50.0|2022-01-07|
|         4|    Cravate|           12.0|         15.0|2022-01-08|
|         5|       Robe|           15.0|         45.0|2022-01-09|
+----------+-----------+---------------+-------------+----------+
only showing top 5 rows

root
 |-- ID_produit: integer (nullable = true)
 |-- Nom_produit: string (nullable = true)
 |-- Quantite_vendue: double (nullable = true)
 |-- Prix_unitaire: double (nullable = true)
 |-- Date_vente: string (nullable = true)

+-------+------------------+-----------+------------------+------------------+------------+
|summary|        ID_produit|Nom_produit|   Quantite_vendue|    

In [84]:
# Nombre de lignes avant suppression
nb_lignes_avant = df.count()

# Suppression des doublons
df = df.dropDuplicates()

# Nombre de lignes après suppression
nb_lignes_apres = df.count()

# Affichage du résultat
print(f"Nombre de lignes avant suppression : {nb_lignes_avant}")
print(f"Nombre de lignes après suppression : {nb_lignes_apres}")
print(f"Nombre de doublons supprimés : {nb_lignes_avant - nb_lignes_apres}")


Nombre de lignes avant suppression : 5266
Nombre de lignes après suppression : 4965
Nombre de doublons supprimés : 301


In [85]:
from pyspark.sql.functions import col, isnan, when, count

# Supprimer les lignes où 'Quantite_vendue' est manquante
df = df.filter(col("Quantite_vendue").isNotNull())

# Convertir 'Quantite_vendue' en entier (si elle ne l'est pas déjà)
df = df.withColumn("Quantite_vendue", col("Quantite_vendue").cast("int"))

# Vérifier le nombre de valeurs nulles dans la colonne 'Quantite_vendue'
nb_nan = df.select(count(when(col("Quantite_vendue").isNull(), True)).alias("nb_nan")).collect()[0]["nb_nan"]
print(f"Le nombre de NaN dans la colonne 'Quantite_vendue' est : {nb_nan}")


Le nombre de NaN dans la colonne 'Quantite_vendue' est : 0


In [86]:

df.printSchema()
df.show(5)

root
 |-- ID_produit: integer (nullable = true)
 |-- Nom_produit: string (nullable = true)
 |-- Quantite_vendue: integer (nullable = true)
 |-- Prix_unitaire: double (nullable = true)
 |-- Date_vente: string (nullable = true)

+----------+-----------+---------------+-------------+------------+
|ID_produit|Nom_produit|Quantite_vendue|Prix_unitaire|  Date_vente|
+----------+-----------+---------------+-------------+------------+
|        37|    Chemise|             10|         25.0|  2022-01-05|
|        65|       Robe|             15|         45.0|  2022-01-09|
|        64|    Cravate|             18|        87.41|  31/10/2024|
|        56|       Robe|              8|        23.35|invalid_date|
|        42|     Montre|              4|        40.91|invalid_date|
+----------+-----------+---------------+-------------+------------+
only showing top 5 rows



In [87]:
from pyspark.sql.functions import col, when

# Fonction pour appliquer la winsorisation sur une colonne
def winsorize_column(df, column_name):
    # Calcul des quartiles
    q1, q3 = df.approxQuantile(column_name, [0.25, 0.75], 0.01)
    iqr = q3 - q1
    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr

    # Clipping des valeurs aberrantes
    return df.withColumn(
        column_name,
        when(col(column_name) < lower, lower)
        .when(col(column_name) > upper, upper)
        .otherwise(col(column_name))
    )

# Appliquer sur les deux colonnes
df = winsorize_column(df, "Quantite_vendue")
df = winsorize_column(df, "Prix_unitaire")


In [88]:
from pyspark.sql.functions import col, when, count

# Vérifier les doublons restants (lignes identiques)
doublons_restants = df.count() - df.dropDuplicates().count()

# Compter les valeurs manquantes pour chaque colonne
valeurs_manquantes = df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df.columns
]).collect()[0].asDict()

# Vérifier les valeurs négatives
quantite_negative = df.filter(col("Quantite_vendue") < 0).count()
prix_negative = df.filter(col("Prix_unitaire") < 0).count()

# Résumé sous forme de dictionnaire
validation = {
    "doublons_restants": doublons_restants,
    "valeurs_manquantes": valeurs_manquantes,
    "quantite_negative": quantite_negative,
    "prix_negative": prix_negative
}

# Afficher les résultats
for cle, valeur in validation.items():
    print(f"{cle} : {valeur}")

doublons_restants : 6
valeurs_manquantes : {'ID_produit': 0, 'Nom_produit': 10, 'Quantite_vendue': 0, 'Prix_unitaire': 1195, 'Date_vente': 0}
quantite_negative : 0
prix_negative : 0


In [89]:
from pyspark.sql.functions import col, when, count

# 1. Supprimer les lignes où 'Nom_produit' ou 'Prix_unitaire' sont nulles
df = df.dropna(subset=['Nom_produit', 'Prix_unitaire'])

# 2. Validation des données

# Doublons restants
doublons_restants = df.count() - df.dropDuplicates().count()

# Valeurs manquantes par colonne
valeurs_manquantes = df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df.columns
]).collect()[0].asDict()

# Quantités négatives
quantite_negative = df.filter(col("Quantite_vendue") < 0).count()

# Prix négatifs
prix_negative = df.filter(col("Prix_unitaire") < 0).count()

# Résumé
validation = {
    "doublons_restants": doublons_restants,
    "valeurs_manquantes": valeurs_manquantes,
    "quantite_negative": quantite_negative,
    "prix_negative": prix_negative
}

# Affichage
for cle, valeur in validation.items():
    print(f"{cle} : {valeur}")


doublons_restants : 0
valeurs_manquantes : {'ID_produit': 0, 'Nom_produit': 0, 'Quantite_vendue': 0, 'Prix_unitaire': 0, 'Date_vente': 0}
quantite_negative : 0
prix_negative : 0


In [90]:
from pyspark.sql.functions import col
import logging

logging.basicConfig(level=logging.ERROR)

try:
    df = df.withColumn("Valeur_totale", col("Quantite_vendue") * col("Prix_unitaire"))
except Exception as e:
    logging.error(f"Erreur lors du calcul de la colonne 'Valeur_totale': {e}")


In [91]:
from pyspark.sql.functions import col

df = df.withColumn("Montant_total", col("Quantite_vendue") * col("Prix_unitaire"))


In [92]:
from pyspark.sql.functions import min as spark_min, max as spark_max, col

# Calcul des min et max avec alias différents
stats = df.agg(
    spark_min("Montant_total").alias("min_montant"),
    spark_max("Montant_total").alias("max_montant")
).collect()[0]

montant_min = stats["min_montant"]
montant_max = stats["max_montant"]

# Création de la colonne normalisée
df = df.withColumn(
    "Montant_total_normalise",
    (col("Montant_total") - montant_min) / (montant_max - montant_min)
)


In [93]:
from pyspark.sql.functions import sum as spark_sum

df_agg = df.groupBy("Nom_produit") \
           .agg(spark_sum("Valeur_totale").alias("Vente_totale"))


In [94]:
from pyspark.sql.functions import col, when, count

# Recalcul des validations finales

doublons_restants = df.count() - df.dropDuplicates().count()

valeurs_manquantes = df.select([
    count(when(col(c).isNull(), c)).alias(c) for c in df.columns
]).collect()[0].asDict()

quantite_negative = df.filter(col("Quantite_vendue") < 0).count()
prix_negative = df.filter(col("Prix_unitaire") < 0).count()

documentation_resume = {
    "Traitements effectués": [
        "Suppression des doublons",
        "Imputation des valeurs manquantes avec la médiane",
        "Winsorisation des valeurs aberrantes",
        "Ajout de la colonne 'Valeur_totale'",
        "Agrégation des ventes par produit"
    ],
    "Validation finale": {
        "doublons_restants": doublons_restants,
        "valeurs_manquantes": valeurs_manquantes,
        "quantite_negative": quantite_negative,
        "prix_negative": prix_negative
    }
}

# Affichage lisible
for cle, valeur in documentation_resume.items():
    print(f"{cle} :")
    print( valeur)
    print()


Traitements effectués :
['Suppression des doublons', 'Imputation des valeurs manquantes avec la médiane', 'Winsorisation des valeurs aberrantes', "Ajout de la colonne 'Valeur_totale'", 'Agrégation des ventes par produit']

Validation finale :
{'doublons_restants': 0, 'valeurs_manquantes': {'ID_produit': 0, 'Nom_produit': 0, 'Quantite_vendue': 0, 'Prix_unitaire': 0, 'Date_vente': 0, 'Valeur_totale': 0, 'Montant_total': 0, 'Montant_total_normalise': 0}, 'quantite_negative': 0, 'prix_negative': 0}



In [95]:
df.show(5)  # Affiche les 5 premières lignes sous forme tabulaire


+----------+-----------+---------------+-------------+------------+------------------+------------------+-----------------------+
|ID_produit|Nom_produit|Quantite_vendue|Prix_unitaire|  Date_vente|     Valeur_totale|     Montant_total|Montant_total_normalise|
+----------+-----------+---------------+-------------+------------+------------------+------------------+-----------------------+
|        37|    Chemise|           10.0|         25.0|  2022-01-05|             250.0|             250.0|    0.13262669828487156|
|        65|       Robe|           15.0|         45.0|  2022-01-09|             675.0|             675.0|     0.3580920853691532|
|        64|    Cravate|           18.0|        87.41|  31/10/2024|1573.3799999999999|1573.3799999999999|     0.8346887781898048|
|        56|       Robe|            8.0|        23.35|invalid_date|             186.8|             186.8|    0.09909866895845602|
|        42|     Montre|            4.0|        40.91|invalid_date|            163.64|    

In [97]:

df.printSchema()


root
 |-- ID_produit: integer (nullable = true)
 |-- Nom_produit: string (nullable = true)
 |-- Quantite_vendue: double (nullable = true)
 |-- Prix_unitaire: double (nullable = true)
 |-- Date_vente: string (nullable = true)
 |-- Valeur_totale: double (nullable = true)
 |-- Montant_total: double (nullable = true)
 |-- Montant_total_normalise: double (nullable = true)

