In [None]:
!pip uninstall -y thinc opencv-python opencv-contrib-python opencv-python-headless

In [1]:
# Install Java 11 (works well with Spark 3.5.x)
!apt-get -y install openjdk-11-jdk > /dev/null

# Install PySpark (bundles Spark 3.5.6)
!pip -q install "pyspark[connect]==3.5.6" "delta-spark==3.2.0"

# No "SPARK_HOME" needed on Colab.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["PATH"] += ":/usr/lib/jvm/java-11-openjdk-amd64/bin"

# Make sure old envs don't override this session
os.environ.pop("SPARK_HOME", None)
os.environ.pop("PYSPARK_SUBMIT_ARGS", None)

In [2]:
def _create_delta_spark():
  from pyspark.sql import SparkSession
  from delta import configure_spark_with_delta_pip
  builder = SparkSession.builder.appName("DeltaLakeApp") \
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
  .config("spark.jars.packages","io.delta:delta-core_2.12:2.0.0")
  return configure_spark_with_delta_pip(builder).getOrCreate()

spark = _create_delta_spark()

Reading some data already available in Google Colab under `sample_data` folder.

In [4]:
# read sample data
df = spark.read.options(inferSchema=True, header=True).csv("/content/sample_data/california_housing_train.csv")
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



Create a database to create our delta table under.

In [7]:
#create a database
spark.sql("create database my_demo")
spark.sql("use my_demo")

DataFrame[]

Now, we can write the data in the delta format.

In [8]:
# write csv data as deltap
df.write.mode("overwrite").format("delta").save("my_demo/df_delta")

And, read in the delta format.

In [9]:
#read delta table
df_delta = spark.read.format("delta").load("my_demo/df_delta")

In [10]:
df_delta.show(1)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 1 row



Now, let's read the delta table and add a new column to it as below.


In [11]:
# create a new column in housing_df_delta dataframe with the name "median_house_value_new" and its values as "median_house_value"*1.1
df_delta = df_delta.withColumn("median_house_value_new", df_delta["median_house_value"] * 1.1)
df_delta.show(1)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|median_house_value_new|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+----------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|               73590.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+----------------------+
only showing top 1 row



As we know that the already saved files and the files have a schema difference, we need to pass an option `mergeSchema`.

In [13]:
df_delta.write\
.option("mergeSchema", "true")\
.mode("append").format("delta").save("my_demo/df_delta")

Let's the read the available versions for our delta table as below. I have 2 versions: version 0 is the table creation version and version 1 is the changed schema write version.

In [14]:
#Load Delta table
from delta import DeltaTable
deltaTable = DeltaTable.forPath(spark, "my_demo/df_delta")

#View history
history = deltaTable.history().show(truncate =False)

+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------+------------+-----------------------------------+
|version|timestamp              |userId|userName|operation|operationParameters                   |job |notebook|clusterId|readVersion|isolationLevel|isBlindAppend|operationMetrics                                                 |userMetadata|engineInfo                         |
+-------+-----------------------+------+--------+---------+--------------------------------------+----+--------+---------+-----------+--------------+-------------+-----------------------------------------------------------------+------------+-----------------------------------+
|1      |2025-09-27 07:18:33.077|NULL  |NULL    |WRITE    |{mode -> Append, partitionBy -> []}   |NULL|NULL    |NULL     |0          |Serializable  |false        |

Let's read the 0th version of the delta table. Observe the output that we don't have the newly added column.

In [15]:
# read version 0
version0DF = spark.read.format("delta")\
  .option("versionAsOf", 0)\
  .load("my_demo/df_delta")

version0DF.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|  -114.58|   33.63|    

Whereas if you read version 1 of the delta table, we have the newly added column available.

In [17]:
#read version 1
version1DF = spark.read.format("delta")\
  .option("versionAsOf", 1)\
  .load("my_demo/df_delta")

version1DF.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+----------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|median_house_value_new|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+----------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|                  NULL|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|                  NULL|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|                  NULL|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|         