In [0]:
# Importing all the necessory libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timezone

spark = SparkSession.builder.appName("EcomDataPipeline").getOrCreate()

In [0]:
# Checking for Mounting

for m in dbutils.fs.mounts():
    print(m.mountPoint, "->", m.source)

try:
    dbutils.fs.unmount("/mnt/ecomdata1")
    print("Unmounted /mnt/ecomdata1")
except Exception as e:
    print("Nothing to unmount or already gone:", e)


/mnt/lz2 -> abfss://landing-zone-2@ecomadlsatul.dfs.core.windows.net
/databricks-datasets -> databricks-datasets
/Volumes -> UnityCatalogVolumes
/mnt/ecomdata -> abfss://landing-zone-2@ecomadlsatul.dfs.core.windows.net
/databricks/mlflow-tracking -> databricks/mlflow-tracking
/databricks-results -> databricks-results
/databricks/mlflow-registry -> databricks/mlflow-registry
/Volume -> DbfsReserved
/volumes -> DbfsReserved
/ -> DatabricksRoot
/volume -> DbfsReserved
Nothing to unmount or already gone: An error occurred while calling o462.unmount.
: java.rmi.RemoteException: java.lang.IllegalArgumentException: requirement failed: Directory not mounted: /mnt/ecomdata1; nested exception is: 
	java.lang.IllegalArgumentException: requirement failed: Directory not mounted: /mnt/ecomdata1
	at com.databricks.backend.daemon.data.client.BaseDbfsClient.send0(BaseDbfsClient.scala:161)
	at com.databricks.backend.daemon.data.client.BaseDbfsClient.sendIdempotent(BaseDbfsClient.scala:69)
	at com.databr

In [0]:
def ensure_mount():
    mnt_point = "/mnt/lz2"
    mounts = [m.mountPoint for m in dbutils.fs.mounts()]
    if mnt_point in mounts:
        print(f"✅ Already mounted: {mnt_point}")
        return

    configs = {
        "fs.azure.account.auth.type": "OAuth",
        "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
        "fs.azure.account.oauth2.client.id": "b2b8369b-7480-4898-ba87-2104ce9fc28a",         
        "fs.azure.account.oauth2.client.secret": "jCi8Q~TFDdTI-urHYBBqvXe4kXGR~2h6AviHIcr-", 
        "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/1d1656e8-380d-4ea4-947e-f4f84ba4fe61/oauth2/token" 
    }

    dbutils.fs.mount(
        source="abfss://landing-zone-2@ecomadlsatul.dfs.core.windows.net",
        mount_point=mnt_point,
        extra_configs=configs
    )
    print(f"✅ Mounted landing-zone-2 at {mnt_point}")

ensure_mount()

✅ Already mounted: /mnt/lz2


In [0]:
# -----------------------------
# 1) Define paths 
# -----------------------------
BASE          = "/mnt/lz2"
RAW_BASE      = f"{BASE}/to_process_data"      
BRONZE_BASE   = f"{BASE}/delta/tables/bronze"
SILVER_BASE   = f"{BASE}/delta/tables/silver"
GOLD_BASE     = f"{BASE}/delta/tables/gold"

RAW_USERS     = f"{RAW_BASE}/users_data"
RAW_BUYERS    = f"{RAW_BASE}/buyers_data"
RAW_SELLERS   = f"{RAW_BASE}/sellers_data"
RAW_COUNTRIES = f"{RAW_BASE}/countries_data"

BR_USERS      = f"{BRONZE_BASE}/users"
BR_BUYERS     = f"{BRONZE_BASE}/buyers"
BR_SELLERS    = f"{BRONZE_BASE}/sellers"
BR_COUNTRIES  = f"{BRONZE_BASE}/countries"

SL_USERS      = f"{SILVER_BASE}/users"
SL_BUYERS     = f"{SILVER_BASE}/buyers"
SL_SELLERS    = f"{SILVER_BASE}/sellers"
SL_COUNTRIES  = f"{SILVER_BASE}/countries"

GOLD_OBT      = f"{GOLD_BASE}/ecom_one_big_table"


dbutils.widgets.text("file_name", "")  
file_name = dbutils.widgets.get("file_name").strip()
SRC_USERS = f"{RAW_USERS}/{file_name}" if file_name else RAW_USERS

In [0]:
# ============================================================
# 2) BRONZE — read parquet from to_process_data/* and write Delta
# ============================================================

# Users (Bronze)
userDF = (spark.read.format("parquet")
          .option("header","true").option("inferSchema","true")
          .load(SRC_USERS))
(userDF.write.format("delta").mode("overwrite").save(BR_USERS))

# Buyers (Bronze)
buyersDF = (spark.read.format("parquet")
            .option("header","true").option("inferSchema","true")
            .load(RAW_BUYERS))
(buyersDF.write.format("delta").mode("overwrite").save(BR_BUYERS))

# Sellers (Bronze)
sellersDF = (spark.read.format("parquet")
             .option("header","true").option("inferSchema","true")
             .load(RAW_SELLERS))
(sellersDF.write.format("delta").mode("overwrite").save(BR_SELLERS))

# Countries (Bronze)
countriesDF = (spark.read.format("parquet")
               .option("header","true").option("inferSchema","true")
               .load(RAW_COUNTRIES))
(countriesDF.write.format("delta").mode("overwrite").save(BR_COUNTRIES))


In [0]:


# ============================================================
# 3) SILVER — your original transformations (paths switched to /mnt/lz2)
# ============================================================

spark = SparkSession.builder.appName("EcommerceDataPipeline").getOrCreate()

# ----- Users (Silver) -----

# Reading User Table
usersDF = spark.read.format("delta").load(BR_USERS)

# Normalize country code to uppercase
usersDF = usersDF.withColumn("countryCode" , upper(col("countryCode")))

# Handling multiple languages elegently with 'expr' and 'case when'
usersDF = usersDF.withColumn(
    "language_full",
    expr("CASE WHEN language = 'EN' THEN 'ENGLISH' "
         "WHEN language = 'FR' THEN 'FRENCH' "
         "ELSE 'Other' END ")
)

# Correcting potential data entry errors in gender column
usersDF = usersDF.withColumn(
    "gender",
    when(col("gender").startswith("M"), "Male")
    .when(col("gender").startswith("F"), "Female")
    .otherwise("Other")
)

# Using 'regexp_replace' to clean 'civilitytitle' values
usersDF = usersDF.withColumn("civilitytitle_clean",
                             regexp_replace("civilitytitle", "(Mme|Ms|Mrs)", "Ms"))


#Derive new column 'Year_since_last_login' from 'daysSinceLastLogin'
usersDF = usersDF.withColumn("Year_since_last_login", col("daysSinceLastLogin")/365)

# calculate age of account in years and categorize into 'account_age_group'
usersDF = usersDF.withColumn("account_age_years", round(col("seniority") / 365 ,2)) \
                 .withColumn("account_age_group",
                             when(col("account_age_years") < 1 , "New")
                             .when((col("account_age_years") >=1) & (col("account_age_years") < 3) , "Intermediate")
                             .otherwise("Experienced"))

# Add a column with the current year for comarison 
usersDF = usersDF.withColumn("current_year", year(current_date()))

# Creatively combining strings to forma a unique user descriptor
usersDF = usersDF.withColumn(
    "user_descriptor",
    concat(col("gender"), lit("_"),
           col("countrycode"), lit("_"),
           expr("substring(civilitytitle_clean,1,3)"), lit("_"),
           col("language_full"))
)

# Flag long title 
usersDF = usersDF.withColumn("flag_long_title", length(col("civilitytitle_clean")) > 10)

# Casting
usersDF = usersDF.withColumn("hasAnyApp" , col("hasAnyApp").cast("boolean")) \
                 .withColumn("hasAndroidApp" , col("hasAndroidApp").cast("boolean")) \
                 .withColumn("hasIosApp" , col("hasIosApp").cast("boolean")) \
                 .withColumn("hasProfilePicture" , col("hasProfilePicture").cast("boolean"))

usersDF = usersDF.withColumn("socialNbFollowers", col("socialNbFollowers").cast(IntegerType())) \
                 .withColumn("socialNbFollows" , col("socialNbFollows").cast(IntegerType())) \
                 .withColumn("productsPassRate" , col("productsPassRate").cast(DecimalType(10,2))) \
                 .withColumn("seniorityAsMonths" , col("seniorityAsMonths").cast(DecimalType(10,2))) \
                 .withColumn("seniorityAsYears" , col("seniorityAsYears").cast(DecimalType(10,2)))

usersDF = usersDF.withColumn(
    "daysSinceLastLogin",
    when(col("daysSinceLastLogin").isNotNull(), col("daysSinceLastLogin").cast(IntegerType())).otherwise(0)
)

usersDF.select("gender").show()

(usersDF.write.format("delta").mode("overwrite").save(SL_USERS))

+------+
|gender|
+------+
|Female|
|Female|
|  Male|
|Female|
|  Male|
|Female|
|Female|
|Female|
|Female|
|  Male|
|Female|
|Female|
|Female|
|Female|
|Female|
|Female|
|Female|
|Female|
|Female|
|  Male|
+------+
only showing top 20 rows


In [0]:
# ----- Buyers (Silver) -----

# Reading buyers table
buyersDF = spark.read.format("delta").load(BR_BUYERS)

# Casting Integer Columns
integer_columns = [
    'buyers', 'topbuyers', 'femalebuyers', 'malebuyers',
    'topfemalebuyers', 'topmalebuyers', 'totalproductsbought',
    'totalproductswished', 'totalproductsliked', 'toptotalproductsbought',
    'toptotalproductswished', 'toptotalproductsliked'
]
for c in integer_columns:
    buyersDF = buyersDF.withColumn(c, col(c).cast(IntegerType()))

# Casting Decimal columns
decimal_columns = [
    'topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio',
    'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio',
    'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished',
    'meanproductsliked', 'topmeanproductsbought', 'topmeanproductswished',
    'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays',
    'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing'
]
for c in decimal_columns:
    buyersDF = buyersDF.withColumn(c, col(c).cast(DecimalType(10,2)))

# Normalize country name 
buyersDF = buyersDF.withColumn("country", initcap(col("country")))

for c in integer_columns:
    buyersDF = buyersDF.fillna({c: 0})

# Calculate the ratio of female to male buyers
buyersDF = buyersDF.withColumn("female_to_make_ratio",
                               round(col("femalebuyers")/(col("malebuyers")+1),2))

# Determine the market potential by comparing wishlist and purchases
buyersDF = buyersDF.withColumn("wishlist_to_purchase_ratio",
                               round(col("totalproductswished")/(col("totalproductsbought")+1),2))

# Tag countries with highes engagement ratio 
buyersDF = buyersDF.withColumn("high_engagement",
                               when(col("boughtperwishlistratio") > 0.5, True).otherwise(False))

# Flag market with incresing female buyers participation
buyersDF = buyersDF.withColumn("growing_female_market",
                               when(col("femalebuyersratio") > col("topfemalebuyersratio"), True).otherwise(False))

(buyersDF.write.format("delta").mode("overwrite").save(SL_BUYERS))

In [0]:
# ----- Sellers (Silver) -----

# Reading sellers table
sellersDF = spark.read.format("delta").load(BR_SELLERS)
sellersDF = sellersDF \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))

# Normalize country name and gender value 
sellersDF = sellersDF.withColumn("country", initcap(col("country"))) \
                     .withColumn("sex"    , initcap(col("sex")))

# Add a column to categorize the number of sellers
sellersDF = sellersDF.withColumn(
    "seller_size_category",
    when(col("nbsellers") < 500, "Small")
    .when((col("nbsellers") >= 500) & (col("nbsellers") < 2000), "Medium")
    .otherwise("Large")
)

# Calculate the mean product listed per seller as an indicator of seller activity
sellersDF = sellersDF.withColumn(
    "mean_product_listed_per_seller",
    round(col("totalproductslisted") / col("nbsellers"), 2)
)

# Indentify markets with high seller pass rate
sellersDF = sellersDF.withColumn(
    "high_seller_pass_rate",
    when(col("meansellerpassrate") > 0.75, "High").otherwise("Normal")
)

mean_pass_rate = sellersDF.select(round(avg("meansellerpassrate"),2).alias("avg_pass_rate")) \
                          .collect()[0]["avg_pass_rate"]

sellersDF = sellersDF.withColumn(
    "meansellerpassrate",
    when(col("meansellerpassrate").isNull(), mean_pass_rate).otherwise(col("meansellerpassrate"))
)

(sellersDF.write.format("delta").mode("overwrite").save(SL_SELLERS))

In [0]:
# ----- Countries (Silver) -----

# Reading from countries table
countriesDF = spark.read.format("delta").load(BR_COUNTRIES)
countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

# Transformation
countriesDF = countriesDF.withColumn("country", initcap(col("country")))

# Calculating the ratio of top sellers to total sellers
countriesDF = countriesDF.withColumn("top_seller_ratio", 
                                     round(col("topsellers") / col("sellers"), 2))

# countriesDF countries with high ratio of female sellers 
countriesDF = countriesDF.withColumn("high_female_seller_ratio",
                                     when(col("femalesellersratio") > 0.5, True).otherwise(False))

# Adding a performance indicator based on the sold/listed ratio
countriesDF = countriesDF.withColumn("performane_indicator",
                                     round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2))

#Flag countries with exceptionally high performance 
countriesDF = countriesDF.withColumn("high_performance",
                                     when(col("performane_indicator") > 0.8, "True").otherwise("False"))

countriesDF = countriesDF.withColumn(
    "activity_level",
    when(col("meanofflinedays") < 30, "Higly Active")
    .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
    .otherwise("Low Activity")
)

(countriesDF.write.format("delta").mode("overwrite").save(SL_COUNTRIES))

In [0]:
# ============================================================
# 4) GOLD — your original join to One Big Table
# ============================================================

spark = SparkSession.builder.appName("GoldLayerCreation").getOrCreate()

# reading the silver delta tables 
silver_sellers   = spark.read.format("delta").load(SL_SELLERS)
silver_buyers    = spark.read.format("delta").load(SL_BUYERS)
silver_users     = spark.read.format("delta").load(SL_USERS)
silver_countries = spark.read.format("delta").load(SL_COUNTRIES)

# Perform the join operations
comprehensive_user_table = silver_users \
    .join(silver_countries, ["country"], "outer") \
    .join(silver_buyers,    ["country"], "outer") \
    .join(silver_sellers,   ["country"], "outer")

# Select and alias columns from each dataframe to ensure uniqueness
comprehensive_user_table = comprehensive_user_table.select(
    silver_users["country"].alias("Country"),
    # From silver_users
    silver_users["productsSold"].alias("Users_productsSold"),
    silver_users["productsWished"].alias("Users_productsWished"),
    silver_users["account_age_years"].alias("Users_account_age_years"),
    silver_users["account_age_group"].alias("Users_account_age_group"),
    silver_users["hasanyapp"].alias("Users_hasanyapp"),
    silver_users["socialnbfollowers"].alias("Users_socialnbfollowers"),
    silver_users["flag_long_title"].alias("Users_flag_long_title"),
    # From silver_countries
    silver_countries["sellers"].alias("Countries_Sellers"),
    silver_countries["topsellers"].alias("Countries_TopSellers"),
    silver_countries["femalesellers"].alias("Countries_FemaleSellers"),
    silver_countries["malesellers"].alias("Countries_MaleSellers"),
    silver_countries["topfemalesellers"].alias("Countries_TopFemaleSellers"),
    silver_countries["topmalesellers"].alias("Countries_TopMaleSellers"),
    # From silver_buyers
    silver_buyers["buyers"].alias("Buyers_Total"),
    silver_buyers["topbuyers"].alias("Buyers_Top"),
    silver_buyers["femalebuyers"].alias("Buyers_Female"),
    silver_buyers["malebuyers"].alias("Buyers_Male"),
    silver_buyers["topfemalebuyers"].alias("Buyers_TopFemale"),
    silver_buyers["topmalebuyers"].alias("Buyers_TopMale"),
    # From silver_sellers
    silver_sellers["nbsellers"].alias("Sellers_Total"),
    silver_sellers["sex"].alias("Sellers_Sex"),
    silver_sellers["meanproductssold"].alias("Sellers_MeanProductsSold"),
    silver_sellers["meanproductslisted"].alias("Sellers_MeanProductsListed"),
)

comprehensive_user_table.show()
(comprehensive_user_table.write.format("delta").mode("overwrite").save(GOLD_OBT))

print(" Pipeline finished: Bronze → Silver → Gold")

+-------+------------------+--------------------+-----------------------+-----------------------+---------------+-----------------------+---------------------+-----------------+--------------------+-----------------------+---------------------+--------------------------+------------------------+------------+----------+-------------+-----------+----------------+--------------+-------------+-----------+------------------------+--------------------------+
|Country|Users_productsSold|Users_productsWished|Users_account_age_years|Users_account_age_group|Users_hasanyapp|Users_socialnbfollowers|Users_flag_long_title|Countries_Sellers|Countries_TopSellers|Countries_FemaleSellers|Countries_MaleSellers|Countries_TopFemaleSellers|Countries_TopMaleSellers|Buyers_Total|Buyers_Top|Buyers_Female|Buyers_Male|Buyers_TopFemale|Buyers_TopMale|Sellers_Total|Sellers_Sex|Sellers_MeanProductsSold|Sellers_MeanProductsListed|
+-------+------------------+--------------------+-----------------------+-------------

In [0]:
ts = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
entities = ["users_data", "buyers_data", "sellers_data", "countries_data"]

for e in entities:
    src = f"/mnt/lz2/to_process_data/{e}"
    dst = f"/mnt/lz2/processed_data/{e}/load_ts={ts}/"
    try:
        # any parquet files to move?
        files = [f for f in dbutils.fs.ls(src) if f.path.lower().endswith(".parquet")]
        if not files:
            print(f" No new files for {e}")
            continue

        dbutils.fs.mkdirs(dst)
        # move everything inside src into dst (fast & simple)
        dbutils.fs.mv(src, dst, True)
        # recreate empty source folder for next run
        dbutils.fs.mkdirs(src)
        print(f" Moved {e}: {len(files)} files -> {dst}")
    except Exception as ex:
        print(f" Skipped {e}: {ex}")

 Moved users_data: 1 files -> /mnt/lz2/processed_data/users_data/load_ts=20250811_215706/
 Moved buyers_data: 1 files -> /mnt/lz2/processed_data/buyers_data/load_ts=20250811_215706/
 Moved sellers_data: 1 files -> /mnt/lz2/processed_data/sellers_data/load_ts=20250811_215706/
 Moved countries_data: 1 files -> /mnt/lz2/processed_data/countries_data/load_ts=20250811_215706/
