### Popular OpenStorgae Format
AWS - Apache Iceberg
Azure - Delta Lake 
GCP - Apache Hudi

### Data Reading csv file

In [0]:
%python
df = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.load('/FileStore/OpenTableFormat/rawdata/sales_data_first.csv')

display(df)

Branch_ID,Dealer_ID,Model_ID,Revenue,Units_Sold,Date_ID,Month,Year,BranchName,DealerName,Product_Name,Date
BR0006,DLR0168,Ren-M128,12971088,3,DT01236,5,2020,AC Cars Motors,Saab Motors,Renault,5/20/2020 0:00
BR0011,DLR0069,Vol-M256,14181510,3,DT01225,5,2020,Acura Motors,Geo Motors,Volkswagen,5/9/2020 0:00
BR0021,DLR0070,Vol-M257,7738896,1,DT01226,5,2020,Aixam-Mega (including Arola) Motors,Gilbern Motors,Volkswagen,5/10/2020 0:00
BR0031,DLR0071,Vol-M258,10067596,2,DT01227,5,2020,Alfa Romeo Motors,Ginetta Motors,Volkswagen,5/11/2020 0:00
BR0041,DLR0072,Vol-M259,13055810,2,DT01228,5,2020,Alpine Motors,Glas Motors,Volkswagen,5/12/2020 0:00
BR0051,DLR0073,Vol-M260,2224368,1,DT01229,5,2020,Alvis Motors,GMC Motors,Volkswagen,5/13/2020 0:00
BR0061,DLR0074,Nis-M261,11270580,3,DT01230,5,2020,"AMC, Eagle Motors",GTA Spano Motors,Nissan,5/14/2020 0:00
BR0066,DLR0169,Ren-M129,693559,1,DT01237,5,2020,Acura Motors,SAIC Motor Motors,Renault,5/21/2020 0:00
BR0071,DLR0075,Nis-M262,18784710,3,DT01231,5,2020,Anadol Motors,Gumpert Motors,Nissan,5/15/2020 0:00
BR0081,DLR0076,Nis-M263,2354637,3,DT01232,5,2020,Ariel Motors,Healey Motors,Nissan,5/16/2020 0:00


In [0]:
%python
'''
convert the above df into delta format
* we can provide path till folder level, we cant give the filename when writing into delta format '''

df.write.format("delta")\
    .option("path","/FileStore/OpenTableFormat/sinkdata/sales_data")\
        .save()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1853423176225449>:5[0m
[1;32m      1[0m [38;5;124;03m'''[39;00m
[1;32m      2[0m [38;5;124;03mconvert the above df into delta format[39;00m
[1;32m      3[0m [38;5;124;03m* we can provide path till folder level, we cant give the filename when writing into delta format '''[39;00m
[0;32m----> 5[0m df[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)\
[1;32m      6[0m     [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mpath[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124m/FileStore/OpenTableFormat/sinkdata/sales_data[39m[38;5;124m"[39m)\
[1;32m      7[0m         [38;5;241m.[39msave()

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, *

In [0]:
%sql

-- Even without creating a delta table , we can still query the data in table format 

select * from delta.`/FileStore/OpenTableFormat/sinkdata/sales_data`



### Behind the scene of Opentable format 

* It read the csv data and converted into delta format, but indeed we dont have the delta format file. instead it will convert the csv file into parquet file
* In delta format it contains 2 files
    1. Parquet file
    2. Transaction files aka (_delata_log) basically a folder concept
* The _delata_log folder contains the json files, schema checkpoint
    * Basically its keep on tracking the changes you are performing on the file ( basically your transactions)
    * every time you make the changes, it will keep on adding the new json files. And each time it will not store the state of the table
    * if you want the latest state of the table, then it will read all the previous json files.

* Note:
  If there are 100 transaction,whether we need to read all the 100 json files?
  No, Every 10th json files its created in delta_log it will maintain the history of the past 9 previous json file in the 10th json file. So you need the current state then it will read the 10th + 1 file (i.e 11th file)

In [0]:
%python
''' to read the content of json file'''

df = spark.read.json("/FileStore/OpenTableFormat/sinkdata/sales_data/_delta_log/00000000000000000000.json")

display(df)



### Create DELTA Table

In [0]:
%sql

create table my_delta_table
(
  ID int,
  Name string,
  salary double
)
using delta
location '/FileStore/OpenTableFormat/sinkdata/first_delta_table'




In [0]:
%sql
-- ''' create Schema'''

create schema bronze

In [0]:
%sql

-- Enforcing the schema while creating the table, knows as Schema Enforcement

create table bronze.my_delta_table
(
  ID int,
  Name string,
  salary double
)
using delta
location '/FileStore/OpenTableFormat/sinkdata/my_delta_table'

### Insert values into table

In [0]:
%sql
INSERT INTO bronze.my_delta_table
VALUES 
(1, 'AAA', 1000),
(2, 'BBB', 2000)

num_affected_rows,num_inserted_rows
2,2


In [0]:
%sql
select * from bronze.my_delta_table

ID,Name,salary
1,AAA,1000.0
2,BBB,2000.0
1,AAA,1000.0
2,BBB,2000.0
1,AAA,1000.0
2,BBB,2000.0
1,AAA,1000.0
2,BBB,2000.0
1,AAA,1000.0
2,BBB,2000.0


In [0]:
%sql

-- Will run the sql cmd 10time just to verify whether it will create the checkpoint in the delta_log folder or not 

INSERT INTO bronze.my_delta_table
VALUES 
(1, 'AAA', 1000),
(2, 'BBB', 2000)



In [0]:
%sql

-- create 2nd table

create table bronze.my_delta_table2
(
  ID int,
  Name string,
  salary double
)
using delta
location '/FileStore/OpenTableFormat/sinkdata/my_delta_table2'

In [0]:
%sql
describe table extended bronze.my_delta_table2

col_name,data_type,comment
ID,int,
Name,string,
salary,double,
,,
# Detailed Table Information,,
Catalog,spark_catalog,
Database,bronze,
Table,my_delta_table2,
Created Time,Fri May 09 07:24:17 UTC 2025,
Last Access,UNKNOWN,


### DELETION VECTOR DISABLE

In [0]:
%sql
-- We will trun off the deletion vector

ALTER TABLE bronze.my_delta_table2
SET TBLPROPERTIES ('delta.enableDeletionVectors' = false);



In [0]:
%sql
INSERT INTO bronze.my_delta_table2
VALUES 
(1, 'AAA', 1000),
(2, 'BBB', 2000)



In [0]:
%sql
INSERT INTO bronze.my_delta_table2
VALUES 
(3, 'ccc', 1000),
(4, 'ddd', 2000)



In [0]:
%sql
select * from bronze.my_delta_table2



### Update values in table

In [0]:
%sql
update bronze.my_delta_table2
set name = 'ZZZ' where id = 2



In [0]:
%python
''' to read the content of json file'''

df = spark.read.json("/FileStore/OpenTableFormat/sinkdata/my_delta_table2/_delta_log/00000000000000000005.json")

display(df)



### DML in open table

whenever you perform the DML operation on the table, it will create a new parquet file.
1. The new parquet file basically contain the changes which you are performed , before this it will copy the content from the previous parquet file and it will remove the previous file ( it is also called as TOMBSTONING) means soft delete 
2. It will keep the current state of the table/file in parquet

In [0]:
%sql
select * from bronze.my_delta_table2



### Delete the data from table


In [0]:
%sql
delete from bronze.my_delta_table2
where id = 2



In [0]:
%python
''' to read the content of json file'''

df = spark.read.json("/FileStore/OpenTableFormat/sinkdata/my_delta_table2/_delta_log/00000000000000000010.json")

display(df)



### Data Versioning & Time Travel concept

As we know, if we acceidently delete the dataframe and we want to resuse the data which was recetly deleted then we can use the feature of data versioning / time travel functionalites

In [0]:
%sql
describe history bronze.my_delta_table2



In [0]:
%sql
-- After applying the versioning to table

select * from bronze.my_delta_table2 version as of 9



In [0]:
%sql
-- Latest State of the table

select * from bronze.my_delta_table2




### VACCUM Command

1. Hard Deletion of the data from delta
  Its the right of the person/organaization , can ask for remove the
2. It will remove the files which are older than 7 days
3. If you are performing any vaccum run, then always make sure to use the 1st DRY RUN cmd as precautionary action


In [0]:
%sql
vacuum bronze.my_delta_table2



In [0]:
%sql
-- If we want to know what are the deleted partition when we execute vaccum command then use the below command
  -- 1.If you created the table recently then you ill not see the list of partition data bcoz data is not older than 7 days

vacuum bronze.my_delta_table2 DRY RUN



In [0]:
%sql
-- Set the retention period

SET spark.databricks.delta.retentionDurationCheck.enabled = false



In [0]:
%sql
-- Very Risky command to execute on table, always perform dry run before executing the VACCUM cmd. Else we cant traverse back/version back any data once its deleted, it will be permanently deleted.

VACUUM bronze.my_delta_table2 RETAIN 0 HOURS DRY RUN;



In [0]:
%sql
-- Deleted all the parquet file which mentioned in the above cmd, and we can't traverse back or version back the data

VACUUM bronze.my_delta_table2 RETAIN 0 HOURS 



### SCHEMA ENFORCEMENT & SCHEMA EVOLUTION 

In [0]:
%python
df = spark.read.format("csv")\
.option("header",True)\
.option("inferSchema",True)\
.load('/FileStore/OpenTableFormat/rawdata/sales_data_third.csv')

display(df)



In [0]:
%python
'''
convert the above df into delta format
    1. we can provide path till folder level, we cant give the filename when writing into delta format 
    2. whenever there is schema evolution while writing the data in delta format use the "mergeSchema = true" in option
'''

df.write.format("delta")\
    .mode('append')\
    .option("path","/FileStore/OpenTableFormat/sinkdata/sales_data")\
    .option('mergeSchema', True)\
        .save()



In [0]:
%sql
-- During the read of 2 sheet we didn't have the schema changes so we have enforced the schema, but in the third sheet we see the schema evolution.
-- Note : How to identify the SCHEMA EVOLUTION ?, Just read the file, you can see the last new column name "ReturnFlag" it mention the schema evolution which returns 1. if there is no schema evolution then it will return null values.

select * from delta.`/FileStore/OpenTableFormat/sinkdata/sales_data`



### METADATA LEVEL CHANGES

In [0]:
%sql
-- Changes applied on the metadata level not on the data level, so we can see new json files created in _delta_log file, but nothing parquet file is created

-- Adding new column "Flag"

alter table bronze.my_delta_table2
add column flag INT

In [0]:
%sql
SELECT * FROM  bronze.my_delta_table2

ID,Name,salary,flag
3,ccc,1000.0,
4,ddd,2000.0,
3,ccc,1000.0,
4,ddd,2000.0,
1,AAA,1000.0,
1,ccc,1000.0,
1,AAA,1000.0,


In [0]:
%python
''' to read the content of json file'''

df = spark.read.json("/FileStore/OpenTableFormat/sinkdata/my_delta_table2/_delta_log/00000000000000000011.json")

display(df)

commitInfo,metaData
"List(0509-071452-exzuppom, Databricks-Runtime/12.2.x-scala2.12, true, WriteSerializable, List(2116935914246921), ADD COLUMNS, List([{""column"":{""name"":""flag"",""type"":""integer"",""nullable"":true,""metadata"":{}}}]), 10, 1746775741351, 19975d76-94c4-4a4a-80ef-e27a1f6d4fd1, 7635163290231677, contacttopaone@gmail.com)",
,"List(List(false), 1746694528184, List(parquet), 09738dcf-e401-454b-b629-75a70adbe84b, List(), {""type"":""struct"",""fields"":[{""name"":""ID"",""type"":""integer"",""nullable"":true,""metadata"":{}},{""name"":""Name"",""type"":""string"",""nullable"":true,""metadata"":{}},{""name"":""salary"",""type"":""double"",""nullable"":true,""metadata"":{}},{""name"":""flag"",""type"":""integer"",""nullable"":true,""metadata"":{}}]})"


In [0]:
%sql
-- Reordering the column names

alter table bronze.my_delta_table2
alter column id after Name

In [0]:
%sql
select * from bronze.my_delta_table2

Name,ID,salary,flag
ccc,3,1000.0,
ddd,4,2000.0,
ccc,3,1000.0,
ddd,4,2000.0,
AAA,1,1000.0,
ccc,1,1000.0,
AAA,1,1000.0,


In [0]:
%sql
--  Dropping the column from the table

alter table bronze.my_delta_table2
drop column flag

In [0]:
%sql
-- when we try to drop the column from the table, we need to make some changes in the column mapping still the changes will be on metadata level

-- minReaderVersion = 2
-- minWriterVersion = 5
-- MappingMode = name

alter table BRONZE.my_delta_table2
set tblproperties (
  'delta.minReaderVersion' = '2',
  'delta.minWriterVersion' = '5',
  'delta.columnMapping.mode' = 'name'
)

In [0]:
%sql
alter table bronze.my_delta_table2
drop column flag

In [0]:
%sql
-- Renaming the column name

alter table bronze.my_delta_table2
rename column id to cust_id

In [0]:
%python
''' to read the content of json file'''

df = spark.read.json("/FileStore/OpenTableFormat/sinkdata/my_delta_table2/_delta_log/00000000000000000015.json")
display(df)

commitInfo,metaData
"List(0509-071452-exzuppom, Databricks-Runtime/12.2.x-scala2.12, true, WriteSerializable, List(2116935914246921), RENAME COLUMN, List(cust_id, ID), 14, 1746776861743, 34c74a68-2c42-4fad-8b27-24e41be11f78, 7635163290231677, contacttopaone@gmail.com)",
,"List(List(4, name, false), 1746694528184, List(parquet), 09738dcf-e401-454b-b629-75a70adbe84b, List(), {""type"":""struct"",""fields"":[{""name"":""Name"",""type"":""string"",""nullable"":true,""metadata"":{""delta.columnMapping.id"":1,""delta.columnMapping.physicalName"":""Name""}},{""name"":""cust_id"",""type"":""integer"",""nullable"":true,""metadata"":{""delta.columnMapping.id"":2,""delta.columnMapping.physicalName"":""ID""}},{""name"":""salary"",""type"":""double"",""nullable"":true,""metadata"":{""delta.columnMapping.id"":3,""delta.columnMapping.physicalName"":""salary""}}]})"


### Optimization Techniques in Delta 

In [0]:
%sql
optimize bronze.my_delta_table2 zorder by (cust_id)

path,metrics
dbfs:/FileStore/OpenTableFormat/sinkdata/my_delta_table2,"List(1, 5, List(1314, 1314, 1314.0, 1, 1314), List(1059, 1068, 1062.6, 5, 5313), 0, List(minCubeSize(107374182400), List(0, 0), List(5, 5313), 0, List(5, 5313), 1, null), 1, 5, 0, false, 0, 0, 1746777691729, 1746777703256, 8, 1, null, List(0, 0), 3, 3, 375)"


In [0]:

%python
''' to read the content of json file'''

df = spark.read.json("/FileStore/OpenTableFormat/sinkdata/my_delta_table2/_delta_log/00000000000000000016.json")
display(df)

add,commitInfo,remove
,"List(0509-071452-exzuppom, Databricks-Runtime/12.2.x-scala2.12, false, SnapshotIsolation, List(2116935914246921), OPTIMIZE, List(1314, 1314, 1314, 1, 0, 5313, 5, 1314, 1314, 1314), List(false, 0, [], [""cust_id""]), 15, 1746777697560, 9b653dd6-d2e2-4496-9144-f963670eac1a, 7635163290231677, contacttopaone@gmail.com)",
,,"List(false, 1746777695292, true, part-00000-1964a13a-2463-4d15-9450-1066353c0405-c000.snappy.parquet, 1059, List(1746694835000000, 1746694835000000, 1746694835000000, 268435456))"
,,"List(false, 1746777695292, true, part-00000-8a491ec0-ce24-43f1-a7a1-6149082cc3c0-c000.snappy.parquet, 1068, List(1746702171000000, 1746702171000000, 1746702171000000, 268435456))"
,,"List(false, 1746777695292, true, part-00000-cb4d2108-66b9-4d2e-9b2e-9e69e0ff3fee-c000.snappy.parquet, 1068, List(1746695263000000, 1746695263000000, 1746695263000000, 268435456))"
,,"List(false, 1746777695292, true, part-00001-b89063b3-0ed9-4365-96e8-1b617d3bedde-c000.snappy.parquet, 1059, List(1746694851000000, 1746694851000000, 1746694851000000, 268435456))"
,,"List(false, 1746777695292, true, part-00002-593e260c-9942-499f-8e6e-0cc8769a3e64-c000.snappy.parquet, 1059, List(1746702155000000, 1746702155000000, 1746702155000000, 268435456))"
"List(false, 1746777698000, 1q/part-00000-45d2e0dd-a6dd-4364-a329-5766bde72afc-c000.snappy.parquet, 1314, {""numRecords"":7,""minValues"":{""Name"":""AAA"",""ID"":1,""salary"":1000.0},""maxValues"":{""Name"":""ddd"",""ID"":4,""salary"":2000.0},""nullCount"":{""Name"":0,""ID"":0,""salary"":0}}, List(1746694835000000, 1746702171000000, 1746694835000000, 268435456, 451bae7b-7d53-4f15-9123-1300eb9fa2e3, [""cust_id""], hilbert))",,


### DELETION VECTOR ENABLE


In [0]:
%sql
create table bronze.new_delta
(
  id int,
  name string
)
using delta
location '/FileStore/OpenTableFormat/sinkdata/new_delta'

In [0]:
%sql
insert into bronze.new_delta
values
(4,'aa'),
(5,'bb'),
(6,'cc') 


num_affected_rows,num_inserted_rows
3,3


In [0]:
%sql
update bronze.new_delta 
set name = 'ZZ' where id =6

num_affected_rows
1


In [0]:
%python

'''By default the deletion vector has not been enabled , so we can manually enable it. This happened only in the community edition while in enterprise it will be automaticallly enabled'''

df = spark.read.json('/FileStore/OpenTableFormat/sinkdata/new_delta/_delta_log/00000000000000000003.json')
display(df)

add,commitInfo,remove
,"List(0509-071452-exzuppom, Databricks-Runtime/12.2.x-scala2.12, false, WriteSerializable, List(2116935914246921), UPDATE, List(2718, 806, 0, 1, 2, 0, 0, 806, 1, 1, 916, 1781), List([""(id#9802 = 6)""]), 2, 1746779973511, 48403121-3143-4b3a-8b7e-03b5f66150f7, 7635163290231677, contacttopaone@gmail.com)",
,,"List(true, 1746779973497, true, part-00000-8f479628-f93a-40b4-80a0-c2049ce5403a-c000.snappy.parquet, 806, List(1746779866000000, 1746779866000000, 1746779866000000, 268435456))"
"List(true, 1746779974000, part-00000-3212ebc3-b1ed-42a1-8b3e-44ecc51f897e-c000.snappy.parquet, 806, {""numRecords"":3,""minValues"":{""id"":4,""name"":""ZZ""},""maxValues"":{""id"":6,""name"":""bb""},""nullCount"":{""id"":0,""name"":0}}, List(1746779866000000, 1746779866000000, 1746779866000000, 268435456))",,


In [0]:
%sql
create table bronze.new_delta2
(
  id int,
  name string
)
using delta
location '/FileStore/OpenTableFormat/sinkdata/new_delta2'

In [0]:
%sql
-- We will trun on the deletion vector

ALTER TABLE bronze.new_delta2
SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);

In [0]:
%sql
insert into bronze.new_delta2
values
(7,'aa'),
(8,'bb'),
(9,'cc') 

num_affected_rows,num_inserted_rows
3,3


In [0]:
%sql
update bronze.new_delta2
set name = 'ZZ' where id =6

num_affected_rows
1


### Streaming Technique in open format

In [0]:
%python
df = spark.readStream.table('bronze.new_delta2')

In [0]:
%python

df.writeStream.format("delta")\
    .option("checkpointLocation","/FileStore/OpenTableFormat/sinkdata/stream_table/checkpoint")\
    .trigger(processingTime='10 seconds')\
    .option("path","/FileStore/OpenTableFormat/sinkdata/stream_table/")\
    .toTable("bronze.streamTable")

Out[10]: <pyspark.sql.streaming.query.StreamingQuery at 0x7f5fe53b0550>

In [0]:
%sql
select * from bronze.streamTable

id,name
1,aa
2,bb
3,cc
4,aa
5,bb
6,ZZ
7,aa
8,bb
9,cc
