In [1]:
!pip install delta-spark==3.0.0

Collecting delta-spark==3.0.0
  Downloading delta_spark-3.0.0-py3-none-any.whl.metadata (2.0 kB)
Collecting py4j==0.10.9.7 (from pyspark<3.6.0,>=3.5.0->delta-spark==3.0.0)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading delta_spark-3.0.0-py3-none-any.whl (21 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hInstalling collected packages: py4j, delta-spark
Successfully installed delta-spark-3.0.0 py4j-0.10.9.7


In [18]:
from pyspark.sql import SparkSession
from delta import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

In [19]:
builder = SparkSession.builder.appName("Delta Lake Farming Campaign Tracking") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [20]:
schema = StructType([
    StructField("campaign_id", IntegerType(), nullable=False),
    StructField("culture", StringType(), nullable=False),
    StructField("area", FloatType(), nullable=False),
    StructField("yield", FloatType(), nullable=False),
    StructField("year", IntegerType(), nullable=False)
])

In [21]:
data = [
    (1, "Wheat", 150.5, 3.2, 2023),
    (2, "Corn", 200.0, 5.1, 2023),
    (3, "Barley", 120.3, 2.8, 2023),
    (1, "Wheat", 160.0, 3.5, 2022),
    (2, "Corn", 195.0, 5.0, 2022),
    (3, "Barley", 115.0, 2.7, 2022),
    (1, "Wheat", 140.0, 3.0, 2021),
    (2, "Corn", 180.0, 4.8, 2021),
    (3, "Barley", 110.0, 2.6, 2021)
]
df = spark.createDataFrame(data, schema=schema)
df.show()


+-----------+-------+-----+-----+----+
|campaign_id|culture| area|yield|year|
+-----------+-------+-----+-----+----+
|          1|  Wheat|150.5|  3.2|2023|
|          2|   Corn|200.0|  5.1|2023|
|          3| Barley|120.3|  2.8|2023|
|          1|  Wheat|160.0|  3.5|2022|
|          2|   Corn|195.0|  5.0|2022|
|          3| Barley|115.0|  2.7|2022|
|          1|  Wheat|140.0|  3.0|2021|
|          2|   Corn|180.0|  4.8|2021|
|          3| Barley|110.0|  2.6|2021|
+-----------+-------+-----+-----+----+



In [27]:
DeltaTable.createIfNotExists(spark) \
    .tableName("farming_campaign") \
    .property("delta.enableChangeDataFeed", "true") \
    .addColumns(schema) \
    .execute()

df.write.format("delta").mode("overwrite").saveAsTable("farming_campaign")

In [29]:
new_data = [
    (2, "Corn", 210.0, 5.3, 2023), 
    (4, "Soybean", 180.0, 3.6, 2023), 
    (1, "Wheat", 155.0, 3.4, 2024), 
    (3, "Barley", 125.0, 3.0, 2024) 
]
new_df = spark.createDataFrame(new_data, schema=schema)
new_df.show()

+-----------+-------+-----+-----+----+
|campaign_id|culture| area|yield|year|
+-----------+-------+-----+-----+----+
|          2|   Corn|210.0|  5.3|2023|
|          4|Soybean|180.0|  3.6|2023|
|          1|  Wheat|155.0|  3.4|2024|
|          3| Barley|125.0|  3.0|2024|
+-----------+-------+-----+-----+----+



In [30]:
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "farming_campaign")

In [31]:
delta_table.alias("old_data") \
    .merge(
        new_df.alias("new_data"),
        "old_data.campaign_id = new_data.campaign_id AND old_data.year = new_data.year"
    ) \
    .whenMatchedUpdate(set={"area": "new_data.area", "yield": "new_data.yield", "culture": "new_data.culture", "year": "new_data.year"}) \
    .whenNotMatchedInsert(values={"campaign_id": "new_data.campaign_id", "culture": "new_data.culture", "area": "new_data.area", "yield": "new_data.yield", "year": "new_data.year"}) \
    .execute()

delta_table.toDF().show()

+-----------+-------+-----+-----+----+
|campaign_id|culture| area|yield|year|
+-----------+-------+-----+-----+----+
|          1|  Wheat|155.0|  3.4|2024|
|          2|   Corn|210.0|  5.3|2023|
|          3| Barley|125.0|  3.0|2024|
|          4|Soybean|180.0|  3.6|2023|
|          2|   Corn|180.0|  4.8|2021|
|          3| Barley|110.0|  2.6|2021|
|          3| Barley|115.0|  2.7|2022|
|          3| Barley|120.3|  2.8|2023|
|          1|  Wheat|160.0|  3.5|2022|
|          1|  Wheat|150.5|  3.2|2023|
|          1|  Wheat|140.0|  3.0|2021|
|          2|   Corn|195.0|  5.0|2022|
+-----------+-------+-----+-----+----+



In [32]:
cdf = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", "0") \
    .table("farming_campaign")
cdf.show()

+-----------+-------+-----+-----+----+----------------+---------------+--------------------+
|campaign_id|culture| area|yield|year|    _change_type|_commit_version|   _commit_timestamp|
+-----------+-------+-----+-----+----+----------------+---------------+--------------------+
|          1|  Wheat|155.0|  3.4|2024|          insert|              8|2024-10-27 11:37:...|
|          2|   Corn|200.0|  5.1|2023| update_preimage|              8|2024-10-27 11:37:...|
|          2|   Corn|210.0|  5.3|2023|update_postimage|              8|2024-10-27 11:37:...|
|          3| Barley|125.0|  3.0|2024|          insert|              8|2024-10-27 11:37:...|
|          4|Soybean|180.0|  3.6|2023|          insert|              8|2024-10-27 11:37:...|
|          1|  Wheat|155.0|  3.4|2024|          insert|             10|2024-10-27 11:45:...|
|          2|   Corn|200.0|  5.1|2023| update_preimage|             10|2024-10-27 11:45:...|
|          2|   Corn|210.0|  5.3|2023|update_postimage|             10

In [33]:
delta_table.history().show()

+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|     10|2024-10-27 11:45:...|  NULL|    NULL|               MERGE|{predicate -> ["(...|NULL|    NULL|     NULL|          9|  Serializable|        false|{numTargetRowsCop...|        NULL|Apache-Spark/3.5....|
|      9|2024-10-27 11:40:...|  NULL|    NULL|CREATE OR REPLACE...|{isManaged -> tru...|NULL|    NULL|     NULL|          8|  Serializable|        false|{numFiles -

In [45]:
previous_version_df = spark.read.format("delta").option("versionAsOf", 1).table("farming_campaign")
previous_version_df.show()

+-----------+-------+-----+-----+----+
|campaign_id|culture| area|yield|year|
+-----------+-------+-----+-----+----+
|          3| Barley|120.3|  2.8|2023|
|          1|  Wheat|150.5|  3.2|2023|
|          2|   Corn|200.0|  5.1|2023|
+-----------+-------+-----+-----+----+



In [46]:
delta_table.toDF().show()

+-----------+-------+-----+-----+----+
|campaign_id|culture| area|yield|year|
+-----------+-------+-----+-----+----+
|          1|  Wheat|155.0|  3.4|2024|
|          2|   Corn|210.0|  5.3|2023|
|          3| Barley|125.0|  3.0|2024|
|          4|Soybean|180.0|  3.6|2023|
|          2|   Corn|180.0|  4.8|2021|
|          3| Barley|110.0|  2.6|2021|
|          3| Barley|115.0|  2.7|2022|
|          3| Barley|120.3|  2.8|2023|
|          1|  Wheat|160.0|  3.5|2022|
|          1|  Wheat|150.5|  3.2|2023|
|          1|  Wheat|140.0|  3.0|2021|
|          2|   Corn|195.0|  5.0|2022|
+-----------+-------+-----+-----+----+



In [48]:
delta_table.restoreToVersion(1)
delta_table.toDF().show()

+-----------+-------+-----+-----+----+
|campaign_id|culture| area|yield|year|
+-----------+-------+-----+-----+----+
|          3| Barley|120.3|  2.8|2023|
|          1|  Wheat|150.5|  3.2|2023|
|          2|   Corn|200.0|  5.1|2023|
+-----------+-------+-----+-----+----+



In [None]:
spark.stop()