**Intialize the SparkSession**

In [0]:
from pyspark.sql import SparkSession
spark=SparkSession.builder\
      .appName("assignment-1")\
      .getOrCreate()
spark

**PySpark + Delta**

In [0]:
from pyspark.sql.functions import col, sum
# 1. Ingest all 3 CSVs as Delta Tables.
# Read the data
orders = spark.read.option("header", True).csv("file:/Workspace/Shared/orders.csv", inferSchema=True)
customers = spark.read.option("header", True).csv("file:/Workspace/Shared/customers.csv", inferSchema=True)
products = spark.read.option("header", True).csv("file:/Workspace/Shared/products.csv", inferSchema=True)
orders.printSchema()
products.printSchema()
customers.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- Status : string (nullable = true)

root
 |-- ProductID: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Stock: integer (nullable = true)
 |-- ReorderLevel : double (nullable = true)

root
 |-- CustomerID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- SignupDate : timestamp (nullable = true)



In [0]:
def clean_column_names(df):
    for old_name in df.columns:
        df = df.withColumnRenamed(old_name, old_name.strip())
    return df
# Clean column names as they have some spaces
orders = clean_column_names(orders)
customers = clean_column_names(customers)
products = clean_column_names(products)


In [0]:
#Save it as delta table
orders.write.format("delta").mode("overwrite").save("file:/Workspace/Shared/orders")
customers.write.format("delta").mode("overwrite").save("file:/Workspace/Shared/customers")
products.write.format("delta").mode("overwrite").save("file:/Workspace/Shared/products")
#read the delta table
orders = spark.read.format("delta").load("file:/Workspace/Shared/orders")
customers = spark.read.format("delta").load("file:/Workspace/Shared/customers")
products = spark.read.format("delta").load("file:/Workspace/Shared/products")
orders.printSchema()
customers.printSchema()
products.printSchema()
# 2. Write SQL to get the total revenue per Product.
print("Revenue per Product:")
spark.sql("""
SELECT 
    ProductID,
    SUM(Quantity * Price) AS TotalRevenue
FROM delta.`file:/Workspace/Shared/orders`
GROUP BY ProductID
""").show()
#3.Join Orders + Customers to find revenue by Region.
orders.join(
    customers.withColumn("CustomerID", col("CustomerID").cast("string")),
    on="CustomerID"
).groupBy("City") \
 .agg(sum(col("Quantity") * col("Price")).alias("CityRevenue")) \
 .show()
#4.Update the Status of Pending orders to 'Cancelled'.
from delta.tables import DeltaTable
delta_orders = DeltaTable.forPath(spark, "file:/Workspace/Shared/orders")
delta_orders.update(
    condition="Status = 'Pending'",
    set={"Status": "'Cancelled'"}
)
# 5.Merge a new return record into Orders.
# Convert date string to datetime.date
import datetime
new_return=[(3006, "C002", "P1001", 1, 75000, datetime.datetime.strptime("2024-05-06", "%Y-%m-%d").date(), "Returned")]
new= spark.createDataFrame(new_return, schema=orders.schema)
delta_orders.alias("t").merge(
    new.alias("s"),
    "t.OrderID = s.OrderID"
).whenNotMatchedInsertAll().execute()

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- Status: string (nullable = true)

root
 |-- CustomerID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Age: integer (nullable = true)

root
 |-- ProductID: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Stock: integer (nullable = true)
 |-- ReorderLevel: double (nullable = true)

Revenue per Product:
+---------+------------+
|ProductID|TotalRevenue|
+---------+------------+
|    P1001|      150000|
|    P1002|      150000|
|    P1003|       30000|
|    P1004|       30000|
+---------+------------+

+----+-----------+
|City|CityRevenue|
+----+-----------+
+----+-----------+



**DLT Pipeline**

In [0]:
#6.Create raw → cleaned → aggregated tables.
# Cleaned table rows
cleaned = orders.dropna()
cleaned.write.format("delta").mode("overwrite").save("/Workspace/Shared/clean_orders")
#Aggregated: Revenue per Category
cleaned.join(products, "ProductID") \
       .groupBy("Category") \
       .agg(sum(col("Quantity") * col("Price")).alias("CategoryRevenue")) \
       .write.format("delta").mode("overwrite").save("/Workspace/Shared/category_revenue")

**Time Travel**

In [0]:
#7.View data before the Status update.
delta_orders = DeltaTable.forPath(spark, "file:/Workspace/Shared/orders")
delta_orders.history()
#8.Restore to Older Version
delta_orders = DeltaTable.forPath(spark, "file:/Workspace/Shared/orders")
delta_orders.restoreToVersion(0)
delta_orders.history()

DataFrame[version: bigint, timestamp: timestamp, userId: string, userName: string, operation: string, operationParameters: map<string,string>, job: struct<jobId:string,jobName:string,jobRunId:string,runId:string,jobOwnerId:string,triggerType:string>, notebook: struct<notebookId:string>, clusterId: string, readVersion: bigint, isolationLevel: string, isBlindAppend: boolean, operationMetrics: map<string,string>, userMetadata: string, engineInfo: string]

**VACUUM+RETENTION**

In [0]:
#9.VACUUM after changing default retention.
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", False)
delta_orders.vacuum(1)
delta_orders.history()

DataFrame[version: bigint, timestamp: timestamp, userId: string, userName: string, operation: string, operationParameters: map<string,string>, job: struct<jobId:string,jobName:string,jobRunId:string,runId:string,jobOwnerId:string,triggerType:string>, notebook: struct<notebookId:string>, clusterId: string, readVersion: bigint, isolationLevel: string, isBlindAppend: boolean, operationMetrics: map<string,string>, userMetadata: string, engineInfo: string]

**Expectations**

In [0]:
# 10. Quantity > 0 , Price > 0 ,orderDate is not null
delta_orders.toDF().filter((col("Quantity") > 0) & (col("Price") > 0)).show()

+-------+----------+---------+--------+-----+----------+----------+
|OrderID|CustomerID|ProductID|Quantity|Price| OrderDate|    Status|
+-------+----------+---------+--------+-----+----------+----------+
|   3001|      C001|    P1001|       1|75000|2024-05-01|Delivered |
|   3002|      C002|    P1002|       2|50000|2024-05-02| Returned |
|   3003|      C003|    P1003|       1|30000|2024-05-03|Delivered |
|   3004|      C001|    P1002|       1|50000|2024-05-04|Delivered |
|   3005|      C004|    P1004|       3|10000|2024-05-05|   Pending|
+-------+----------+---------+--------+-----+----------+----------+



**Bonus**

In [0]:
#11.Add 'OrderType' Column
from pyspark.sql.functions import when
orders.withColumn("OrderType", when(col("Status") == "Returned", "Return").otherwise("Sale")).show()

+-------+----------+---------+--------+-----+----------+----------+---------+
|OrderID|CustomerID|ProductID|Quantity|Price| OrderDate|    Status|OrderType|
+-------+----------+---------+--------+-----+----------+----------+---------+
|   3001|      C001|    P1001|       1|75000|2024-05-01|Delivered |     Sale|
|   3002|      C002|    P1002|       2|50000|2024-05-02| Returned |     Sale|
|   3003|      C003|    P1003|       1|30000|2024-05-03|Delivered |     Sale|
|   3004|      C001|    P1002|       1|50000|2024-05-04|Delivered |     Sale|
|   3005|      C004|    P1004|       3|10000|2024-05-05|   Pending|     Sale|
+-------+----------+---------+--------+-----+----------+----------+---------+

