In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.appName("EcomDataPipeline").getOrCreate()

In [0]:
# Read user data from bronze layer
userDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/users")

In [0]:
# Normalize country codes to upppercase
userDF = userDF.withColumn("countrycode", upper(col("countrycode")))

In [0]:
# Handling multiple languages
userDF = userDF.withColumn("language_full", expr("CASE WHEN language = 'fr' THEN 'French'" +
                                                 "WHEN language = 'en' THEN 'English'" +
                                                  "WHEN language = 'es' THEN 'Spanish'" +
                                                  "ELSE 'Other' END"))

In [0]:
# Correcting potential data entry error in gender column
userDF = userDF.withColumn("gender", when(col('gender').startswith('M'), 'Male') \
                                         .when(col('gender').startswith('F'), 'Female') \
                                         .otherwise('Other'))

In [0]:
# Clean civilitytitle column
userDF = userDF.withColumn("civilitytitle_clean", regexp_replace(col("civilitytitle"), "(Mme|Ms|Mrs)" , "Ms"))

In [0]:
# Derive new column years_since_last_login from dayssincelastlogin
userDF = userDF.withColumn("years_since_last_login", col("dayssincelastlogin")/365)

In [0]:
# Calculate age of account in years and catergorize into account_age_group
userDF = userDF.withColumn("account_age_years", round(col("seniority")/365, 2))
userDF = userDF.withColumn("account_age_group", when(col("account_age_years") < 1, "New")
                                                .when((col("account_age_years") >= 1) & (col("account_age_years") < 3), "Intermediate")
                                                .otherwise("Experienced"))

In [0]:
# Add a column with the current year for comparison
userDF = userDF.withColumn("current_year", year(current_date()))

In [0]:
# Combining strings to create unique user description
userDF = userDF.withColumn("user_description", concat(col("gender"), lit("_"),
                                                      col("countrycode"), lit("_"),
                                                      expr("substring(civilitytitle_clean, 1, 3)"), lit("_"),
                                                      col("language_full")))

In [0]:
# Casting data types
userDF = userDF.withColumn("hasanyapp", col("hasanyapp").cast("boolean"))
userDF = userDF.withColumn("hasandroidapp", col("hasandroidapp").cast("boolean"))
userDF = userDF.withColumn("hasiosapp", col("hasiosapp").cast("boolean"))
userDF = userDF.withColumn("hasprofilepicture", col("hasprofilepicture").cast("boolean"))

userDF = userDF.withColumn("socialnbfollowers", col("socialnbfollowers").cast(IntegerType()))
userDF = userDF.withColumn("socialnbfollows", col("socialnbfollows").cast(IntegerType()))

userDF = userDF.withColumn("productspassrate", col("productspassrate").cast(DecimalType(10,2)))
userDF = userDF.withColumn("seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10,2)))
userDF = userDF.withColumn("seniorityasyears", col("seniorityasyears").cast(DecimalType(10,2)))

userDF = userDF.withColumn("dayssincelastlogin", when(col("dayssincelastlogin").isNotNull(), col("dayssincelastlogin").cast(IntegerType()))
                                                 .otherwise(0))

In [0]:
# Write user data to silver layer
userDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/users")

In [0]:
# Read buyers data from bronze layer
buyerDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/buyers")

In [0]:
# Casting integer columns
integer_columns = [
    'buyers', 'topbuyers', 'femalebuyers', 'malebuyers',
    'topfemalebuyers', 'topmalebuyers', 'totalproductsbought',
    'totalproductswished', 'totalproductsliked', 'toptotalproductsbought',
    'toptotalproductswished', 'toptotalproductsliked'
    ]

for column in integer_columns:
    buyerDF = buyerDF.withColumn(column, col(column).cast(IntegerType()))


In [0]:
# Casting decimal columns
decimal_columns = [
    'topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio',
    'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio',
    'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished',
    'meanproductsliked', 'topmeanproductsbought', 'topmeanproductswished',
    'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays',
    'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing'
]

for column in decimal_columns:
    buyerDF = buyerDF.withColumn(column, col(column).cast(DecimalType(10,2)))

In [0]:
# Normalize country names
buyerDF = buyerDF.withColumn("country", initcap(col("country")))

# Fill na in integer columns
for column in integer_columns:
    buyerDF = buyerDF.fillna({column: 0})

# Calculate the ratio of female to male buyers
buyerDF = buyerDF.withColumn("female_to_make_ratio", col("femalebuyers")/col("malebuyers"))

# Determine the market potential by comparing wishlist and purchases
buyerDF = buyerDF.withColumn("wishlist_to_purchase_ratio", round(col("totalproductswished")/col("totalproductsbought"),2))

# Tag countries with high engagement ratio
high_engagement_threshold = 0.5
buyerDF = buyerDF.withColumn("high_engagement", when(col("boughtperwishlistratio") > high_engagement_threshold, True).otherwise(False))

In [0]:
# Write buyers dat to silver layer
buyerDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/buyers")

In [0]:
# Read sellers data from bronze layer
sellersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")

In [0]:
# Casting data types

sellersDF = sellersDF \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10,2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10,2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10,2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10,2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))

In [0]:
# Normalize country names and gender values
sellersDF = sellersDF.withColumn("country", initcap(col("country"))) \
                     .withColumn("sex", upper(col("sex")))

# add column to categorize number of sellers
sellersDF = sellersDF.withColumn("seller_size_category", when(col("nbsellers") < 500, "Small") \
                                                        .when((col("nbsellers") >= 500) & (col("nbsellers") < 2000), "Medium") \
                                                        .otherwise("Large"))

# Calculate the mean number of products lister per seller
sellersDF = sellersDF.withColumn("mean_products_listed", round(col("totalproductslisted")/col("nbsellers"), 2))

# identify markets with higher seller pass rate
sellersDF = sellersDF.withColumn("high_seller_pass_rate", when(col("meansellerpassrate") > 0.75, True).otherwise(False))

#Imputing null values in meansellerpassrate with the mean
mean_pass_rate = sellersDF.select(round(avg("meansellerpassrate"), 2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]
sellersDF = sellersDF.withColumn("meansellerpassrate",
                                 when(col("meansellerpassrate").isNull(), mean_pass_rate)
                                 .otherwise(col("meansellerpassrate")))



In [0]:
# Write sellers data to silver layer
sellersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/sellers")

In [0]:
# Read countries data from bronze layer
countriesDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/countries")

In [0]:
# change data types
countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

In [0]:
#  Add performance indicator based on the sold to listed ratio
countriesDF = countriesDF.withColumn("performance_indicator", round(col("totalproductssold") / (col("totalproductslisted") + 1), 2))
                                    
# Flag countries with exceptionally high performance
perfromance_threshold = 0.8

countriesDF = countriesDF.withColumn("high_performance", when(col("performance_indicator") > perfromance_threshold, True).otherwise(False))

# Adding activity level
countriesDF = countriesDF.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))


In [0]:
# Write countries data to bronze layer
countriesDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")