In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import re
from delta import configure_spark_with_delta_pip


In [3]:
paths = ["dataset/2019-Oct.csv", "dataset/2019-Nov.csv"]

In [4]:
# spark = SparkSession.builder \
#     .appName("EcommerceAnalysis") \
#     .config("spark.driver.memory", "8g") \
#     .config("spark.executor.memory", "4g") \
#     .config("spark.sql.shuffle.partitions", "200") \
#     .getOrCreate()


builder = (
    SparkSession.builder
    .appName("EcommerceAnalysis")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.memory", "4g")
    .config("spark.sql.shuffle.partitions", "200")
    # Delta
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()


26/01/26 19:20:04 WARN Utils: Your hostname, onizuka-Latitude-5500 resolves to a loopback address: 127.0.1.1; using 192.168.0.167 instead (on interface wlo1)
26/01/26 19:20:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/onizuka/.ivy2/cache
The jars for the packages stored in: /home/onizuka/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6395345b-8d3e-4e5b-8f85-605ed957374a;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.4.0 in central
	found io.delta#delta-storage;2.4.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 165ms :: artifacts dl 6ms
	:: modules in use:
	io.delta#delta-core_2.12;2.4.0 from central in [default]
	io.delta#delta-storage;2.4.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   0  

In [7]:
#Définir le schéma strictement
schema = StructType([
    StructField("event_time", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", LongType(), True),
    StructField("category_id", LongType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", LongType(), True),
    StructField("user_session", StringType(), True)
])

In [8]:
df = spark.read.csv(paths, header=True, schema=schema)
print(f"Les fichiers sont chargés. Nombre de partitions : {df.rdd.getNumPartitions()}")

Les fichiers sont chargés. Nombre de partitions : 110


In [9]:
# Fusionner et sauvegarder en format optimisé sur votre disque
df.write.mode("overwrite").parquet("dataset/full_data.parquet")

                                                                                

In [10]:
df = spark.read.parquet("dataset/full_data.parquet")

In [11]:
print(f"Données chargées : {df.count()} lignes.")

Données chargées : 109950743 lignes.


## NORMALISATION

In [12]:
def normalize_columns(df):
    current_cols = df.columns
    new_cols = []
    
    for col in current_cols:
        clean_name = col.lower()
        clean_name = re.sub(r'[.\s]+', '_', clean_name)
        clean_name = re.sub(r'[^\w]', '', clean_name)
        new_cols.append(clean_name)
    
    return df.toDF(*new_cols)

In [13]:
df = normalize_columns(df)
print("Nouveaux noms de colonnes :", df.columns)

df = df.withColumn("brand", F.lower(F.col("brand"))) \
       .withColumn("main_category", F.split(F.col("category_code"), "\.").getItem(0))

df.limit(5).select("brand", "category_code", "main_category").show()

Nouveaux noms de colonnes : ['event_time', 'event_type', 'product_id', 'category_id', 'category_code', 'brand', 'price', 'user_id', 'user_session']
+--------+--------------------+-------------+
|   brand|       category_code|main_category|
+--------+--------------------+-------------+
|elenberg|appliances.kitche...|   appliances|
|   intel|computers.compone...|    computers|
|  irobot|appliances.enviro...|   appliances|
| lucente|                null|         null|
| samsung|electronics.smart...|  electronics|
+--------+--------------------+-------------+



## Conversion des types (casting)

In [14]:
df.select("event_time").show(5)

+--------------------+
|          event_time|
+--------------------+
|2019-11-17 08:43:...|
|2019-11-17 08:43:...|
|2019-11-17 08:43:...|
|2019-11-17 08:43:...|
|2019-11-17 08:43:...|
+--------------------+
only showing top 5 rows



In [15]:
df = df.withColumn("event_time", F.to_timestamp("event_time", "yyyy-MM-dd HH:mm:ss 'UTC'"))

In [16]:
cols_to_clean = ["event_type", "category_code", "brand"]
for col_name in cols_to_clean:
    df = df.withColumn(col_name, F.lower(F.trim(F.col(col_name))))

In [17]:
df.printSchema()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: long (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_session: string (nullable = true)
 |-- main_category: string (nullable = true)



## Data Quality Rules

In [18]:
VALID_EVENT_TYPES = ["view", "cart", "remove_from_cart", "purchase"]

# bornes temporelles (inclut Oct 2019 → Apr 2020)
start_ts = F.to_timestamp(F.lit("2019-10-01 00:00:00"), "yyyy-MM-dd HH:mm:ss")
end_ts   = F.to_timestamp(F.lit("2020-05-01 00:00:00"), "yyyy-MM-dd HH:mm:ss")  # exclusif

df = (
    df
    # garder uniquement les event_type valides
    .filter(F.col("event_type").isin(VALID_EVENT_TYPES))

    # price > 0 (et non null)
    .filter(F.col("price").isNotNull() & (F.col("price") > 0))

    # event_time non null et dans la période
    .filter(F.col("event_time").isNotNull() & (F.col("event_time") >= start_ts) & (F.col("event_time") < end_ts))

    # user_id et product_id non null
    .filter(F.col("user_id").isNotNull() & F.col("product_id").isNotNull())
)



In [19]:
# (Optionnel) colonne pratique pour partitionnement par date
df = df.withColumn("event_date", F.to_date("event_time"))

print("✅ Après DQ Rules :", df.count())
df.groupBy("event_type").count().show()


                                                                                

✅ Après DQ Rules : 109693982




+----------+---------+
|event_type|    count|
+----------+---------+
|  purchase|  1659788|
|      view|104083306|
|      cart|  3950888|
+----------+---------+



                                                                                

## Gestion des valeurs manquantes (NULL)

In [20]:
string_columns = [c for c, t in df.dtypes if t == 'string']

# Afficher les statistiques (count, mean, stddev, min, max)
df.select(string_columns).describe().show()

26/01/26 19:26:11 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+----------+-------------------+--------+--------------------+-------------+
|summary|event_type|      category_code|   brand|        user_session|main_category|
+-------+----------+-------------------+--------+--------------------+-------------+
|  count| 109693982|           74413417|94619500|           109693970|     74413417|
|   mean|      null|               null|     NaN|                null|         null|
| stddev|      null|               null|     NaN|                null|         null|
|    min|      cart|    accessories.bag|  a-case|00000042-3e3f-42f...|  accessories|
|    max|      view|stationery.cartrige|   zyxel|fffffde2-4522-4b4...|   stationery|
+-------+----------+-------------------+--------+--------------------+-------------+



                                                                                

In [21]:
print("Audit des NULL avant nettoyage :")
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

Audit des NULL avant nettoyage :




+----------+----------+----------+-----------+-------------+--------+-----+-------+------------+-------------+----------+
|event_time|event_type|product_id|category_id|category_code|   brand|price|user_id|user_session|main_category|event_date|
+----------+----------+----------+-----------+-------------+--------+-----+-------+------------+-------------+----------+
|         0|         0|         0|          0|     35280565|15074482|    0|      0|          12|     35280565|         0|
+----------+----------+----------+-----------+-------------+--------+-----+-------+------------+-------------+----------+



                                                                                

In [22]:
df = df.withColumn("category_code", F.coalesce(F.col("category_code"), F.lit("Unknown"))) \
       .withColumn("main_category", F.coalesce(F.col("main_category"), F.lit("Unknown"))) \
       .withColumn("brand", F.coalesce(F.col("brand"), F.lit("Unknown")))

In [23]:
df = df.filter(F.col("user_session").isNotNull())

In [24]:
print("Nouveau comptage après correction :")
df.select(F.count("event_type").alias("Total"), 
        F.count("category_code").alias("Cat_Code_Clean"),
        F.count("main_category").alias("Main_Cat_Clean")).show()

Nouveau comptage après correction :




+---------+--------------+--------------+
|    Total|Cat_Code_Clean|Main_Cat_Clean|
+---------+--------------+--------------+
|109693970|     109693970|     109693970|
+---------+--------------+--------------+



                                                                                

In [25]:
# --- Vérification finale ---
print("Audit des NULL après nettoyage :")
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

Audit des NULL après nettoyage :




+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+-------------+----------+
|event_time|event_type|product_id|category_id|category_code|brand|price|user_id|user_session|main_category|event_date|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+-------------+----------+
|         0|         0|         0|          0|            0|    0|    0|      0|           0|            0|         0|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+-------------+----------+



                                                                                

In [26]:
df = df.dropDuplicates(["event_time", "event_type", "user_id", "user_session", "product_id"])

In [27]:
string_columns = [c for c, t in df.dtypes if t == 'string']

# Afficher les statistiques (count, mean, stddev, min, max)
df.select(string_columns).describe().show()



+-------+----------+-------------------+---------+--------------------+-------------+
|summary|event_type|      category_code|    brand|        user_session|main_category|
+-------+----------+-------------------+---------+--------------------+-------------+
|  count| 109563324|          109563324|109563324|           109563324|    109563324|
|   mean|      null|               null|      NaN|                null|         null|
| stddev|      null|               null|      NaN|                null|         null|
|    min|      cart|            Unknown|  Unknown|00000042-3e3f-42f...|      Unknown|
|    max|      view|stationery.cartrige|    zyxel|fffffde2-4522-4b4...|   stationery|
+-------+----------+-------------------+---------+--------------------+-------------+



                                                                                

## store data 

In [28]:
silver_delta_path = "data_lake/silver/events_clean_delta"


In [31]:
df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("event_date") \
    .save(silver_delta_path)

print(f"✅ SILVER DELTA stocké : {silver_delta_path}")




✅ SILVER DELTA stocké : data_lake/silver/events_clean_delta


                                                                                