# Preprocessing et cleaning des données
L'objectif de ce notebook est de nettoyer et process la donnée avant traitement et anayse. 

### Consignes:

Use Spark to clean and preprocess the data. Key steps include:
- Handling missing values.
- Removing duplicates.
- Normalizing data formats (e.g., date formats, categorical variables).
- Filtering irrelevant data

In [66]:
!pip install pyspark
!pip install pyspark[sql]



In [67]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, trim
from pyspark.sql.functions import to_timestamp


## 1. Data loading

First we create a Spark session

In [68]:
# Créer une session Spark
spark = SparkSession.builder.appName("BigDataProject").getOrCreate()

In [69]:
# Load CSV data
file_path = "../ecommerce_data_with_trends.csv"
data = spark.read.csv(file_path, header=True, inferSchema=True)

# Show schema, first lines and total number of initial rows
data.printSchema()
data.show(5)
data.count()

root
 |-- transaction_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- city: string (nullable = true)
 |-- customer_type: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_amount: double (nullable = true)

+--------------------+--------------------+-----------+--------------+-----------------+-------------+--------------------+--------------------+------+--------+------------+
|      transaction_id|           timestamp|customer_id| customer_name|             city|customer_type|        product_name|            category| price|quantity|total_amount|
+--------------------+--------------------+-----------+--------------+-----------------+-------------+--------------------+--------------------+------+--------+------------+
|TX_89

1000000

## 2. Data Preprocessing

In [70]:
data_cleaned = data.dropna(subset=["transaction_id", "timestamp", "customer_id", "total_amount"])

# Checking
print(f"Number of lines after NULL values filtering : {data_cleaned.count()}")


Number of lines after NULL values filtering : 1000000


In [72]:
data_cleaned = data_cleaned.dropDuplicates()

# Checking
print(f"Number of lines after duplicate filtering : {data_cleaned.count()}")


Number of lines after duplicate filtering : 1000000


In [74]:
data_cleaned = data_cleaned.filter(data_cleaned["total_amount"] > 0)

# Checking
print(f"Number of lines after unnecessary lines filtering : {data_cleaned.count()}")

Number of lines after unnecessary lines filtering : 1000000


## 3. Data Cleaning

In [76]:
data_cleaned = data_cleaned.withColumn("timestamp", to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"))

# Checking
data_cleaned.select("timestamp").show(5)

+--------------------+
|           timestamp|
+--------------------+
|2023-10-30 03:29:...|
|2023-10-30 03:59:...|
|2023-10-30 04:15:...|
|2023-10-30 07:27:...|
|2023-10-30 07:53:...|
+--------------------+
only showing top 5 rows



In [78]:
data_cleaned = data_cleaned.withColumn("customer_type", lower(trim(data_cleaned["customer_type"])))

# Checking
data_cleaned.select("customer_type").show(5)

+-------------+
|customer_type|
+-------------+
|          b2c|
|          b2b|
|          b2b|
|          b2c|
|          b2c|
+-------------+
only showing top 5 rows



## 4. Export processed and cleaned data

In [80]:
output_path = "../preprocessed_data"
data_cleaned.coalesce(1).write.csv(output_path, header=True, mode="overwrite")

print(f"Preprocessed and cleaned data saved at {output_path}")


Preprocessed and cleaned data saved at ../preprocessed_data.csv
