In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark=SparkSession.builder.appName('EcomDataPipeline').getOrCreate()

In [0]:
userDF=spark.read.format('delta').load('/mnt/delta/tables/bronze/users/')

In [0]:
userDF=userDF.withColumn('countryCode',upper(col('countryCode')))

In [0]:
userDF=userDF.withColumn('language_fulL',expr("CASE when language == 'en' then 'English'" +
                                        "when language == 'fr' then 'French'" +
                                        "else 'Other' End"))

In [0]:
userDF=userDF.withColumn('gender',
                  when(col('gender').startswith('F'),'Female')
                  .when(col('gender').startswith('M'),'Male')
                  .otherwise('Other'))

In [0]:
userDF=userDF.withColumn('civilitytitle_clean',regexp_replace('civilityTitle','(Mme,Ms,Mrs)','Ms'))

In [0]:
userDF=userDF.withColumn('year_since_last_login',col('dayssincelastlogin')/365)

In [0]:
userDF=userDF.withColumn('account_age_years',round(col('seniority')/365))

In [0]:
userDF=userDF.withColumn('account_age_group',
                         when(col('account_age_years') < 1,'New')
                         .when((col('account_age_years') >= 1) & (col('account_age_years') < 3), 'Intermediate')
                         .otherwise('experienced'))

In [0]:
userDF=userDF.withColumn('current_year',year(current_date()))
userDF=userDF.withColumn('user_description',concat(col('gender'),lit('_'),col('countryCode'),lit('_'),
                                                   expr("substring(civilitytitle_clean,1,3)"),lit('_'),
                                                   col('language')))
                         

In [0]:
userDF=userDF.withColumn('flag_long_title',length(col('civilitytitle'))>10)

In [0]:
userDF=userDF.withColumn('hasanyapp',col('hasanyapp').cast('boolean')) \
              .withColumn('hasandroidapp',col('hasandroidapp').cast('boolean')) \
                .withColumn('hasiosapp',col('hasiosapp').cast('boolean')) \
                .withColumn('hasprofilepicture',col('hasprofilepicture').cast('boolean')) \
                .withColumn('socialnbfollowers',col('socialnbfollowers').cast(IntegerType())) \
                .withColumn('socialnbfollows',col('socialnbfollows').cast(IntegerType())) \
                .withColumn('productspassrate',col('productspassrate').cast(DecimalType(10,2))) \
                .withColumn('seniorityasmonths',col('seniorityasmonths').cast(DecimalType(10,2))) \
                .withColumn('seniorityasyears',col('seniorityasyears').cast(DecimalType(10,2))) \
                              

In [0]:
userDF=userDF.withColumn('dayssincelastlogin',when(col('dayssincelastlogin').isNotNull(),col('dayssincelastlogin')\
        .cast(IntegerType())).otherwise(0))

In [0]:
userDF.write.format('delta').mode('overwrite').save('dbfs:/mnt/delta/tables/silver/users/')

In [0]:
buyerDF=spark.read.format('delta').load('dbfs:/mnt/delta/tables/bronze/buyers/')

In [0]:
integer_column =[
    'buyers','topbuyers','femalebuyers','malebuyers','topfemalebuyers','topmalebuyers','totalproductsbought','totalproductswished','totalproductsliked','toptotalproductsbought','toptotalproductswished','toptotalproductsliked',
]
for col_name in integer_column:
    buyersDF=buyerDF.withColumn(col_name,col(col_name).cast(IntegerType()))

In [0]:
decimal_columns=[
    'topbuyerratio','femalebuyersratio','topfemalebuyersratio','boughtperwishlistratio','boughtperlikeratio','topboughtperwishlistratio','topboughtperlikeratio','meanproductsbought','meanproductswished','meanproductsliked','topmeanproductsbought','topmeanproductswished','topmeanproductsliked','meanofflinedays','topmeanofflinedays','meanfollowers','meanfollowing','topmeanfollowers','topmeanfollowing'
]
for col_name in decimal_columns:
    buyerDF=buyerDF.withColumn(col_name,col(col_name).cast(DecimalType(10,2)))

In [0]:
buyerDF=buyerDF.withColumn('country',initcap(col('country')))
buyerDF=buyerDF.withColumn('female_to_male_ratio',round(col('femalebuyers')/(col('malebuyers')+1),2))
buyersDF = buyersDF.withColumn("wishlist_to_purchase_ratio", 
                               round(col("totalproductswished") / (col("totalproductsbought") + 1), 2))


high_engagement_threshold = 0.5
buyersDF = buyersDF.withColumn("high_engagement",
                               when(col("boughtperwishlistratio") > high_engagement_threshold, True)
                               .otherwise(False))
                               
buyersDF = buyersDF.withColumn("growing_female_market",
                               when(col("femalebuyersratio") > col("topfemalebuyersratio"), True)
                               .otherwise(False))

In [0]:
buyerDF.write.format('delta').mode('overwrite').save('dbfs:/mnt/delta/tables/silver/buyers/')

In [0]:
sellerDF = spark.read.format("delta").load("/mnt/delta/tables/bronze/sellers")

In [0]:
sellerDF = sellerDF \
    .withColumn("nbsellers", col("nbsellers").cast(IntegerType())) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meansellerpassrate", col("meansellerpassrate").cast(DecimalType(10, 2))) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("meanproductsbought", col("meanproductsbought").cast(DecimalType(10, 2))) \
    .withColumn("meanproductswished", col("meanproductswished").cast(DecimalType(10, 2))) \
    .withColumn("meanproductsliked", col("meanproductsliked").cast(DecimalType(10, 2))) \
    .withColumn("totalbought", col("totalbought").cast(IntegerType())) \
    .withColumn("totalwished", col("totalwished").cast(IntegerType())) \
    .withColumn("totalproductsliked", col("totalproductsliked").cast(IntegerType())) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollows", col("meanfollows").cast(DecimalType(10, 2))) \
    .withColumn("percentofappusers", col("percentofappusers").cast(DecimalType(10, 2))) \
    .withColumn("percentofiosusers", col("percentofiosusers").cast(DecimalType(10, 2))) \
    .withColumn("meanseniority", col("meanseniority").cast(DecimalType(10, 2)))

In [0]:
sellerDF.write.format('delta').mode('overwrite').save('dbfs:/mnt/delta/tables/silver/sellers/')

In [0]:
countriesDF=spark.read.format('delta').load('dbfs:/mnt/delta/tables/bronze/countries')

In [0]:
countriesDF = countriesDF \
    .withColumn("sellers", col("sellers").cast(IntegerType())) \
    .withColumn("topsellers", col("topsellers").cast(IntegerType())) \
    .withColumn("topsellerratio", col("topsellerratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellersratio", col("femalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("topfemalesellersratio", col("topfemalesellersratio").cast(DecimalType(10, 2))) \
    .withColumn("femalesellers", col("femalesellers").cast(IntegerType())) \
    .withColumn("malesellers", col("malesellers").cast(IntegerType())) \
    .withColumn("topfemalesellers", col("topfemalesellers").cast(IntegerType())) \
    .withColumn("topmalesellers", col("topmalesellers").cast(IntegerType())) \
    .withColumn("countrysoldratio", col("countrysoldratio").cast(DecimalType(10, 2))) \
    .withColumn("bestsoldratio", col("bestsoldratio").cast(DecimalType(10, 2))) \
    .withColumn("toptotalproductssold", col("toptotalproductssold").cast(IntegerType())) \
    .withColumn("totalproductssold", col("totalproductssold").cast(IntegerType())) \
    .withColumn("toptotalproductslisted", col("toptotalproductslisted").cast(IntegerType())) \
    .withColumn("totalproductslisted", col("totalproductslisted").cast(IntegerType())) \
    .withColumn("topmeanproductssold", col("topmeanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("topmeanproductslisted", col("topmeanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanproductssold", col("meanproductssold").cast(DecimalType(10, 2))) \
    .withColumn("meanproductslisted", col("meanproductslisted").cast(DecimalType(10, 2))) \
    .withColumn("meanofflinedays", col("meanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("topmeanofflinedays", col("topmeanofflinedays").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowers", col("meanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("meanfollowing", col("meanfollowing").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowers", col("topmeanfollowers").cast(DecimalType(10, 2))) \
    .withColumn("topmeanfollowing", col("topmeanfollowing").cast(DecimalType(10, 2)))

countriesDF = countriesDF.withColumn("country", initcap(col("country")))


# Calculating the ratio of top sellers to total sellers
countriesDF = countriesDF.withColumn("top_seller_ratio", 
                                        round(col("topsellers") / col("sellers"), 2))

# countriesDF countries with a high ratio of female sellers
countriesDF = countriesDF.withColumn("high_female_seller_ratio", 
                                        when(col("femalesellersratio") > 0.5, True).otherwise(False))

# Adding a performance indicator based on the sold/listed ratio
countriesDF = countriesDF.withColumn("performance_indicator", 
                                        round(col("toptotalproductssold") / (col("toptotalproductslisted") + 1), 2))

# Flag countries with exceptionally high performance
performance_threshold = 0.8
countriesDF = countriesDF.withColumn("high_performance", 
                                        when(col("performance_indicator") > performance_threshold, True).otherwise(False))

countriesDF = countriesDF.withColumn("activity_level",
                                       when(col("meanofflinedays") < 30, "Highly Active")
                                       .when((col("meanofflinedays") >= 30) & (col("meanofflinedays") < 60), "Moderately Active")
                                       .otherwise("Low Activity"))


countriesDF.write.format("delta").mode("overwrite").save("/mnt/delta/tables/silver/countries")