In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [0]:
spark = SparkSession.builder.appName("ECommerceDataAnalysis").getOrCreate()

In [0]:

#/mnt/delta/tables/bronze/users
usersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/users")

In [0]:
usersDF.show(2)

+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+----------------+-----------+
|      identifierHash|type|   country|language|socialNbFollowers|socialNbFollows|socialProductsLiked|productsListed|productsSold|productsPassRate|productsWished|productsBought|gender|civilityGenderId|civilityTitle|hasAnyApp|hasAndroidApp|hasIosApp|hasProfilePicture|daysSinceLastLogin|seniority|seniorityAsMonths|seniorityAsYears|countryCode|
+--------------------+----+----------+--------+-----------------+---------------+-------------------+--------------+------------+----------------+--------------+--------------+------+----------------+-------------+---------+-------------+---------+-----------------+------------------+---------+-----------------+-

In [0]:
usersDF.select('countrycode').show(5)

+-----------+
|countrycode|
+-----------+
|         us|
|         de|
|         se|
|         tr|
|         fr|
+-----------+
only showing top 5 rows



In [0]:

# Normalize country codes to uppercase
usersDF = usersDF.withColumn("countrycode", upper(col("countrycode")))

In [0]:
usersDF.select('countrycode').show(5)

+-----------+
|countrycode|
+-----------+
|         US|
|         DE|
|         SE|
|         TR|
|         FR|
+-----------+
only showing top 5 rows



In [0]:
usersDF.select('language').show(5)

+--------+
|language|
+--------+
|      en|
|      de|
|      en|
|      en|
|      en|
+--------+
only showing top 5 rows



In [0]:

# Handling multiple languages elegantly with `expr` and `case when`
usersDF = usersDF.withColumn("language_full", 
                             expr("CASE WHEN language = 'en' THEN 'English' " +
                                  "WHEN language = 'fr' THEN 'French' " +
                                  "ELSE 'Other' END"))


In [0]:
#Added new column as language_full
usersDF.select('language_full').show(5)

+-------------+
|language_full|
+-------------+
|      English|
|        Other|
|      English|
|      English|
|      English|
+-------------+
only showing top 5 rows



In [0]:
usersDF.select('gender').show(5)

+------+
|gender|
+------+
|     F|
|     F|
|     M|
|     F|
|     M|
+------+
only showing top 5 rows



In [0]:
# Correcting potential data entry errors in `gender` column
usersDF = usersDF.withColumn("gender", 
                             when(col("gender").startswith("M"), "Male")
                             .when(col("gender").startswith("F"), "Female")
                             .otherwise("Other"))

In [0]:
usersDF.select('gender').show(5)

+------+
|gender|
+------+
|Female|
|Female|
|  Male|
|Female|
|  Male|
+------+
only showing top 5 rows



In [0]:
usersDF.select('dayssincelastlogin').show(10)

+------------------+
|dayssincelastlogin|
+------------------+
|               709|
|               709|
|               689|
|               709|
|               709|
|               709|
|               591|
|               709|
|               701|
|               703|
+------------------+
only showing top 10 rows



In [0]:
# Derive new column `years_since_last_login` from `dayssincelastlogin`
usersDF = usersDF.withColumn("years_since_last_login", col("dayssincelastlogin") / 365)

In [0]:
usersDF.select('years_since_last_login').show(10)

+----------------------+
|years_since_last_login|
+----------------------+
|    1.9424657534246574|
|    1.9424657534246574|
|    1.8876712328767122|
|    1.9424657534246574|
|    1.9424657534246574|
|    1.9424657534246574|
|    1.6191780821917807|
|    1.9424657534246574|
|    1.9205479452054794|
|     1.926027397260274|
+----------------------+
only showing top 10 rows



In [0]:
usersDF.select('seniority').show(5)

+---------+
|seniority|
+---------+
|     3205|
|     3205|
|     3205|
|     3205|
|     3205|
+---------+
only showing top 5 rows



In [0]:
# Calculate age of account in years and categorize into `account_age_group`
usersDF = usersDF.withColumn("account_age_years", round(col("seniority") / 365, 2))
usersDF = usersDF.withColumn("account_age_group",
                             when(col("account_age_years") < 1, "New")
                             .when((col("account_age_years") >= 1) & (col("account_age_years") < 3), "Intermediate")
                             .otherwise("Experienced"))
     

In [0]:
usersDF.select('account_age_years','account_age_group').show(5)

+-----------------+-----------------+
|account_age_years|account_age_group|
+-----------------+-----------------+
|             8.78|      Experienced|
|             8.78|      Experienced|
|             8.78|      Experienced|
|             8.78|      Experienced|
|             8.78|      Experienced|
+-----------------+-----------------+
only showing top 5 rows



In [0]:
# Add a column with the current year for comparison
usersDF = usersDF.withColumn("current_year", year(current_date()))

In [0]:
usersDF.select('current_year').show(5)

+------------+
|current_year|
+------------+
|        2024|
|        2024|
|        2024|
|        2024|
|        2024|
+------------+
only showing top 5 rows



In [0]:
usersDF.select('civilityTitle').show()

+-------------+
|civilityTitle|
+-------------+
|          mrs|
|          mrs|
|           mr|
|          mrs|
|           mr|
|          mrs|
|          mrs|
|          mrs|
|          mrs|
|           mr|
|          mrs|
|          mrs|
|          mrs|
|          mrs|
|          mrs|
|          mrs|
|          mrs|
|          mrs|
|          mrs|
|           mr|
+-------------+
only showing top 20 rows



In [0]:
# Creatively combining strings to form a unique user descriptor
usersDF = usersDF.withColumn("user_descriptor", 
                             concat(col("gender"), lit("_"), 
                                    col("countrycode"), lit("_"), 
                                    expr("substring(civilityTitle, 1, 3)"), lit("_"), 
                                    col("language_full")))

In [0]:
usersDF.select('user_descriptor').show(5)

+--------------------+
|     user_descriptor|
+--------------------+
|Female_US_mrs_Eng...|
| Female_DE_mrs_Other|
|  Male_SE_mr_English|
|Female_TR_mrs_Eng...|
|  Male_FR_mr_English|
+--------------------+
only showing top 5 rows



In [0]:
usersDF = usersDF.withColumn("flag_long_title", length(col("civilitytitle")) > 10)

In [0]:

usersDF = usersDF.withColumn("hasanyapp", col("hasanyapp").cast("boolean"))
usersDF = usersDF.withColumn("hasandroidapp", col("hasandroidapp").cast("boolean"))
usersDF = usersDF.withColumn("hasiosapp", col("hasiosapp").cast("boolean"))
usersDF = usersDF.withColumn("hasprofilepicture", col("hasprofilepicture").cast("boolean"))


usersDF = usersDF.withColumn("socialnbfollowers", col("socialnbfollowers").cast(IntegerType()))
usersDF = usersDF.withColumn("socialnbfollows", col("socialnbfollows").cast(IntegerType()))

usersDF = usersDF.withColumn("productspassrate", col("productspassrate").cast(DecimalType(10, 2)))
usersDF = usersDF.withColumn("seniorityasmonths", col("seniorityasmonths").cast(DecimalType(10, 2)))
usersDF = usersDF.withColumn("seniorityasyears", col("seniorityasyears").cast(DecimalType(10, 2)))

In [0]:

usersDF = usersDF.withColumn("dayssincelastlogin",
                             when(col("dayssincelastlogin").isNotNull(),
                                  col("dayssincelastlogin").cast(IntegerType()))
                             .otherwise(0))

In [0]:
usersDF.select('dayssincelastlogin').show(10)

+------------------+
|dayssincelastlogin|
+------------------+
|               709|
|               709|
|               689|
|               709|
|               709|
|               709|
|               591|
|               709|
|               701|
|               703|
+------------------+
only showing top 10 rows



In [0]:
usersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/users")

In [0]:
%fs ls '/mnt/delta/tables/silver/users'

path,name,size,modificationTime
dbfs:/mnt/delta/tables/silver/users/_delta_log/,_delta_log/,0,0
dbfs:/mnt/delta/tables/silver/users/part-00000-130f951c-d9c5-4174-807b-255700c5b97d-c000.snappy.parquet,part-00000-130f951c-d9c5-4174-807b-255700c5b97d-c000.snappy.parquet,341894,1715154977000


In [0]:
%fs ls '/mnt/delta/tables/silver'

path,name,size,modificationTime
dbfs:/mnt/delta/tables/silver/users/,users/,0,0


In [0]:
#/mnt/delta/tables/bronze/users
buyersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/buyers")

In [0]:
buyersDF.printSchema()

root
 |-- country: string (nullable = true)
 |-- buyers: integer (nullable = true)
 |-- topbuyers: integer (nullable = true)
 |-- topbuyerratio: double (nullable = true)
 |-- femalebuyers: integer (nullable = true)
 |-- malebuyers: integer (nullable = true)
 |-- topfemalebuyers: integer (nullable = true)
 |-- topmalebuyers: integer (nullable = true)
 |-- femalebuyersratio: double (nullable = true)
 |-- topfemalebuyersratio: double (nullable = true)
 |-- boughtperwishlistratio: double (nullable = true)
 |-- boughtperlikeratio: double (nullable = true)
 |-- topboughtperwishlistratio: double (nullable = true)
 |-- topboughtperlikeratio: double (nullable = true)
 |-- totalproductsbought: integer (nullable = true)
 |-- totalproductswished: integer (nullable = true)
 |-- totalproductsliked: integer (nullable = true)
 |-- toptotalproductsbought: integer (nullable = true)
 |-- toptotalproductswished: integer (nullable = true)
 |-- toptotalproductsliked: integer (nullable = true)
 |-- meanprodu

In [0]:
# Casting Integer columns
integer_columns = [
    'buyers', 'topbuyers', 'femalebuyers', 'malebuyers',
    'topfemalebuyers', 'topmalebuyers', 'totalproductsbought',
    'totalproductswished', 'totalproductsliked', 'toptotalproductsbought',
    'toptotalproductswished', 'toptotalproductsliked'
]

for column_name in integer_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(IntegerType()))

In [0]:
buyersDF.printSchema()

root
 |-- country: string (nullable = true)
 |-- buyers: integer (nullable = true)
 |-- topbuyers: integer (nullable = true)
 |-- topbuyerratio: double (nullable = true)
 |-- femalebuyers: integer (nullable = true)
 |-- malebuyers: integer (nullable = true)
 |-- topfemalebuyers: integer (nullable = true)
 |-- topmalebuyers: integer (nullable = true)
 |-- femalebuyersratio: double (nullable = true)
 |-- topfemalebuyersratio: double (nullable = true)
 |-- boughtperwishlistratio: double (nullable = true)
 |-- boughtperlikeratio: double (nullable = true)
 |-- topboughtperwishlistratio: double (nullable = true)
 |-- topboughtperlikeratio: double (nullable = true)
 |-- totalproductsbought: integer (nullable = true)
 |-- totalproductswished: integer (nullable = true)
 |-- totalproductsliked: integer (nullable = true)
 |-- toptotalproductsbought: integer (nullable = true)
 |-- toptotalproductswished: integer (nullable = true)
 |-- toptotalproductsliked: integer (nullable = true)
 |-- meanprodu

In [0]:
# Casting Decimal columns
decimal_columns = [
    'topbuyerratio', 'femalebuyersratio', 'topfemalebuyersratio',
    'boughtperwishlistratio', 'boughtperlikeratio', 'topboughtperwishlistratio',
    'topboughtperlikeratio', 'meanproductsbought', 'meanproductswished',
    'meanproductsliked', 'topmeanproductsbought', 'topmeanproductswished',
    'topmeanproductsliked', 'meanofflinedays', 'topmeanofflinedays',
    'meanfollowers', 'meanfollowing', 'topmeanfollowers', 'topmeanfollowing'
]

for column_name in decimal_columns:
    buyersDF = buyersDF.withColumn(column_name, col(column_name).cast(DecimalType(10, 2)))


In [0]:
buyersDF.printSchema()

root
 |-- country: string (nullable = true)
 |-- buyers: integer (nullable = true)
 |-- topbuyers: integer (nullable = true)
 |-- topbuyerratio: decimal(10,2) (nullable = true)
 |-- femalebuyers: integer (nullable = true)
 |-- malebuyers: integer (nullable = true)
 |-- topfemalebuyers: integer (nullable = true)
 |-- topmalebuyers: integer (nullable = true)
 |-- femalebuyersratio: decimal(10,2) (nullable = true)
 |-- topfemalebuyersratio: decimal(10,2) (nullable = true)
 |-- boughtperwishlistratio: decimal(10,2) (nullable = true)
 |-- boughtperlikeratio: decimal(10,2) (nullable = true)
 |-- topboughtperwishlistratio: decimal(10,2) (nullable = true)
 |-- topboughtperlikeratio: decimal(10,2) (nullable = true)
 |-- totalproductsbought: integer (nullable = true)
 |-- totalproductswished: integer (nullable = true)
 |-- totalproductsliked: integer (nullable = true)
 |-- toptotalproductsbought: integer (nullable = true)
 |-- toptotalproductswished: integer (nullable = true)
 |-- toptotalproduc

In [0]:
buyersDF.select('country').show(5)


+-----------+
|    country|
+-----------+
|     France|
|Royaume-Uni|
| Etats-Unis|
|  Allemagne|
|     Italie|
+-----------+
only showing top 5 rows



In [0]:

# Normalize country names
buyersDF = buyersDF.withColumn("country", initcap(col("country")))

for col_name in integer_columns:
    buyersDF = buyersDF.fillna({col_name: 0})

In [0]:
buyersDF.select('country').show(5)

+-----------+
|    country|
+-----------+
|     France|
|Royaume-uni|
| Etats-unis|
|  Allemagne|
|     Italie|
+-----------+
only showing top 5 rows



In [0]:
# Calculate the ratio of female to male buyers
buyersDF = buyersDF.withColumn("female_to_male_ratio", 
                               round(col("femalebuyers") / (col("malebuyers") + 1), 2))

In [0]:
# Determine the market potential by comparing wishlist and purchases
buyersDF = buyersDF.withColumn("wishlist_to_purchase_ratio", 
                               round(col("totalproductswished") / (col("totalproductsbought") + 1), 2))

In [0]:
# Tag countries with a high engagement ratio
high_engagement_threshold = 0.5
buyersDF = buyersDF.withColumn("high_engagement",
                               when(col("boughtperwishlistratio") > high_engagement_threshold, True)
                               .otherwise(False))
                            
# Flag markets with increasing female buyer participation
buyersDF = buyersDF.withColumn("growing_female_market",
                               when(col("femalebuyersratio") > col("topfemalebuyersratio"), True)
                               .otherwise(False))

     

In [0]:
buyersDF.select('female_to_male_ratio','wishlist_to_purchase_ratio','high_engagement','growing_female_market').show(10)

+--------------------+--------------------------+---------------+---------------------+
|female_to_male_ratio|wishlist_to_purchase_ratio|high_engagement|growing_female_market|
+--------------------+--------------------------+---------------+---------------------+
|                2.12|                      3.83|           true|                 true|
|                 2.4|                      6.68|           true|                false|
|                3.29|                      4.58|           true|                 true|
|                2.41|                      7.19|           true|                false|
|                 2.4|                      7.95|           true|                 true|
|                2.82|                      7.69|           true|                false|
|                4.37|                      5.07|           true|                 true|
|                 2.9|                       8.8|           true|                 true|
|                4.42|          

In [0]:
buyersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/buyers")

In [0]:

sellersDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")

In [0]:
sellersDF.show(5)

+---------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+------------------+
|  country|   sex|nbsellers|meanproductssold|meanproductslisted|meansellerpassrate|totalproductssold|totalproductslisted|meanproductsbought|meanproductswished|meanproductsliked|totalbought|totalwished|totalproductsliked|meanfollowers|meanfollows|percentofappusers|percentofiosusers|     meanseniority|
+---------+------+---------+----------------+------------------+------------------+-----------------+-------------------+------------------+------------------+-----------------+-----------+-----------+------------------+-------------+-----------+-----------------+-----------------+------------------+
|Allemagne|Female|      116|            4.03|              2.72|             27.33|           

In [0]:
sellersDF.printSchema()

root
 |-- country: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- nbsellers: integer (nullable = true)
 |-- meanproductssold: double (nullable = true)
 |-- meanproductslisted: double (nullable = true)
 |-- meansellerpassrate: double (nullable = true)
 |-- totalproductssold: integer (nullable = true)
 |-- totalproductslisted: integer (nullable = true)
 |-- meanproductsbought: double (nullable = true)
 |-- meanproductswished: double (nullable = true)
 |-- meanproductsliked: double (nullable = true)
 |-- totalbought: integer (nullable = true)
 |-- totalwished: integer (nullable = true)
 |-- totalproductsliked: integer (nullable = true)
 |-- meanfollowers: double (nullable = true)
 |-- meanfollows: double (nullable = true)
 |-- percentofappusers: double (nullable = true)
 |-- percentofiosusers: double (nullable = true)
 |-- meanseniority: double (nullable = true)



In [0]:
sellersDF = sellersDF \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))

In [0]:
sellersDF.printSchema()

root
 |-- country: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- nbsellers: integer (nullable = true)
 |-- meanproductssold: decimal(10,2) (nullable = true)
 |-- meanproductslisted: decimal(10,2) (nullable = true)
 |-- meansellerpassrate: decimal(10,2) (nullable = true)
 |-- totalproductssold: integer (nullable = true)
 |-- totalproductslisted: integer (nullable = true)
 |-- meanproductsbought: decimal(10,2) (nullable = true)
 |-- meanproductswished: decimal(10,2) (nullable = true)
 |-- meanproductsliked: decimal(10,2) (nullable = true)
 |-- totalbought: integer (nullable = true)
 |-- totalwished: integer (nullable = true)
 |-- totalproductsliked: integer (nullable = true)
 |-- meanfollowers: decimal(10,2) (nullable = true)
 |-- meanfollows: decimal(10,2) (nullable = true)
 |-- percentofappusers: decimal(10,2) (nullable = true)
 |-- percentofiosusers: decimal(10,2) (nullable = true)
 |-- meanseniority: decimal(10,2) (nullable = true)



In [0]:
sellersDF.select('country','sex').show(5)

+---------+------+
|  country|   sex|
+---------+------+
|Allemagne|Female|
|Allemagne|  Male|
|  Arménie|Female|
|Australie|Female|
|Australie|  Male|
+---------+------+
only showing top 5 rows



In [0]:
# Normalize country names and gender values
sellersDF = sellersDF.withColumn("country", initcap(col("country"))) \
                                                .withColumn("sex", upper(col("sex")))


In [0]:
sellersDF.select('country','sex').show(5)

+---------+------+
|  country|   sex|
+---------+------+
|Allemagne|FEMALE|
|Allemagne|  MALE|
|  Arménie|FEMALE|
|Australie|FEMALE|
|Australie|  MALE|
+---------+------+
only showing top 5 rows



In [0]:
#Add a column to categorize the number of sellers
sellersDF = sellersDF.withColumn("seller_size_category", 
                               when(col("nbsellers") < 500, "Small") \
                               .when((col("nbsellers") >= 500) & (col("nbsellers") < 2000), "Medium") \
                               .otherwise("Large"))


In [0]:
sellersDF.select('seller_size_category').show(5)

+--------------------+
|seller_size_category|
+--------------------+
|               Small|
|               Small|
|               Small|
|               Small|
|               Small|
+--------------------+
only showing top 5 rows



In [0]:
# Calculate the mean products listed per seller as an indicator of seller activity
sellersDF = sellersDF.withColumn("mean_products_listed_per_seller", 
                               round(col("totalproductslisted") / col("nbsellers"), 2))


In [0]:
# Identify markets with high seller pass rate
sellersDF = sellersDF.withColumn("high_seller_pass_rate", 
                               when(col("meansellerpassrate") > 0.75, "High") \
                               .otherwise("Normal"))

mean_pass_rate = sellersDF.select(round(avg("meansellerpassrate"), 2).alias("avg_pass_rate")).collect()[0]["avg_pass_rate"]

sellersDF = sellersDF.withColumn("meansellerpassrate",
                                 when(col("meansellerpassrate").isNull(), mean_pass_rate)
                                 .otherwise(col("meansellerpassrate")))

In [0]:
sellersDF.select('meansellerpassrate').show(5)

+------------------+
|meansellerpassrate|
+------------------+
|             27.33|
|             19.15|
|              0.00|
|             10.44|
|             33.33|
+------------------+
only showing top 5 rows



In [0]:
sellersDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/sellers")

In [0]:

countriesDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/countries")

In [0]:
countriesDF.show(5)

+---------+-------+----------+--------------+------------------+---------------------+-------------+-----------+----------------+--------------+----------------+-------------+--------------------+-----------------+----------------------+-------------------+-------------------+---------------------+-----------------+------------------+---------------+------------------+-------------+-------------+----------------+----------------+
|  country|sellers|topsellers|topsellerratio|femalesellersratio|topfemalesellersratio|femalesellers|malesellers|topfemalesellers|topmalesellers|countrysoldratio|bestsoldratio|toptotalproductssold|totalproductssold|toptotalproductslisted|totalproductslisted|topmeanproductssold|topmeanproductslisted| meanproductssold|meanproductslisted|meanofflinedays|topmeanofflinedays|meanfollowers|meanfollowing|topmeanfollowers|topmeanfollowing|
+---------+-------+----------+--------------+------------------+---------------------+-------------+-----------+----------------+---

In [0]:
countriesDF.printSchema()

root
 |-- country: string (nullable = true)
 |-- sellers: integer (nullable = true)
 |-- topsellers: integer (nullable = true)
 |-- topsellerratio: double (nullable = true)
 |-- femalesellersratio: double (nullable = true)
 |-- topfemalesellersratio: double (nullable = true)
 |-- femalesellers: integer (nullable = true)
 |-- malesellers: integer (nullable = true)
 |-- topfemalesellers: integer (nullable = true)
 |-- topmalesellers: integer (nullable = true)
 |-- countrysoldratio: double (nullable = true)
 |-- bestsoldratio: double (nullable = true)
 |-- toptotalproductssold: integer (nullable = true)
 |-- totalproductssold: integer (nullable = true)
 |-- toptotalproductslisted: integer (nullable = true)
 |-- totalproductslisted: integer (nullable = true)
 |-- topmeanproductssold: double (nullable = true)
 |-- topmeanproductslisted: double (nullable = true)
 |-- meanproductssold: double (nullable = true)
 |-- meanproductslisted: double (nullable = true)
 |-- meanofflinedays: double (nul

In [0]:
countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

In [0]:

countriesDF = countriesDF.withColumn("country", initcap(col("country")))


In [0]:

# Calculating the ratio of top sellers to total sellers
countriesDF = countriesDF.withColumn("top_seller_ratio", 
                                        round(col("topsellers") / col("sellers"), 2))

In [0]:
# countriesDF countries with a high ratio of female sellers
countriesDF = countriesDF.withColumn("high_female_seller_ratio", 
                                        when(col("femalesellersratio") > 0.5, True).otherwise(False))

In [0]:
countriesDF.select('high_female_seller_ratio').show(5)

+------------------------+
|high_female_seller_ratio|
+------------------------+
|                    true|
|                   false|
|                    true|
|                    true|
|                    true|
+------------------------+
only showing top 5 rows



In [0]:

# Adding a performance indicator based on the sold/listed ratio
countriesDF = countriesDF.withColumn("performance_indicator", 
                                        round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2))


In [0]:
# Flag countries with exceptionally high performance
performance_threshold = 0.8
countriesDF = countriesDF.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))


In [0]:
countriesDF = countriesDF.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))

In [0]:
countriesDF.select('activity_level').show(5)

+--------------+
|activity_level|
+--------------+
| Highly Active|
| Highly Active|
|  Low Activity|
|  Low Activity|
| Highly Active|
+--------------+
only showing top 5 rows



In [0]:
countriesDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")

In [0]:
%fs ls '/mnt/delta/tables/'

path,name,size,modificationTime
dbfs:/mnt/delta/tables/bronze/,bronze/,0,0
dbfs:/mnt/delta/tables/silver/,silver/,0,0


In [0]:
%fs ls '/mnt/delta/tables/silver/'

path,name,size,modificationTime
dbfs:/mnt/delta/tables/silver/buyers/,buyers/,0,0
dbfs:/mnt/delta/tables/silver/countries/,countries/,0,0
dbfs:/mnt/delta/tables/silver/sellers/,sellers/,0,0
dbfs:/mnt/delta/tables/silver/users/,users/,0,0
