In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
spark = SparkSession.builder.appName("EcommerceDataPipeline").getOrCreate()

#defining source and target paths
users_bronze = "/mnt/delta/tables/bronze/users" #source
users_silver = "/mnt/delta/tables/silver/users" #target

In [0]:
#normalize country codes to uppercase
userDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/users").withColumn("countrycode", upper(col("countrycode")))

In [0]:
#handling multiple languages elegantly wih "expr" and "case when"

userDF = userDF.withColumn("language"
                           , expr("case when language = 'en' then 'English' when language = 'fr' then 'French' else 'Other' end"))


In [0]:
# using regex_replace  to clean civilitytitle values

userDF = userDF.withColumn(
    "civilitytitle",
    regexp_replace(col("civilitytitle"), "(Mmr|Mrs|Ms)", "Ms")
)

In [0]:
#derive new coloumn from "years_since_last_login" from "daysSinceLastLogin"
userDF = userDF.withColumn("years_since_last_login", col("daysSinceLastLogin")/365)

In [0]:
#calculate age of account in years and categorize in to "accounts_age_group"

userDF = userDF.withColumn(
    "accounts_age_years", 
    round(col("seniority") / 365, 2)
)
userDF = userDF.withColumn(
    "accounts_age_group",
    when(col("accounts_age_years") < 1, "New")
    .when((col("accounts_age_years") >= 1) & (col("accounts_age_years") < 3), "Intermediate")
    .otherwise("Experienced")
)


In [0]:
userDF = userDF.withColumn("flag_long_title", length(col("civilityTitle")>10))

In [0]:
userDF = userDF.withColumn("hasAnyApp",col("hasAnyApp").cast("boolean"))
userDF = userDF.withColumn("hasAndroidApp",col("hasAndroidApp").cast("boolean"))
userDF = userDF.withColumn("hasIosApp",col("hasIosApp").cast("boolean"))
userDF = userDF.withColumn("hasProfilePicture",col("hasProfilePicture").cast("boolean"))


userDF = userDF.withColumn("identifierHash",col("identifierHash").cast("int"))
userDF = userDF.withColumn("socialNbFollowers",col("socialNbFollowers").cast("int"))
userDF = userDF.withColumn("socialNbFollows",col("socialNbFollows").cast("int"))
userDF = userDF.withColumn("socialProductsLiked",col("socialProductsLiked").cast("int"))
userDF = userDF.withColumn("productsListed",col("productsListed").cast("int"))  
userDF = userDF.withColumn("productsSold",col("productsSold").cast("int"))
userDF = userDF.withColumn("productsWished",col("productsWished").cast("int"))
userDF = userDF.withColumn("productsBought",col("productsBought").cast("int"))

userDF = userDF.withColumn("productsPassRate",col("productsPassRate").cast("Decimal(10,2)"))
userDF = userDF.withColumn("seniorityAsMonths",col("seniorityAsMonths").cast("Decimal(10,2)"))
userDF = userDF.withColumn("seniorityAsYears",col("seniorityAsYears").cast("Decimal(10,2)"))


In [0]:
userDF = userDF.withColumn("daysSinceLastLogin",
                           when(col("daysSinceLastLogin").isNotNull(),
                           col("daysSinceLastLogin").cast("int"))
                           .otherwise(0)
)

#userDF = userDF.drop_duplicates(['identifierHash'])


In [0]:
userDF.write.format("delta").mode("append").option("overwriteSchema", "true").save("/mnt/delta/tables/silver/users")


In [0]:
user_silver_df = spark.read.format("delta").load(users_silver)  #target delta table
target_table = DeltaTable.forPath(spark, users_silver)   #target delta table instance for merge operations


In [0]:
target_table.alias("target").merge(
  userDF.alias("source"),
  "target.identifierHash = source.identifierHash") \
.whenMatchedUpdate(
  condition = "target.daysSinceLastLogin <> source.daysSinceLastLogin",
  set = {                                      # Set current to false and endDate to source's effective date.
    "daysSinceLastLogin": "source.daysSinceLastLogin"
  }     
).whenNotMatchedInsertAll(
).execute()

In [0]:
buyersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/buyers")

In [0]:
integer_columns = [
    "buyers", "topbuyers", "femalebuyers", "malebuyers", 
    "topfemalebuyers", "totalproductsbought", "totalproductswished", 
    "totalproductsliked", "toptotalproductsbought", 
    "toptotalproductswished", "toptotalproductsliked"
]

for column_name in integer_columns:
    if column_name in buyersDF.columns:
        buyersDF = buyersDF.withColumn(column_name, buyersDF[column_name].cast("int"))

In [0]:
decimal_coloumns = [
    "topbuyerratio","femalebuyersratio","topfemalebuyersratio","boughtperwishlistratio","boughtperlikeratio","topboughtperwishlistratio","topboughtperlikeratio","meanproductsbought",
    "meanproductswished","meanproductsliked","topmeanproductsbought","topmeanproductswished",
    "topmeanproductsliked","meanofflinedays","topmeanofflinedays","meanfollowers","meanfollowing",
    "topmeanfollowers","topmeanfollowing"
]

for column_name in decimal_coloumns:
    if column_name in buyersDF.columns:
        buyersDF = buyersDF.withColumn(column_name, buyersDF[column_name].cast("decimal(5,2)"))

In [0]:
#Normalize the country names into captial letters
buyersDF = buyersDF.withColumn("country", initcap(col("country")))

for col_name in integer_columns:
    buyersDF = buyersDF.fillna({col_name:0})

#Calculate the ratio of female buyers and male buyers
buyersDF = buyersDF.withColumn("female_to_male_ratio", round(col("femalebuyers")/(col("malebuyers") +1), 2))

#determinme the market potential  by comparing whislist and purchase ratio
buyersDF = buyersDF.withColumn("market_potential", round(col("totalproductswished")/(col("totalproductsbought") +1), 2))

# tag countries with high engagement ratio

high_engagement_threshold = 0.5

buyersDF = buyersDF.withColumn("high_engagement", 
                               when(col("topboughtperwishlistratio") > high_engagement_threshold, True)
                               .otherwise(False))

# flag markets with high female ratio                              
buyersDF = buyersDF.withColumn("growing_female_market",
                               when(col("femalebuyersratio") > col("topfemalebuyersratio"), True)
                               .otherwise(False))


In [0]:
# Remove duplicate columns
buyersDF = buyersDF.toDF(*[f"{col}_{i}" if col in buyersDF.columns[:i] else col 
                           for i, col in enumerate(buyersDF.columns)])


In [0]:
buyersDF.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("/mnt/delta/tables/silver/buyers")

In [0]:
# Drop the 'high_engagement_38' column
buyersDF = buyersDF.drop("high_engagement_38","column1","column2")


In [0]:
sellersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")

In [0]:
sellersDF = sellersDF \
    .withColumn("nbsellers", sellersDF["nbsellers"].cast(IntegerType())) \
    .withColumn("meanproductssold", sellersDF["meanproductssold"].cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", sellersDF["meanproductslisted"].cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", sellersDF["meansellerpassrate"].cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", sellersDF["totalproductssold"].cast(IntegerType())) \
    .withColumn("totalproductslisted", sellersDF["totalproductslisted"].cast(IntegerType())) \
    .withColumn("meanproductsbought", sellersDF["meanproductsbought"].cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", sellersDF["meanproductswished"].cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", sellersDF["meanproductsliked"].cast(DecimalType(10, 2))) \
    .withColumn("totalbought", sellersDF["totalbought"].cast(IntegerType())) \
    .withColumn("totalwished", sellersDF["totalwished"].cast(IntegerType())) \
    .withColumn("totalproductsliked", sellersDF["totalproductsliked"].cast(IntegerType())) \
    .withColumn("meanfollowers", sellersDF["meanfollowers"].cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", sellersDF["meanfollows"].cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", sellersDF["percentofappusers"].cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", sellersDF["percentofiosusers"].cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", sellersDF["meanseniority"].cast(DecimalType(10, 2)))


In [0]:
sellersDF = sellersDF.drop("column1","column2")

In [0]:
# normalize the country names and gender values

sellersDF = sellersDF.withColumn("country",initcap(col("country")))\
            .withColumn("sex",upper(col("sex")))


# Adding a coloumn to categorize the size acording to the number of sellers

sellersDF = sellersDF.withColumn("selling_size_category",
                                 when(col("nbsellers")<500, "Small")\
                                 .when((col("nbsellers")>=500) & (col("nbsellers")<2000), "Medium")\
                                .otherwise("Big")
                                 )



# calculate the mean product listed per seller as an indicator of seller activity
sellersDF = sellersDF.withColumn("meanproductslistedperseller",
                                 round(col("meanproductslisted") / col("nbsellers"), 2))

#calcualte sellers with high pass rate

sellersDF = sellersDF.withColumn("highpassrate",
                                 when(col("meansellerpassrate")>=0.75, "High")\
                                .otherwise("normal")
                                )



mean_pass_rate = sellersDF.select(round(avg("meansellerpassrate"),2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]

sellersDF = sellersDF.withColumn("meansellerpassrate",
                                 when(col("meansellerpassrate").isNull(), mean_pass_rate)
                                 .otherwise(col("meansellerpassrate"))
                                )

In [0]:
sellersDF.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/delta/tables/silver/sellers")

In [0]:
countriesDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/countries")

countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast("int")) \
    .withColumn("topsellers", col("topsellers").cast("int")) \
    .withColumn("topsellerratio", col("topsellerratio").cast("decimal(10, 2)")) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast("decimal(10, 2)")) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast("decimal(10, 2)")) \
    .withColumn("femalesellers", col("femalesellers").cast("int")) \
    .withColumn("malesellers", col("malesellers").cast("int")) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast("int")) \
    .withColumn("topmalesellers", col("topmalesellers").cast("int")) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast("decimal(10, 2)")) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast("decimal(10, 2)")) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast("int")) \
    .withColumn("totalproductssold", col("totalproductssold").cast("int")) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast("int")) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast("int")) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast("decimal(10, 2)")) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast("decimal(10, 2)")) \
    .withColumn("meanproductssold", col("meanproductssold").cast("decimal(10, 2)")) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast("decimal(10, 2)")) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast("decimal(10, 2)")) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast("decimal(10, 2)")) \
    .withColumn("meanfollowers", col("meanfollowers").cast("decimal(10, 2)")) \
    .withColumn("meanfollowing", col("meanfollowing").cast("decimal(10, 2)")) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast("decimal(10, 2)")) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast("decimal(10, 2)"))


countriesDF = countriesDF.drop("column1","column2")


In [0]:
countriesDF = countriesDF.withColumn("country",initcap(col("country")))
#calculate the ratio of top sellers to total sellers
countriesDF = countriesDF.withColumn("top_seller_ratio",
                                     round(col("topsellers")/col("sellers"),2)
                                     )
# countries with high ratio of female seller
countriesDF = countriesDF.withColumn("high_female_seller_ratio",
                                     when(col("femalesellersratio") > 0.5, True)\
                                     .otherwise(False)
                                     )
# creating a performance indicator based on products sold with the products listed
countriesDF = countriesDF.withColumn("performance_indicator",
                                     round(col("meanproductssold")/col("meanproductslisted"),2)
                                    )   
#flag countries with high performance
High_performace_threshold = 0.7
countriesDF =countriesDF.withColumn("high_performance",
                                     when(col("performance_indicator") > High_performace_threshold, True)
                                     .otherwise(False)
                                    )
# countries activity level
countriesDF = countriesDF.withColumn("activity_level",
                                     when(col("meanofflinedays") > 30, "High")\
                                    .when((col("meanofflinedays") <30) & (col("meanofflinedays") > 10), "Medium")\
                                    .otherwise("Low")
                                    )


In [0]:
countriesDF.write.format("delta").mode("overwrite").option("mergeSchema", "true").save("/mnt/delta/tables/silver/countries")