### **SILVER LAYER**

In [0]:
from pyspark.sql.functions import *

silver_df = spark.read.table("retail_databricks_ws.bronze.raw_bronze_tbl")
display(silver_df)

### **STRING AND DATE STANDARDIZATION**

In [0]:
silver_tranformed_df = silver_df\
                    .withColumn("CUSTOMERNAME", initcap(trim(col("CUSTOMERNAME"))))\
                    .withColumn("CONTACTLASTNAME", initcap(trim(col("CONTACTLASTNAME"))))\
                    .withColumn("CONTACTFIRSTNAME", initcap(trim(col("CONTACTFIRSTNAME"))))\
                    .withColumn("CITY", initcap(trim(col("CITY"))))\
                    .withColumn("COUNTRY", upper(trim(col("COUNTRY"))))\
                    .withColumn("STATE", upper(trim(col("STATE"))))\
                    .withColumn("TERRITORY", upper(trim(col("TERRITORY"))))\
                    .withColumn("STATUS",initcap(trim(col("STATUS"))))\
                    .withColumn("PRODUCTLINE", initcap(trim(col("PRODUCTLINE"))))\
                    .withColumn("DEALSIZE", initcap(trim(col("DEALSIZE"))))\
                    .withColumn("ADDRESSLINE1", initcap(trim(col("ADDRESSLINE1"))))\
                    .withColumn("ADDRESSLINE2", initcap(trim(col("ADDRESSLINE2"))))\
                    .withColumn("PHONE", trim(col("PHONE")))\
                    .withColumn("POSTALCODE",trim("POSTALCODE"))\
                    .withColumn("ORDERDATE",trim(col("ORDERDATE")))
display(silver_tranformed_df)

In [0]:
#Converting Timestamp Column to Right Date Format
silver_tranformed_df = silver_tranformed_df.withColumn("ORDERDATE", to_date(to_timestamp(col("ORDERDATE"),"M/d/yyyy H:mm")))
display(silver_tranformed_df)

### **VALIDATION OF PHONE NUMBERS BASED ON COUNTRY**

In [0]:
import pyspark.sql.functions as F

#Cleansing the Phone Number
silver_tranformed_df = silver_tranformed_df.withColumn(
    "PHONE",
    F.regexp_replace(col("PHONE"), "[^0-9+]", "")
)
display(silver_tranformed_df)

In [0]:
from pyspark.sql import functions as F

#Creating Country ISO Code
country_iso_map = {
    "USA": "US",
    "FRANCE": "FR",
    "NORWAY": "NO",
    "SWEDEN": "SE",
    "SPAIN": "ES",
    "AUSTRALIA": "AU",
    "CANADA": "CA",
    "UK": "GB",           
    "SINGAPORE": "SG",
    "JAPAN": "JP",
    "ITALY": "IT",
    "FINLAND": "FI",
    "AUSTRIA": "AT",
    "DENMARK": "DK",
    "GERMANY": "DE",
    "BELGIUM": "BE",
    "PHILIPPINES": "PH",
    "IRELAND": "IE",
    "SWITZERLAND": "CH"
}

#Convert the mapping to Spark Expression
pairs = [[keys,values] for keys,values in country_iso_map.items()]
flat_list = [item for sublist in pairs for item in sublist]
#print(flat_list)

literal_list = [F.lit(x) for x in flat_list]
mapping_expr = F.create_map(*literal_list)
display(mapping_expr)



### UDF For Normalizing Phone Numbers

In [0]:
pip install phonenumbers

In [0]:
from pyspark.sql.types import StringType
import phonenumbers

def normalize_country_with_phone(phone,country):
    if phone is None or country is None:
        return None
    try:
        parsed = phonenumbers.parse(phone,country)
        if phonenumbers.is_valid_number(parsed):
            return phonenumbers.format_number(parsed,phonenumbers.PhoneNumberFormat.E164)
        else:
            return None
    except:
        return None

normalize_country_with_phone_udf = udf(normalize_country_with_phone, StringType())

In [0]:
#Mapping the country ISO Column
silver_tranformed_df = silver_tranformed_df.withColumn("COUNTRY_ISO", mapping_expr.getItem(col("COUNTRY")))
display(silver_tranformed_df)                                                                                            

In [0]:
#Applying UDF on the Phone Column
silver_cleaned_df = silver_tranformed_df.withColumn(
    "PHONE",
    normalize_country_with_phone_udf(F.col("PHONE"), F.col("COUNTRY_ISO"))
)
display(silver_cleaned_df)

### **NULL CHECKS AND VALIDATION**

In [0]:
Nulls_check = silver_cleaned_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in silver_cleaned_df.columns])
display(Nulls_check)

In [0]:
silver_cleaned_df = silver_cleaned_df.fillna(
    {
        "PHONE": "INVALID",
        "ADDRESSLINE2": "NA",
        "STATE": "UNSPECIFIED",
        "POSTALCODE": "UNSPECIFIED"
    }
)

display(silver_cleaned_df)


In [0]:
# Adding a new column "silver_processed_timestamp" with the current timestamp to track transformation processed time
silver_cleaned_df = silver_cleaned_df.withColumn("SILVER_PROCESSED_TIMESTAMP", F.current_timestamp())

#Removing unneccessary columns
silver_cleaned_df = silver_cleaned_df.drop("ORDERLINENUMBER")
display(silver_cleaned_df)

In [0]:
#Renaming the columns to lower case
silver_cleaned_df= silver_cleaned_df.toDF(*[c.lower() for c in silver_cleaned_df.columns])
display(silver_cleaned_df)

### **Deduplication using Window function**

In [0]:
from pyspark.sql.window import Window

window = Window.partitionBy("ordernumber","productcode").orderBy(desc("silver_processed_timestamp"))

silver_final_df = (silver_cleaned_df.withColumn("rn",row_number().over(window))\
                                   .filter(col("rn") == 1)\
                                   .drop("rn")
)
display(silver_final_df)

In [0]:
#Dropping the Timestamp columns
silver_final_df = silver_final_df.drop("bronze_ingest_timestamp")
display(silver_final_df)

In [0]:
#Writing to silver table
silver_final_df.write.format("delta")\
        .option("mergeSchema", "true")\
        .mode("overwrite")\
        .saveAsTable("retail_databricks_ws.silver.silver_cleaned_tbl")

In [0]:
%sql
SELECT * FROM retail_databricks_ws.silver.silver_cleaned_tbl