In [0]:
%sql
CREATE DATABASE IF NOT EXISTS sales_view;
USE sales_view;


In [0]:
    
spark.conf.set(
  "fs.azure.account.key.assignmentstoreacc.dfs.core.windows.net",
  "<SA-Key>"
)


# CUSTOMER → SILVER

In [0]:
bronze_customer_path = "abfss://bronze@assignmentstoreacc.dfs.core.windows.net/sales_view/customer"

df = spark.read.option("header", True).csv(bronze_customer_path)
display(df)



In [0]:
import re

def to_snake(col_name):
    col_name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', col_name)
    col_name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', col_name)
    return col_name.lower()

df = df.toDF(*[to_snake(c) for c in df.columns])
display(df)


In [0]:
from pyspark.sql.functions import split, col

df = df.withColumn("first_name", split(col("name"), " ")[0]) \
       .withColumn("last_name", split(col("name"), " ")[1])
display(df)

In [0]:
df = df.withColumn("domain", split(col("email _id"), "@")[1])
df = df.withColumn("domain", split(col("domain"), "\\.")[0])
display(df)

In [0]:
from pyspark.sql.functions import when

df = df.withColumn(
    "gender",
    when(col("gender") == "male", "M")
    .when(col("gender") == "female", "F")
)
display(df)

In [0]:
df = df.withColumn("joining _date", split(col("joining _date"), " ")[0]) \
       .withColumn("joining_time", split(col("joining _date"), " ")[1])


In [0]:
from pyspark.sql.functions import to_date

df = df.withColumn("joining _date", to_date(col("joining _date"), "yyyy-MM-dd"))


In [0]:
df.select("spent").distinct().show(10, False)
df.printSchema()


### IMPORTANT RULE
If Spark keeps throwing an error that doesn’t match your current line, the DataFrame lineage is broken → re-read source.

In [0]:
bronze_customer_path = "abfss://bronze@assignmentstoreacc.dfs.core.windows.net/sales_view/customer"

df_raw = spark.read.option("header", True).csv(bronze_customer_path)
df_raw.printSchema()
display(df_raw)


In [0]:
import re

def to_snake(col_name):
    col_name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', col_name)
    col_name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', col_name)
    return col_name.lower()

df = df_raw.toDF(*[to_snake(c) for c in df_raw.columns])


In [0]:
from pyspark.sql.functions import col, regexp_extract

df = df.withColumn(
    "spent_clean",
    regexp_extract(col("spent"), r"([0-9]+(\.[0-9]+)?)", 1).cast("double")
)


In [0]:
from pyspark.sql.functions import when

df = df.withColumn(
    "expenditure_status",
    when(col("spent_clean") < 200, "MINIMUM")
    .otherwise("MAXIMUM")
)

display(df.select("spent", "spent_clean", "expenditure_status"))


In [0]:
def normalize_columns(cols):
    return [
        c.strip()                
         .replace(" ", "_")       
         .replace("__", "_")    
         .lower()
        for c in cols
    ]

df = df.toDF(*normalize_columns(df.columns))


In [0]:
df.columns


In [0]:
silver_customer_path = "abfss://silver@assignmentstoreacc.dfs.core.windows.net/sales_view/customer"
df.write.format("delta").mode("overwrite").save(silver_customer_path)


In [0]:
spark.read.format("delta") \
  .load("abfss://silver@assignmentstoreacc.dfs.core.windows.net/sales_view/customer") \
  .display()


# PRODUCT → SILVER

In [0]:
bronze_product_path = "abfss://bronze@assignmentstoreacc.dfs.core.windows.net/sales_view/product"

df_raw = spark.read.option("header", True).csv(bronze_product_path)
display(df_raw)


In [0]:
import re

def to_snake(col_name):
    col_name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', col_name)
    col_name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', col_name)
    return col_name.strip().replace(" ", "_").lower()

df = df_raw.toDF(*[to_snake(c) for c in df_raw.columns])

display(df)


In [0]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "sub_category",
    when(col("category_id") == 1, "phone")
    .when(col("category_id") == 2, "laptop")
    .when(col("category_id") == 3, "playstation")
    .when(col("category_id") == 4, "e-device")
)

display(df)

In [0]:
silver_product_path = "abfss://silver@assignmentstoreacc.dfs.core.windows.net/sales_view/product"

df.write.format("delta").mode("overwrite").save(silver_product_path)


In [0]:
spark.read.format("delta").load(silver_product_path).display()


# STORE → SILVER


In [0]:
bronze_store_path = "abfss://bronze@assignmentstoreacc.dfs.core.windows.net/sales_view/store"

df_raw = spark.read.option("header", True).csv(bronze_store_path)
display(df_raw)


In [0]:
import re

def to_snake(col_name):
    col_name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', col_name)
    col_name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', col_name)
    return col_name.strip().replace(" ", "_").lower()

df = df_raw.toDF(*[to_snake(c) for c in df_raw.columns])

display(df)

In [0]:
from pyspark.sql.functions import split, col

df = df.withColumn("store_category", split(col("email_address"), "@")[1])
df = df.withColumn("store_category", split(col("store_category"), "\\.")[0])

display(df)

In [0]:
from pyspark.sql.functions import to_date

df = df.withColumn("created_at", to_date(col("created_at"))) \
       .withColumn("updated_at", to_date(col("updated_at")))


In [0]:
bronze_store_path = "abfss://bronze@assignmentstoreacc.dfs.core.windows.net/sales_view/store"

df_raw = spark.read.option("header", True).csv(bronze_store_path)
df_raw.printSchema()
display(df_raw)


In [0]:
import re

def to_snake(col_name):
    col_name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', col_name)
    col_name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', col_name)
    return col_name.strip().replace(" ", "_").lower()

df = df_raw.toDF(*[to_snake(c) for c in df_raw.columns])


In [0]:
from pyspark.sql.functions import to_date, col

df = df.withColumn(
    "created_at",
    to_date(col("created_at"), "dd-MM-yyyy")
).withColumn(
    "updated_at",
    to_date(col("updated_at"), "dd-MM-yyyy")
)


In [0]:
df.select("created_at", "updated_at").display()


In [0]:
silver_store_path = "abfss://silver@assignmentstoreacc.dfs.core.windows.net/sales_view/store"

df.write.format("delta").mode("overwrite").save(silver_store_path)


In [0]:
spark.read.format("delta").load(silver_store_path).display()


# SALES → SILVER

In [0]:
bronze_sales_path = "abfss://bronze@assignmentstoreacc.dfs.core.windows.net/sales_view/sales"

df_raw = spark.read.option("header", True).csv(bronze_sales_path)
display(df_raw)


In [0]:
import re

def to_snake(col_name):
    col_name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', col_name)
    col_name = re.sub('([a-z0-9])([A-Z])', r'\1_\2', col_name)
    return col_name.strip().replace(" ", "_").lower()

df = df_raw.toDF(*[to_snake(c) for c in df_raw.columns])
display(df)

In [0]:
silver_sales_path = "abfss://silver@assignmentstoreacc.dfs.core.windows.net/sales_view/customer_sales"

df.write.format("delta").mode("overwrite").save(silver_sales_path)


In [0]:
spark.read.format("delta").load(silver_sales_path).display()
