In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.appName("EcomDatPipeline").getOrCreate()

In [0]:
userDF = spark.read.format("Delta").load("/mnt/delta/tables/bronze/users")

In [0]:
userDF.show()

+--------------+----+-----------+-----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+-----------------+---------+-----------------+----------------+----------------+
|identifierHash|type|countryCode|    country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasProfilePicture|seniority|seniorityAsMonths|seniorityAsYears|websiteLongevity|
+--------------+----+-----------+-----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+-----------------+---------+-----------------+----------------+----------------+
|    1920991140|user|         us| Etats-Unis|      en|                3|              8|      

In [0]:
userDF.select(col("countrycode")).show()

+-----------+
|countrycode|
+-----------+
|         us|
|         se|
|         it|
|         dk|
|         de|
|         it|
|         dk|
|         fr|
|         fr|
|         fr|
|         fr|
|         fr|
|         ca|
|         de|
|         gb|
|         us|
|         us|
|         au|
|         us|
|         us|
+-----------+
only showing top 20 rows



In [0]:
userDF.withColumn("countrycode", upper(col("countrycode"))).show()

+--------------+----+-----------+-----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+-----------------+---------+-----------------+----------------+----------------+
|identifierHash|type|countrycode|    country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasProfilePicture|seniority|seniorityAsMonths|seniorityAsYears|websiteLongevity|
+--------------+----+-----------+-----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+-----------------+---------+-----------------+----------------+----------------+
|    1920991140|user|         US| Etats-Unis|      en|                3|              8|      

In [0]:
from pyspark.sql import functions as F

# Assuming userDF is your original DataFrame
userDF = userDF.withColumn(
    "language_full",
    F.expr("""
        CASE 
            WHEN language = 'EN' THEN 'ENGLISH'
            WHEN language = 'FR' THEN 'FRENCH'
            ELSE 'other' 
        END
    """)
)


In [0]:
userDF = userDF.withColumn("gender", when(col("gender").startswith("M"), "Male")
                           .when(col("gender").startswith("F"), "Female")
                           .otherwise("other"))

In [0]:
userDF = userDF.withColumn("civilitytitle_clean", regexp_replace("civilitytitle", "(Mme|Ms|Mrs)","Ms"))

In [0]:
userDF = userDF.withColumn("years_since_last_login", col("dayssincelastlogin") / 365)

In [0]:
userDF = userDF.withColumn("account_age_years", round(col("seniority") / 365,2))
userDF = userDF.withColumn("account_age_group",
                           when(col("account_age_years")<1 , "New")
                           .when((col("account_age_years") >=1 ) & (col("account_age_years")<3), "Intermediate")
                           .otherwise("Experienced"))

In [0]:
userDF = userDF.withColumn("current_year", year(current_date()))

In [0]:
userDF = userDF.withColumn("user_descriptor",
                            concat(col("gender"), lit("_"),
                                    col("countrycode"), lit("_"),
                                    expr("substring(civilitytitle_clean,1,3)"),lit("_"),
                                    col("language_full")))

In [0]:
userDF = userDF.withColumn("flag_long_title", length(col("civilitytitle")) > 10)

In [0]:
userDF = userDF.withColumn("socialnbfollowers", col("socialnbfollowers").cast(IntegerType()))
userDF = userDF.withColumn("socialnbfollows", col("socialnbfollows").cast(IntegerType()))

userDF = userDF.withColumn("productspassrate", col("productspassrate").cast(DecimalType(10, 2)))
userDF = userDF.withColumn("seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10, 2)))
userDF = userDF.withColumn("seniorityasyears", col("seniorityasyears").cast(DecimalType(10, 2)))

In [0]:
userDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/user")

In [0]:

#/mnt/delta/tables/bronze/users
buyersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/buyers")

In [0]:
# Casting Integer columns
integer_columns = [
    'buyers', 'topbuyers', 'femalebuyers', 'malebuyers',
    'topfemalebuyers', 'topmalebuyers', 'totalproductsbought',
    'totalproductswished', 'totalproductsliked', 'toptotalproductsbought',
    'toptotalproductswished', 'toptotalproductsliked'
]

for column_name in integer_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(IntegerType()))

In [0]:

# Casting Decimal columns
decimal_columns = [
    'topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio',
    'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio',
    'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished',
    'meanproductsliked', 'topmeanproductsbought', 'topmeanproductswished',
    'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays',
    'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing'
]

for column_name in decimal_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(DecimalType(10, 2)))

In [0]:

# Normalize country names
buyersDF = buyersDF.withColumn("country", initcap(col("country")))

for col_name in integer_columns:
    buyersDF = buyersDF.fillna({col_name: 0})

# Calculate the ratio of female to male buyers
buyersDF = buyersDF.withColumn("female_to_male_ratio", 
                               round(col("femalebuyers") / (col("malebuyers") + 1), 2))

# Determine the market potential by comparing wishlist and purchases
buyersDF = buyersDF.withColumn("wishlist_to_purchase_ratio", 
                               round(col("totalproductswished") / (col("totalproductsbought") + 1), 2))

# Tag countries with a high engagement ratio
high_engagement_threshold = 0.5
buyersDF = buyersDF.withColumn("high_engagement",
                               when(col("boughtperwishlistratio") > high_engagement_threshold, True)
                               .otherwise(False))
                               
    # Flag markets with increasing female buyer participation
buyersDF = buyersDF.withColumn("growing_female_market",
                               when(col("femalebuyersratio") > col("topfemalebuyersratio"), True)
                               .otherwise(False))

In [0]:
buyersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/buyers")

In [0]:
sellersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")

In [0]:
sellersDF = sellersDF \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))

In [0]:


# Normalize country names and gender values
sellersDF = sellersDF.withColumn("country", initcap(col("country"))) \
                                                .withColumn("sex", upper(col("sex")))


#Add a column to categorize the number of sellers
sellersDF = sellersDF.withColumn("seller_size_category", 
                               when(col("nbsellers") < 500, "Small") \
                               .when((col("nbsellers") >= 500) & (col("nbsellers") < 2000), "Medium") \
                               .otherwise("Large"))

# Calculate the mean products listed per seller as an indicator of seller activity
sellersDF = sellersDF.withColumn("mean_products_listed_per_seller", 
                               round(col("totalproductslisted") / col("nbsellers"), 2))

# Identify markets with high seller pass rate
sellersDF = sellersDF.withColumn("high_seller_pass_rate", 
                               when(col("meansellerpassrate") > 0.75, "High") \
                               .otherwise("Normal"))

mean_pass_rate = sellersDF.select(round(avg("meansellerpassrate"), 2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]

sellersDF = sellersDF.withColumn("meansellerpassrate",
                                 when(col("meansellerpassrate").isNull(), mean_pass_rate)
                                 .otherwise(col("meansellerpassrate")))

sellersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/sellers")

In [0]:
countriesDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/countires")

In [0]:
countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

In [0]:
countriesDF = countriesDF.withColumn("country", initcap(col("country")))


# Calculating the ratio of top sellers to total sellers
countriesDF = countriesDF.withColumn("top_seller_ratio", 
                                        round(col("topsellers") / col("sellers"), 2))

# countriesDF countries with a high ratio of female sellers
countriesDF = countriesDF.withColumn("high_female_seller_ratio", 
                                        when(col("femalesellersratio") > 0.5, True).otherwise(False))

# Adding a performance indicator based on the sold/listed ratio
countriesDF = countriesDF.withColumn("performance_indicator", 
                                        round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2))

# Flag countries with exceptionally high performance
performance_threshold = 0.8
countriesDF = countriesDF.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))

countriesDF = countriesDF.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))

In [0]:
countriesDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")