In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit, count, countDistinct, avg, sum, max as spark_max, round, dayofweek, hour, row_number, datediff
from pyspark.sql import Window
from pyspark import StorageLevel
import pandas as pd

In [15]:
# Initialisation de la session Spark
spark = SparkSession.builder \
    .appName("E-Amazing Data Processing") \
    .config("spark.driver.memory", "2g") \
    .config("spark.memory.fraction", "0.6") \
    .getOrCreate()

In [16]:

# Définir le chemin des fichiers
output_path = "./data/filtered_df_output.parquet"

In [17]:

# Fonction de lecture du fichier Parquet et conversion de la colonne event_time
def load_and_prepare_data(path):
    df = spark.read.parquet(path)
    return df.withColumn("event_time", col("event_time").cast("timestamp"))

In [18]:

# Fonction d'ajout de colonnes supplémentaires
def add_additional_columns(df):
    return df.withColumn("event_day_of_week", dayofweek(col("event_time"))) \
             .withColumn("event_hour", hour(col("event_time")))

In [19]:
# Fonction de calcul des statistiques par utilisateur et période
def compute_user_stats(df, period):
    views = df.filter(col("event_type") == "view").groupBy("user_id").agg(count("*").alias(f"number_of_views_{period}"))
    carts = df.filter(col("event_type") == "cart").groupBy("user_id").agg(count("*").alias(f"number_of_carts_{period}"))
    sessions = df.groupBy("user_id").agg(countDistinct("user_session").alias(f"number_of_sessions_{period}"))
    purchases = df.filter(col("event_type") == "purchase").groupBy("user_id").agg(
        count("*").alias(f"count_products_{period}"),
        round(avg("price"), 2).alias(f"avg_price_{period}")
    )
    return views.join(carts, "user_id").join(sessions, "user_id").join(purchases, "user_id")

In [20]:
# Fonction pour calculer les autres statistiques
def compute_other_stats(df, now):
    last_purchase = df.filter(col("event_type") == "purchase").groupBy("user_id").agg(spark_max("event_time").alias("last_purchase"))
    days_since_last_purchase = last_purchase.withColumn("days_since_last_purchase", datediff(lit(now), col("last_purchase")))

    total_purchase_value = df.filter(col("event_type") == "purchase").groupBy("user_id").agg(round(sum("price"), 2).alias("total_purchase_value"))

    # Calcul des abandons de panier (cart abandonments)
    cart_events = df.filter(col("event_type") == "cart").groupBy("user_id").agg(count("*").alias("cart_count"))
    purchase_events = df.filter(col("event_type") == "purchase").groupBy("user_id").agg(count("*").alias("purchase_count"))

    cart_abandonments = cart_events.join(purchase_events, "user_id", "left") \
                                   .withColumn("cart_abandonments", when(col("cart_count") > col("purchase_count"), col("cart_count") - col("purchase_count")).otherwise(0)) \
                                   .select("user_id", "cart_abandonments")

    return days_since_last_purchase, total_purchase_value, cart_abandonments

In [21]:

# Fonction pour déterminer les préférences (marque et catégorie) et le moment d'activité
def compute_user_preferences_and_activity(df):
    # Déterminer la marque la plus achetée par chaque utilisateur (preferred_brand)
    window_spec_brand = Window.partitionBy("user_id").orderBy(col("brand_purchases").desc())
    most_purchased_brand = df.filter(col("event_type") == "purchase").groupBy("user_id", "brand").agg(count("*").alias("brand_purchases")) \
                             .withColumn("rank", row_number().over(window_spec_brand)).filter(col("rank") == 1).drop("rank").select("user_id", col("brand").alias("preferred_brand").cast("string"))

    # Déterminer la catégorie la plus achetée par chaque utilisateur (preferred_category)
    window_spec_category = Window.partitionBy("user_id").orderBy(col("category_purchases").desc())
    most_purchased_category = df.filter(col("event_type") == "purchase").groupBy("user_id", "category_code").agg(count("*").alias("category_purchases")) \
                                .withColumn("rank", row_number().over(window_spec_category)).filter(col("rank") == 1).drop("rank").select("user_id", col("category_code").alias("preferred_category").cast("string"))

    # Calcul du moment d'activité le plus fréquent (most_active_time)
    most_active_time = df.withColumn("time_of_day",
        when(col("event_hour").between(5, 12), "morning")
        .when(col("event_hour").between(12, 17), "afternoon")
        .when(col("event_hour").between(18, 23), "evening")
        .otherwise("night")
    )\
    .groupBy("user_id", "time_of_day").agg(count("*").alias("time_activity")) \
    .withColumn("rank", row_number().over(Window.partitionBy("user_id").orderBy(col("time_activity").desc()))) \
    .filter(col("rank") == 1).drop("rank")\
    .select("user_id", col("time_of_day").alias("most_active_time").cast("string")) 

    # Calcul du jour le plus actif (most_active_day)
    most_active_day = df.groupBy("user_id", "event_day_of_week")\
                        .agg(count("*").alias("day_activity")) \
                        .withColumn("rank", row_number().over(Window.partitionBy("user_id").orderBy(col("day_activity").desc()))) \
                        .filter(col("rank") == 1).drop("rank")\
                        .select("user_id", col("event_day_of_week").alias("most_active_day"))

    # Mapping des jours de la semaine
    most_active_day = most_active_day.withColumn(
        "most_active_day",
        when(col("most_active_day") == 1, "dimanche")
        .when(col("most_active_day") == 2, "lundi")
        .when(col("most_active_day") == 3, "mardi")
        .when(col("most_active_day") == 4, "mercredi")
        .when(col("most_active_day") == 5, "jeudi")
        .when(col("most_active_day") == 6, "vendredi")
        .when(col("most_active_day") == 7, "samedi")
    )
    
    return most_purchased_brand, most_purchased_category, most_active_time, most_active_day

In [22]:


# Fonction principale de traitement
def main():
    # Chargement et préparation des données
    filtered_df = load_and_prepare_data(output_path)
    
    # Ajout de colonnes supplémentaires
    filtered_df = add_additional_columns(filtered_df)

    # Définir les périodes de temps
    now = filtered_df.select(spark_max("event_time")).collect()[0][0]
    last_2_months = now - pd.DateOffset(months=2)
    last_5_months = now - pd.DateOffset(months=5)
    last_7_months = now - pd.DateOffset(months=7)

    # Filtrer les données pour chaque période
    filtered_df_2m = filtered_df.filter(col("event_time") >= lit(last_2_months))
    filtered_df_5m = filtered_df.filter(col("event_time") >= lit(last_5_months))
    filtered_df_7m = filtered_df.filter(col("event_time") >= lit(last_7_months))

    # Calculer les statistiques pour chaque période
    stats_2m = compute_user_stats(filtered_df_2m, "2m")
    stats_5m = compute_user_stats(filtered_df_5m, "5m")
    stats_7m = compute_user_stats(filtered_df_7m, "7m")

    # Union des statistiques pour les différentes périodes
    stats_all = stats_2m.join(stats_5m, "user_id").join(stats_7m, "user_id")

    # Calculer les autres statistiques
    days_since_last_purchase, total_purchase_value, cart_abandonments = compute_other_stats(filtered_df, now)

    # Calculer les préférences (marque, catégorie) et les moments d'activité
    preferred_brand, preferred_category, most_active_time, most_active_day = compute_user_preferences_and_activity(filtered_df)

    # Finaliser et combiner les résultats
    user_stats_df = stats_all.join(days_since_last_purchase, "user_id") \
                             .join(total_purchase_value, "user_id") \
                             .join(cart_abandonments, "user_id", "left") \
                             .join(preferred_brand, "user_id", "left") \
                             .join(preferred_category, "user_id", "left") \
                             .join(most_active_time, "user_id", "left") \
                             .join(most_active_day, "user_id", "left")

    # Affichage du DataFrame final
    user_stats_df.show()

    # # Affichage du nombre de lignes et de colonnes du DataFrame
    # print(f"Nombre de lignes : {user_stats_df.count()}")
    # print(f"Nombre de colonnes : {len(user_stats_df.columns)}")

# Enregistrer le DataFrame final en format Parquet avec suppression de l'existant
    user_stats_df.write.mode("overwrite").parquet("./data/user_stats_df_output.parquet")

In [23]:

# Exécuter le script principal
if __name__ == "__main__":
    main()


                                                                                

+---------+------------------+------------------+---------------------+-----------------+------------+------------------+------------------+---------------------+-----------------+------------+------------------+------------------+---------------------+-----------------+------------+-------------------+------------------------+--------------------+-----------------+---------------+--------------------+----------------+---------------+
|  user_id|number_of_views_2m|number_of_carts_2m|number_of_sessions_2m|count_products_2m|avg_price_2m|number_of_views_5m|number_of_carts_5m|number_of_sessions_5m|count_products_5m|avg_price_5m|number_of_views_7m|number_of_carts_7m|number_of_sessions_7m|count_products_7m|avg_price_7m|      last_purchase|days_since_last_purchase|total_purchase_value|cart_abandonments|preferred_brand|  preferred_category|most_active_time|most_active_day|
+---------+------------------+------------------+---------------------+-----------------+------------+------------------+-

                                                                                