In [0]:
from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.appName("ecom-data-pipeline").getOrCreate()
spark

In [0]:
userDf = spark.read.format("delta").load("/mnt/delta/tables/bronze/users")

In [0]:
userDf = userDf.withColumn("countrycode",upper(col("countrycode")))

In [0]:
userDf.select(col("countrycode")).show(5)

+-----------+
|countrycode|
+-----------+
|         US|
|         DE|
|         SE|
|         TR|
|         FR|
+-----------+
only showing top 5 rows



In [0]:
userDf = userDf.withColumn("language_full",
                           expr("CASE WHEN language = 'EN' THEN 'English'" + "WHEN language='FR' THEN 'French'" + "ELSE 'Other' END")
                           )

In [0]:
userDf = userDf.withColumn("gender",
                           when(col("gender").startswith("M"), "Male")
                           .when(col("gender").startswith("F"), "Female")
                           .otherwise("Other")
                           )

In [0]:
userDf = userDf.withColumn("civilitytitle_clean",
                           regexp_replace(col("civilitytitle"), "(Mme|Ms|Mrs)", "Ms")
                           )

In [0]:
# userDf = userDf.withColumn("")
userDf = userDf.withColumn("years_since_last_login",col("dayssincelastlogin")/365)

In [0]:
# Calculate age of account in years and categorize into 'account_age_group'
userDf = userDf.withColumn("account_age_years",round(col("seniority")/365,2))
userDf = userDf.withColumn("account_age_group", when(col("account_age_years") < 1, "New")
                           .when((col("account_age_years") >= 1) & (col("account_age_years") <3), "Intermediate")
                           .otherwise("Experienced")
                           )

In [0]:
userDf = userDf.withColumn("current_year",year(current_date()))

In [0]:
userDf = userDf.withColumn("user_descriptor",
                           concat(col("gender"),lit("_"),col("countrycode"),lit("_"),
                                  expr("substring(civilitytitle_clean,1,3)"),lit("_"),
                                  col("language_full")
                                  )
                           )

In [0]:
userDf.select(col("user_descriptor")).show(5)

+-------------------+
|    user_descriptor|
+-------------------+
|Female_US_mrs_Other|
|Female_DE_mrs_Other|
|   Male_SE_mr_Other|
|Female_TR_mrs_Other|
|   Male_FR_mr_Other|
+-------------------+
only showing top 5 rows



In [0]:
userDf = userDf.withColumn("flag_long_title",length(col("civilitytitle"))>10)

In [0]:
userDf = userDf.withColumn("hasanyapp",col("hasanyapp").cast("boolean"))
userDf = userDf.withColumn("hasandroidapp",col("hasandroidapp").cast("boolean"))
userDf = userDf.withColumn("hasiosapp",col("hasiosapp").cast("boolean"))
userDf = userDf.withColumn("hasprofilepicture",col("hasprofilepicture").cast("boolean"))
userDf = userDf.withColumn("socialnbfollowers",col("socialnbfollowers").cast(IntegerType()))
userDf = userDf.withColumn("socialnbfollows",col("socialnbfollows").cast(IntegerType()))
userDf = userDf.withColumn("productspassrate",col("productspassrate").cast(DecimalType(10,2)))
userDf = userDf.withColumn("seniorityAsMonths",col("seniorityAsMonths").cast(DecimalType(10,2)))
userDf = userDf.withColumn("seniorityAsYears",col("seniorityAsYears").cast(DecimalType(10,2)))

In [0]:
userDf.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/users")

In [0]:
##Buyers

buyersDf = spark.read.format("delta").load("/mnt/delta/tables/bronze/buyers")

In [0]:
integer_columns = [
    "buyers", "topbuyers", "femalebuyers", "malebuyers", 
    "topfemalebuyers", "topmalebuyers", "totalproductsbought", 
    "totalproductswished", "totalproductsliked", 
    "toptotalproductsbought", "toptotalproductswished", 
    "toptotalproductsliked"
]

for column_name in integer_columns: 
    buyersDf = buyersDf.withColumn(column_name, col(column_name).cast(IntegerType()))

In [0]:
decimal_columns = [
    "topbuyerratio", "femalebuyersratio", "topfemalebuyersratio", 
    "boughtperwishlistratio", "boughtperlikeratio", 
    "topboughtperwishlistratio", "topboughtperlikeratio", 
    "meanproductsbought", "meanproductswished", 
    "meanproductsliked", "topmeanproductsbought", 
    "topmeanproductswished", "topmeanproductsliked", 
    "meanofflinedays", "topmeanofflinedays", 
    "meanfollowers", "meanfollowing", 
    "topmeanfollowers", "topmeanfollowing"
]

for column_name in decimal_columns:
    buyersDf = buyersDf.withColumn(column_name, col(column_name).cast(DecimalType(10, 2)))


In [0]:
# Normalize country names
buyersDf = buyersDf.withColumn("country",initcap(col("country")))
for col_name in integer_columns:
    buyersDf = buyersDf.fillna(0, [col_name])


In [0]:
# calculate the ration of female to male buyers
buyersDf = buyersDf.withColumn("female_to_male_ratio",round(col("femalebuyers")/(col("malebuyers")+1),2))

# determine the market potential by comparing wishlist and purchases
buyersDf = buyersDf.withColumn("wishlist_to_purchase_ratio",round(col("totalproductswished") / (col("totalproductsbought") + 1), 2))


# Tag countries with a high enagagement ratio
high_engagement_thresold = 0.5
buyersDf = buyersDf.withColumn("high_engagement",
                               when(col("boughtperwishlistratio") > high_engagement_thresold,True))

buyersDf = buyersDf.withColumn("growing_female_market",
                               when(col("femalebuyersratio") > col("topfemalebuyersratio"),True).otherwise(False)
                               )

In [0]:
buyersDf.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/buyers")

In [0]:
# Sellers
sellersDf = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")


In [0]:
# Define the schema with columns and their target types
columns_to_cast = {
    "country": "string",
    "sex": "string",
    "nbsellers": "integer",
    "meanproductssold": "decimal(10, 2)",  # Adjust precision and scale as needed
    "meanproductslisted": "decimal(10, 2)",
    "meansellerpassrate": "decimal(10, 2)",
    "totalproductssold": "integer",
    "totalproductslisted": "integer",
    "meanproductsbought": "decimal(10, 2)",
    "meanproductswished": "decimal(10, 2)",
    "meanproductsliked": "decimal(10, 2)",
    "totalbought": "integer",
    "totalwished": "integer",
    "totalproductsliked": "integer",
    "meanfollowers": "decimal(10, 2)",
    "meanfollows": "decimal(10, 2)",
    "percentofappusers": "decimal(10, 2)",
    "percentofiosusers": "decimal(10, 2)",
    "meanseniority": "decimal(10, 2)"
}

# Construct a list of SQL-style casting expressions
cast_expressions = [f"CAST({col} AS {dtype}) AS {col}" for col, dtype in columns_to_cast.items()]

# Apply all casts using selectExpr
sellersDf = sellersDf.selectExpr(*cast_expressions)

# Show the schema and data to verify
sellersDf.printSchema()



root
 |-- country: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- nbsellers: integer (nullable = true)
 |-- meanproductssold: decimal(10,2) (nullable = true)
 |-- meanproductslisted: decimal(10,2) (nullable = true)
 |-- meansellerpassrate: decimal(10,2) (nullable = true)
 |-- totalproductssold: integer (nullable = true)
 |-- totalproductslisted: integer (nullable = true)
 |-- meanproductsbought: decimal(10,2) (nullable = true)
 |-- meanproductswished: decimal(10,2) (nullable = true)
 |-- meanproductsliked: decimal(10,2) (nullable = true)
 |-- totalbought: integer (nullable = true)
 |-- totalwished: integer (nullable = true)
 |-- totalproductsliked: integer (nullable = true)
 |-- meanfollowers: decimal(10,2) (nullable = true)
 |-- meanfollows: decimal(10,2) (nullable = true)
 |-- percentofappusers: decimal(10,2) (nullable = true)
 |-- percentofiosusers: decimal(10,2) (nullable = true)
 |-- meanseniority: decimal(10,2) (nullable = true)



In [0]:
# Normalize country names and gender values
sellersDf = sellersDf.withColumn("country",initcap(col("country")))\
    .withColumn("sex",upper(col("sex")))

# Add a column to categorize the number of sellers 
sellersDf = sellersDf.withColumn("seller_size_category",
                                 when(col("nbsellers")<500,"Small") \
                                 .when((col("nbsellers")>=500) & (col("nbsellers")<2000),"Medium")\
                                     .otherwise("Large")
                                )

# Calculate the mean products listed per seller as an indicator of seller activity
sellersDf = sellersDf.withColumn("mean_products_listed_per_seller",
                                 round(col("totalproductslisted") / col("nbsellers"),2)
                                 )


# Identity markets with higher seller pass rate 

sellersDf = sellersDf.withColumn("high_sellers_pass_rate",
                                 when(col("meansellerpassrate") > 0.75,"High")\
                                     .otherwise("Normal")
                                 )


mean_pass_rate = sellersDf.select(round(avg("meansellerpassrate"),2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]

sellersDf = sellersDf.withColumn("meansellerpassrate",
                                 when(col("meansellerpassrate").isNull(),mean_pass_rate).otherwise(col("meansellerpassrate"))
                                 )



In [0]:
sellersDf.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/sellers")

In [0]:
# Country
countriesDf = spark.read.format("delta").load("/mnt/delta/tables/bronze/countries")

In [0]:
# Define the schema with columns and their target types
columns_to_cast = {
    "country": "string",
    "sellers": "integer",
    "topsellers": "integer",
    "topsellerratio": "decimal(10, 2)",  # Adjust precision and scale as needed
    "femalesellersratio": "decimal(10, 2)",
    "topfemalesellersratio": "decimal(10, 2)",
    "femalesellers": "integer",
    "malesellers": "integer",
    "topfemalesellers": "integer",
    "topmalesellers": "integer",
    "countrysoldratio": "decimal(10, 2)",
    "bestsoldratio": "decimal(10, 2)",
    "toptotalproductssold": "integer",
    "totalproductssold": "integer",
    "toptotalproductslisted": "integer",
    "totalproductslisted": "integer",
    "topmeanproductssold": "decimal(10, 2)",
    "topmeanproductslisted": "decimal(10, 2)",
    "meanproductssold": "decimal(10, 2)",
    "meanproductslisted": "decimal(10, 2)",
    "meanofflinedays": "decimal(10, 2)",
    "topmeanofflinedays": "decimal(10, 2)",
    "meanfollowers": "decimal(10, 2)",
    "meanfollowing": "decimal(10, 2)",
    "topmeanfollowers": "decimal(10, 2)",
    "topmeanfollowing": "decimal(10, 2)"
}

# Construct SQL-style casting expressions
cast_expressions = [f"CAST({col} AS {dtype}) AS {col}" for col, dtype in columns_to_cast.items()]

# Apply all casts using selectExpr
countriesDf = countriesDf.selectExpr(*cast_expressions)

# Show the schema and data to verify
countriesDf.printSchema()



root
 |-- country: string (nullable = true)
 |-- sellers: integer (nullable = true)
 |-- topsellers: integer (nullable = true)
 |-- topsellerratio: decimal(10,2) (nullable = true)
 |-- femalesellersratio: decimal(10,2) (nullable = true)
 |-- topfemalesellersratio: decimal(10,2) (nullable = true)
 |-- femalesellers: integer (nullable = true)
 |-- malesellers: integer (nullable = true)
 |-- topfemalesellers: integer (nullable = true)
 |-- topmalesellers: integer (nullable = true)
 |-- countrysoldratio: decimal(10,2) (nullable = true)
 |-- bestsoldratio: decimal(10,2) (nullable = true)
 |-- toptotalproductssold: integer (nullable = true)
 |-- totalproductssold: integer (nullable = true)
 |-- toptotalproductslisted: integer (nullable = true)
 |-- totalproductslisted: integer (nullable = true)
 |-- topmeanproductssold: decimal(10,2) (nullable = true)
 |-- topmeanproductslisted: decimal(10,2) (nullable = true)
 |-- meanproductssold: decimal(10,2) (nullable = true)
 |-- meanproductslisted: de

In [0]:
countriesDf = countriesDf.withColumn("country", initcap(col("country")))

# Calculating the ration of top sellers to total sellers
countriesDf = countriesDf.withColumn("top_seller_ratio", round(col("topsellers") / col("sellers"),2))

# countries with high ration of female sellers
countriesDf = countriesDf.withColumn("high_female_seller_ration",
                                     when(col("femalesellersratio") > 0.5, True).otherwise(False))
                                    

# Adding a performance indicator based on the sold/listed ratio
countriesDf = countriesDf.withColumn("performance_indicator",
                                     round(col("totalproductssold") / (col("totalproductslisted")+1),2))


# Flag countries with exceptionally high performance 
performance_thresold = 0.8
countriesDf = countriesDf.withColumn("high_performance",
                                     when(col("performance_indicator") > performance_thresold, True).otherwise(False))

countriesDf = countriesDf.withColumn("activity_level",
                                     when(col("meanofflinedays") < 30, "Highly Active")
                                     .when((col("meanofflinedays") >=30) & (col("meanofflinedays") < 60), "Moderately Active")
                                     .otherwise("Low Activity"))


countriesDf.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")
                                                               