In [7]:
# Import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace

# Create SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Catalogue") \
    .getOrCreate()

# Read CSV into DataFrame
df = spark.read.csv("./data/Catalogue.csv", header=True, inferSchema=True)

# Display DataFrame
df.show()

+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+
|    marque|           nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion| prix|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|  blanc|   false|50500|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   noir|   false|50500|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|  rouge|   false|50500|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   gris|    true|35350|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   bleu|    true|35350|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   gris|   false|50500|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   bleu|   false|50500|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|  rouge|    true|35350|

In [8]:
from pyspark.sql.functions import regexp_replace, col

# Define a regular expression pattern to replace � with e
replacement_rule = regexp_replace(col("marque"), "�", "e")

# Apply the replacement rule to the marque column
cleaned_df = df.withColumn("marque", replacement_rule)

# Apply the replacement rule to the longueur column
cleaned_df = cleaned_df.withColumn("longueur", regexp_replace(col("longueur"), "�", "e"))

# Display the cleaned DataFrame
cleaned_df.show()


+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+
|    marque|           nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion| prix|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+
|     Volvo|        S80 T6|      272|tres longue|       5|       5|  blanc|   false|50500|
|     Volvo|        S80 T6|      272|tres longue|       5|       5|   noir|   false|50500|
|     Volvo|        S80 T6|      272|tres longue|       5|       5|  rouge|   false|50500|
|     Volvo|        S80 T6|      272|tres longue|       5|       5|   gris|    true|35350|
|     Volvo|        S80 T6|      272|tres longue|       5|       5|   bleu|    true|35350|
|     Volvo|        S80 T6|      272|tres longue|       5|       5|   gris|   false|50500|
|     Volvo|        S80 T6|      272|tres longue|       5|       5|   bleu|   false|50500|
|     Volvo|        S80 T6|      272|tres longue|       5|       5|  rouge|    true|35350|

In [6]:

# Define a regular expression pattern to replace é with e
replacement_rule = regexp_replace(col("marque"), "�", "e") #df.withColumn("marque", regexp_replace(col("marque"), "�", "e"))
cleaned_df = df.withColumn("marque", replacement_rule)
# Display the cleaned DataFrame
cleaned_df.show()

+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+
|    marque|           nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion| prix|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|  blanc|   false|50500|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   noir|   false|50500|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|  rouge|   false|50500|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   gris|    true|35350|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   bleu|    true|35350|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   gris|   false|50500|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|   bleu|   false|50500|
|     Volvo|        S80 T6|      272|tr�s longue|       5|       5|  rouge|    true|35350|

In [9]:
# Define the path for the cleaned CSV file
output_path = "./Cleaned/catalogue"

# Save the cleaned DataFrame to a new CSV file
cleaned_df.write.csv(output_path, header=True, mode="overwrite")

# Display a message to confirm the save
print("Cleaned DataFrame has been saved to:", output_path)

Cleaned DataFrame has been saved to: ./Cleaned/catalogue


In [3]:
from pyspark.sql.functions import col

# Iterate over each column
for column_name in df.columns:
    # Filter rows with null or empty values in the column
    empty_values_df = df.filter((col(column_name) == "") | col(column_name).isNull())
    
    # Check if there are any empty values in the column
    if empty_values_df.count() > 0:
        print("There are empty values in the column:", column_name)
        # Display the rows with empty values
        empty_values_df.show()
    else:
        print("There are no empty values in the column:", column_name)


There are no empty values in the column: marque
There are no empty values in the column: nom
There are no empty values in the column: puissance
There are no empty values in the column: longueur
There are no empty values in the column: nbPlaces
There are no empty values in the column: nbPortes
There are no empty values in the column: couleur
There are no empty values in the column: occasion
There are no empty values in the column: prix


In [4]:
from pyspark.sql import Window
from pyspark.sql.functions import lag, when, col, lit

# Variables catégoriques : remplacer les valeurs manquantes par les valeurs les plus fréquentes
for col_name in [col for col, dtype in df.dtypes if dtype == 'string']:
    mode_value = df.groupBy(col_name).count().orderBy("count", ascending=False).first()[0]
    df = df.fillna(mode_value, subset=[col_name])

# Variables numériques : remplacer les valeurs manquantes par la valeur précédente dans la même colonne
for col_name in [col for col, dtype in df.dtypes if dtype != 'string']:
    df = df.withColumn(col_name, when(col(col_name).isNull(), lag(col_name).over(Window.orderBy(lit(0)))).otherwise(col_name))

# Afficher le DataFrame après le remplissage
df.show()



AnalysisException: [DATATYPE_MISMATCH.DATA_DIFF_TYPES] Cannot resolve "CASE WHEN (occasion IS NULL) THEN lag(occasion, 1, NULL) OVER (ORDER BY 0 ASC NULLS FIRST ROWS BETWEEN -1 FOLLOWING AND -1 FOLLOWING) ELSE occasion END" due to data type mismatch: Input to `casewhen` should all be the same type, but it's ["BOOLEAN", "STRING"].;
'Project [marque#298, nom#333, puissance#413, longueur#368, nbPlaces#424, nbPortes#435, couleur#403, CASE WHEN isnull(occasion#24) THEN lag(occasion#24, -1, null) windowspecdefinition(0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) ELSE occasion END AS occasion#446, prix#25]
+- Project [marque#298, nom#333, puissance#413, longueur#368, nbPlaces#424, nbPortes#435, couleur#403, occasion#24, prix#25]
   +- Project [marque#298, nom#333, puissance#413, longueur#368, nbPlaces#424, couleur#403, occasion#24, prix#25, nbPortes#22, _we0#436, CASE WHEN isnull(nbPortes#22) THEN cast(_we0#436 as string) ELSE nbPortes END AS nbPortes#435]
      +- Window [lag(nbPortes#22, -1, null) windowspecdefinition(0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#436], [0 ASC NULLS FIRST]
         +- Project [marque#298, nom#333, puissance#413, longueur#368, nbPlaces#424, couleur#403, occasion#24, prix#25, nbPortes#22]
            +- Project [marque#298, nom#333, puissance#413, longueur#368, nbPlaces#424, nbPortes#22, couleur#403, occasion#24, prix#25]
               +- Project [marque#298, nom#333, puissance#413, longueur#368, nbPortes#22, couleur#403, occasion#24, prix#25, nbPlaces#21, _we0#425, CASE WHEN isnull(nbPlaces#21) THEN cast(_we0#425 as string) ELSE nbPlaces END AS nbPlaces#424]
                  +- Window [lag(nbPlaces#21, -1, null) windowspecdefinition(0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#425], [0 ASC NULLS FIRST]
                     +- Project [marque#298, nom#333, puissance#413, longueur#368, nbPortes#22, couleur#403, occasion#24, prix#25, nbPlaces#21]
                        +- Project [marque#298, nom#333, puissance#413, longueur#368, nbPlaces#21, nbPortes#22, couleur#403, occasion#24, prix#25]
                           +- Project [marque#298, nom#333, longueur#368, nbPlaces#21, nbPortes#22, couleur#403, occasion#24, prix#25, puissance#19, _we0#414, CASE WHEN isnull(puissance#19) THEN cast(_we0#414 as string) ELSE puissance END AS puissance#413]
                              +- Window [lag(puissance#19, -1, null) windowspecdefinition(0 ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS _we0#414], [0 ASC NULLS FIRST]
                                 +- Project [marque#298, nom#333, longueur#368, nbPlaces#21, nbPortes#22, couleur#403, occasion#24, prix#25, puissance#19]
                                    +- Project [marque#298, nom#333, puissance#19, longueur#368, nbPlaces#21, nbPortes#22, coalesce(couleur#23, cast(blanc as string)) AS couleur#403, occasion#24, prix#25]
                                       +- Project [marque#298, nom#333, puissance#19, coalesce(longueur#20, cast(longue as string)) AS longueur#368, nbPlaces#21, nbPortes#22, couleur#23, occasion#24, prix#25]
                                          +- Project [marque#298, coalesce(nom#18, cast(Polo 1.2 6V as string)) AS nom#333, puissance#19, longueur#20, nbPlaces#21, nbPortes#22, couleur#23, occasion#24, prix#25]
                                             +- Project [coalesce(marque#17, cast(Volkswagen as string)) AS marque#298, nom#18, puissance#19, longueur#20, nbPlaces#21, nbPortes#22, couleur#23, occasion#24, prix#25]
                                                +- Relation [marque#17,nom#18,puissance#19,longueur#20,nbPlaces#21,nbPortes#22,couleur#23,occasion#24,prix#25] csv


In [22]:
df.show()

+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+
|    marque|           nom|puissance|   longueur|nbPlaces|nbPortes|couleur|occasion| prix|
+----------+--------------+---------+-----------+--------+--------+-------+--------+-----+
|     Volvo|        S80 T6|puissance|tr�s longue|nbPlaces|nbPortes|  blanc|   false|50500|
|     Volvo|        S80 T6|puissance|tr�s longue|nbPlaces|nbPortes|   noir|   false|50500|
|     Volvo|        S80 T6|puissance|tr�s longue|nbPlaces|nbPortes|  rouge|   false|50500|
|     Volvo|        S80 T6|puissance|tr�s longue|nbPlaces|nbPortes|   gris|    true|35350|
|     Volvo|        S80 T6|puissance|tr�s longue|nbPlaces|nbPortes|   bleu|    true|35350|
|     Volvo|        S80 T6|puissance|tr�s longue|nbPlaces|nbPortes|   gris|   false|50500|
|     Volvo|        S80 T6|puissance|tr�s longue|nbPlaces|nbPortes|   bleu|   false|50500|
|     Volvo|        S80 T6|puissance|tr�s longue|nbPlaces|nbPortes|  rouge|    true|35350|

In [5]:
from pyspark.sql.functions import col

# Iterate over each column
for column_name in df.columns:
    # Filter rows with null or empty values in the column
    empty_values_df = df.filter((col(column_name) == "") | col(column_name).isNull())
    
    # Check if there are any empty values in the column
    if empty_values_df.count() > 0:
        print("There are empty values in the column:", column_name)
        # Display the rows with empty values
        empty_values_df.show()
    else:
        print("There are no empty values in the column:", column_name)

There are no empty values in the column: marque
There are no empty values in the column: nom
There are no empty values in the column: puissance
There are no empty values in the column: longueur
There are no empty values in the column: nbPlaces
There are no empty values in the column: nbPortes
There are no empty values in the column: couleur
There are no empty values in the column: occasion
There are no empty values in the column: prix


In [43]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Créer la session Spark
spark = SparkSession.builder \
    .appName("Count Null Rows") \
    .getOrCreate()



# Initialiser le compteur
total_null_rows = 0

# Parcourir toutes les colonnes du DataFrame
for col_name in df.columns:
    # Compter le nombre de lignes avec des valeurs nulles dans cette colonne
    null_rows_count = df.filter(df[col_name].isNull()).count()
    # Ajouter au compteur total
    total_null_rows += null_rows_count
    # Afficher le nombre de lignes avec des valeurs nulles dans cette colonne
    print("Nombre de lignes avec des valeurs nulles dans la colonne", col_name, ":", null_rows_count)

# Afficher le nombre total de lignes avec des valeurs nulles dans le DataFrame
print("Nombre total de lignes avec des valeurs nulles dans le DataFrame :", total_null_rows)


Nombre de lignes avec des valeurs nulles dans la colonne marque : 0
Nombre de lignes avec des valeurs nulles dans la colonne nom : 0
Nombre de lignes avec des valeurs nulles dans la colonne puissance : 0
Nombre de lignes avec des valeurs nulles dans la colonne longueur : 0
Nombre de lignes avec des valeurs nulles dans la colonne nbPlaces : 0
Nombre de lignes avec des valeurs nulles dans la colonne nbPortes : 0
Nombre de lignes avec des valeurs nulles dans la colonne couleur : 0
Nombre de lignes avec des valeurs nulles dans la colonne occasion : 0
Nombre de lignes avec des valeurs nulles dans la colonne prix : 0
Nombre total de lignes avec des valeurs nulles dans le DataFrame : 0
