### Incremental Logic and Transformation

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
spark = SparkSession.builder.appName("Ecomdatapipeline").getOrCreate()

In [0]:
#defining source and target paths
users_bronze = "/mnt/delta/tables/ecom2/bronze/users"  #source
users_silver = "/mnt/delta/tables/ecom2/silver/users"  #target

In [0]:
#Defining Source Dataframe
user_df = spark.read.format("delta").load(users_bronze)    #source delta table

In [0]:
#Transformation

#Normalize Country codes to uppercase

user_df = user_df.withColumn("countryCode",upper(col("countryCode")))


user_df = user_df.withColumn("language_full",expr("CASE WHEN language = 'en' then 'English'" +
                    "WHEN language = 'fr' then 'French'" + 
                    "ELSE 'Others' end"))

user_df = user_df.withColumn("gender",when(col("gender").startswith("M"),"Male")
                        .when(col("gender").startswith("F"),"Female")
                        .otherwise("Other"))

# Using regexp_replace to clean civilitytile values


user_df = user_df.withColumn("civilityTitle_clean",
                             regexp_replace(col("civilityTitle"),"(miss|mrs)","Ms"))


# Derive new column 

user_df = user_df.withColumn("yearssincelastlogin",col("daysSinceLastLogin")/365)


# Calculate age of account in years and categorize into 'account_age_group'


user_df = user_df.withColumn("account_age_years",round(col("seniority")/365,2))\
                 .withColumn("account_age_group",
                             when(col("account_age_years")<1,"New")
                             .when((col("account_age_years")>=1) & (col("account_age_years")< 3),"Intermediate")
                             .otherwise("Experienced"))
                 
# Add a column with the current year for comparison

user_df = user_df.withColumn("current_year",year(current_date()))


#Combine strings to form a unique user descriptor


user_df = user_df.withColumn("user_descriptor",
                            concat(col("gender"),lit("_"),
                                   col("countryCode"),lit("_"),
                                   expr("substring(civilityTitle_clean,1,3)"),lit("_"),
                                   col("language_full")))


user_df = user_df.withColumn("flag_long_title",length(col("civilityTitle"))>10)


#casting data types

user_df = user_df.withColumn("hasAnyApp",col("hasAnyApp").cast("boolean"))
user_df = user_df.withColumn("hasAndroidApp",col("hasAndroidApp").cast("boolean"))
user_df = user_df.withColumn("hasIosApp",col("hasIosApp").cast("boolean"))
user_df = user_df.withColumn("hasProfilePicture",col("hasProfilePicture").cast("boolean"))



user_df = user_df.withColumn("socialNbFollowers",col("socialNbFollowers").cast(IntegerType()))
user_df = user_df.withColumn("socialNbFollows",col("socialNbFollows").cast(IntegerType()))


user_df = user_df.withColumn("productsPassRate",col("productsPassRate").cast(DecimalType(10,2)))
user_df = user_df.withColumn("seniorityAsMonths",col("seniorityAsMonths").cast(DecimalType(10,2)))
user_df = user_df.withColumn("seniorityAsYears",col("seniorityAsYears").cast(DecimalType(10,2)))


user_df = user_df.withColumn("daysSinceLastLogin",
                             when(col("daysSinceLastLogin").isNotNull(),
                                  col("daysSinceLastLogin").cast(IntegerType()))
                             .otherwise(0))


user_df = user_df.drop_duplicates(['identifierHash'])

In [0]:
#Only for first run
##user_df.write.format("delta").mode("overwrite").save("/mnt/delta/tables/ecom2/silver/users")

In [0]:
#Defining instances and dataframes for incremental loading of target table
user_silver_df = spark.read.format("delta").load(users_silver)  #target delta table
target_table = DeltaTable.forPath(spark, users_silver)   #target delta table instance for merge operations

In [0]:
target_table.alias("target").merge(
  user_df.alias("source"),
  "target.identifierHash = source.identifierHash") \
.whenMatchedUpdate(
  condition = "target.daysSinceLastLogin <> source.daysSinceLastLogin",
  set = {                                      # Set current to false and endDate to source's effective date.
    "daysSinceLastLogin": "source.daysSinceLastLogin"
  }     
).whenNotMatchedInsertAll(
).execute()