In [6]:
from pyspark.sql.types import *
from delta.tables import *

In [7]:
spark.sql.dropTable("countries")
delta_table = (
 DeltaTable.create(spark)
 .tableName("countries")
 .addColumn("id", dataType=LongType(), nullable=False)
 .addColumn("country", dataType=StringType(), nullable=False)
 .addColumn("capital", dataType=StringType(), nullable=False)
 .execute()
)

AnalysisException: [TABLE_OR_VIEW_ALREADY_EXISTS] Cannot create table or view `default`.`countries` because it already exists.
Choose a different name, drop or replace the existing object, or add the IF NOT EXISTS clause to tolerate pre-existing objects.

In [3]:
data = [
 (1, "United Kingdom", "London"),
 (2, "Canada", "Toronto")
 ]
schema = ["id", "country", "capital"]
df = spark.createDataFrame(data, schema=schema)
(
df
.write
.format("delta")
.insertInto("default.countries")
)

                                                                                

In [4]:
data = [(3, 'United States', 'Washington, D.C.') ]
# Define the schema for the Delta table
schema = ["id", "country", "capital"]
df = spark.createDataFrame(data, schema=schema)
(df
.write
.format("delta")
.mode("append")
.saveAsTable("default.countries")
)

                                                                                

In [3]:
delta_table = DeltaTable.forName(spark, "default.countries")
delta_table_df = delta_table.toDF()
delta_table_df.show()

24/11/17 12:41:49 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
24/11/17 12:41:49 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
24/11/17 12:41:52 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
24/11/17 12:41:52 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.20.0.2
24/11/17 12:41:52 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
24/11/17 12:41:52 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


AnalysisException: [DELTA_MISSING_DELTA_TABLE] `default`.`countries` is not a Delta table.

In [5]:
delta_table_df.filter(delta_table_df.capital == 'London').show()

+---+--------------+-------+
| id|       country|capital|
+---+--------------+-------+
|  1|United Kingdom| London|
|  1|United Kingdom| London|
+---+--------------+-------+



In [6]:
delta_table_df.select("id", "capital").show()

+---+----------------+
| id|         capital|
+---+----------------+
|  3|Washington, D.C.|
|  3|Washington, D.C.|
|  1|          London|
|  1|          London|
|  2|         Toronto|
|  2|         Toronto|
+---+----------------+



In [9]:
delta_table.update(
 condition = "id = 1",
 set = { "country": "'U.K.'" } )
delta_table_df = delta_table.toDF()
delta_table_df.show()

                                                                                

+---+-------------+----------------+
| id|      country|         capital|
+---+-------------+----------------+
|  3|United States|Washington, D.C.|
|  3|United States|Washington, D.C.|
|  2|       Canada|         Toronto|
|  2|       Canada|         Toronto|
|  1|         U.K.|          London|
|  1|         U.K.|          London|
+---+-------------+----------------+



In [10]:
fullHistoryDF = delta_table.history()

In [12]:
fullHistoryDF["version","timestamp","operation","readVersion","isBlindAppend"].show()

+-------+--------------------+------------+-----------+-------------+
|version|           timestamp|   operation|readVersion|isBlindAppend|
+-------+--------------------+------------+-----------+-------------+
|      7|2024-11-17 09:49:...|      UPDATE|          6|        false|
|      6|2024-11-17 09:49:...|      UPDATE|          5|        false|
|      5|2024-11-17 09:49:...|      UPDATE|          4|        false|
|      4|2024-11-17 09:48:...|       WRITE|          3|         true|
|      3|2024-11-17 09:48:...|       WRITE|          2|         true|
|      2|2024-11-17 09:45:...|       WRITE|          1|         true|
|      1|2024-11-17 09:45:...|       WRITE|          0|         true|
|      0|2024-11-17 09:44:...|CREATE TABLE|       NULL|         true|
+-------+--------------------+------------+-----------+-------------+



In [13]:
df = (spark.read.format("delta")
 .option("versionAsOf", 3)
 .load("./spark-warehouse/countries"))
df.show()

+---+--------------+----------------+
| id|       country|         capital|
+---+--------------+----------------+
|  3| United States|Washington, D.C.|
|  1|United Kingdom|          London|
|  1|United Kingdom|          London|
|  2|        Canada|         Toronto|
|  2|        Canada|         Toronto|
+---+--------------+----------------+



In [15]:
df = (spark.read.format("delta")
.option("timestampAsOf", "2024-11-17 09:49")
 .load("./spark-warehouse/countries"))
df.show()

+---+--------------+----------------+
| id|       country|         capital|
+---+--------------+----------------+
|  3| United States|Washington, D.C.|
|  3| United States|Washington, D.C.|
|  1|United Kingdom|          London|
|  1|United Kingdom|          London|
|  2|        Canada|         Toronto|
|  2|        Canada|         Toronto|
+---+--------------+----------------+



In [17]:
from pyspark.sql.functions import col

delta_table.delete(col("id") == 2) 
delta_table_df = delta_table.toDF()
delta_table_df.show()

+---+-------------+----------------+
| id|      country|         capital|
+---+-------------+----------------+
|  3|United States|Washington, D.C.|
|  3|United States|Washington, D.C.|
|  1|         U.K.|          London|
|  1|         U.K.|          London|
+---+-------------+----------------+



In [19]:
fullHistoryDF = delta_table.history()
fullHistoryDF["version","timestamp","operation","readVersion","isBlindAppend"].show()

+-------+--------------------+------------+-----------+-------------+
|version|           timestamp|   operation|readVersion|isBlindAppend|
+-------+--------------------+------------+-----------+-------------+
|      8|2024-11-17 09:51:...|      DELETE|          7|        false|
|      7|2024-11-17 09:49:...|      UPDATE|          6|        false|
|      6|2024-11-17 09:49:...|      UPDATE|          5|        false|
|      5|2024-11-17 09:49:...|      UPDATE|          4|        false|
|      4|2024-11-17 09:48:...|       WRITE|          3|         true|
|      3|2024-11-17 09:48:...|       WRITE|          2|         true|
|      2|2024-11-17 09:45:...|       WRITE|          1|         true|
|      1|2024-11-17 09:45:...|       WRITE|          0|         true|
|      0|2024-11-17 09:44:...|CREATE TABLE|       NULL|         true|
+-------+--------------------+------------+-----------+-------------+



In [20]:
delta_table.restoreToVersion(3)
delta_table_df = delta_table.toDF()
delta_table_df.show()

24/11/17 09:52:33 WARN DAGScheduler: Broadcasting large task binary with size 1072.7 KiB


+---+--------------+----------------+
| id|       country|         capital|
+---+--------------+----------------+
|  3| United States|Washington, D.C.|
|  1|United Kingdom|          London|
|  1|United Kingdom|          London|
|  2|        Canada|         Toronto|
|  2|        Canada|         Toronto|
+---+--------------+----------------+



In [21]:
(
spark
.createDataFrame(
 [
 (1, 'India', 'New Delhi'),
 (4, 'Australia', 'Canberra')
 ],
 schema=["id", "country", "capital"]
 )
.write
.format("delta")
.mode("overwrite") # specify the output mode
.saveAsTable("default.countries")
)

delta_table_df = delta_table.toDF()
delta_table_df.show()

24/11/17 09:53:06 ERROR HiveAlterHandler: Failed to alter table default.countries
24/11/17 09:53:06 WARN HiveExternalCatalog: Could not alter schema of table `default`.`countries` in a Hive compatible way. Updating Hive metastore in Spark SQL specific format.
java.lang.reflect.InvocationTargetException
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at org.apache.spark.sql.hive.client.Shim_v2_1.alterTable(HiveShim.scala:1611)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$alterTableDataSchema$1(HiveClientImpl.scala:633)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:303)
	at org.apache

+---+---------+---------+
| id|  country|  capital|
+---+---------+---------+
|  4|Australia| Canberra|
|  1|    India|New Delhi|
+---+---------+---------+



In [22]:
delta_table.restoreToVersion(3)
data = [
 (4, 'India', 'New D'),
 ]
schema = ["id", "country", "capital"]
df = spark.createDataFrame(data, schema=schema)
(
df
.write
.format("delta")
.insertInto("default.countries")
)
delta_table_df = delta_table.toDF()
delta_table_df.show()

24/11/17 09:53:25 WARN DAGScheduler: Broadcasting large task binary with size 1073.2 KiB
                                                                                

+---+--------------+----------------+
| id|       country|         capital|
+---+--------------+----------------+
|  3| United States|Washington, D.C.|
|  1|United Kingdom|          London|
|  1|United Kingdom|          London|
|  2|        Canada|         Toronto|
|  2|        Canada|         Toronto|
|  4|         India|           New D|
+---+--------------+----------------+



In [30]:
idf = (
 spark
 .createDataFrame([
 (4, 'India', 'New Delhi'),
 (5, 'Australia', 'Canberra')],
 schema=["id", "country", "capital"]
 )
 )
delta_table.alias("target").merge(
 source = idf.alias("source"),
 condition = "source.id = target.id"
 ).whenMatchedUpdate(set =
 {
 "country": "source.country",
 "capital": "source.capital"
 }
 ).whenNotMatchedInsert(values =
 {
 "id": "source.id",
 "country": "source.country",
 "capital": "source.capital"
 }
 ).execute()

delta_table_df = delta_table.toDF()
delta_table_df.show()

24/11/17 12:40:07 ERROR MergeIntoCommand: Fatal error in MERGE with materialized source in attempt 1.
org.apache.spark.sql.delta.DeltaAnalysisException: [DELTA_SCHEMA_CHANGE_SINCE_ANALYSIS] The schema of your Delta table has changed in an incompatible way since your DataFrame
or DeltaTable object was created. Please redefine your DataFrame or DeltaTable object.
Changes:
Latest schema is missing field(s): id, country, capital
	at org.apache.spark.sql.delta.DeltaErrorsBase.schemaChangedSinceAnalysis(DeltaErrors.scala:553)
	at org.apache.spark.sql.delta.DeltaErrorsBase.schemaChangedSinceAnalysis$(DeltaErrors.scala:540)
	at org.apache.spark.sql.delta.DeltaErrors$.schemaChangedSinceAnalysis(DeltaErrors.scala:3203)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2(MergeIntoCommand.scala:89)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2$adapted(MergeIntoCommand.scala:83)
	at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaL

AnalysisException: [DELTA_SCHEMA_CHANGE_SINCE_ANALYSIS] The schema of your Delta table has changed in an incompatible way since your DataFrame
or DeltaTable object was created. Please redefine your DataFrame or DeltaTable object.
Changes:
Latest schema is missing field(s): id, country, capital

In [27]:
delta_table.detail()["format","createdAt","lastModified","numFiles","sizeInBytes","minReaderVersion","minWriterVersion"].show()

+------+--------------------+--------------------+--------+-----------+----------------+----------------+
|format|           createdAt|        lastModified|numFiles|sizeInBytes|minReaderVersion|minWriterVersion|
+------+--------------------+--------------------+--------+-----------+----------------+----------------+
| delta|2024-11-17 09:44:...|2024-11-17 09:53:...|       6|       6208|               1|               2|
+------+--------------------+--------------------+--------+-----------+----------------+----------------+



In [29]:
fullHistoryDF = delta_table.history()
fullHistoryDF["version","timestamp","operation","readVersion","isBlindAppend"].show()

Py4JJavaError: An error occurred while calling o108.history.
: java.io.FileNotFoundException: No such file or directory: file:/opt/spark/work-dir/workspace/spark-warehouse/countries/_delta_log
	at io.delta.storage.HadoopFileSystemLogStore.listFrom(HadoopFileSystemLogStore.java:56)
	at org.apache.spark.sql.delta.storage.LogStoreAdaptor.listFrom(LogStore.scala:452)
	at org.apache.spark.sql.delta.storage.DelegatingLogStore.listFrom(DelegatingLogStore.scala:127)
	at org.apache.spark.sql.delta.DeltaHistoryManager$.getEarliestDeltaFile(DeltaHistoryManager.scala:286)
	at org.apache.spark.sql.delta.DeltaHistoryManager.$anonfun$getHistory$2(DeltaHistoryManager.scala:66)
	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.delta.DeltaHistoryManager.getHistory(DeltaHistoryManager.scala:66)
	at io.delta.tables.execution.DeltaTableOperations.$anonfun$executeHistory$1(DeltaTableOperations.scala:56)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.delta.DeltaTableUtils$.withActiveSession(DeltaTable.scala:491)
	at io.delta.tables.execution.DeltaTableOperations.executeHistory(DeltaTableOperations.scala:54)
	at io.delta.tables.execution.DeltaTableOperations.executeHistory$(DeltaTableOperations.scala:51)
	at io.delta.tables.DeltaTable.executeHistory(DeltaTable.scala:44)
	at io.delta.tables.DeltaTable.history(DeltaTable.scala:136)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Unknown Source)
