In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, round

spark = SparkSession.builder.appName("EcomDataPipeline").getOrCreate()

In [0]:
user_df = spark.read.format("delta").load("/mnt/delta/tables/bronze/users")

# Normalize country codes to uppercase
user_df.select(col("countrycode"))
user_df = user_df.withColumn("countrycode", upper(col("countrycode")))

# Handling multiple languages with 'expr' and 'case when'
user_df = user_df.withColumn("language_full",
					expr("CASE WHEN language = 'EN' THEN 'English' " +
								"WHEN language = 'FR' THEN 'FRENCH' " +
								"ELSE 'Other' END"))

# Correcting potential data entry errors in 'gender' column
user_df = user_df.withColumn("gender",
				when(col("gender").startswith("M"), "Male")
				.when(col("gender").startswith("F"), "Female")
				.otherwise("Other"))

# Using 'regexp_replace' to clean 'civilitytittle' values
user_df = user_df.withColumn("civilitytitle_clean",
				regexp_replace("civilitytitle", "(Mme|Ms|Mrs)", "Ms"))

# Calculate age of account in years and categorize into 'account_age_group'
user_df = user_df.withColumn("account_age_years", round(col("seniority") / 365, 2))
user_df = user_df.withColumn("account_age_group",
				when(col("account_age_years") < 1, "New")
				.when((col("account_Age_years") >= 1) & (col("account_age_years") < 3),
				"Intermediate")
				.otherwise("Experienced"))

# Add a column with the current year for comparison
user_df = user_df.withColumn("current_year", year(current_date()))

# Combnining strings to form a unique user description
user_df = user_df.withColumn("user_descriptor",
				concat(col("gender"), lit("_"),
				col("countrycode"), lit("_"),
				expr("substring(civilitytitle_clean, 1, 3)"), lit("_"),
				col("language_full")))
				
user_df = user_df.withColumn("flag_long_title", length(col("civilitytitle"))>10)

# Make columns into proper data type
user_df = user_df.withColumn("hasprofilepicture", col("hasprofilepicture").cast("boolean"))

user_df = user_df.withColumn("socialnbfollowers", col("socialnbfollowers").cast(IntegerType()))
user_df = user_df.withColumn("socialnbfollows", col("socialnbfollows").cast(IntegerType()))

user_df = user_df.withColumn("productspassrate", col("productspassrate").cast(DecimalType(10,2)))
user_df = user_df.withColumn("seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10,2)))
user_df = user_df.withColumn("seniorityasyears", col("seniorityasyears").cast(DecimalType(10,2)))

In [0]:
# Save user to silver db
user_df.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/users")

In [0]:
buyer_df = spark.read.format("delta").load("/mnt/delta/tables/bronze/buyers")

# Change to proper data type
integer_columns = ['buyers', 'topbuyers', 'femalebuyers', 'malebuyers','totalproductsbought','totalproductswished','totalproductsliked','toptotalproductsbought','toptotalproductswished','toptotalproductsliked']

for column_name in integer_columns:
    buyer_df = buyer_df.withColumn(column_name, col(column_name).cast(IntegerType()))

float_columns = ['topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio', 'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio', 'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished', 
    'meanproductsliked','topmeanproductsbought', 'topmeanproductswished', 'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays', 'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing']

for column_name in float_columns:
    buyer_df = buyer_df.withColumn(column_name, col(column_name).cast(FloatType()))

In [0]:
# Normalize country names
buyer_df = buyer_df.withColumn("country", initcap(col("country")))

for col_name in integer_columns:
	buyer_df = buyer_df.fillna({col_name:0})

# Calculate the ratio of female to male buyers
buyer_df = buyer_df.withColumn(
    "female_to_male_ratio",
    round((col("femalebuyers") / col("malebuyers")) + 1, 2)
)

# Determine the market potential by comparing wishlist and purchases
buyer_df = buyer_df.withColumn(
    "wishlist_to_purchase_ratio",
    round((col("totalproductswished") / col("totalproductsbought")) + 1, 2)
)
		
# Tag countries with a high engagement ratio
high_engagement_threshold = 0.5
buyer_df = buyer_df.withColumn("high_engagement",
		when(col("boughtperwishlistratio") > high_engagement_threshold , True).
		otherwise(False))

# Flag markets with increasing female buyer participation
buyer_df = buyer_df.withColumn("growing_female_market",
		when(col("femalebuyersratio") > col("topfemalebuyersratio"), True).
		otherwise(False))

In [0]:
# Save buyer to silver db
buyer_df .write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/buyers")

In [0]:
seller_df = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")

# change data type
seller_df = seller_df \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(15, 4)))


In [0]:
# Normalize country names and gender values
seller_df = seller_df .withColumn("country", initcap(col("country"))) \
                                                .withColumn("sex", upper(col("sex")))


#Add a column to categorize the number of sellers
seller_df = seller_df .withColumn("seller_size_category", 
                               when(col("nbsellers") < 500, "Small") \
                               .when((col("nbsellers") >= 500) & (col("nbsellers") < 2000), "Medium") \
                               .otherwise("Large"))

# Calculate the mean products listed per seller as an indicator of seller activity
seller_df = seller_df .withColumn("mean_products_listed_per_seller", 
                               round(col("totalproductslisted") / col("nbsellers"), 2))

# Identify markets with high seller pass rate
seller_df = seller_df .withColumn("high_seller_pass_rate", 
                               when(col("meansellerpassrate") > 0.75, "High") \
                               .otherwise("Normal"))

mean_pass_rate = seller_df .select(round(avg("meansellerpassrate"), 2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]

seller_df = seller_df .withColumn("meansellerpassrate",
                                 when(col("meansellerpassrate").isNull(), mean_pass_rate)
                                 .otherwise(col("meansellerpassrate")))

In [0]:
seller_df .write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/sellers")

In [0]:
country_df= spark.read.format("delta").load("/mnt/delta/tables/bronze/countries")

country_df= country_df\
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

country_df= country_df.withColumn("country", initcap(col("country")))


# Calculating the ratio of top sellers to total sellers
country_df= country_df.withColumn("top_seller_ratio", 
                                        round(col("topsellers") / col("sellers"), 2))

# countriesDF countries with a high ratio of female sellers
country_df= country_df.withColumn("high_female_seller_ratio", 
                                        when(col("femalesellersratio") > 0.5, True).otherwise(False))

# Adding a performance indicator based on the sold/listed ratio
country_df= country_df.withColumn("performance_indicator", 
                                        round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2))

# Flag countries with exceptionally high performance
performance_threshold = 0.8
country_df= country_df.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))

country_df= country_df.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))


In [0]:
country_df.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")