In [0]:
#1.Creating Delta Table and inserting the data into it.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("DeltaTable").getOrCreate()

data = [(1, "satya",24),(2,"rohitha",20),(3, "rana",30)]
columns = ["id","name","age"]

df = spark.createDataFrame(data,columns)

#converting dataframe into deltatable
df.write.format("delta").mode("overwrite").save("/delta/people")

#check the delta table content
delta_df = spark.read.format("delta").load("/delta/people")
delta_df.show()



+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|rohitha| 20|
|  1|  satya| 24|
|  3|   rana| 30|
+---+-------+---+



In [0]:
#2.upsert data using mergeinto- adding columns and updating existing columns

from delta.tables import DeltaTable

#create exsisting deltatable

delta_table = DeltaTable.forPath(spark, "/delta/people")

#add new data
new_data = [(1,"satya",25),(4,"mouni",30)]
columns = ["id","name","age"]
new_df = spark.createDataFrame(new_data,columns)

#create the temp view for new data
new_df.createOrReplaceTempView("new_data")

#
delta_table.alias("target").merge(
      source = spark.table("new_data").alias("source"),
      condition = "target.id = source.id"
).whenMatchedUpdate(set ={"age":"source.age"})\
.whenNotMatchedInsert(values={"name":"source.name", "age":"source.age"}).execute()

#verify
delta_table.toDF().show()


+----+-------+---+
|  id|   name|age|
+----+-------+---+
|   2|rohitha| 20|
|   1|  satya| 25|
|   3|   rana| 30|
|NULL|  shyam| 40|
|NULL|  mouni| 30|
|NULL|    ram| 35|
|NULL|    ram| 35|
|NULL|    ram| 35|
+----+-------+---+



In [0]:
#3.implementing time travel in delta tables

#display the current data and version of delta table
current_df = spark.read.format("delta").load("/delta/people")
current_df.show()

#query a previous version of the delta table
version_0_df = spark.read.format("delta").option("versionAsOf", 1).load("/delta/people")
version_0_df.show()


+----+-------+---+
|  id|   name|age|
+----+-------+---+
|   2|rohitha| 20|
|   1|  satya| 25|
|   3|   rana| 30|
|NULL|  shyam| 40|
|NULL|  mouni| 30|
|NULL|    ram| 35|
|NULL|    ram| 35|
|NULL|    ram| 35|
+----+-------+---+

+----+-------+---+
|  id|   name|age|
+----+-------+---+
|   2|rohitha| 20|
|   1|  satya| 25|
|   3|   rana| 30|
|NULL|    ram| 35|
+----+-------+---+



In [0]:
#here we are again adding new data (mergeinto) and checking the version using timetravel

from delta.tables import DeltaTable

#create exsisting deltatable
delta_table = DeltaTable.forPath(spark,"/delta/people")

#add new data 
new_data1 = [(1,"satya",26),(6,"mouni",31)]
columns = ["id","name","age"]
new_df1 = spark.createDataFrame(new_data1,columns)

#create view for new data
new_df1.createOrReplaceTempView("new_data1")

#mergeinto
delta_table.alias("target").merge(
    source=spark.table("new_data1").alias("source"),
    condition="target.name = source.name"
).whenMatchedUpdate(set={"age": "source.age"}) \
 .whenNotMatchedInsert(values={"name": "source.name", "age": "source.age"}) \
 .execute()

#check
delta_table.toDF().show()

+----+-------+---+
|  id|   name|age|
+----+-------+---+
|   2|rohitha| 20|
|   1|  satya| 26|
|   3|   rana| 30|
|NULL|  mouni| 31|
|NULL|  shyam| 40|
|NULL|    ram| 35|
|NULL|    ram| 35|
|NULL|    ram| 35|
+----+-------+---+



In [0]:
#check with timetravel versions

#current data version
current_df = spark.read.format("delta").load("/delta/people")
current_df.show()

#check the time travel query - for 0
version_0_df =spark.read.format("delta").option("VersionAsOf",0).load("/delta/people")
version_0_df.show()

#check for version 1
version_1_df =spark.read.format("delta").option("VersionAsOf",1).load("/delta/people")
version_1_df.show()

+----+-------+---+
|  id|   name|age|
+----+-------+---+
|   2|rohitha| 20|
|   1|  satya| 26|
|   3|   rana| 30|
|NULL|  mouni| 31|
|NULL|  shyam| 40|
|NULL|    ram| 35|
|NULL|    ram| 35|
|NULL|    ram| 35|
+----+-------+---+

+---+-------+---+
| id|   name|age|
+---+-------+---+
|  2|rohitha| 20|
|  1|  satya| 24|
|  3|   rana| 30|
+---+-------+---+

+----+-------+---+
|  id|   name|age|
+----+-------+---+
|   2|rohitha| 20|
|   1|  satya| 25|
|   3|   rana| 30|
|NULL|    ram| 35|
+----+-------+---+



In [0]:
##here we are again adding new data (mergeinto) and checking the version using timetravel

from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark,"/delta/people")

new_data2 = [(3,"rana",32)]
column = ["id","name","age"]
new_data2_df = spark.createDataFrame(new_data2,column)

new_data2_df.createOrReplaceTempView("new_data2")

delta_table.alias("target").merge(
    source=spark.table("new_data2").alias("source"),
    condition="target.name = source.name"
).whenMatchedUpdate(set={"age": "source.age"}) \
 .whenNotMatchedInsert(values={"name": "source.name", "age": "source.age"}) \
 .execute()

delta_table.toDF().show()

+----+-------+---+
|  id|   name|age|
+----+-------+---+
|   2|rohitha| 20|
|   1|  satya| 26|
|   3|   rana| 32|
|NULL|  mouni| 31|
|NULL|  shyam| 40|
|NULL|    ram| 35|
|NULL|    ram| 35|
|NULL|    ram| 35|
+----+-------+---+



In [0]:
# creating version of the data using time travel

#current data
current_df = spark.read.format("delta").load("/delta/people")
current_df.show()

version_0_df = spark.read.format("delta").option("VersionAsOf",0).load("/delta/people")
version_0_df.show()

version_1_df = spark.read.format("delta").option("VersionAsOf",1).load("/delta/people")
version_1_df.show()

version_2_df = spark.read.format("delta").option("VersionAsOf",2).load("/delta/people")
version_2_df.show()

In [0]:
#Using Delta table to Handle schema evolution

#creating original dataframe with the two columns
initialdata = [(1,"john")]
initialdata_df = spark.createDataFrame(initialdata,["id","name"])
initialdata_df.write.format("delta").mode("overwrite").save("/delta/schemaEvolution")

#create new the dataframe  with new added column
new_data = [(1,"john",32)]
columns = ["id","name","age"]
new_data_df = spark.createDataFrame(new_data,columns)

#append new data with schema evolution
new_data_df.write.format("delta").mode("append").option("mergeSchema", "true").save("/delta/schemaEvolution")

#verify the schema evolution
schema_df = spark.read.format("delta").load("/delta/schemaEvolution")
schema_df.printSchema()
schema_df.show()


root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)

+---+----+----+
| id|name| age|
+---+----+----+
|  1|john|  32|
|  1|john|NULL|
+---+----+----+



In [0]:
#deleting records from delta table

from delta.tables import DeltaTable

delta_table= DeltaTable.forPath(spark,"/delta/people")

#deleting the data where the age is less than 30
delta_table.delete("age <30")

delta_table.toDF().show()

+----+-----+---+
|  id| name|age|
+----+-----+---+
|   3| rana| 32|
|NULL|mouni| 31|
|NULL|shyam| 40|
|NULL|  ram| 35|
|NULL|  ram| 35|
|NULL|  ram| 35|
+----+-----+---+



In [0]:
#optimise Delta table (compact small file)
spark.sql("OPTIMIZE '/delta/people'")

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [0]:
#filter table with the specific columns

filter_df = spark.read.format("delta").load("/delta/people").select("name").filter("age >=35")

filter_df.show()

+-----+
| name|
+-----+
|  ram|
|  ram|
|  ram|
|shyam|
+-----+



In [0]:
#Z- ordered column 
# Z-Ordering by a column (e.g., "age")

spark.sql("OPTIMIZE  '/delta/people' ZORDER BY (age)")
delta_table.toDF().show()

+----+-----+---+
|  id| name|age|
+----+-----+---+
|NULL|  ram| 35|
|NULL|  ram| 35|
|NULL|  ram| 35|
|NULL|shyam| 40|
|NULL|mouni| 31|
|   3| rana| 32|
+----+-----+---+

