In [20]:
# Utiliser le code suivant pour récuperer le 
import os
print("JAVA_HOME =", os.environ.get("JAVA_HOME"))

JAVA_HOME = /opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home


In [21]:
import os
os.environ['JAVA_HOME'] = '/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home'
import findspark
findspark.init()


In [None]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Creation de my_spark
my_spark = SparkSession.builder.appName("my_spark").getOrCreate()

# Print my_spark
print(my_spark)

# Lecture du fichier csv:
ventes_df = my_spark.read.csv("ventes.csv", header=True, inferSchema=True)

ventes_df.show()


<pyspark.sql.session.SparkSession object at 0x10bc667b0>
+--------------+-----------+-----------+-------------+-----------------+------------------+---------------+---------------+---------------+-------------+---------------------+-------------------+---------+--------------+
|id_transaction| client_nom| client_age| client_ville| produit_nom     | produit_categorie| produit_marque| prix_catalogue| magasin_nom   | magasin_type| magasin_region      |         date      | quantite| montant_total|
+--------------+-----------+-----------+-------------+-----------------+------------------+---------------+---------------+---------------+-------------+---------------------+-------------------+---------+--------------+
|           1.0| Alice     |       25.0| Paris       | Ordinateur      | Informatique     | Dell          |          800.0| Boutique Lyon | Physique    |  Auvergne-Rhône-A...|2023-03-12 00:00:00|        2|              |
|           2.0| Bob       |       34.0| Lyon        | Smar

['id_transaction',
 ' client_nom',
 ' client_age',
 ' client_ville',
 ' produit_nom     ',
 ' produit_categorie',
 ' produit_marque',
 ' prix_catalogue',
 ' magasin_nom   ',
 ' magasin_type',
 ' magasin_region      ',
 ' date      ',
 ' quantite',
 ' montant_total']

## Nettoyage des données avec PySpark

### <span style=color:orange > Convertir toutes les valeurs de type string en lowercase.

In [72]:
from pyspark.sql.functions import col, trim, lower, year

string_cols = [colonne for colonne, data_type in ventes_df.dtypes if data_type == 'string']

for c in string_cols:
    ventes_df = ventes_df.withColumn(c, lower(trim(col(c))))

In [None]:
# Vérification :

ventes_df.show(5)

+--------------+-----------+-----------+-------------+-----------------+------------------+---------------+---------------+---------------+-------------+---------------------+-------------------+---------+--------------+
|id_transaction| client_nom| client_age| client_ville| produit_nom     | produit_categorie| produit_marque| prix_catalogue| magasin_nom   | magasin_type| magasin_region      |         date      | quantite| montant_total|
+--------------+-----------+-----------+-------------+-----------------+------------------+---------------+---------------+---------------+-------------+---------------------+-------------------+---------+--------------+
|           1.0|      alice|       25.0|        paris|       ordinateur|      informatique|           dell|          800.0|  boutique lyon|     physique| auvergne-rhône-alpes|2023-03-12 00:00:00|        2|              |
|           2.0|        bob|       34.0|         lyon|       smartphone|        téléphonie|          apple|         

### <span style=color:orange > Eliminez les lignes où au moins une colonne est manquante

In [None]:
# Nombre total de lignes avant le traitement:
print(f"Total lignes : {ventes_df.count()}")

Total lignes : 495


In [None]:
# Liste des colonnes à tester (toutes sauf la dernière)
colonnes = ventes_df.columns[:-1]

# Condition : au moins une colonne est null ou vide
condition = None
for c in colonnes:
    cond = (col(c).isNull()) | (trim(col(c)) == "")
    condition = cond if condition is None else (condition | cond)

# Appliquer le filtre
ventes_df.filter(condition).show(truncate=False)



+--------------+-----------+-----------+-------------+-----------------+------------------+---------------+---------------+---------------+-------------+---------------------+-------------------+---------+--------------+
|id_transaction| client_nom| client_age| client_ville| produit_nom     | produit_categorie| produit_marque| prix_catalogue| magasin_nom   | magasin_type| magasin_region      | date              | quantite| montant_total|
+--------------+-----------+-----------+-------------+-----------------+------------------+---------------+---------------+---------------+-------------+---------------------+-------------------+---------+--------------+
|12.0          |emma       |31.0       |toulouse     |casque audio     |accessoires       |sony           |150.0          |boutique lyon  |physique     |auvergne-rhône-alpes |2023-02-19 00:00:00|         |              |
|58.0          |emma       |31.0       |toulouse     |tablette         |informatique      |samsung        |600.0    

In [None]:
ventes_df = ventes_df.na.drop(subset= colonnes) # Supprime que les valeurs Null
ventes_df = ventes_df.filter(~ condition) # Garde toutes les lignes qui ne correspondent pas à ma condition

In [46]:
# Nombre total de lignes après le traitement des veleurs null ou vide:
print(f"Total lignes après la suppression des Null ou vide : {ventes_df.count()}")

Total lignes après la suppression des Null ou vide : 487


### <span style=color:orange > Eliminez les lignes où la date d'achat est extravagante

<span style=color:yellow >Normalisation des Nom des colonnes et renommage de la colonne date en date_vente pour éviter les conflits avec 
les fonctions de pyspark

In [57]:
ventes_df = ventes_df.select([col(c).alias(c.strip()) for c in ventes_df.columns]) # Supprime les espaces 
ventes_df = ventes_df.withColumnRenamed("date", "date_vente")

In [58]:
ventes_df.columns

['id_transaction',
 'client_nom',
 'client_age',
 'client_ville',
 'produit_nom',
 'produit_categorie',
 'produit_marque',
 'prix_catalogue',
 'magasin_nom',
 'magasin_type',
 'magasin_region',
 'date_vente',
 'quantite',
 'montant_total']

In [70]:
# Enregistrement du DataFrame comme une vue temporaire pour pouvoir l'interroger avec SQL :
ventes_df.createOrReplaceTempView("ventes")

# Visualisation des annomalies dans la colonne date :
my_spark.sql("""
    SELECT 
        YEAR(date_vente) AS annee_ventes,
    COUNT(*) AS nb_ventes
    FROM ventes
    GROUP BY YEAR(date_vente)
    ORDER BY annee_ventes
""").show()

+------------+---------+
|annee_ventes|nb_ventes|
+------------+---------+
|        1632|        1|
|        1702|        1|
|        1742|        1|
|        1745|        1|
|        1821|        1|
|        1852|        1|
|        2023|      481|
+------------+---------+



In [73]:
# Nombre total de lignes avant le traitement des dates:
print(f"Total lignes avant le traitement des dates : {ventes_df.count()}")

Total lignes avant le traitement des dates : 487


In [None]:
ventes_df = ventes_df.filter(year(col("date_vente")) == 2023)

In [75]:
# Nombre total de lignes après le traitement des dates:
print(f"Total lignes après le traitement des dates : {ventes_df.count()}")

Total lignes après le traitement des dates : 481


### <span style=color:orange > Globalement, élimination des lignes où les cellules présentent des anomalies 

In [76]:
ventes_df.printSchema()

root
 |-- id_transaction: double (nullable = true)
 |-- client_nom: string (nullable = true)
 |-- client_age: double (nullable = true)
 |-- client_ville: string (nullable = true)
 |-- produit_nom: string (nullable = true)
 |-- produit_categorie: string (nullable = true)
 |-- produit_marque: string (nullable = true)
 |-- prix_catalogue: double (nullable = true)
 |-- magasin_nom: string (nullable = true)
 |-- magasin_type: string (nullable = true)
 |-- magasin_region: string (nullable = true)
 |-- date_vente: timestamp (nullable = true)
 |-- quantite: string (nullable = true)
 |-- montant_total: string (nullable = true)



### <span style=color:orange >Si l'âge est négatif, le mettre en positif.

In [84]:
ventes_df.createOrReplaceTempView("ventes")
my_spark.sql("""
    SELECT *
    FROM ventes
    WHERE  client_age < 15 OR client_age > 70;
""").show()

+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+-------------------+--------+-------------+
|id_transaction|client_nom|client_age|client_ville|     produit_nom|produit_categorie|produit_marque|prix_catalogue|   magasin_nom|magasin_type|      magasin_region|         date_vente|quantite|montant_total|
+--------------+----------+----------+------------+----------------+-----------------+--------------+--------------+--------------+------------+--------------------+-------------------+--------+-------------+
|          35.0|   charlie|     -29.0|   marseille|      smartphone|       téléphonie|         apple|        1200.0|boutique paris|    physique|       île-de-france|2023-03-03 00:00:00|       5|             |
|         145.0|     alice|     -25.0|       paris|    casque audio|      accessoires|          sony|         150.0| boutique lyon|    physique|auvergne-rhône-alpes

In [86]:
ventes_df.filter(ventes_df["client_age"]< 15).show()

+--------------+----------+----------+------------+------------+-----------------+--------------+--------------+--------------+------------+--------------------+-------------------+--------+-------------+
|id_transaction|client_nom|client_age|client_ville| produit_nom|produit_categorie|produit_marque|prix_catalogue|   magasin_nom|magasin_type|      magasin_region|         date_vente|quantite|montant_total|
+--------------+----------+----------+------------+------------+-----------------+--------------+--------------+--------------+------------+--------------------+-------------------+--------+-------------+
|          35.0|   charlie|     -29.0|   marseille|  smartphone|       téléphonie|         apple|        1200.0|boutique paris|    physique|       île-de-france|2023-03-03 00:00:00|       5|             |
|         145.0|     alice|     -25.0|       paris|casque audio|      accessoires|          sony|         150.0| boutique lyon|    physique|auvergne-rhône-alpes|2023-06-03 00:00:00

In [87]:
from pyspark.sql.functions import abs as spark_abs

ventes_df = ventes_df.withColumn("client_age", spark_abs("client_age"))


In [91]:
ventes_df = ventes_df.filter(ventes_df["client_age"]< 100)

In [None]:
# Vérification :
ventes_df.filter((ventes_df["client_age"]< 15) | (ventes_df["client_age"] > 50)).show()