In [804]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import yaml
import os
from pathlib import Path
from IPython.display import display, Markdown
from pyspark.sql.utils import AnalysisException
from pyspark.sql.types import BooleanType, DoubleType, StringType
from pyspark.sql.functions import udf, col, when, explode, row_number, sum, to_date
from pyspark.sql.window import Window
from pathlib import Path
import pprint as pp

In [805]:
spark = SparkSession.builder.appName("Test").getOrCreate()
print("‚úì Spark fonctionne")

‚úì Spark fonctionne


In [806]:
PROJECT_ROOT = Path("/home/abdeldpro/cours/Esther_brief/migration_pandas_pyspark")

config_file = PROJECT_ROOT / "settings.yaml"

with config_file.open('r') as file:
    config = yaml.safe_load(file)

print("‚úì Configuration charg√©e")
pprint.pprint(config)

‚úì Configuration charg√©e
{'business_rules': {'exclude_inactive_customers': True,
                    'exclude_negative_prices': True,
                    'payment_status': 'paid'},
 'csv_encoding': 'utf-8',
 'csv_float_format': '%.2f',
 'csv_sep': ',',
 'db_path': './data/sales.db',
 'input_dir': './data/march-input',
 'input_files': {'customers': 'customers.csv',
                 'orders_pattern': 'orders_*.json',
                 'refunds': 'refunds.csv'},
 'output_columns': ['date',
                    'city',
                    'channel',
                    'orders_count',
                    'unique_customers',
                    'items_sold',
                    'gross_revenue_eur',
                    'refunds_eur',
                    'net_revenue_eur'],
 'output_dir': './data/out'}


In [807]:
# Pr√©parer les chemins
input_dir = PROJECT_ROOT / config['input_dir'].lstrip('./')
output_dir = PROJECT_ROOT / config['output_dir'].lstrip('./')

print(f"üìÅ Dossier d'entr√©e : {input_dir}")
print(f"üìÅ Existe ? {input_dir.exists()}")

print(f"\nüìÅ Dossier de sortie : {output_dir}")

# Cr√©er le dossier de sortie
output_dir.mkdir(parents=True, exist_ok=True)
print(f"‚úì Dossier de sortie pr√™t")

üìÅ Dossier d'entr√©e : /home/abdeldpro/cours/Esther_brief/migration_pandas_pyspark/data/march-input
üìÅ Existe ? True

üìÅ Dossier de sortie : /home/abdeldpro/cours/Esther_brief/migration_pandas_pyspark/data/out
‚úì Dossier de sortie pr√™t


In [808]:
# Chemin du fichier customers
customers_path = input_dir / config['input_files']['customers']

print(f"üìÑ Chargement de : {customers_path.name}")

# Charger avec Spark
df_customers = spark.read.csv(
    str(customers_path),
    header=True,
    sep=config['csv_sep'],
    encoding=config['csv_encoding'],
    inferSchema=True
)

print(f"‚úì {df_customers.count()} clients charg√©s")
print("\nüìã Structure des colonnes :")
df_customers.printSchema()

print("\nüìÑ Aper√ßu des donn√©es :")
df_customers.show(3, truncate=False)

üìÑ Chargement de : customers.csv
‚úì 800 clients charg√©s

üìã Structure des colonnes :
root
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- city: string (nullable = true)
 |-- is_active: string (nullable = true)


üìÑ Aper√ßu des donn√©es :
+-----------+----------+---------+-----------------+--------+---------+
|customer_id|first_name|last_name|email            |city    |is_active|
+-----------+----------+---------+-----------------+--------+---------+
|C0001      |User1     |Test1    |user1@example.com|Nantes  |yes      |
|C0002      |User2     |Test2    |user2@example.com|Toulouse|yes      |
|C0003      |User3     |Test3    |user3@example.com|Bordeaux|y        |
+-----------+----------+---------+-----------------+--------+---------+
only showing top 3 rows


In [809]:
# V√©rifier ce qu'il y a dans input_dir
print(f"input_dir = {input_dir}")
print(f"Type : {type(input_dir)}")

# Lister les fichiers orders
print("\nüìÇ Fichiers orders pr√©sents :")
for fichier in input_dir.glob("orders_*.json"):
    print(f"  - {fichier.name}")

input_dir = /home/abdeldpro/cours/Esther_brief/migration_pandas_pyspark/data/march-input
Type : <class 'pathlib.PosixPath'>

üìÇ Fichiers orders pr√©sents :
  - orders_2025-03-08.json
  - orders_2025-03-19.json
  - orders_2025-03-25.json
  - orders_2025-03-18.json
  - orders_2025-03-26.json
  - orders_2025-03-02.json
  - orders_2025-03-01.json
  - orders_2025-03-21.json
  - orders_2025-03-24.json
  - orders_2025-03-20.json
  - orders_2025-03-14.json
  - orders_2025-03-05.json
  - orders_2025-03-06.json
  - orders_2025-03-17.json
  - orders_2025-03-10.json
  - orders_2025-03-04.json
  - orders_2025-03-22.json
  - orders_2025-03-13.json
  - orders_2025-03-15.json
  - orders_2025-03-09.json
  - orders_2025-03-07.json
  - orders_2025-03-31.json
  - orders_2025-03-12.json
  - orders_2025-03-23.json
  - orders_2025-03-16.json
  - orders_2025-03-30.json
  - orders_2025-03-28.json
  - orders_2025-03-27.json
  - orders_2025-03-03.json
  - orders_2025-03-11.json
  - orders_2025-03-29.json


In [810]:
# ========================================
# CONSOLIDATION DE TOUS LES FICHIERS ORDERS DU MOIS
# √ânonc√© : "orders_YYYY-MM-DD.json (commandes et lignes d'articles, un fichier par jour)"
# Boucle sur tous les jours du mois de mars (1 √† 31)
# ========================================

liste_orders = []

for day in range(1, 32): 
    order_path = os.path.join(input_dir, f"orders_2025-03-{day:02d}.json")
    if not os.path.exists(order_path):
        continue
    else:
        order = spark.read.json(order_path, multiLine=True)
        liste_orders.append(order)

if liste_orders:
    from functools import reduce
    orders = reduce(lambda df1, df2: df1.union(df2), liste_orders)
else:
    orders = None

In [811]:
# V√©rifier si le DataFrame existe
if orders is not None:
    orders.printSchema()
    orders.show(5, truncate=False)
    print("Nombre total de commandes consolid√©es :", orders.count())
else:
    print("Aucun fichier orders trouv√© pour ce mois.")


root
 |-- channel: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- qty: long (nullable = true)
 |    |    |-- sku: string (nullable = true)
 |    |    |-- unit_price: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- payment_status: string (nullable = true)

+-------+-------------------+-----------+---------------------------------------------------------+-------------+--------------+
|channel|created_at         |customer_id|items                                                    |order_id     |payment_status|
+-------+-------------------+-----------+---------------------------------------------------------+-------------+--------------+
|app    |2025-03-01 20:36:44|C0793      |[{4, SKU001, 24.9}]                                      |O202503010001|pending       |
|web    |2025-03-01 11:30:49|C0676      |[{4

In [812]:
# ========================================
# CHARGEMENT DES REMBOURSEMENTS
# √ânonc√© : "refunds.csv (historique des remboursements)"
# ========================================

refunds_path = os.path.join(input_dir, "refunds.csv")
try:
    refunds = spark.read.csv(refunds_path, header=True, inferSchema=True)
except AnalysisException:
    refunds = None
    print(f"Fichier non trouv√© : {refunds_path}")


In [813]:
refunds.show(5, truncate=False)

+---------+-------------+------+----------+-------------------+
|refund_id|order_id     |amount|reason    |created_at         |
+---------+-------------+------+----------+-------------------+
|R000001  |O202503010089|error |delay     |2025-03-01 14:03:41|
|R000002  |O202503010038|-8.89 |gesture   |2025-03-01 22:16:56|
|R000003  |O202503010008|again |item_issue|2025-03-01 20:06:25|
|R000004  |O202503010073|-2.47 |coupon    |2025-03-01 20:02:46|
|R000005  |O202503010005|-3.83 |gesture   |2025-03-01 09:58:15|
+---------+-------------+------+----------+-------------------+
only showing top 5 rows


In [814]:
# ========================================
# FONCTION POUR STANDARDISER LES BOOL√âENS
# Les donn√©es peuvent arriver dans diff√©rents formats : "true", 1, "yes", etc.
# Cette fonction les transforme tous en bool√©en Python (True/False)
# ========================================

def controle_bool(v):
    if isinstance(v, bool): return v
    if isinstance(v, (int, float)): return bool(v)
    if v is None: return False
    s = str(v).strip().lower()
    return s in ("1","true","yes","y","t")


In [815]:
# ========================================
# NETTOYAGE DES DONN√âES CLIENTS
# Standardisation du champ is_active et des types de colonnes
# ========================================

controle_bool_udf = F.udf(controle_bool, BooleanType())

customers = (
    df_customers
        .withColumn("is_active", controle_bool_udf(df_customers["is_active"]))
        .withColumn("customer_id", df_customers["customer_id"].cast(StringType()))
        .withColumn("city", df_customers["city"].cast(StringType()))
)

In [816]:
# ========================================
# NETTOYAGE DES REMBOURSEMENTS
# √ânonc√© : "Agr√©ger les remboursements par commande, avec des montants n√©gatifs"
# Conversion des montants en num√©rique et gestion des erreurs
# ========================================

# refunds = (
#     refunds
#         .withColumn(
#             "amount",
#             when(col("amount").cast(DoubleType()).isNull(), 0.0)
#             .otherwise(col("amount").cast(DoubleType()))
#         )
#         .withColumn("created_at", col("created_at").cast(StringType()))
# )


refunds_clean = (
        refunds.withColumn("amount", F.expr("try_cast(amount AS double)"))
        .na.fill({"amount": 0.0})
        .select("order_id", "amount")
    )

In [817]:
refunds_clean.show(5, truncate=False)

+-------------+------+
|order_id     |amount|
+-------------+------+
|O202503010089|0.0   |
|O202503010038|-8.89 |
|O202503010008|0.0   |
|O202503010073|-2.47 |
|O202503010005|-3.83 |
+-------------+------+
only showing top 5 rows


In [818]:
# ========================================
# FILTRAGE DES COMMANDES PAY√âES
# √ânonc√© : "Conserver uniquement les commandes pay√©es (payment_status = 'paid')"
# ========================================

ln_initial = orders.count()
orders = orders.filter(col("payment_status") == "paid")
ln_final = orders.count()

In [819]:
# Etude de la structure avant explode
orders.select("items").printSchema()
orders.select(explode("items")).printSchema()

root
 |-- items: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- qty: long (nullable = true)
 |    |    |-- sku: string (nullable = true)
 |    |    |-- unit_price: double (nullable = true)

root
 |-- col: struct (nullable = true)
 |    |-- qty: long (nullable = true)
 |    |-- sku: string (nullable = true)
 |    |-- unit_price: double (nullable = true)



In [820]:
# ========================================
# EXPLOSION DES LIGNES D'ARTICLES
# Chaque commande contient plusieurs articles (dans une liste "items")
# On "√©clate" cette liste pour avoir une ligne par article
# ========================================

orders2 = orders.withColumn("item", explode(col("items")))
orders2 = orders2.select(
    *[col(c) for c in orders2.columns if c != "items"],
    col("item.qty").alias("item_qty"),
    col("item.sku").alias("item_sku"),
    col("item.unit_price").alias("item_unit_price")
).drop("items")

In [821]:
# ========================================
# REJET DES ARTICLES √Ä PRIX N√âGATIF
# √ânonc√© : "√âcarter toute ligne d'article avec prix unitaire n√©gatif (et consigner ces rejets)"
# ========================================

orders2 = orders2.drop("item")

neg_items = orders2.filter(col("item_unit_price") < 0)
n_neg = neg_items.count()
if n_neg > 0:
    rejects_path = os.path.join(output_dir, "rejects_items.csv")

    neg_items.coalesce(1).write.mode("overwrite").csv(
        rejects_path, 
        header=True,
        encoding=enc
    )
orders2 = orders2.filter(col("item_unit_price") >= 0)

orders2.show(5, truncate=False)

+-------+-------------------+-----------+-------------+--------------+--------+--------+---------------+
|channel|created_at         |customer_id|order_id     |payment_status|item_qty|item_sku|item_unit_price|
+-------+-------------------+-----------+-------------+--------------+--------+--------+---------------+
|web    |2025-03-01 11:30:49|C0676      |O202503010001|paid          |5       |SKU005  |12.5           |
|web    |2025-03-01 07:27:00|C0642      |O202503010003|paid          |1       |SKU014  |5.0            |
|web    |2025-03-01 22:29:42|C0571      |O202503010005|paid          |1       |SKU001  |2.5            |
|web    |2025-03-01 09:24:19|C0704      |O202503010006|paid          |1       |SKU039  |9.9            |
|web    |2025-03-01 09:24:19|C0704      |O202503010006|paid          |4       |SKU037  |15.0           |
+-------+-------------------+-----------+-------------+--------------+--------+--------+---------------+
only showing top 5 rows


In [822]:
# ========================================
# D√âDUPLICATION DES COMMANDES
# √ânonc√© : "D√©dupliquer sur order_id (garder la premi√®re occurrence)"
# On trie par date de cr√©ation et on garde la premi√®re occurrence par order_id
# ========================================

before = orders2.count()
window = Window.partitionBy("order_id").orderBy("created_at")
orders3 = orders2.withColumn("row_num", row_number().over(window))
orders3 = orders3.filter(col("row_num") == 1)
orders3 = orders3.drop("row_num")
after = orders3.count()

print(f"D√©doublonnage : {before - after} commandes supprim√©es")

D√©doublonnage : 4385 commandes supprim√©es


In [823]:
# ========================================
# CALCUL DU REVENU BRUT PAR COMMANDE
# Calcul : quantit√© √ó prix unitaire, puis agr√©gation par commande
# ========================================

orders3 = orders3.withColumn(
    "line_gross", 
    col("item_qty") * col("item_unit_price")
)

per_order = orders3.groupBy(
    "order_id", 
    "customer_id", 
    "channel", 
    "created_at"
).agg(
    sum("item_qty").alias("items_sold"),
    sum("line_gross").alias("gross_revenue_eur")
)

per_order.show(5, truncate=False)

+-------------+-----------+-------+-------------------+----------+-----------------+
|order_id     |customer_id|channel|created_at         |items_sold|gross_revenue_eur|
+-------------+-----------+-------+-------------------+----------+-----------------+
|O202503010001|C0676      |web    |2025-03-01 11:30:49|5         |62.5             |
|O202503010003|C0642      |web    |2025-03-01 07:27:00|1         |5.0              |
|O202503010005|C0571      |web    |2025-03-01 22:29:42|1         |2.5              |
|O202503010006|C0704      |web    |2025-03-01 09:24:19|1         |9.9              |
|O202503010007|C0464      |app    |2025-03-01 15:50:48|1         |24.9             |
+-------------+-----------+-------+-------------------+----------+-----------------+
only showing top 5 rows


In [824]:
# ========================================
# EXCLUSION DES CLIENTS INACTIFS
# √ânonc√© : "Exclure les clients inactifs (is_active = false)"
# On fait une jointure avec la table customers et on filtre sur is_active = True
# ========================================

len_init = per_order.count()
per_order = per_order.join(
    customers.select("customer_id", "city", "is_active"),
    on="customer_id",
    how="left"
)
per_order = per_order.filter(col("is_active") == True)
len_after = per_order.count()

print(f"Clients inactifs exclus : {len_init - len_after}")

Clients inactifs exclus : 340


In [825]:
# ========================================
# STANDARDISATION DES DATES
# Conversion de diff√©rents formats de date en format ISO (YYYY-MM-DD)
# ========================================

per_order = per_order.withColumn(
    "order_date",
    to_date(col("created_at"))
)

per_order.schema["order_date"].dataType

DateType()

In [826]:
# ========================================
# AGR√âGATION DES REMBOURSEMENTS PAR COMMANDE
# √ânonc√© : "Agr√©ger les remboursements par commande, avec des montants n√©gatifs"
# ========================================

# S√©curisation des montants de remboursement + agr√©gation
refunds_sum = (
    refunds
        .withColumn(
            "amount",
            F.expr("try_cast(amount AS double)")   # -> renvoie null si 'error'
        )
        .fillna({"amount": 0.0})                  # invariant : amount doit √™tre num√©rique
        .groupBy("order_id")
        .agg(F.sum("amount").alias("refunds_eur"))
)

per_order = (
    per_order
        .join(refunds_sum, on="order_id", how="left")
        .fillna({"refunds_eur": 0.0})
)



In [827]:
per_order.show()

+-------------+-----------+-------+-------------------+----------+------------------+---------+---------+----------+-------------------+
|     order_id|customer_id|channel|         created_at|items_sold| gross_revenue_eur|     city|is_active|order_date|        refunds_eur|
+-------------+-----------+-------+-------------------+----------+------------------+---------+---------+----------+-------------------+
|O202503010001|      C0676|    web|2025-03-01 11:30:49|         5|              62.5|Marseille|     true|2025-03-01|                0.0|
|O202503010003|      C0642|    web|2025-03-01 07:27:00|         1|               5.0| Toulouse|     true|2025-03-01|              -3.13|
|O202503010005|      C0571|    web|2025-03-01 22:29:42|         1|               2.5| Toulouse|     true|2025-03-01|             -35.42|
|O202503010007|      C0464|    app|2025-03-01 15:50:48|         1|              24.9|   Nantes|     true|2025-03-01|                0.0|
|O202503010008|      C0317|    app|2025-0

In [828]:
# Chemin vers la base SQLite
import sqlite3

db_path = "sales.db"  # ou "data/sales.db" si tu veux un sous-dossier

# S√©lection des colonnes PySpark
per_order_save = per_order.select(
    "order_id", "customer_id", "city", "channel",
    "order_date", "items_sold", "gross_revenue_eur"
)

# Conversion en Pandas
per_order_pd = per_order_save.toPandas()

# Sauvegarde dans SQLite
conn = sqlite3.connect(db_path)
per_order_pd.to_sql("orders_clean", conn, if_exists="replace", index=False)
conn.close()

In [829]:
# ========================================
# SAUVEGARDE DANS SQLITE : TABLE orders_clean
# √ânonc√© : "Une base SQLite sales.db comprenant : orders_clean (d√©tails nettoy√©s par commande)"
# ========================================

# import sqlite3

# # On s√©lectionne les colonnes comme en Pandas
# per_order_save = per_order.select(
#     "order_id", "customer_id", "channel", "created_at", 
#     "items_sold", "gross_revenue_eur", "city", "is_active", "order_date", "refunds_eur"
# )

# # Conversion en Pandas
# per_order_pd = per_order_save.toPandas()

# # Connexion SQLite
# conn = sqlite3.connect(db_path)
# per_order_pd.to_sql("orders_clean", conn, if_exists="replace", index=False)
# conn.close()
