In [0]:
csv_data = """transaction_id,customer_name,region,product,category,quantity,unit_price,date
1,Rajesh,North,Laptop,Electronics,1,55000,2024-01-12
2,Sneha,West,Refrigerator,Electronics,1,32000,2024-02-05
3,Anil,South,Shampoo,Personal Care,5,150,2024-01-17
4,Divya,North,Mobile,Electronics,2,20000,2024-03-22
5,Vikram,East,Washing Machine,Electronics,1,28000,2024-02-28
6,Preeti,West,Sneakers,Fashion,2,4000,2024-01-31
7,Aman,South,TV,Electronics,1,45000,2024-02-15
8,Isha,North,Notebook,Stationery,10,60,2024-01-10
9,Kunal,East,Pencil,Stationery,20,10,2024-03-05
10,Tanvi,West,Face Cream,Personal Care,3,200,2024-03-19
"""

dbutils.fs.put("dbfs:/tmp/sales_transactions.csv", csv_data, overwrite=True)


Wrote 603 bytes.


True

In [0]:
df = (spark.read
      .option("header", True)
      .option("inferSchema", True)
      .csv("dbfs:/tmp/sales_transactions.csv"))

df.show()
df.printSchema()


+--------------+-------------+------+---------------+-------------+--------+----------+----------+
|transaction_id|customer_name|region|        product|     category|quantity|unit_price|      date|
+--------------+-------------+------+---------------+-------------+--------+----------+----------+
|             1|       Rajesh| North|         Laptop|  Electronics|       1|     55000|2024-01-12|
|             2|        Sneha|  West|   Refrigerator|  Electronics|       1|     32000|2024-02-05|
|             3|         Anil| South|        Shampoo|Personal Care|       5|       150|2024-01-17|
|             4|        Divya| North|         Mobile|  Electronics|       2|     20000|2024-03-22|
|             5|       Vikram|  East|Washing Machine|  Electronics|       1|     28000|2024-02-28|
|             6|       Preeti|  West|       Sneakers|      Fashion|       2|      4000|2024-01-31|
|             7|         Aman| South|             TV|  Electronics|       1|     45000|2024-02-15|
|         

In [0]:

df.write.mode("overwrite").parquet("/tmp/sales_transactions_parquet")

df.write.format("delta").mode("overwrite").save("/tmp/sales_transactions_delta")


In [0]:

spark.sql("DROP TABLE IF EXISTS sales_transactions")
spark.sql("""CREATE TABLE sales_transactions
USING DELTA
AS
SELECT * FROM parquet.`/tmp/sales_transactions_parquet`
""")



DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
from pyspark.sql.functions import expr, month, date_format, when,col
df_transformed = df.withColumn("total_amount", col("quantity") * col("unit_price")) \
    .withColumn("month", month(col("date"))) \
    .withColumn("formatted_date", date_format(col("date"), "dd-MMM-yyyy")) \
    .withColumn("is_high_value", when(col("total_amount") > 30000, True).otherwise(False))

df_transformed.show(truncate=False)


+--------------+-------------+------+---------------+-------------+--------+----------+----------+------------+-----+--------------+-------------+
|transaction_id|customer_name|region|product        |category     |quantity|unit_price|date      |total_amount|month|formatted_date|is_high_value|
+--------------+-------------+------+---------------+-------------+--------+----------+----------+------------+-----+--------------+-------------+
|1             |Rajesh       |North |Laptop         |Electronics  |1       |55000     |2024-01-12|55000       |1    |12-Jan-2024   |true         |
|2             |Sneha        |West  |Refrigerator   |Electronics  |1       |32000     |2024-02-05|32000       |2    |05-Feb-2024   |true         |
|3             |Anil         |South |Shampoo        |Personal Care|5       |150       |2024-01-17|750         |1    |17-Jan-2024   |false        |
|4             |Divya        |North |Mobile         |Electronics  |2       |20000     |2024-03-22|40000       |3    |2

In [0]:
df_transformed.groupBy("region").count().show()



+------+-----+
|region|count|
+------+-----+
| South|    2|
|  East|    2|
|  West|    3|
| North|    3|
+------+-----+



In [0]:
 df_transformed.groupBy("category") \
    .sum("total_amount") \
    .withColumnRenamed("sum(total_amount)", "total_sales_amount") \
    .orderBy(col("total_sales_amount").desc()) \
    .limit(3).show()




+-------------+------------------+
|     category|total_sales_amount|
+-------------+------------------+
|  Electronics|            200000|
|      Fashion|              8000|
|Personal Care|              1350|
+-------------+------------------+



In [0]:
df_transformed.groupBy("month") \
    .sum("total_amount") \
    .withColumnRenamed("sum(total_amount)", "monthly_revenue") \
    .orderBy("month").show()




+-----+---------------+
|month|monthly_revenue|
+-----+---------------+
|    1|          64350|
|    2|         105000|
|    3|          40800|
+-----+---------------+



In [0]:
max_amount = df_transformed.agg({"total_amount": "max"}).collect()[0][0]

df_transformed.filter(col("total_amount") == max_amount).show()



+--------------+-------------+------+-------+-----------+--------+----------+----------+------------+-----+--------------+-------------+
|transaction_id|customer_name|region|product|   category|quantity|unit_price|      date|total_amount|month|formatted_date|is_high_value|
+--------------+-------------+------+-------+-----------+--------+----------+----------+------------+-----+--------------+-------------+
|             1|       Rajesh| North| Laptop|Electronics|       1|     55000|2024-01-12|       55000|    1|   12-Jan-2024|         true|
+--------------+-------------+------+-------+-----------+--------+----------+----------+------------+-----+--------------+-------------+



In [0]:
total_q1_sales = df_transformed.filter(col("month").between(1,3)) \
    .agg(expr("sum(total_amount)").alias("Q1_total_sales"))

total_q1_sales.show()


+--------------+
|Q1_total_sales|
+--------------+
|        210150|
+--------------+



In [0]:
from delta.tables import DeltaTable
delta_table = DeltaTable.forPath(spark, "/tmp/sales_transactions_delta")
delta_table.update(
    condition = col("category") == "Stationery",
    set = { "unit_price": expr("unit_price * 1.10") }
)


DataFrame[num_affected_rows: bigint]

In [0]:
delta_table.delete(condition = col("quantity") < 3)


DataFrame[num_affected_rows: bigint]

In [0]:
from pyspark.sql.functions import to_date
from datetime import date
from delta.tables import DeltaTable
new_transaction = [(11, "Rhea", "South", "Smartwatch", "Electronics", 1, 22000, date.today())]
new_df = spark.createDataFrame(new_transaction, schema=df.schema)
new_df = new_df.withColumn("date", to_date("date"))
new_df.write.format("delta").mode("append").save("/tmp/sales_transactions_delta")
updated_df = spark.read.format("delta").load("/tmp/sales_transactions_delta")
updated_df.orderBy("transaction_id").show(truncate=False)


+--------------+-------------+------+----------+-------------+--------+----------+----------+
|transaction_id|customer_name|region|product   |category     |quantity|unit_price|date      |
+--------------+-------------+------+----------+-------------+--------+----------+----------+
|3             |Anil         |South |Shampoo   |Personal Care|5       |150       |2024-01-17|
|8             |Isha         |North |Notebook  |Stationery   |10      |66        |2024-01-10|
|9             |Kunal        |East  |Pencil    |Stationery   |20      |11        |2024-03-05|
|10            |Tanvi        |West  |Face Cream|Personal Care|3       |200       |2024-03-19|
|11            |Rhea         |South |Smartwatch|Electronics  |1       |22000     |2025-08-08|
+--------------+-------------+------+----------+-------------+--------+----------+----------+



In [0]:
from pyspark.sql.functions import month, col
delta_path = "/tmp/sales_transactions_delta"
delta_partitioned_region = "/tmp/delta_sales_partitioned_region"
delta_partitioned_month = "/tmp/delta_sales_partitioned_month"
df = spark.read.format("delta").load(delta_path).withColumn("month", month(col("date")))

df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("region") \
    .save(delta_partitioned_region)
display(spark.read.format("delta").load(delta_partitioned_region))

transaction_id,customer_name,region,product,category,quantity,unit_price,date,month
10,Tanvi,West,Face Cream,Personal Care,3,200,2024-03-19,3
11,Rhea,South,Smartwatch,Electronics,1,22000,2025-08-08,8
3,Anil,South,Shampoo,Personal Care,5,150,2024-01-17,1
8,Isha,North,Notebook,Stationery,10,66,2024-01-10,1
9,Kunal,East,Pencil,Stationery,20,11,2024-03-05,3


In [0]:
df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("month") \
    .save(delta_partitioned_month)
display(spark.read.format("delta").load(delta_partitioned_month))

transaction_id,customer_name,region,product,category,quantity,unit_price,date,month
10,Tanvi,West,Face Cream,Personal Care,3,200,2024-03-19,3
9,Kunal,East,Pencil,Stationery,20,11,2024-03-05,3
8,Isha,North,Notebook,Stationery,10,66,2024-01-10,1
3,Anil,South,Shampoo,Personal Care,5,150,2024-01-17,1
11,Rhea,South,Smartwatch,Electronics,1,22000,2025-08-08,8


In [0]:
spark.sql(f"OPTIMIZE delta.`{delta_partitioned_region}` ZORDER BY (category)")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,