In [9]:
!pip install -q pyspark==3.5.1 delta-spark==3.1.0

In [10]:
# Delta supported spark session
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

builder = SparkSession.builder \
    .appName("DeltaLakeColab") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark

In [11]:
# Loading Data
csv_data ="""
transaction_id,customer_name,region,product,category,quantity,unit_price,date
1,Rajesh,North,Laptop,Electronics,1,55000,2024-01-12
2,Sneha,West,Refrigerator,Electronics,1,32000,2024-02-05
3,Anil,South,Shampoo,Personal Care,5,150,2024-01-17
4,Divya,North,Mobile,Electronics,2,20000,2024-03-22
5,Vikram,East,Washing Machine,Electronics,1,28000,2024-02-28
6,Preeti,West,Sneakers,Fashion,2,4000,2024-01-31
7,Aman,South,TV,Electronics,1,45000,2024-02-15
8,Isha,North,Notebook,Stationery,10,60,2024-01-10
9,Kunal,East,Pencil,Stationery,20,10,2024-03-05
10,Tanvi,West,Face Cream,Personal Care,3,200,2024-03-19
"""
with open("sales_transactions.csv", "w") as f:
  f.write(csv_data)

### Task 1: Ingest & Save

In [12]:
# Loading csv in PySpark DataFrame

df_spark = spark.read.csv('sales_transactions.csv', header=True, inferSchema=True)
df_spark.show()

+--------------+-------------+------+---------------+-------------+--------+----------+----------+
|transaction_id|customer_name|region|        product|     category|quantity|unit_price|      date|
+--------------+-------------+------+---------------+-------------+--------+----------+----------+
|             1|       Rajesh| North|         Laptop|  Electronics|       1|     55000|2024-01-12|
|             2|        Sneha|  West|   Refrigerator|  Electronics|       1|     32000|2024-02-05|
|             3|         Anil| South|        Shampoo|Personal Care|       5|       150|2024-01-17|
|             4|        Divya| North|         Mobile|  Electronics|       2|     20000|2024-03-22|
|             5|       Vikram|  East|Washing Machine|  Electronics|       1|     28000|2024-02-28|
|             6|       Preeti|  West|       Sneakers|      Fashion|       2|      4000|2024-01-31|
|             7|         Aman| South|             TV|  Electronics|       1|     45000|2024-02-15|
|         

In [13]:
# Save it in Parquet & Delta formats
df_spark.write.format("delta").mode("overwrite").save("sales_transactions")
df_delta = spark.read.format("delta").load("sales_transactions")
df_delta.show()

+--------------+-------------+------+---------------+-------------+--------+----------+----------+
|transaction_id|customer_name|region|        product|     category|quantity|unit_price|      date|
+--------------+-------------+------+---------------+-------------+--------+----------+----------+
|             1|       Rajesh| North|         Laptop|  Electronics|       1|     55000|2024-01-12|
|             2|        Sneha|  West|   Refrigerator|  Electronics|       1|     32000|2024-02-05|
|             3|         Anil| South|        Shampoo|Personal Care|       5|       150|2024-01-17|
|             4|        Divya| North|         Mobile|  Electronics|       2|     20000|2024-03-22|
|             5|       Vikram|  East|Washing Machine|  Electronics|       1|     28000|2024-02-28|
|             6|       Preeti|  West|       Sneakers|      Fashion|       2|      4000|2024-01-31|
|             7|         Aman| South|             TV|  Electronics|       1|     45000|2024-02-15|
|         

In [14]:
# Saving as parquet
df_spark.write.mode("overwrite").parquet("sales_parquet")


### Task 2: Data Transformations

In [16]:
# Add a new column total_amount = quantity * unit_price
from pyspark.sql.functions import col
from delta.tables import DeltaTable

df_delta = spark.read.format("delta").load("sales_transactions")

df_total = df_delta.withColumn("total_amount", col("quantity") * col("unit_price"))
df_total.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("sales_transactions")

DeltaTable.forPath(spark, "sales_transactions").toDF().show()

+--------------+-------------+------+---------------+-------------+--------+----------+----------+------------+
|transaction_id|customer_name|region|        product|     category|quantity|unit_price|      date|total_amount|
+--------------+-------------+------+---------------+-------------+--------+----------+----------+------------+
|             1|       Rajesh| North|         Laptop|  Electronics|       1|     55000|2024-01-12|       55000|
|             2|        Sneha|  West|   Refrigerator|  Electronics|       1|     32000|2024-02-05|       32000|
|             3|         Anil| South|        Shampoo|Personal Care|       5|       150|2024-01-17|         750|
|             4|        Divya| North|         Mobile|  Electronics|       2|     20000|2024-03-22|       40000|
|             5|       Vikram|  East|Washing Machine|  Electronics|       1|     28000|2024-02-28|       28000|
|             6|       Preeti|  West|       Sneakers|      Fashion|       2|      4000|2024-01-31|      

In [17]:
# Add another column month extracted from the date
from pyspark.sql.functions import month

df_month = df_delta.withColumn("month", month(col("date")))
df_month.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("sales_transactions")

DeltaTable.forPath(spark, "sales_transactions").toDF().show()

+--------------+-------------+------+---------------+-------------+--------+----------+----------+-----+
|transaction_id|customer_name|region|        product|     category|quantity|unit_price|      date|month|
+--------------+-------------+------+---------------+-------------+--------+----------+----------+-----+
|             1|       Rajesh| North|         Laptop|  Electronics|       1|     55000|2024-01-12|    1|
|             2|        Sneha|  West|   Refrigerator|  Electronics|       1|     32000|2024-02-05|    2|
|             3|         Anil| South|        Shampoo|Personal Care|       5|       150|2024-01-17|    1|
|             4|        Divya| North|         Mobile|  Electronics|       2|     20000|2024-03-22|    3|
|             5|       Vikram|  East|Washing Machine|  Electronics|       1|     28000|2024-02-28|    2|
|             6|       Preeti|  West|       Sneakers|      Fashion|       2|      4000|2024-01-31|    1|
|             7|         Aman| South|             TV|  

In [19]:
# Format date as dd-MMM-yyyy and display
from pyspark.sql.functions import date_format

df_date = df_delta.withColumn("formatted_date", date_format("date", "dd-MMM-yyyy"))
df_date.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("sales_transactions")

# Selecting specific columns from sales_transactions
DeltaTable.forPath(spark, "sales_transactions").toDF().select("transaction_id", "date", "formatted_date").show()

+--------------+----------+--------------+
|transaction_id|      date|formatted_date|
+--------------+----------+--------------+
|             1|2024-01-12|   12-Jan-2024|
|             2|2024-02-05|   05-Feb-2024|
|             3|2024-01-17|   17-Jan-2024|
|             4|2024-03-22|   22-Mar-2024|
|             5|2024-02-28|   28-Feb-2024|
|             6|2024-01-31|   31-Jan-2024|
|             7|2024-02-15|   15-Feb-2024|
|             8|2024-01-10|   10-Jan-2024|
|             9|2024-03-05|   05-Mar-2024|
|            10|2024-03-19|   19-Mar-2024|
+--------------+----------+--------------+



In [29]:
# Create a column is_high_value (TRUE if total_amount > 30,000 , else FALSE)
from pyspark.sql.functions import when

# We have to read/load delta table again for new schema
df_updated = spark.read.format("delta").load("sales_transactions")

df_total = df_updated.withColumn("total_amount", col("quantity") * col("unit_price"))

df_high = df_total.withColumn("is_high_value", when(col("total_amount") > 30000, True).otherwise(False))
df_high.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("sales_transactions")

DeltaTable.forPath(spark, "sales_transactions").toDF().show()

+--------------+-------------+------+---------------+-------------+--------+----------+----------+--------------+------------+-------------+
|transaction_id|customer_name|region|        product|     category|quantity|unit_price|      date|formatted_date|total_amount|is_high_value|
+--------------+-------------+------+---------------+-------------+--------+----------+----------+--------------+------------+-------------+
|             1|       Rajesh| North|         Laptop|  Electronics|       1|     55000|2024-01-12|   12-Jan-2024|       55000|         true|
|             2|        Sneha|  West|   Refrigerator|  Electronics|       1|     32000|2024-02-05|   05-Feb-2024|       32000|         true|
|             3|         Anil| South|        Shampoo|Personal Care|       5|       150|2024-01-17|   17-Jan-2024|         750|        false|
|             4|        Divya| North|         Mobile|  Electronics|       2|     20000|2024-03-22|   22-Mar-2024|       40000|         true|
|            

###Task 3: Aggregations & Insights

In [30]:
# Creating a view
df =spark.read.format("delta").load("sales_transactions")
df.createOrReplaceTempView("sales_transactions")

In [31]:
# Count transactions per region
spark.sql("select region, count(*) as TransactionCount from sales_transactions group by region").show()

+------+----------------+
|region|TransactionCount|
+------+----------------+
| South|               2|
|  East|               2|
|  West|               3|
| North|               3|
+------+----------------+



In [32]:
# Get top 3 categories by total sales amount
spark.sql("SELECT category, SUM(total_amount) AS total_sales FROM sales_transactions GROUP BY category ORDER BY total_sales DESC LIMIT 3").show()


+-------------+-----------+
|     category|total_sales|
+-------------+-----------+
|  Electronics|     200000|
|      Fashion|       8000|
|Personal Care|       1350|
+-------------+-----------+



In [34]:
# Find month-wise revenue trend
spark.sql("""
    SELECT MONTH(date) AS month, SUM(total_amount) AS revenue
    FROM sales_transactions
    GROUP BY month
    ORDER BY month
""").show()


+-----+-------+
|month|revenue|
+-----+-------+
|    1|  64350|
|    2| 105000|
|    3|  40800|
+-----+-------+



In [33]:
# Show customer(s) who made the highest purchase in one transaction
spark.sql("""
    SELECT customer_name, total_amount
    FROM sales_transactions
    WHERE total_amount = (SELECT MAX(total_amount) FROM sales_transactions)
""").show()


+-------------+------------+
|customer_name|total_amount|
+-------------+------------+
|       Rajesh|       55000|
+-------------+------------+



In [35]:
# Calculate total sales done in Q1 (Jan–Mar)
spark.sql("select sum(total_amount)as sale_q1 from sales_transactions where month(date) between 1 and 3").show()

+-------+
|sale_q1|
+-------+
| 210150|
+-------+



### Task 4: Update and Delete Scenarios


In [38]:
# Update price of all Stationery items to increase by 10%

delta_table = DeltaTable.forPath(spark, "sales_transactions")
delta_table.update(
    condition="category = 'stationery'",
    set= {"unit_price": "unit_price * 1.1"})

delta_table.toDF().show()

+--------------+-------------+------+---------------+-------------+--------+----------+----------+--------------+------------+-------------+
|transaction_id|customer_name|region|        product|     category|quantity|unit_price|      date|formatted_date|total_amount|is_high_value|
+--------------+-------------+------+---------------+-------------+--------+----------+----------+--------------+------------+-------------+
|             1|       Rajesh| North|         Laptop|  Electronics|       1|     55000|2024-01-12|   12-Jan-2024|       55000|         true|
|             2|        Sneha|  West|   Refrigerator|  Electronics|       1|     32000|2024-02-05|   05-Feb-2024|       32000|         true|
|             3|         Anil| South|        Shampoo|Personal Care|       5|       150|2024-01-17|   17-Jan-2024|         750|        false|
|             4|        Divya| North|         Mobile|  Electronics|       2|     20000|2024-03-22|   22-Mar-2024|       40000|         true|
|            

In [39]:
# Delete all records with quantity < 3

delta_table.delete(condition="quantity < 3")
delta_table.toDF().show()

+--------------+-------------+------+----------+-------------+--------+----------+----------+--------------+------------+-------------+
|transaction_id|customer_name|region|   product|     category|quantity|unit_price|      date|formatted_date|total_amount|is_high_value|
+--------------+-------------+------+----------+-------------+--------+----------+----------+--------------+------------+-------------+
|             3|         Anil| South|   Shampoo|Personal Care|       5|       150|2024-01-17|   17-Jan-2024|         750|        false|
|             8|         Isha| North|  Notebook|   Stationery|      10|        60|2024-01-10|   10-Jan-2024|         600|        false|
|             9|        Kunal|  East|    Pencil|   Stationery|      20|        10|2024-03-05|   05-Mar-2024|         200|        false|
|            10|        Tanvi|  West|Face Cream|Personal Care|       3|       200|2024-03-19|   19-Mar-2024|         600|        false|
+--------------+-------------+------+----------+

In [50]:
# Add a new row into the Delta Table with today's transaction data
from datetime import date
from pyspark.sql import Row

delta_table = DeltaTable.forPath(spark, "sales_transactions")

new = [Row(
    transaction_id=11,
    customer_name="Ananya",
    region="North",
    product="Headphones",
    category="Electronics",
    quantity=2,
    unit_price=3000,
    date=date.today(),
    formatted_date=date.today().strftime("%d-%b-%Y"),
    total_amount=2 * 3000,
    is_high_value=True
)]

df_insert = spark.createDataFrame(new, ["transaction_id", "customer_name", "region", "product", "category", "quantity", "unit_price", "formatted_date","date", "total_amount", "is_high_value" ])

delta_table.alias("target").merge(
    df_insert.alias("source"),
    "target.transaction_id = source.transaction_id"
).whenNotMatchedInsert(values={
    "transaction_id": "source.transaction_id",
    "customer_name": "source.customer_name",
    "region": "source.region",
    "product": "source.product",
    "category": "source.category",
    "quantity": "source.quantity",
    "unit_price": "source.unit_price",
    "date": "source.date",
    "formatted_date": "source.formatted_date",
    "total_amount": "source.total_amount",
    "is_high_value": "source.is_high_value"}).execute()

delta_table.toDF().show()

+--------------+-------------+------+----------+-------------+--------+----------+----------+--------------+------------+-------------+
|transaction_id|customer_name|region|   product|     category|quantity|unit_price|      date|formatted_date|total_amount|is_high_value|
+--------------+-------------+------+----------+-------------+--------+----------+----------+--------------+------------+-------------+
|             3|         Anil| South|   Shampoo|Personal Care|       5|       150|2024-01-17|   17-Jan-2024|         750|        false|
|             8|         Isha| North|  Notebook|   Stationery|      10|        60|2024-01-10|   10-Jan-2024|         600|        false|
|             9|        Kunal|  East|    Pencil|   Stationery|      20|        10|2024-03-05|   05-Mar-2024|         200|        false|
|            10|        Tanvi|  West|Face Cream|Personal Care|       3|       200|2024-03-19|   19-Mar-2024|         600|        false|
|            11|       Ananya| North|Headphones|

### Task 5: Partitioning & Optimization

In [51]:
# Re-write the Delta table partitioned by region

df = spark.read.format("delta").load("sales_transactions")

partitioned_path = "sales_transactions_by_region"

df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("region") \
    .save(partitioned_path)


In [52]:
# Create a second Delta table partitioned by month

from pyspark.sql.functions import month

df = spark.read.format("delta").load("sales_transactions")

df_with_month = df.withColumn("month", month(df["date"]))

month_partitioned_path = "sales_transactions_by_month"

df_with_month.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("month") \
    .save(month_partitioned_path)
