**Task 1: Ingest and Save**

Load the CSV into a PySpark DataFrame.

Save it in Parquet and Delta formats (in different folders).

Create and register Delta Table called sales_transaction

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, month, date_format, round as spark_round, max as spark_max, sum as spark_sum, to_date
from delta.tables import DeltaTable
from datetime import datetime

spark = SparkSession.builder.appName("SalesTransactionsETL").getOrCreate()

In [0]:
# Path of your uploaded CSV in DBFS
csv_path = "dbfs:/FileStore/tables/sales_transaction.csv"

# Read into DataFrame
df = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", True) \
    .load(csv_path)

display(df)


transaction_id,customer_name,region,product,category,quantity,unit_price,date
1,Rajesh,North,Laptop,Electronics,1,55000,2024-01-12
2,Sneha,West,Refrigerator,Electronics,1,32000,2024-02-05
3,Anil,South,Shampoo,Personal Care,5,150,2024-01-17
4,Divya,North,Mobile,Electronics,2,20000,2024-03-22
5,Vikram,East,Washing Machine,Electronics,1,28000,2024-02-28
6,Preeti,West,Sneakers,Fashion,2,4000,2024-01-31
7,Aman,South,TV,Electronics,1,45000,2024-02-15
8,Isha,North,Notebook,Stationery,10,60,2024-01-10
9,Kunal,East,Pencil,Stationery,20,10,2024-03-05
10,Tanvi,West,Face Cream,Personal Care,3,200,2024-03-19


In [0]:
#Save it in Parquet and Delta formats (in different folders
# Output paths
parquet_path = "dbfs:/mnt/data/sales_transactions_parquet"
delta_path = "dbfs:/mnt/data/sales_transactions_delta"

# Save as Parquet
df.write.mode("overwrite").parquet(parquet_path)

# Save as Delta
df.write.format("delta").mode("overwrite").save(delta_path)


In [0]:
# Switch to local Hive metastore (not Unity Catalog)
spark.sql("USE hive_metastore.default")

# Create table
spark.sql("DROP TABLE IF EXISTS sales_transactions")
spark.sql(f"""
    CREATE TABLE sales_transactions
    USING DELTA
    LOCATION '{delta_path}'
""")



DataFrame[]

**Task 2: Data Transformation**

Add a new column total_amount = quantity × unit_price .

Add another column month extracted from the date.

Format date as dd-MMM-yyyy and display.

Create a column is_high_value (TRUE if total_amount > 30,000 , else FALSE).

In [0]:
df = spark.read.format("delta").load(delta_path)

df = df.withColumn("total_amount", col("quantity") * col("unit_price")) \
       .withColumn("month", month(col("date"))) \
       .withColumn("date_formatted", date_format(col("date"), "dd-MMM-yyyy")) \
       .withColumn("is_high_value", when(col("total_amount") > 30000, True).otherwise(False))

display(df)

transaction_id,customer_name,region,product,category,quantity,unit_price,date,total_amount,month,date_formatted,is_high_value
1,Rajesh,North,Laptop,Electronics,1,55000,2024-01-12,55000,1,12-Jan-2024,True
2,Sneha,West,Refrigerator,Electronics,1,32000,2024-02-05,32000,2,05-Feb-2024,True
3,Anil,South,Shampoo,Personal Care,5,150,2024-01-17,750,1,17-Jan-2024,False
4,Divya,North,Mobile,Electronics,2,20000,2024-03-22,40000,3,22-Mar-2024,True
5,Vikram,East,Washing Machine,Electronics,1,28000,2024-02-28,28000,2,28-Feb-2024,False
6,Preeti,West,Sneakers,Fashion,2,4000,2024-01-31,8000,1,31-Jan-2024,False
7,Aman,South,TV,Electronics,1,45000,2024-02-15,45000,2,15-Feb-2024,True
8,Isha,North,Notebook,Stationery,10,60,2024-01-10,600,1,10-Jan-2024,False
9,Kunal,East,Pencil,Stationery,20,10,2024-03-05,200,3,05-Mar-2024,False
10,Tanvi,West,Face Cream,Personal Care,3,200,2024-03-19,600,3,19-Mar-2024,False


**Task 3: Aggregations & Insights**

Count transactions per region.

Get top 3 categories by total sales amount.

Find month-wise revenue trend.

Show customer(s) who made the highest purchase in one transaction.

Calculate total sales done in Q1 (Jan–Mar)

In [0]:
# 3.1 Count transactions per region
display(df.groupBy("region").count())

region,count
West,3
East,2
North,3
South,2


In [0]:
# 3.2 Top 3 categories by total sales
display(df.groupBy("category")
          .agg(spark_sum("total_amount").alias("total_sales"))
          .orderBy(col("total_sales").desc())
          .limit(3))

category,total_sales
Electronics,200000
Fashion,8000
Personal Care,1350


In [0]:
# 3.3 Month-wise revenue trend
display(df.groupBy("month")
          .agg(spark_sum("total_amount").alias("monthly_revenue"))
          .orderBy("month"))

month,monthly_revenue
1,64350
2,105000
3,40800


In [0]:
# 3.4 Customer(s) with highest single transaction
max_amt = df.agg(spark_max("total_amount").alias("max_purchase")).collect()[0]["max_purchase"]
display(df.filter(col("total_amount") == max_amt))


transaction_id,customer_name,region,product,category,quantity,unit_price,date,total_amount,month,date_formatted,is_high_value
1,Rajesh,North,Laptop,Electronics,1,55000,2024-01-12,55000,1,12-Jan-2024,True


In [0]:
# 3.5 Total sales in Q1
display(df.filter(col("month").between(1, 3))
          .agg(spark_sum("total_amount").alias("Q1_sales")))

Q1_sales
210150


**Task 4: Update & Delete Scenarios**

Update price of all Stationery items to increase by 10%.

Delete all records with quantity < 3 .

Add a new row into the Delta Table with today's transaction

In [0]:
deltaTable = DeltaTable.forPath(spark, delta_path)

# 4.1 Update price of Stationery items (+10%)
deltaTable.update(
    condition=col("category") == "Stationery",
    set={"unit_price": col("unit_price") * 1.10}
)
display(deltaTable.toDF())


transaction_id,customer_name,region,product,category,quantity,unit_price,date
1,Rajesh,North,Laptop,Electronics,1,55000,2024-01-12
2,Sneha,West,Refrigerator,Electronics,1,32000,2024-02-05
3,Anil,South,Shampoo,Personal Care,5,150,2024-01-17
4,Divya,North,Mobile,Electronics,2,20000,2024-03-22
5,Vikram,East,Washing Machine,Electronics,1,28000,2024-02-28
6,Preeti,West,Sneakers,Fashion,2,4000,2024-01-31
7,Aman,South,TV,Electronics,1,45000,2024-02-15
10,Tanvi,West,Face Cream,Personal Care,3,200,2024-03-19
8,Isha,North,Notebook,Stationery,10,72,2024-01-10
9,Kunal,East,Pencil,Stationery,20,12,2024-03-05


In [0]:
# 4.2 Delete records with quantity < 3
deltaTable.delete(condition=col("quantity") < 3)
display(deltaTable.toDF())

transaction_id,customer_name,region,product,category,quantity,unit_price,date
8,Isha,North,Notebook,Stationery,10,72,2024-01-10
9,Kunal,East,Pencil,Stationery,20,12,2024-03-05
3,Anil,South,Shampoo,Personal Care,5,150,2024-01-17
10,Tanvi,West,Face Cream,Personal Care,3,200,2024-03-19


In [0]:
# Load existing Delta table
delta_table = DeltaTable.forPath(spark, delta_path)

# New transaction row
new_data = [
    (11, "Satej", "South", "Smartwatch", "Electronics", 1, 15000, datetime.today().strftime("%Y-%m-%d"))
]

# Create DataFrame with matching columns
update_df = spark.createDataFrame(new_data, [
    "transaction_id", "customer_name", "region", "product", "category", "quantity", "unit_price", "date"
])

# Perform merge (update if transaction_id exists, else insert)
delta_table.alias("target").merge(
    update_df.alias("source"),
    "target.transaction_id = source.transaction_id"
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()

# Display final table
delta_table.toDF().display()

transaction_id,customer_name,region,product,category,quantity,unit_price,date
8,Isha,North,Notebook,Stationery,10,72,2024-01-10
9,Kunal,East,Pencil,Stationery,20,12,2024-03-05
3,Anil,South,Shampoo,Personal Care,5,150,2024-01-17
10,Tanvi,West,Face Cream,Personal Care,3,200,2024-03-19
11,Satej,South,Smartwatch,Electronics,1,15000,2025-08-08


** Task 5: Partitioning & Optimization (Bonus)**

Re-write the Delta table partitioned by region .

Create a second Delta table partitioned by month .

Optimize using Z-Ordering (if Databricks)

In [0]:
from pyspark.sql.functions import month

# Paths for partitioned tables
delta_partitioned_region = "/tmp/delta_sales_partitioned_region"
delta_partitioned_month = "/tmp/delta_sales_partitioned_month"

# Load Delta table and add month column
df = spark.read.format("delta").load(delta_path) \
        .withColumn("month", month(col("date")))

# 1. Partitioned by region
df.write.format("delta").mode("overwrite").partitionBy("region").save(delta_partitioned_region)
print("Partitioned by region table created.")
display(spark.read.format("delta").load(delta_partitioned_region))

# 2. Partitioned by month
df.write.format("delta").mode("overwrite").partitionBy("month").save(delta_partitioned_month)
print("Partitioned by month table created.")
display(spark.read.format("delta").load(delta_partitioned_month))

# 3. Z-Ordering (Databricks Premium only) – This will output optimization stats
print("Running Z-Ordering on region-partitioned table...")
spark.sql(f"OPTIMIZE delta.`{delta_partitioned_region}` ZORDER BY (category)")

Partitioned by region table created.


transaction_id,customer_name,region,product,category,quantity,unit_price,date,month
3,Anil,South,Shampoo,Personal Care,5,150,2024-01-17,1
11,Satej,South,Smartwatch,Electronics,1,15000,2025-08-08,8
10,Tanvi,West,Face Cream,Personal Care,3,200,2024-03-19,3
8,Isha,North,Notebook,Stationery,10,72,2024-01-10,1
9,Kunal,East,Pencil,Stationery,20,12,2024-03-05,3


Partitioned by month table created.


transaction_id,customer_name,region,product,category,quantity,unit_price,date,month
9,Kunal,East,Pencil,Stationery,20,12,2024-03-05,3
10,Tanvi,West,Face Cream,Personal Care,3,200,2024-03-19,3
8,Isha,North,Notebook,Stationery,10,72,2024-01-10,1
3,Anil,South,Shampoo,Personal Care,5,150,2024-01-17,1
11,Satej,South,Smartwatch,Electronics,1,15000,2025-08-08,8


Running Z-Ordering on region-partitioned table...


DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,