In [0]:
bronze_base_path = "abfss://bronze@adfassignment23.dfs.core.windows.net/sales-view"
silver_base_path = "abfss://silver@adfassignment23.dfs.core.windows.net/sales-view"


In [0]:
%run /Workspace/Users/sce21cs010@sairamtap.edu.in/adf_assignment/src/bronze_to_silver/utils

In [0]:
from pyspark.sql.functions import col, split, when, to_date
import re

# Step 1: Read CSV from Bronze Layer
df_customer = spark.read.option("header", True).format("csv").load(f"{bronze_base_path}/customer/")
display(df_customer)

# Step 2: Clean and Rename Columns
df_customer = clean_and_snake_case_columns(df_customer)
df_customer.printSchema()
display(df_customer)

# Step 3: Extract first_name and last_name from 'name'
df_customer = df_customer.withColumn("first_name", split(col("name"), " ").getItem(0)) \
                         .withColumn("last_name", split(col("name"), " ").getItem(1))

# Step 4: Extract domain from 'email_id'
df_customer = df_customer.withColumn("domain", split(col("email_id"), "@").getItem(1)) \
                         .withColumn("domain", split(col("domain"), r"\.").getItem(0))

# Step 5: Convert gender values to M/F/O
df_customer = df_customer.withColumn("gender", when(col("gender") == "male", "M")
                                               .when(col("gender") == "female", "F")
                                               .otherwise("O"))

# Step 6: Split joining_date into date and time
df_customer = df_customer.withColumn("joining_date_split", split(col("joining_date"), " ")) \
                         .withColumn("date", to_date(col("joining_date_split").getItem(0), "MM-dd-yyyy")) \
                         .withColumn("time", col("joining_date_split").getItem(1)) \
                         .drop("joining_date_split")

# Step 7: Create expenditure_status based on 'spent'
df_customer = df_customer.withColumn("spent_numeric", col("spent").cast("double")) \
                         .withColumn("expenditure_status", 
                                     when(col("spent_numeric") < 200, "MINIMUM")
                                     .otherwise("MAXIMUM")) \
                         .drop("spent_numeric")

display(df_customer)

# Step 8: Write to Silver Layer as Delta Table
df_customer.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", True) \
    .save(f"{silver_base_path}/customer")


In [0]:
from pyspark.sql.functions import col, when

# Read CSV from Bronze layer
df_product = spark.read.option("header", True).csv(f"{bronze_base_path}/product")
display(df_product)

# Clean and convert column names to snake_case
df_product = clean_and_snake_case_columns(df_product)

# Add sub_category column based on category_id
df_product = df_product.withColumn("sub_category", 
    when(col("category_id") == 1, "phone")
   .when(col("category_id") == 2, "laptop")
   .when(col("category_id") == 3, "playstation")
   .when(col("category_id") == 4, "e-device")
)

display(df_product)

# Write to Silver layer as Delta table
df_product.write.format("delta").mode("overwrite").option("overwriteSchema", True) \
    .save(f"{silver_base_path}/product")


In [0]:
from pyspark.sql.functions import col, split, regexp_extract, to_date, when

# Step 1: Reading CSV from Bronze
df_store = spark.read.option("header", True).csv(f"{bronze_base_path}/store")
df_store.display()

# Step 2: Cleaning and renaming columns to snake_case
df_store = clean_and_snake_case_columns(df_store)

# Step 3: Extracting store_category from email domain
df_store = df_store.withColumn(
    "store_category",
    split(split(col("email_address"), "@").getItem(1), r"\.").getItem(0)
)

# Step 4: Defining regex pattern for 'dd-MM-yyyy'
date_pattern = r"^\d{2}-\d{2}-\d{4}$"

# Step 5: Converting `created_at` only if it matches the date pattern
df_store = df_store.withColumn(
    "created_at",
    when(
        regexp_extract(col("created_at"), date_pattern, 0) != "",
        to_date(col("created_at"), "dd-MM-yyyy")
    ).otherwise(None)
)

# Step 6: Converting `updated_at` only if it matches the date pattern
df_store = df_store.withColumn(
    "updated_at",
    when(
        regexp_extract(col("updated_at"), date_pattern, 0) != "",
        to_date(col("updated_at"), "dd-MM-yyyy")
    ).otherwise(None)
)

df_store.display()

# Step 7: Write to Silver Layer as Delta table
df_store.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(f"{silver_base_path}/store")


In [0]:
from pyspark.sql.functions import col, to_timestamp

# Step 1: Read CSV from Bronze Layer
df_sales = spark.read.option("header", True).csv(f"{bronze_base_path}/sales")
display(df_sales) 

# Step 2: Clean and convert column names to snake_case
df_sales = clean_and_snake_case_columns(df_sales)
display(df_sales)

# Step 3: Format timestamp columns
df_sales = df_sales.withColumn(
    "order_date", to_timestamp(col("order_date"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
).withColumn(
    "ship_date", to_timestamp(col("ship_date"), "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
)

# Step 4: Write to Silver Layer in Delta format
df_sales.write.format("delta") \
    .mode("overwrite").option("overwriteSchema", "true") \
    .save(f"{silver_base_path}/customer_sales")


In [0]:
%sh
ssh-keygen -t rsa -b 4096 -C "shreyaagarwal.g@diggibyte.com" -f ~/.ssh/id_rsa -N ""
cat ~/.ssh/id_rsa.pub


In [0]:
%pip
!pip install databricks-cli



In [0]:
%restart_python


In [0]:
!databricks configure --token


In [0]:
!databricks secrets create-scope --scope git-ssh
