In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
dbutils.fs.ls('/FileStore/tables/')

[FileInfo(path='dbfs:/FileStore/tables/Sales_data_2k.csv', name='Sales_data_2k.csv', size=1907147, modificationTime=1750315510000),
 FileInfo(path='dbfs:/FileStore/tables/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1750192059000),
 FileInfo(path='dbfs:/FileStore/tables/_committed_2318855837230609441', name='_committed_2318855837230609441', size=303, modificationTime=1750192059000),
 FileInfo(path='dbfs:/FileStore/tables/_started_2318855837230609441', name='_started_2318855837230609441', size=0, modificationTime=1750192058000),
 FileInfo(path='dbfs:/FileStore/tables/amazon/', name='amazon/', size=0, modificationTime=1749823129000),
 FileInfo(path='dbfs:/FileStore/tables/amazon-1.csv', name='amazon-1.csv', size=4744481, modificationTime=1750331780000),
 FileInfo(path='dbfs:/FileStore/tables/amazon.csv/', name='amazon.csv/', size=0, modificationTime=1750192577000),
 FileInfo(path='dbfs:/FileStore/tables/part-00000-tid-2318855837230609441-53de64ee-4612-4269-b608-f6b77d5772aa-957-1

In [0]:
table_name = "silver.sales_2k"
table_exists = spark.catalog.tableExists(table_name)
print(table_exists)

True


In [0]:
Sales_data=spark.table('silver.sales_2k')

In [0]:
Sales_data.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- OrderDate: date (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- SubCategory: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- TotalAmount: double (nullable = true)
 |-- Region: string (nullable = true)



In [0]:
Sales_data.createOrReplaceTempView("sales")

### Starting to handle data with sql

In [0]:
%sql
select * from sales limit 102;

OrderID,OrderDate,CustomerID,CustomerName,ProductID,ProductName,Category,SubCategory,Quantity,UnitPrice,TotalAmount,Region
1,2024-10-01,C2311,Amy Sheppard,P9918,Headphones,Accessories,Input Devices,3,905.22,2715.66,West
2,2024-11-16,C8843,Diane Gilbert,P1764,Webcam,Electronics,Peripherals,2,241.44,482.88,East
3,2024-12-16,C6090,Jessica Perez MD,P8004,Mouse,Accessories,Storage Devices,10,385.64,3856.4,East
4,2025-04-23,C3487,Caroline Martin,P5384,Desk,Furniture,Tables,6,250.78,1504.68,East
5,2025-01-19,C8763,Cathy Pearson,P9904,Laptop,Electronics,Peripherals,2,863.48,1726.96,North
6,2025-01-25,C1026,Albert Blackwell,P3545,Table Lamp,Furniture,Tables,9,134.44,1209.96,North
7,2025-04-13,C6194,Karen Lewis,P4360,Headphones,Accessories,Storage Devices,1,214.98,214.98,East
8,2024-09-02,C3914,Michael Malone,P3858,USB Drive,Accessories,Audio,8,336.36,2690.88,West
9,2025-04-21,C6751,Daniel Rowe,P1320,Monitor,Electronics,Computers,5,20.41,102.05,East
10,2024-12-04,C9247,Sandra Tate,P8950,Mouse,Accessories,Storage Devices,2,211.8,423.6,West


## Inorder to enter new entries we create a dataframe

In [0]:
from pyspark.sql import Row
from pyspark.sql.functions import to_date
from pyspark.sql.types import DateType


upsert_data = [
    Row(
        OrderID=101,
        OrderDate="2025-06-18",
        CustomerID="C001",
        CustomerName="Alice Williams",
        ProductID="P101",
        ProductName="Mechanical Keyboard",
        Category="Electronics",
        SubCategory="Keyboards",
        Quantity=2,
        UnitPrice=200.0,
        TotalAmount=400.0,
        Region="South"
    ),
    Row(
        OrderID=9999,
        OrderDate="2025-06-19",
        CustomerID="C999",
        CustomerName="John Blaze",
        ProductID="P999",
        ProductName="Wireless Router",
        Category="Electronics",
        SubCategory="Networking",
        Quantity=1,
        UnitPrice=1500.0,
        TotalAmount=1500.0,
        Region="North"
    ),
    Row(
        OrderID=102,
        OrderDate="2025-06-20",
        CustomerID="C002",
        CustomerName="Bob Smith",
        ProductID="P102",
        ProductName="Gaming Mouse",
        Category="Electronics",
        SubCategory="Mice",
        Quantity=3,
        UnitPrice=50.0,
        TotalAmount=150.0,
        Region="West"
    ),
    Row(
        OrderID=103,
        OrderDate="2025-06-21",
        CustomerID="C003",
        CustomerName="Carol Johnson",
        ProductID="P103",
        ProductName="Laptop Stand",
        Category="Electronics",
        SubCategory="Accessories",
        Quantity=1,
        UnitPrice=75.0,
        TotalAmount=75.0,
        Region="East"
    )
]

df_upsert = spark.createDataFrame(upsert_data)

df_upsert = df_upsert.withColumn("OrderDate", to_date(df_upsert["OrderDate"], "yyyy-MM-dd"))

df_upsert.createOrReplaceTempView("updates")

display(df_upsert)

OrderID,OrderDate,CustomerID,CustomerName,ProductID,ProductName,Category,SubCategory,Quantity,UnitPrice,TotalAmount,Region
101,2025-06-18,C001,Alice Williams,P101,Mechanical Keyboard,Electronics,Keyboards,2,200.0,400.0,South
9999,2025-06-19,C999,John Blaze,P999,Wireless Router,Electronics,Networking,1,1500.0,1500.0,North
102,2025-06-20,C002,Bob Smith,P102,Gaming Mouse,Electronics,Mice,3,50.0,150.0,West
103,2025-06-21,C003,Carol Johnson,P103,Laptop Stand,Electronics,Accessories,1,75.0,75.0,East


In [0]:
%sql
select * from updates;

OrderID,OrderDate,CustomerID,CustomerName,ProductID,ProductName,Category,SubCategory,Quantity,UnitPrice,TotalAmount,Region
101,2025-06-18,C001,Alice Williams,P101,Mechanical Keyboard,Electronics,Keyboards,2,200.0,400.0,South
9999,2025-06-19,C999,John Blaze,P999,Wireless Router,Electronics,Networking,1,1500.0,1500.0,North
102,2025-06-20,C002,Bob Smith,P102,Gaming Mouse,Electronics,Mice,3,50.0,150.0,West
103,2025-06-21,C003,Carol Johnson,P103,Laptop Stand,Electronics,Accessories,1,75.0,75.0,East


In [0]:
%sql
merge into sales as base
using updates as upd
on base.OrderID = upd.OrderID

when matched then 
update set *
when not matched then 
insert *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
4,4,0,0


In [0]:
%sql
select * from sales
where OrderID=102
order by OrderID desc;

OrderID,OrderDate,CustomerID,CustomerName,ProductID,ProductName,Category,SubCategory,Quantity,UnitPrice,TotalAmount,Region
102,2025-06-20,C002,Bob Smith,P102,Gaming Mouse,Electronics,Mice,3,50.0,150.0,West


## This is the new value inserted here using upsert transformation 

In [0]:
%sql
select * from sales
where OrderID='9999'
order by OrderID desc

OrderID,OrderDate,CustomerID,CustomerName,ProductID,ProductName,Category,SubCategory,Quantity,UnitPrice,TotalAmount,Region
9999,2025-06-19,C999,John Blaze,P999,Wireless Router,Electronics,Networking,1,1500.0,1500.0,North


In [0]:
%sql
select * from sales
where OrderID='101'
order by OrderID asc

OrderID,OrderDate,CustomerID,CustomerName,ProductID,ProductName,Category,SubCategory,Quantity,UnitPrice,TotalAmount,Region
101,2025-06-18,C001,Alice Williams,P101,Mechanical Keyboard,Electronics,Keyboards,2,200.0,400.0,South


In [0]:
spark.table("sales").write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("silver.sales_2k")

## Time Travel

In [0]:
%sql 
describe history silver.sales_2k

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
2,2025-06-25T14:09:09.000Z,3610430143526681,shaikmohammedadnandcme048@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,,0625-140143-zo0unff9-v2n,1.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 5, numRemovedBytes -> 700622, numOutputRows -> 20000, numOutputBytes -> 691477)",,Databricks-Runtime/16.4.x-aarch64-photon-scala2.12
1,2025-06-25T14:06:13.000Z,3610430143526681,shaikmohammedadnandcme048@gmail.com,MERGE,"Map(predicate -> [""(cast(OrderID#10519 as bigint) = OrderID#10762L)""], clusterBy -> [], matchedPredicates -> [{""actionType"":""update""}], statsOnLoad -> false, notMatchedBySourcePredicates -> [], notMatchedPredicates -> [{""actionType"":""insert""}])",,,0625-140143-zo0unff9-v2n,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 0, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 4, numTargetBytesAdded -> 11887, numTargetBytesRemoved -> 0, numTargetDeletionVectorsAdded -> 1, numTargetRowsMatchedUpdated -> 4, executionTimeMs -> 6066, materializeSourceTimeMs -> 241, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, numTargetDeletionVectorsUpdated -> 0, scanTimeMs -> 2176, numTargetRowsUpdated -> 4, numOutputRows -> 4, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 4, numTargetFilesRemoved -> 0, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 3487)",,Databricks-Runtime/16.4.x-aarch64-photon-scala2.12
0,2025-06-25T07:36:49.000Z,3610430143526681,shaikmohammedadnandcme048@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,,0625-071953-ophdfxw5-v2n,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numOutputRows -> 20000, numOutputBytes -> 688735)",,Databricks-Runtime/16.4.x-aarch64-photon-scala2.12


In [0]:
df_old = spark.read.format("delta") \
    .option("versionAsOf", 2) \
    .table("silver.sales_2k")



In [0]:
df_old.createOrReplaceTempView("old")

In [0]:
%sql
select * from old 
where OrderID='102'
order by OrderID asc;

OrderID,OrderDate,CustomerID,CustomerName,ProductID,ProductName,Category,SubCategory,Quantity,UnitPrice,TotalAmount,Region
102,2025-06-20,C002,Bob Smith,P102,Gaming Mouse,Electronics,Mice,3,50.0,150.0,West


In [0]:
df_new=spark.read.format('delta')\
    .option('versionAsOf',0)\
        .table('silver.sales_2k')

In [0]:
df_new.createOrReplaceTempView("new")

In [0]:
%sql
select * from new
where OrderID='102'
order by OrderID asc


OrderID,OrderDate,CustomerID,CustomerName,ProductID,ProductName,Category,SubCategory,Quantity,UnitPrice,TotalAmount,Region
102,2024-11-16,C5957,Larry Garcia,P6022,Mouse,Accessories,Input Devices,6,633.12,3798.72,North


In [0]:
def format_currency(amount):
    return f"₹{amount:,.4f}"


format_currency_udf = udf(format_currency, StringType())


g=spark.sql("""
SELECT 
  date_format(OrderDate, 'yyyy-MM') AS Month,
  FLOOR(SUM(TotalAmount),3) AS MonthlyRevenue
FROM silver.sales_2k
GROUP BY Month
ORDER BY Month
""")

formatted_df = g.withColumn("FormattedRevenue", format_currency_udf(col("MonthlyRevenue")))
formatted_df.select("Month", "FormattedRevenue").show(truncate=False)

+-------+----------------+
|Month  |FormattedRevenue|
+-------+----------------+
|2024-06|₹2,975,035.3890 |
|2024-07|₹4,719,052.8290 |
|2024-08|₹4,622,362.5700 |
|2024-09|₹4,682,603.2400 |
|2024-10|₹4,801,087.5300 |
|2024-11|₹4,621,143.3990 |
|2024-12|₹4,885,781.9600 |
|2025-01|₹4,593,673.1900 |
|2025-02|₹4,193,491.5700 |
|2025-03|₹4,739,078.9890 |
|2025-04|₹4,684,102.0190 |
|2025-05|₹4,815,200.0290 |
|2025-06|₹1,662,193.8690 |
+-------+----------------+



In [0]:
formatted_df.columns

['Month', 'MonthlyRevenue', 'FormattedRevenue']

In [0]:
formatted_df=formatted_df.select('Month','FormattedRevenue')

In [0]:
formatted_df=formatted_df.withColumnRenamed('FormattedRevenue','MonthlyRevenue')

In [0]:
formatted_df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("gold.MonthlyRevenue24_25")