## Write to and read from a Delta Lake table

### Write a Spark DataFrame to a Delta Lake table

In [18]:
%run ../../common_notebooks/setup_spark_connection.ipynb

In [19]:
delta_path = "/opt/spark/delta-tables/"

In [20]:
data = spark.range(0, 5)

(data
  .write
  .format("delta")
  .mode("overwrite")
  .save(delta_path + "table1")
)

### Read the above Delta Lake table to a Spark DataFrame and display the DataFrame

In [21]:
df = (spark
        .read
        .format("delta")
        .load(delta_path + "table1")
        .orderBy("id")
      )

df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



## Overwrite a Delta Lake table

### Overwrite the Delta Lake table written in the above step

In [22]:
data = spark.range(5, 10)

view_data = data.createOrReplaceTempView("data_view")
table2 = delta_path + "table1"
spark.sql(f"""
  MERGE INTO delta.`{table2}` AS target
  USING data_view AS source
  ON target.id = source.id
  WHEN NOT MATCHED THEN
    INSERT *
""")

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

### Read the above overwritten Delta Lake table to a Spark DataFrame and display the DataFrame

In [23]:
df = (spark
        .read
        .format("delta")
        .load(delta_path + "table1")
        .orderBy("id")
      )

df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



## Delta Lake and [ACID](https://en.wikipedia.org/wiki/ACID)

### Update Delta Lake Table

In [24]:
from delta.tables import *
from pyspark.sql.functions import *


delta_table = DeltaTable.forPath(spark, delta_path + "table1")

spark.sql(f"""
  UPDATE delta.`{delta_path}table1`
  SET id = id + 100
  WHERE id % 2 == 0
""")


DataFrame[num_affected_rows: bigint]

In [26]:
spark.sql("""
vacuum delta.`/opt/spark/delta-tables/table1`
""")

DataFrame[path: string]

In [28]:
spark.sql("""
        optimize delta.`/opt/spark/delta-tables/table1`
        """)

DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,clusteringStats:struct<inputZCubeFiles:struct<numFiles:bigint,size:bigint>,inputOtherFiles:struct<numFiles:bigint,size:bigint>,inputNumZCubes:bigint,mergedFiles:struct<numFiles:bigint,size:bigint>,numOutputZCubes:bigint>,numBins:bigint,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,

In [None]:
spark.sql("""
          alter table delta.`/opt/spark/delta-tables/table1`
          set tblproperties ('enableChangeDataCapture' = 'true'
          """)

In [11]:
delta_table.toDF().orderBy("id").show()
table_view = delta_table.createOrReplaceTempView("delta_table_view")
spark.sql("""
  select * from table_view
  order by id
""")
table_view.show()

InvalidPlanInput: No handler found for extension

JVM stacktrace:
org.apache.spark.sql.connect.common.InvalidPlanInput
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelationPlugin$3(SparkConnectPlanner.scala:253)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelationPlugin(SparkConnectPlanner.scala:253)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:235)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$3(SessionHolder.scala:477)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:476)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:147)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:133)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformSort(SparkConnectPlanner.scala:2249)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:163)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$3(SessionHolder.scala:477)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:476)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:147)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:133)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformShowString(SparkConnectPlanner.scala:306)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:150)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$3(SessionHolder.scala:477)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:476)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:147)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:74)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handlePlan(ExecuteThreadRunner.scala:314)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:225)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:112)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:186)
	at org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:102)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:111)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:347)

In [None]:
spark.sql(f"""
  UPDATE delta.`{delta_path}table2`
  SET id = id + 100
  WHERE id % 2 == 0
""")

###  `delete`

In [None]:
# Delete every even value
(delta_table
  .delete(
    condition = expr("id % 2 == 0")
  )
)

(delta_table
  .toDF()
  .orderBy("id")
  .show()
)

### `merge` Delta Lake Table

In [None]:
# Upsert (merge) new data
new_data = spark.range(0, 20)

(delta_table.alias("old_data")
  .merge(
      new_data.alias("new_data"),
      "old_data.id = new_data.id"
      )
  .whenMatchedUpdate(set = { "id": col("new_data.id") })
  .whenNotMatchedInsert(values = { "id": col("new_data.id") })
  .execute()
)

(delta_table
  .toDF()
  .orderBy("id")
  .show()
)

## Time travel feature of Delta Lake

### Display the entire history of the above Delta Lake table

In [None]:
# get the full history of the table
delta_table_history = (DeltaTable
                        .forPath(spark, "/tmp/delta-table")
                        .history()
                      )

(delta_table_history
   .select("version", "timestamp", "operation", "operationParameters", "operationMetrics", "engineInfo")
   .show()
)

### Latest version of the Delta Lake table

In [None]:
# get the full history of the table
delta_table_history = (DeltaTable
                        .forPath(spark, "/tmp/delta-table")
                        .history()
                      )

(delta_table_history
   .select("version", "timestamp", "operation", "operationParameters", "operationMetrics", "engineInfo")
   .show()
)

### Latest version of the Delta Lake table

In [None]:
df = (spark
        .read
        .format("delta")
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()

### Time travel to the version `0` of the Delta Lake table using Delta Lake's history feature

In [None]:
df = (spark
        .read
        .format("delta")
        .option("versionAsOf", 0) # we pass an option `versionAsOf` with the required version number we are interested in
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()

### Time travel to the version `3` of the Delta Lake table using Delta Lake's  history feature

In [None]:
df = (spark
        .read
        .format("delta")
        .option("versionAsOf", 3) # we pass an option `versionAsOf` with the required version number we are interested in
        .load("/tmp/delta-table")
        .orderBy("id")
      )

df.show()