In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
spark = SparkSession.builder.appName('ecomdata').getOrCreate()

In [0]:
usersDF = spark.read.format('delta').load('/mnt/delta/bronze/tables/users')

In [0]:
usersDF.show(5)

+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+
|      identifierHash|type|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countryCode|
+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+-

In [0]:
# Normalize country codes to uppercase
usersDF = usersDF.withColumn('countryCode', upper(col('countryCode')))
usersDF.show(5)

+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+
|      identifierHash|type|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countryCode|
+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+-

In [0]:
usersDF.select('language').distinct().show()


+--------+
|language|
+--------+
|      en|
|      de|
|      es|
|      it|
|      fr|
+--------+



In [0]:
usersDF = usersDF.withColumn('language_full', when(col('language')=='en', 'English')
                                          .when(col('language')=='fr', 'French')
                                          .when(col('language')=='es', 'Spanish')
                                          .when(col('language')=='de', 'German')
                                          .when(col('language')=='it', 'Italian'))

In [0]:
usersDF.select('language_full','language').distinct().show()

+-------------+--------+
|language_full|language|
+-------------+--------+
|      Italian|      it|
|      Spanish|      es|
|       French|      fr|
|      English|      en|
|       German|      de|
+-------------+--------+



In [0]:
usersDF.select('gender').distinct().show()

+------+
|gender|
+------+
|     F|
|     M|
+------+



In [0]:
# Correcting potential data entry errors in `gender` column
usersDF = usersDF.withColumn('gender',expr('CASE WHEN gender LIKE "M%" THEN "MALE"' +  'WHEN gender LIKE "F%" THEN "FEMALE"' + 'ELSE "OTHER" END'))

In [0]:
usersDF.select('gender').distinct().show()

+------+
|gender|
+------+
|  MALE|
|FEMALE|
+------+



In [0]:
# Using `regexp_replace` to clean `civilitytitle` values
usersDF = usersDF.withColumn('civilitytitle', regexp_replace('civilitytitle', "(mrs|miss)", "ms"))

In [0]:
usersDF.select('civilitytitle').distinct().show()

+-------------+
|civilitytitle|
+-------------+
|           ms|
|           mr|
+-------------+



In [0]:
# Derive new column `years_since_last_login` from `dayssincelastlogin`
usersDF = usersDF.withColumn("years_since_last_login", col("dayssincelastlogin") / 365)

In [0]:
usersDF.select('years_since_last_login','dayssincelastlogin').distinct().show()

+----------------------+------------------+
|years_since_last_login|dayssincelastlogin|
+----------------------+------------------+
|    0.6493150684931507|               237|
|   0.27945205479452057|               102|
|                   0.6|               219|
|    1.5835616438356164|               578|
|    0.5068493150684932|               185|
|    0.8438356164383561|               308|
|    1.4109589041095891|               515|
|    1.6767123287671233|               612|
|     1.906849315068493|               696|
|    1.1232876712328768|               410|
|     0.989041095890411|               361|
|     0.810958904109589|               296|
|    1.4986301369863013|               547|
|   0.13150684931506848|                48|
|    0.8328767123287671|               304|
|    1.4273972602739726|               521|
|    1.3753424657534246|               502|
|     0.915068493150685|               334|
|    1.1205479452054794|               409|
|    0.5232876712328767|        

In [0]:
usersDF = usersDF.withColumn("account_age_years", round(col("seniority") / 365, 2))
usersDF.select('account_age_years','seniority').distinct().show()

+-----------------+---------+
|account_age_years|seniority|
+-----------------+---------+
|             8.77|     3201|
|             8.78|     3205|
|             8.78|     3204|
|             8.77|     3200|
|             8.77|     3202|
|             8.78|     3203|
+-----------------+---------+



In [0]:
#categorize into `account_age_group`


usersDF = usersDF.withColumn('account_age_group',when(col('account_age_years') < 1, 'Newbie')\
    .when((col('account_age_years') >= 1) & (col('account_age_years') < 3), 'Intermediate')\
    .otherwise("Experienced"))
    

In [0]:
usersDF.select('account_age_group','account_age_years').show()

+-----------------+-----------------+
|account_age_group|account_age_years|
+-----------------+-----------------+
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
|      Experienced|             8.78|
+-----------------+-----------------+
only showing top 20 rows



In [0]:
# Add a column with the current year for comparison
usersDF = usersDF.withColumn("current_year", year(current_date()))
     

In [0]:
# Creatively combining strings to form a unique user descriptor
usersDF = usersDF.withColumn("user_descriptor", 
                             concat(col("gender"), lit("_"), 
                                    col("countrycode"), lit("_"), 
                                    expr("substring(civilitytitle, 1, 3)"), lit("_"), 
                                    col("language_full")))

In [0]:
usersDF = usersDF.withColumn("flag_long_title", length(col("civilitytitle")) > 10)


In [0]:
usersDF = usersDF.withColumn("hasanyapp", col("hasanyapp").cast("boolean"))
usersDF = usersDF.withColumn("hasandroidapp", col("hasandroidapp").cast("boolean"))
usersDF = usersDF.withColumn("hasiosapp", col("hasiosapp").cast("boolean"))
usersDF = usersDF.withColumn("hasprofilepicture", col("hasprofilepicture").cast("boolean"))


usersDF = usersDF.withColumn("socialnbfollowers", col("socialnbfollowers").cast(IntegerType()))
usersDF = usersDF.withColumn("socialnbfollows", col("socialnbfollows").cast(IntegerType()))

usersDF = usersDF.withColumn("productspassrate", col("productspassrate").cast(DecimalType(10, 2)))
usersDF = usersDF.withColumn("seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10, 2)))
usersDF = usersDF.withColumn("seniorityasyears", col("seniorityasyears").cast(DecimalType(10, 2)))

In [0]:
usersDF = usersDF.withColumn("dayssincelastlogin",
                             when(col("dayssincelastlogin").isNotNull(),
                                  col("dayssincelastlogin").cast(IntegerType()))
                             .otherwise(0))

In [0]:
usersDF.write.format("delta").mode("overwrite").save("/mnt/delta/silver/tables/users")


In [0]:
#/mnt/delta/bronze/tables/users
buyersDF = spark.read.format("delta").load("/mnt/delta/bronze/tables/buyers")

In [0]:
# Casting Integer columns
integer_columns = [
    'buyers', 'topbuyers', 'femalebuyers', 'malebuyers',
    'topfemalebuyers', 'topmalebuyers', 'totalproductsbought',
    'totalproductswished', 'totalproductsliked', 'toptotalproductsbought',
    'toptotalproductswished', 'toptotalproductsliked'
]

for column_name in integer_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(IntegerType()))
     

# Casting Decimal columns
decimal_columns = [
    'topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio',
    'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio',
    'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished',
    'meanproductsliked', 'topmeanproductsbought', 'topmeanproductswished',
    'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays',
    'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing'
]

for column_name in decimal_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(DecimalType(10, 2)))

In [0]:
# Normalize country names
buyersDF = buyersDF.withColumn("country", initcap(col("country")))

for col_name in integer_columns:
    buyersDF = buyersDF.fillna({col_name: 0})

# Calculate the ratio of female to male buyers
buyersDF = buyersDF.withColumn("female_to_male_ratio", 
                               round(col("femalebuyers") / (col("malebuyers") + 1), 2))

# Determine the market potential by comparing wishlist and purchases
buyersDF = buyersDF.withColumn("wishlist_to_purchase_ratio", 
                               round(col("totalproductswished") / (col("totalproductsbought") + 1), 2))

# Tag countries with a high engagement ratio
high_engagement_threshold = 0.5
buyersDF = buyersDF.withColumn("high_engagement",
                               when(col("boughtperwishlistratio") > high_engagement_threshold, True)
                               .otherwise(False))
                               
    # Flag markets with increasing female buyer participation
buyersDF = buyersDF.withColumn("growing_female_market",
                               when(col("femalebuyersratio") > col("topfemalebuyersratio"), True)
                               .otherwise(False))


In [0]:
buyersDF.write.format("delta").mode("overwrite").save("/mnt/delta/silver/tables/buyers")


In [0]:
sellersDF = spark.read.format("delta").load("/mnt/delta/bronze/tables/sellers")

In [0]:
sellersDF = sellersDF \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))
     

In [0]:
# Normalize country names and gender values
sellersDF = sellersDF.withColumn("country", initcap(col("country"))) \
                                                .withColumn("sex", upper(col("sex")))


#Add a column to categorize the number of sellers
sellersDF = sellersDF.withColumn("seller_size_category", 
                               when(col("nbsellers") < 500, "Small") \
                               .when((col("nbsellers") >= 500) & (col("nbsellers") < 2000), "Medium") \
                               .otherwise("Large"))

# Calculate the mean products listed per seller as an indicator of seller activity
sellersDF = sellersDF.withColumn("mean_products_listed_per_seller", 
                               round(col("totalproductslisted") / col("nbsellers"), 2))

# Identify markets with high seller pass rate
sellersDF = sellersDF.withColumn("high_seller_pass_rate", 
                               when(col("meansellerpassrate") > 0.75, "High") \
                               .otherwise("Normal"))

mean_pass_rate = sellersDF.select(round(avg("meansellerpassrate"), 2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]

sellersDF = sellersDF.withColumn("meansellerpassrate",
                                 when(col("meansellerpassrate").isNull(), mean_pass_rate)
                                 .otherwise(col("meansellerpassrate")))
                                 
# OR CAN USE : sellersDF = sellersDF.fillna({"meansellerpassrate": mean_pass_rate})
sellersDF.write.format("delta").mode("overwrite").save("/mnt/delta/silver/tables/sellers")


In [0]:
countriesDF = spark.read.format("delta").load("/mnt/delta/bronze/tables/countries")

countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

countriesDF = countriesDF.withColumn("country", initcap(col("country")))


# Calculating the ratio of top sellers to total sellers
countriesDF = countriesDF.withColumn("top_seller_ratio", 
                                        round(col("topsellers") / col("sellers"), 2))

# countriesDF countries with a high ratio of female sellers
countriesDF = countriesDF.withColumn("high_female_seller_ratio", 
                                        when(col("femalesellersratio") > 0.5, True).otherwise(False))

# Adding a performance indicator based on the sold/listed ratio
countriesDF = countriesDF.withColumn("performance_indicator", 
                                        round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2))

# Flag countries with exceptionally high performance
performance_threshold = 0.8
countriesDF = countriesDF.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))

countriesDF = countriesDF.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))


countriesDF.write.format("delta").mode("overwrite").save("/mnt/delta/silver/tables/countries")