In [6]:
# ensure compatible versions of PySpark and Delta Lake are installed
import logging
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col
import pyspark



logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

logger.info("Creating SparkSession...")

# Create a Spark session that installs Delta Lake onto the Spark Engine remotely and allocates Delta Tables to this Spark application
# If this crashes you can try restarting the Kernal in Jupyter by going to the tabs, selecting Kernal => Restart Kernal
spark = SparkSession.builder.master("spark://spark:7077").appName('DeltaLakeIntegration') \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.getOrCreate()

from delta.tables import DeltaTable

logger.info("Setting up Spark configurations...")
spark.conf.set("spark.sql.shuffle.partitions", 4)
print("Spark session created successfully")
spark_version = spark.version
print("Spark Version:", spark_version)
pyspark_version = pyspark.__version__
print("PySpark Version:", pyspark_version)

INFO:__main__:Creating SparkSession...
INFO:__main__:Setting up Spark configurations...


Spark session created successfully
Spark Version: 3.5.1


In [2]:
# Define delta output path
delta_output_path = "/data/delta_table_of_dog_owners"

# --- Dogs at the Park Scenario ---

print("Welcome to the Doggy Delta Adventure!")

# --- Create Test Data and Initial Write to Delta Table ---

# It's a bright day at the park. Let's meet our first set of dog owners!
owner_data = [Row(owner_id=1, owner_name="Alice", dog_name="Buddy"),
              Row(owner_id=2, owner_name="Bob", dog_name="Max"),
              Row(owner_id=3, owner_name="Charlie", dog_name="Bella"),
              Row(owner_id=4, owner_name="David", dog_name="Lucy"),
              Row(owner_id=5, owner_name="Emma", dog_name="Bailey"),
              Row(owner_id=6, owner_name="Frank", dog_name="Rosie")]
owner_df = spark.createDataFrame(owner_data)

# Write initial test data to Delta table
owner_df.write.format("delta").mode("overwrite").save(delta_output_path)
print("Initial doggy data successfully added!")

# --- Additional Dogs Joining the Park ---

print("\nNew dogs are arriving at the park!")
new_owner_data = [Row(owner_id=7, owner_name="Grace", dog_name="Cooper"),
                  Row(owner_id=8, owner_name="Hannah", dog_name="Daisy")]
new_owner_df = spark.createDataFrame(new_owner_data)

# Append new test data to the Delta table
new_owner_df.write.format("delta").mode("append").save(delta_output_path)
print("New doggy friends added to the park!")

# --- Let's Take a Look at Our Park's Population ---

print("\nLet's see who's at the park today:")
deltaTable = DeltaTable.forPath(spark, delta_output_path)
current_df = deltaTable.toDF()
current_df.show()

# --- Oh No, a Mischievous Dog Causes Trouble! ---

print("\nUh-oh! Trouble's brewing at the park.")
print("One mischievous dog has started a commotion!")

# Simulate a misbehaving dog by changing its owner
deltaTable.update("dog_name = 'Buddy'", {"owner_name": "'Mischief'"})
print("Buddy's owner has been mysteriously changed to 'Mischief'!")
current_df.show()

# --- Time to Restore Order ---

print("\nTime to restore peace and order at the park.")

# Rollback to the previous version to correct the owner of Buddy
previous_version = deltaTable.history().select("version").collect()[1][0]
deltaTable.restoreToVersion(previous_version)
print("Order restored! Buddy is back with his rightful owner.")

# --- Let's Check Our Park Population Again ---

print("\nLet's check the park's population after restoring order:")
current_df = deltaTable.toDF()
current_df.show()


Welcome to the Doggy Delta Adventure!


                                                                                

Py4JJavaError: An error occurred while calling o56.save.
: org.apache.spark.sql.delta.DeltaIOException: [DELTA_CANNOT_CREATE_LOG_PATH] Cannot create file:/data/delta_table_of_dog_owners/_delta_log
	at org.apache.spark.sql.delta.DeltaErrorsBase.cannotCreateLogPathException(DeltaErrors.scala:1534)
	at org.apache.spark.sql.delta.DeltaErrorsBase.cannotCreateLogPathException$(DeltaErrors.scala:1533)
	at org.apache.spark.sql.delta.DeltaErrors$.cannotCreateLogPathException(DeltaErrors.scala:3203)
	at org.apache.spark.sql.delta.DeltaLog.createDirIfNotExists$1(DeltaLog.scala:443)
	at org.apache.spark.sql.delta.DeltaLog.ensureLogDirectoryExist(DeltaLog.scala:447)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.prepareCommit(OptimisticTransaction.scala:1436)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.prepareCommit$(OptimisticTransaction.scala:1356)
	at org.apache.spark.sql.delta.OptimisticTransaction.prepareCommit(OptimisticTransaction.scala:142)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.liftedTree1$1(OptimisticTransaction.scala:1064)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commitImpl$1(OptimisticTransaction.scala:1056)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
	at org.apache.spark.sql.delta.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:142)
	at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133)
	at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
	at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
	at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:142)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122)
	at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112)
	at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:142)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.commitImpl(OptimisticTransaction.scala:1053)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.commitImpl$(OptimisticTransaction.scala:1048)
	at org.apache.spark.sql.delta.OptimisticTransaction.commitImpl(OptimisticTransaction.scala:142)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.commitIfNeeded(OptimisticTransaction.scala:1010)
	at org.apache.spark.sql.delta.OptimisticTransactionImpl.commitIfNeeded$(OptimisticTransaction.scala:1006)
	at org.apache.spark.sql.delta.OptimisticTransaction.commitIfNeeded(OptimisticTransaction.scala:142)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1(WriteIntoDelta.scala:112)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.$anonfun$run$1$adapted(WriteIntoDelta.scala:100)
	at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:223)
	at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:100)
	at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:201)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:304)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
# Stop the SparkSession (you'll have to re-run all the things if you click this)
logger.info("Stopping SparkSession...")
spark.stop()
logger.info("SparkSession stopped.")