In [0]:
df_c = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfs://bronze@storagekevinav.dfs.core.windows.net/sales_view/customer/")
display(df_c)

In [0]:
import re
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType


spark = SparkSession.builder.appName("CamelToSnake_Example").getOrCreate()


def camel_to_snake(name: str) -> str:
    
    s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

def convert_columns_to_snakecase(df: DataFrame) -> DataFrame:
    new_cols = [camel_to_snake(c) for c in df_c.columns]
    return df_c.toDF(*new_cols)

camel_to_snake_udf = udf(lambda x: camel_to_snake(x) if x is not None else None, StringType())
spark.udf.register("camel_to_snake_udf", camel_to_snake_udf)
df = spark.read.option("header", "true").csv(
    "abfss://bronze@storagekevinav.dfs.core.windows.net/sales_view/customer/"
)
for c in df.columns: df = df_c.withColumn(c, camel_to_snake_udf(col(c)))

print(" Columns after rename:", df_c.columns)
display(df_c)


In [0]:
from pyspark.sql.functions import col,split
df_split2 = df_c.withColumn("first_name", split(col("Name"), " ").getItem(0)) \
             .withColumn("last_name", split(col("Name"), " ").getItem(1))
display(df_split2)




In [0]:
from pyspark.sql.functions import col, split
df_with_domain = df_split2.withColumn(
    "domain",
    split(split(col("Email Id"), "@").getItem(1), "\.").getItem(0)
    
)
display(df_with_domain)

In [0]:
from pyspark.sql.functions import col, when, lower, trim

df4 = df_with_domain.withColumn(
    "gender",
    when(lower(trim(col("gender"))) == "female", "F")
    .when(lower(trim(col("gender"))) == "male", "M")
    .otherwise(col("gender"))
)

display(df4)


In [0]:
from pyspark.sql.functions import col, to_timestamp, to_date, split
 
df_dateAndTime = (
   df4
        # STEP 1: Convert entire string to a proper timestamp
        .withColumn("ts", to_timestamp(col("Joining Date"), "dd-MM-yyyy HH:mm"))
       
        # STEP 2: Extract date in yyyy-MM-dd format
        .withColumn("date", to_date(col("ts")))
       
        # STEP 3: Extract time as HH:mm:ss
        .withColumn("time", split(col("joining Date"), " ").getItem(1))
 
        # drop extra column
        .drop("ts")
)
 
display(df_dateAndTime)
 

In [0]:
#Derive “expenditure_status” based on “spent” column
df_expenditureStatus = df4.withColumn(
    "expenditure_status",
    when(col("spent") < 500, "Minimum").otherwise("Maximum")
)
display(df_expenditureStatus)
 

In [0]:
def clean_column_names(df):
    new_cols = []
    for c in df_expenditureStatus.columns:
        # Remove invalid chars:  ,;{}()\n\t=
        clean = re.sub(r"[ ,;{}()\n\t=]", "_", c)
        # Replace multiple underscores with one
        clean = re.sub("_+", "_", clean)
        # Convert to lowercase
        clean = clean.lower()
        new_cols.append(clean)
    return df_expenditureStatus.toDF(*new_cols)
 
df_clean = clean_column_names(df_expenditureStatus)
display(df_clean)

In [0]:
df_unique_c = df_clean.dropDuplicates(["customer_id"])

In [0]:

table_name = "silver_data.sales_view.customer"
 
from delta.tables import DeltaTable
 
if spark.catalog.tableExists(table_name):
    delta_table = DeltaTable.forName(spark, table_name)
   
    (delta_table.alias("target")
     .merge(
         df_unique_c.alias("source"),
         "target.customer_id = source.customer_id"
     )
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute()
    )
 
else:
    (df_unique_c.write.format("delta")
      .mode("overwrite")
      .saveAsTable(table_name))
 
    print(" Product data successfully upserted into Silver layer Catalog.")
 


In [0]:
spark.read.format("delta").load(silver_path).display()

Product Transformation

In [0]:
df=spark.read.csv("abfss://bronze@storagekevinav.dfs.core.windows.net/sales_view/product/",inferSchema=True,header=True)
display(df)
             

In [0]:
import re
 
def rename_columns_to_snake_case(df):
    def camel_to_snake(colname):
        s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', colname)
        colname = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1)
        return colname.lower().replace(" ", "_")
   
    new_cols = [camel_to_snake(c) for c in df.columns]
    return df.toDF(*new_cols)
 
df_snake_p = rename_columns_to_snake_case(df)
df_snake_p.display()

In [0]:
from pyspark.sql.functions import when, col
 
df_create = df_snake_p.withColumn(
    "sub_category",
    when(col("category_id") == 1, "phone")
    .when(col("category_id") == 2, "laptop")
    .when(col("category_id") == 3, "playstation")
    .when(col("category_id") == 4, "e-device")
    .otherwise("unknown")
)
display(df_create)

In [0]:
df_dup = df_create.dropDuplicates(["product_id"])

In [0]:


 
#spark.sql("DESCRIBE TABLE assignment_adf.sales_view.product").show(truncate=False)

#spark.sql("drop table if exists assignment_adf.sales_view.product")

In [0]:

from delta.tables import DeltaTable
 
table_name = "silver_data.sales_view.product"
 
if spark.catalog.tableExists(table_name):
    delta_table = DeltaTable.forName(spark, table_name)
   
    (delta_table.alias("target")
     .merge(
         df_dup.alias("source"),
         "target.product_id = source.product_id"
     )
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute()
    )
 
else:
    (df_dup.write.format("delta")
      .mode("overwrite")
      .saveAsTable(table_name))
 
    print(" Product data successfully upserted into Silver layer Catalog.")

In [0]:
df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("abfs://bronze@storagekevinav.dfs.core.windows.net/sales_view/stote/")
display(df)

In [0]:
def rename_columns_to_snake_case(df):
    def camel_to_snake(colname):
        s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', colname)
        colname = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1)
        return colname.lower().replace(" ", "_")
   
    new_cols = [camel_to_snake(c) for c in df.columns]
    return df.toDF(*new_cols)
 
df_store = rename_columns_to_snake_case(df)
display(df_store)
 

In [0]:
from pyspark.sql.functions import regexp_extract, col

df_2 = df_store.withColumn(
    "store_category",
    regexp_extract(col("email_address"), "@(.*?)\\.", 1)
)
display(df_2)

In [0]:
from pyspark.sql.functions import regexp_extract, col, date_format, current_date
df_store_final = (
    df_2
    .withColumn("created_at", date_format(current_date(), "yyyy-MM-dd"))
    .withColumn("updated_at", date_format(current_date(), "yyyy-MM-dd"))
)

display(df_store_final)

In [0]:
df_dup = df_store_final.dropDuplicates(["store_id"])

In [0]:
from delta.tables import DeltaTable
 

table_name  = "silver_data.sales_view.store"
 
if spark.catalog.tableExists(table_name):

    delta_table = DeltaTable.forName(spark, table_name)
 
    (

        delta_table.alias("target")

            .merge(

                df_dup.alias("source"),

                "target.store_id = source.store_id"

            )

            .whenMatchedUpdateAll()

            .whenNotMatchedInsertAll()

            .execute()

    )

else:


    df_dup.write.format("delta").mode("overwrite").saveAsTable(table_name)
 

Sales Table

In [0]:
dfsales=spark.read.csv("abfss://bronze@storagekevinav.dfs.core.windows.net/sales_view/sales",header=True,inferSchema=True).display()

In [0]:
import re
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("CleanColumns").getOrCreate()


dfsales = spark.read.option("header", True).csv("abfss://bronze@storagekevinav.dfs.core.windows.net/sales_view/sales", inferSchema=True)


def rename_columns_to_snake_case(dfsales):
    def camel_to_snake(colname):
        s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', colname)
        colname = re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1)
        return colname.lower().replace(" ", "_")
    return dfsales.toDF(*[camel_to_snake(c) for c in dfsales.columns])

def clean_column_names(dfsales):
    new_cols = []
    for c in dfsales.columns:
        clean = re.sub(r"[ ,;{}()\n\t=]", "_", c)
        clean = re.sub("_+", "_", clean)
        new_cols.append(clean.lower())
    return dfsales.toDF(*new_cols)

# Step 3: Apply transformations
df_sales = clean_column_names(rename_columns_to_snake_case(dfsales))

# Step 4: Display
display(df_sales)


In [0]:
#Convert all date columns → yyyy-MM-dd
 
#First detect date columns dynamically
date_columns = [c for c in df_sales.columns if "date" in c]
print("Date columns:", date_columns)
 
#Apply on that columns
from pyspark.sql.functions import col, to_timestamp, to_date
 
for dc in date_columns:
    df_datechange = (
        df_sales
        .withColumn(dc + "_ts", to_timestamp(col(dc), "dd-MM-yyyy HH:mm"))
        .withColumn(dc + "_date", to_date(col(dc + "_ts")))
        .drop(dc)        
        .drop(dc + "_ts")  
        .withColumnRenamed(dc + "_date", dc)
    )

In [0]:
df_redup = df_datechange.dropDuplicates(["order_id", "product_id", "customer_id"])

In [0]:
from delta.tables import DeltaTable
 
table_name = "silver_data.sales_view.sales"
 
if spark.catalog.tableExists(table_name):
    delta_table = DeltaTable.forName(spark, table_name)
 
    (
        delta_table.alias("target")
        .merge(
            df_redup.alias("source"),
            "target.order_id = source.order_id AND "
            "target.product_id = source.product_id AND "
            "target.customer_id = source.customer_id"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )
else:
    df_redup.write.format("delta").mode("overwrite").saveAsTable(table_name)
 
print("Sales data successfully upserted into Silver layer Catalog.")

In [0]:
from delta.tables import DeltaTable
 table_name = "assignment_adf.sales_view.sales"
spark.sql("CREATE TABLE IF NOT EXISTS sales USING delta LOCATION 'adfss://gold-transformation@storagekevinav.dfs.core.windows.net/sales'")

In [0]:
df = spark.table("silver_data.sales_view.customer")
df.write.mode("overwrite").format("delta").save("abfss://silver-transformation@storagekevinav.dfs.core.windows.net/customer")
DROP TABLE silver_data.sales_view.customer;
CREATE TABLE silver_data.sales_view.customer
USING DELTA
LOCATION 'abfss://silver-transformation@storagekevinav.dfs.core.windows.net/customer';

 


In [0]:

df = spark.table("silver_data.sales_view.product")
df.write.mode("overwrite").format("delta").save("abfss://silver-transformation@storagekevinav.dfs.core.windows.net/product")


In [0]:
%sql DROP TABLE silver_data.sales_view.product;
CREATE TABLE silver_data.sales_view.product
USING DELTA
LOCATION 'abfss://silver-transformation@storagekevinav.dfs.core.windows.net/product';



In [0]:
df = spark.table("silver_data.sales_view.store")
df.write.mode("overwrite").format("delta").save("abfss://silver-transformation@storagekevinav.dfs.core.windows.net/store")

In [0]:
%sql DROP TABLE silver_data.sales_view.store;
CREATE TABLE silver_data.sales_view.store
USING DELTA
LOCATION 'abfss://silver-transformation@storagekevinav.dfs.core.windows.net/store';

In [0]:
df = spark.table("silver_data.sales_view.sales")
df.write.mode("overwrite").format("delta").save("abfss://silver-transformation@storagekevinav.dfs.core.windows.net/sales")