In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F

spark = SparkSession.builder.master("local[*]").getOrCreate()

## Partie 1 : Lecture des CSV et exploration

In [None]:
try:
    clients = spark.read.option("header", "true").option("inferSchema", True).csv("data/clients_clean.csv")
    commandes = spark.read.option("header", "true").option("inferSchema", True).csv("data/commandes_clean.csv")
    
    print(f"Clients : {clients.count()}")
    print(f"Commandes : {commandes.count()}")
except Exception as e:
    print(f"Erreur lors du chargement des données nettoyées: {e}")


In [None]:
clients.printSchema()
commandes.printSchema()

In [None]:
clients.show(10)
commandes.show(10)

## Partie 3 : Statistiques avec Spark SQL

Création des vues temporaires et exécution des requêtes SQL

In [None]:
# Création des vues temporaires
clients.createOrReplaceTempView("clients")
commandes.createOrReplaceTempView("commandes")

In [None]:
# 1. Chiffre d'affaires total (EUR)
ca_total = spark.sql("""
    SELECT 
        ROUND(SUM(amount), 2) as chiffre_affaires_total_eur
    FROM commandes
    WHERE amount IS NOT NULL
""")
ca_total.show()

In [None]:
# 2. CA par segment client (tri décroissant)
ca_par_segment = spark.sql("""
    SELECT 
        c.segment,
        ROUND(SUM(cmd.amount), 2) as chiffre_affaires_eur,
        COUNT(cmd.order_id) as nombre_commandes
    FROM clients c
    INNER JOIN commandes cmd ON c.client_id = cmd.client_id
    WHERE cmd.amount IS NOT NULL
    GROUP BY c.segment
    ORDER BY chiffre_affaires_eur DESC
""")
ca_par_segment.show()

In [None]:
# 3. Panier moyen
panier_moyen = spark.sql("""
    SELECT 
        ROUND(AVG(amount), 2) as panier_moyen_eur
    FROM commandes
    WHERE amount IS NOT NULL
""")
panier_moyen.show()

In [None]:
# 4. Top 10 villes d'expédition
top_villes = spark.sql("""
    SELECT 
        city_shipping,
        COUNT(*) as nombre_commandes,
        ROUND(SUM(amount), 2) as chiffre_affaires_eur
    FROM commandes
    WHERE city_shipping IS NOT NULL
    GROUP BY city_shipping
    ORDER BY nombre_commandes DESC
    LIMIT 10
""")
top_villes.show()

In [None]:
# 5. Taux (%) de commandes annulées ou retournées
taux_annulation = spark.sql("""
    SELECT 
        COUNT(*) as total_commandes,
        SUM(CASE WHEN status IN ('cancelled', 'returned', 'canceled', 'refunded') THEN 1 ELSE 0 END) as commandes_annulees,
        ROUND(
            SUM(CASE WHEN status IN ('cancelled', 'returned', 'canceled', 'refunded') THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 
            2
        ) as taux_annulation_pct
    FROM commandes
""")
taux_annulation.show()

In [None]:
# 6. Clients sans commande
clients_sans_commande = spark.sql("""
    SELECT 
        c.client_id,
        c.nom,
        c.prenom,
        c.email,
        c.segment
    FROM clients c
    LEFT JOIN commandes cmd ON c.client_id = cmd.client_id
    WHERE cmd.client_id IS NULL
    ORDER BY c.client_id
""")
clients_sans_commande.show()
print(f"Nombre de clients sans commande: {clients_sans_commande.count()}")

In [None]:
# 7. Commandes sans client
commandes_orphelines = spark.sql("""
    SELECT 
        cmd.order_id,
        cmd.client_id,
        cmd.order_date,
        cmd.amount,
        cmd.status
    FROM commandes cmd
    LEFT JOIN clients c ON cmd.client_id = c.client_id
    WHERE c.client_id IS NULL
    ORDER BY cmd.order_id
""")
commandes_orphelines.show()
print(f"Nombre de commandes : {commandes_orphelines.count()}")

## Partie 4 : Statistiques équivalentes avec DataFrame API


In [None]:
# 1. Chiffre d'affaires total (EUR)
ca_total_df = commandes \
    .filter(F.col("amount").isNotNull()) \
    .agg(F.round(F.sum("amount"), 2).alias("chiffre_affaires_total_eur"))
ca_total_df.show()

In [None]:
# 2. CA par segment client
ca_segment_df = clients \
    .join(commandes, "client_id", "inner") \
    .filter(F.col("amount").isNotNull()) \
    .groupBy("segment") \
    .agg(
        F.round(F.sum("amount"), 2).alias("chiffre_affaires_eur"),
        F.count("order_id").alias("nombre_commandes")
    ) \
    .orderBy(F.desc("chiffre_affaires_eur"))
ca_segment_df.show()

In [None]:
# 3. Panier moyen
panier_moyen_df = commandes \
    .filter(F.col("amount").isNotNull()) \
    .agg(F.round(F.avg("amount"), 2).alias("panier_moyen_eur"))
panier_moyen_df.show()

In [None]:
# 4. Top 10 villes d'expédition
top_villes_df = commandes \
    .filter(F.col("city_shipping").isNotNull()) \
    .groupBy("city_shipping") \
    .agg(
        F.count("*").alias("nombre_commandes"),
        F.round(F.sum("amount"), 2).alias("chiffre_affaires_eur")
    ) \
    .orderBy(F.desc("nombre_commandes")) \
    .limit(10)
top_villes_df.show()

In [None]:
# 5. Taux de commandes annulées
taux_annulation_df = commandes \
    .agg(
        F.count("*").alias("total_commandes"),
        F.sum(
            F.when(F.col("status").isin(["cancelled", "returned", "canceled", "refunded"]), 1)
            .otherwise(0)
        ).alias("commandes_annulees")
    ) \
    .select(
        F.col("total_commandes"),
        F.col("commandes_annulees"),
        F.round(
            (F.col("commandes_annulees") * 100.0 / F.col("total_commandes")), 2
        ).alias("taux_annulation_pct")
    )
taux_annulation_df.show()

In [None]:
# 6. Clients sans commande
clients_sans_commande_df = clients \
    .join(commandes, "client_id", "left") \
    .filter(F.col("order_id").isNull()) \
    .select("client_id", "nom", "prenom", "email", "segment") \
    .distinct() \
    .orderBy("client_id")
clients_sans_commande_df.show()
print(f"Nombre: {clients_sans_commande_df.count()}")

In [None]:
# 7. Commandes orphelines
commandes_orphelines_df = commandes \
    .join(clients, "client_id", "left") \
    .filter(F.col("nom").isNull()) \
    .select("order_id", "client_id", "order_date", "amount", "status") \
    .orderBy("order_id")
commandes_orphelines_df.show()
print(f"Nombre: {commandes_orphelines_df.count()}")

## Partie 5 : jointures

In [None]:
# INNER JOIN : clients et commandes_clean => CA et nb commandes par segment et country
inner_join_stats = clients \
    .join(commandes, "client_id", "inner") \
    .filter(F.col("amount").isNotNull()) \
    .groupBy("segment", "country") \
    .agg(
        F.round(F.um("amount"), 2).alias("chiffre_affaires_eur"),
        F.count("order_id").alias("nombre_commandes"),
        F.countDistinct("client_id").alias("nombre_clients_actifs")
    ) \
    .orderBy(F.desc("chiffre_affaires_eur"))

inner_join_stats.show(20)

In [None]:
# LEFT JOIN : liste des clients avec nb commandes et CA (0 si aucune)
left_join_clients = clients \
    .join(commandes, "client_id", "left") \
    .groupBy(
        "client_id", "nom", "prenom", "email", 
        "segment", "country", "is_vip"
    ) \
    .agg(
        F.count(F.when(F.col("order_id").isNotNull(), F.col("order_id"))).alias("nombre_commandes"),
        F.round(
            F.sum(F.when(F.col("amount").isNotNull(), F.col("amount")).otherwise(0)), 2
        ).alias("chiffre_affaires_eur")
    ) \
    .orderBy(F.desc("chiffre_affaires_eur"))

left_join_clients.show(20)

In [None]:
# RIGHT JOIN : comparer avec LEFT pour identifier commandes orphelines
right_join_commandes = clients \
    .join(commandes, "client_id", "right") \
    .select(
        "order_id", "client_id", "nom", "prenom", 
        "order_date", "amount", "status", "segment"
    ) \
    .orderBy("order_id")

right_join_commandes.show(20)
print(f"Total commandes dans le RIGHT JOIN: {right_join_commandes.count()}")

# Identifier les commandes orphelines avec RIGHT JOIN
commandes_orphelines_right = right_join_commandes.filter(F.col("nom").isNull())
print(f"Commandes orphelines avec RIGHT JOIN: {commandes_orphelines_right.count()}")

In [None]:
# FULL OUTER JOIN : détecter tout enregistrement sans correspondance
full_outer_join = clients \
    .join(commandes, "client_id", "full_outer")

# Clients sans commandes
clients_sans_commandes_full = full_outer_join \
    .filter(F.col("order_id").isNull()) \
    .select("client_id", "nom", "prenom", "segment") \
    .distinct()
print(f"Clients sans commandes: {clients_sans_commandes_full.count()}")

# Commandes sans clients
commandes_sans_clients_full = full_outer_join \
    .filter(F.col("nom").isNull()) \
    .select("order_id", "client_id", "order_date", "amount", "status") \
    .distinct()
print(f"Commandes sans clients: {commandes_sans_clients_full.count()}")

clients_sans_commandes_full.show(5)
commandes_sans_clients_full.show(5)

In [None]:
# LEFT ANTI JOIN : clients sans aucune commande completed
commandes_completed = commandes.filter(F.col("status") == "completed")

clients_sans_completed = clients \
    .join(commandes_completed, "client_id", "left_anti") \
    .select("client_id", "nom", "prenom", "email", "segment", "is_vip") \
    .orderBy("client_id")

clients_sans_completed.show(20)
print(f"Nombre de clients sans commande completed: {clients_sans_completed.count()}")

In [None]:
spark.stop()
