In [1]:
#pip install delta-spark==3.3.2
#pip install pyspark==3.5.5

In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("deltaLakeApp")
        .master("local[4]")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.1.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.databricks.delta.retentionDurationCheck.enabled", False)
        .getOrCreate()
)

25/08/12 18:42:36 WARN Utils: Your hostname, jacques-work resolves to a loopback address: 127.0.1.1; using 192.168.1.23 instead (on interface wlp0s20f3)
25/08/12 18:42:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/jacques/Documents/courses/DatabricksCourse/venv/lib/python3.13/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jacques/.ivy2/cache
The jars for the packages stored in: /home/jacques/.ivy2/jars
io.delta#delta-spark_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-16b86063-7d73-4163-8e0f-3a6a5756186e;1.0
	confs: [default]
	found io.delta#delta-spark_2.12;3.1.0 in central
	found io.delta#delta-storage;3.1.0 in central
	found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 133ms :: artifacts dl 3ms
	:: modules in use:
	io.delta#delta-spark_2.12;3.1.0 from central in [default]
	io.delta#delta-storage;3.1.0 from central in [default]
	org.antlr#antlr4-runtime;4.9.3 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   3   |   

## Setup

In [4]:
checkpoint_path = "tmp/checkpoint"

orders_table = "orders"
orders_qa_table = "orders_qa"
orders_analyse = "orders_analyse"
streaming_example = "stream_table"
orders_qa_checkpoint = f"{checkpoint_path}/{orders_qa_table}"
orders_analyse_checkpoint = f"{checkpoint_path}/{orders_analyse}"
streaming_example_checkpoint = f"{checkpoint_path}/{streaming_example}"

In [5]:
import shutil
import os

spark.sql(f"DROP TABLE IF EXISTS {orders_table}")
spark.sql(f"DROP TABLE IF EXISTS {orders_qa_table}")

if os.path.exists(checkpoint_path):
  shutil.rmtree(checkpoint_path)

if os.path.exists("spark-warehouse"):
  shutil.rmtree("spark-warehouse")

In [4]:
spark.sql(f"""
          CREATE OR REPLACE TABLE {orders_table} (
            order_id STRING COMMENT 'Unique identifier for the order',
            client_id STRING COMMENT 'Identifier of the client who placed the order',
            order_value DECIMAL(18, 2) COMMENT 'Total value of the order in the local currency',
            order_date TIMESTAMP COMMENT 'Timestamp when the order was placed',
            country_code STRING COMMENT 'Country in ISO 3166 Alpha-2 code'
          )
          USING DELTA
          TBLPROPERTIES (delta.enableChangeDataFeed = true)
          COMMENT 'orders table'
""")

25/08/12 18:33:05 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

DataFrame[]

In [5]:
spark.sql(f"DESCRIBE EXTENDED {orders_table}").show(truncate=False)

+----------------------------+-----------------------------------------------------------------------------------+----------------------------------------------+
|col_name                    |data_type                                                                          |comment                                       |
+----------------------------+-----------------------------------------------------------------------------------+----------------------------------------------+
|order_id                    |string                                                                             |Unique identifier for the order               |
|client_id                   |string                                                                             |Identifier of the client who placed the order |
|order_value                 |decimal(18,2)                                                                      |Total value of the order in the local currency|
|order_date                 

In [6]:
from decimal import Decimal
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, TimestampType

order_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("client_id", StringType(), True),
    StructField("order_value", DecimalType(18, 2), True),
    StructField("order_date", TimestampType(), True),
    StructField("country_code", StringType(), True)
])

# Sample data
data = [
    ("ORD001", None, Decimal("125.50"), datetime(2025, 8, 1, 10, 30), "BR"),
    ("ORD002", "CL002", Decimal("299.99"), datetime(2025, 8, 1, 11, 15), "BR"),
    ("ORD003", "CL003", Decimal("75.00"), datetime(2025, 8, 2, 9, 45), "BR"),
    ("ORD004", "CL004", Decimal("560.00"), datetime(2025, 8, 2, 15, 10), "BR"),
    ("ORD005", "CL001", Decimal("130.25"), datetime(2025, 8, 3, 14, 0), "BR"),
    ("ORD006", "CL005", Decimal("199.99"), datetime(2025, 8, 3, 16, 30), "US"),
    ("ORD007", "CL006", Decimal("89.95"), datetime(2025, 8, 4, 12, 45), "US"),
    ("ORD008", "CL002", Decimal("149.00"), datetime(2025, 8, 4, 17, 25), "US"),
    ("ORD009", "CL007", Decimal("210.75"), datetime(2025, 8, 4, 18, 50), "US"),
    ("ORD010", "CL008", Decimal("55.55"), datetime(2025, 8, 4, 19, 5), "US"),
]

# Create DataFrame
orders_df = spark.createDataFrame(data, schema=order_schema)

# Write to Delta table
orders_df.coalesce(1).write.format("delta").mode("append").saveAsTable(orders_table)

                                                                                

In [7]:
df = spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 1) \
  .table(orders_table)

df.show()

+--------+---------+-----------+-------------------+------------+------------+---------------+--------------------+
|order_id|client_id|order_value|         order_date|country_code|_change_type|_commit_version|   _commit_timestamp|
+--------+---------+-----------+-------------------+------------+------------+---------------+--------------------+
|  ORD001|     NULL|     125.50|2025-08-01 10:30:00|          BR|      insert|              1|2025-08-12 18:33:...|
|  ORD002|    CL002|     299.99|2025-08-01 11:15:00|          BR|      insert|              1|2025-08-12 18:33:...|
|  ORD003|    CL003|      75.00|2025-08-02 09:45:00|          BR|      insert|              1|2025-08-12 18:33:...|
|  ORD004|    CL004|     560.00|2025-08-02 15:10:00|          BR|      insert|              1|2025-08-12 18:33:...|
|  ORD005|    CL001|     130.25|2025-08-03 14:00:00|          BR|      insert|              1|2025-08-12 18:33:...|
|  ORD006|    CL005|     199.99|2025-08-03 16:30:00|          US|      i

## Changing Data

In [8]:
spark.sql(f"DELETE FROM {orders_table} WHERE order_id='ORD006'")

DataFrame[num_affected_rows: bigint]

In [9]:
from delta.tables import DeltaTable

upsert_data = [
    ("ORD003", "CL003", Decimal("80.00"), datetime(2025, 8, 5, 9, 0), "BR"),   # Update
    ("ORD006", "CL005", Decimal("215.00"), datetime(2025, 8, 5, 10, 0), "BR"), # Update
    ("ORD011", "CL009", Decimal("99.99"), datetime(2025, 8, 5, 11, 30), "US"), # New
    ("ORD012", "CL010", Decimal("150.00"), datetime(2025, 8, 5, 12, 45), "US"),# New
    ("ORD013", "CL011", Decimal("300.00"), datetime(2025, 8, 5, 13, 15), "BR") # New
]

delta_table = DeltaTable.forName(spark, orders_table)
upsert_df = spark.createDataFrame(upsert_data, order_schema)

delta_table.alias("target") \
    .merge(upsert_df.alias("source"), "target.order_id = source.order_id") \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

In [10]:
df = spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 2) \
  .option("endingVersion", 4) \
  .table(orders_table)

df.show()

+--------+---------+-----------+-------------------+------------+----------------+---------------+--------------------+
|order_id|client_id|order_value|         order_date|country_code|    _change_type|_commit_version|   _commit_timestamp|
+--------+---------+-----------+-------------------+------------+----------------+---------------+--------------------+
|  ORD006|    CL005|     199.99|2025-08-03 16:30:00|          US|          delete|              2|2025-08-12 18:33:...|
|  ORD003|    CL003|      75.00|2025-08-02 09:45:00|          BR| update_preimage|              3|2025-08-12 18:33:...|
|  ORD003|    CL003|      80.00|2025-08-05 09:00:00|          BR|update_postimage|              3|2025-08-12 18:33:...|
|  ORD006|    CL005|     215.00|2025-08-05 10:00:00|          BR|          insert|              3|2025-08-12 18:33:...|
|  ORD011|    CL009|      99.99|2025-08-05 11:30:00|          US|          insert|              3|2025-08-12 18:33:...|
|  ORD012|    CL010|     150.00|2025-08-

## Streaming

In [27]:
streaming_process = (spark
  .readStream
  .option("readChangeFeed", "true")
  .table(orders_table)
  .writeStream
  .format('delta')
  .option("checkpointLocation", streaming_example_checkpoint)
  .trigger(availableNow=True)
  .outputMode("append")
)

streaming_process.toTable(streaming_example).awaitTermination()

25/08/12 18:35:07 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [13]:
spark.table(streaming_example).show()

+--------+---------+-----------+-------------------+------------+------------+---------------+--------------------+
|order_id|client_id|order_value|         order_date|country_code|_change_type|_commit_version|   _commit_timestamp|
+--------+---------+-----------+-------------------+------------+------------+---------------+--------------------+
|  ORD001|     NULL|     125.50|2025-08-01 10:30:00|          BR|      insert|              3|2025-08-12 18:33:...|
|  ORD002|    CL002|     299.99|2025-08-01 11:15:00|          BR|      insert|              3|2025-08-12 18:33:...|
|  ORD003|    CL003|      80.00|2025-08-05 09:00:00|          BR|      insert|              3|2025-08-12 18:33:...|
|  ORD004|    CL004|     560.00|2025-08-02 15:10:00|          BR|      insert|              3|2025-08-12 18:33:...|
|  ORD005|    CL001|     130.25|2025-08-03 14:00:00|          BR|      insert|              3|2025-08-12 18:33:...|
|  ORD006|    CL005|     215.00|2025-08-05 10:00:00|          BR|      i

In [None]:
spark.sql(f"UPDATE {orders_table} SET order_value=100.00 WHERE order_id='ORD007'")
streaming_process.toTable(streaming_example).awaitTermination()

<pyspark.sql.streaming.query.StreamingQuery at 0x783c846e9400>

In [17]:
spark.table(streaming_example).show()

+--------+---------+-----------+-------------------+------------+----------------+---------------+--------------------+
|order_id|client_id|order_value|         order_date|country_code|    _change_type|_commit_version|   _commit_timestamp|
+--------+---------+-----------+-------------------+------------+----------------+---------------+--------------------+
|  ORD001|     NULL|     125.50|2025-08-01 10:30:00|          BR|          insert|              3|2025-08-12 18:33:...|
|  ORD002|    CL002|     299.99|2025-08-01 11:15:00|          BR|          insert|              3|2025-08-12 18:33:...|
|  ORD003|    CL003|      80.00|2025-08-05 09:00:00|          BR|          insert|              3|2025-08-12 18:33:...|
|  ORD004|    CL004|     560.00|2025-08-02 15:10:00|          BR|          insert|              3|2025-08-12 18:33:...|
|  ORD005|    CL001|     130.25|2025-08-03 14:00:00|          BR|          insert|              3|2025-08-12 18:33:...|
|  ORD006|    CL005|     215.00|2025-08-

## Data Quality

In [18]:
df = spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 3) \
  .table(orders_table)

df.show()

+--------+---------+-----------+-------------------+------------+----------------+---------------+--------------------+
|order_id|client_id|order_value|         order_date|country_code|    _change_type|_commit_version|   _commit_timestamp|
+--------+---------+-----------+-------------------+------------+----------------+---------------+--------------------+
|  ORD006|    CL005|     199.99|2025-08-03 16:30:00|          US|          delete|              2|2025-08-12 18:33:...|
|  ORD003|    CL003|      75.00|2025-08-02 09:45:00|          BR| update_preimage|              3|2025-08-12 18:33:...|
|  ORD003|    CL003|      80.00|2025-08-05 09:00:00|          BR|update_postimage|              3|2025-08-12 18:33:...|
|  ORD006|    CL005|     215.00|2025-08-05 10:00:00|          BR|          insert|              3|2025-08-12 18:33:...|
|  ORD011|    CL009|      99.99|2025-08-05 11:30:00|          US|          insert|              3|2025-08-12 18:33:...|
|  ORD012|    CL010|     150.00|2025-08-

In [19]:
from pyspark.sql.functions import when, isnull, col, lit, sum, date_format


def change_data_feed_column_signal(df, operation, column_name, column_analised):
    return (df
    .withColumn(column_name, 
                when(col("_change_type") == 'delete', operation(column_analised)*lit(-1))
                .when(col("_change_type") == 'update_preimage', operation(column_analised)*lit(-1))
                .when(col("_change_type") == 'update_postimage', operation(column_analised))
                .when(col("_change_type") == 'insert', operation(column_analised))
            ))

def is_null_signal(column):
  return isnull(column).cast('int')

def lit_1(column):
  return lit(1)

In [20]:
columns = df.drop("_change_type", "_commit_version", "_commit_timestamp").columns

df_changed = df
select_expr = list()

for column in columns:
  df_changed = change_data_feed_column_signal(df_changed, is_null_signal, f"{column}_signal", column)
  select_expr.append(f"sum({column}_signal) as {column}_null_count")

df_changed = change_data_feed_column_signal(df_changed, lit_1, "row_value_count", "*")
select_expr.append(f"sum(row_value_count) number_of_rows")

df_changed = change_data_feed_column_signal(df_changed, col, "order_value_sum", "order_value")
select_expr.append(f"sum(order_value_sum) order_total_value")

df_selected = df_changed.selectExpr(*select_expr)

df_with_table_name = df_selected.withColumn("table_source_name", lit(orders_table))

df_with_table_name.show()

+-------------------+--------------------+----------------------+---------------------+-----------------------+--------------+-----------------+-----------------+
|order_id_null_count|client_id_null_count|order_value_null_count|order_date_null_count|country_code_null_count|number_of_rows|order_total_value|table_source_name|
+-------------------+--------------------+----------------------+---------------------+-----------------------+--------------+-----------------+-----------------+
|                  0|                   1|                     0|                    0|                      0|            13|          2465.98|           orders|
+-------------------+--------------------+----------------------+---------------------+-----------------------+--------------+-----------------+-----------------+



## Streaming Data Quality

In [21]:
def _write_table_update(columns):
  column_set = dict()
  for column in columns:
    column_set[column] = f"source.{column} + target.{column}"
  return column_set

def process_table(batch_df, batch_id):
  df_changed = batch_df
  columns = batch_df.drop("_change_type", "_commit_version", "_commit_timestamp").columns
  select_expr = list()

  for column in columns:
    df_changed = change_data_feed_column_signal(df_changed, is_null_signal, f"{column}_signal", column)
    select_expr.append(f"sum({column}_signal) as {column}_null_count")

  df_changed = change_data_feed_column_signal(df_changed, lit_1, "row_value_count", "*")
  select_expr.append(f"sum(row_value_count) number_of_rows")

  df_changed = change_data_feed_column_signal(df_changed, col, "order_value_sum", "order_value")
  select_expr.append(f"sum(order_value_sum) order_total_value")

  df_selected = df_changed.selectExpr(*select_expr)

  df_with_table_name = df_selected.withColumn("table_source_name", lit(orders_table))

  if spark.catalog.tableExists(orders_qa_table):
    set_columns = _write_table_update(df_with_table_name.drop("table_source_name").columns)

    delta_table = DeltaTable.forName(spark, orders_qa_table)

    delta_table.alias("target").merge(
      df_with_table_name.alias("source"),
          "target.table_source_name = source.table_source_name"
      ).whenMatchedUpdate(
          set = set_columns
      ).whenNotMatchedInsertAll().execute()
  else:
    df_with_table_name.write.format("delta").mode("append").saveAsTable(orders_qa_table)


streaming = (spark
             .readStream
             .option("readChangeFeed", "true")
             .table(orders_table)
             .writeStream
             .format('delta')
             .option("checkpointLocation", orders_qa_checkpoint)
             .foreachBatch(process_table)
             .trigger(availableNow=True)             
)

streaming.start().awaitTermination()

25/08/12 18:34:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [22]:
spark.table(orders_qa_table).show()

+-------------------+--------------------+----------------------+---------------------+-----------------------+--------------+-----------------+-----------------+
|order_id_null_count|client_id_null_count|order_value_null_count|order_date_null_count|country_code_null_count|number_of_rows|order_total_value|table_source_name|
+-------------------+--------------------+----------------------+---------------------+-----------------------+--------------+-----------------+-----------------+
|                  0|                   1|                     0|                    0|                      0|            13|          2476.03|           orders|
+-------------------+--------------------+----------------------+---------------------+-----------------------+--------------+-----------------+-----------------+



## Fixing table null value

In [23]:
spark.table(orders_table).show()

+--------+---------+-----------+-------------------+------------+
|order_id|client_id|order_value|         order_date|country_code|
+--------+---------+-----------+-------------------+------------+
|  ORD001|     NULL|     125.50|2025-08-01 10:30:00|          BR|
|  ORD002|    CL002|     299.99|2025-08-01 11:15:00|          BR|
|  ORD003|    CL003|      80.00|2025-08-05 09:00:00|          BR|
|  ORD004|    CL004|     560.00|2025-08-02 15:10:00|          BR|
|  ORD005|    CL001|     130.25|2025-08-03 14:00:00|          BR|
|  ORD006|    CL005|     215.00|2025-08-05 10:00:00|          BR|
|  ORD007|    CL006|     100.00|2025-08-04 12:45:00|          US|
|  ORD008|    CL002|     149.00|2025-08-04 17:25:00|          US|
|  ORD009|    CL007|     210.75|2025-08-04 18:50:00|          US|
|  ORD010|    CL008|      55.55|2025-08-04 19:05:00|          US|
|  ORD011|    CL009|      99.99|2025-08-05 11:30:00|          US|
|  ORD012|    CL010|     150.00|2025-08-05 12:45:00|          US|
|  ORD013|

In [24]:
spark.sql(f"UPDATE {orders_table} SET client_id='CL001' WHERE order_id='ORD001'")

DataFrame[num_affected_rows: bigint]

In [25]:
df = spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 4) \
  .option("endingVersion", 4) \
  .table(orders_table)

df.show()

+--------+---------+-----------+-------------------+------------+----------------+---------------+--------------------+
|order_id|client_id|order_value|         order_date|country_code|    _change_type|_commit_version|   _commit_timestamp|
+--------+---------+-----------+-------------------+------------+----------------+---------------+--------------------+
|  ORD007|    CL006|      89.95|2025-08-04 12:45:00|          US| update_preimage|              4|2025-08-12 18:33:...|
|  ORD007|    CL006|     100.00|2025-08-04 12:45:00|          US|update_postimage|              4|2025-08-12 18:33:...|
+--------+---------+-----------+-------------------+------------+----------------+---------------+--------------------+



In [26]:
streaming.start().awaitTermination()
spark.table(orders_qa_table).show()

25/08/12 18:34:40 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


+-------------------+--------------------+----------------------+---------------------+-----------------------+--------------+-----------------+-----------------+
|order_id_null_count|client_id_null_count|order_value_null_count|order_date_null_count|country_code_null_count|number_of_rows|order_total_value|table_source_name|
+-------------------+--------------------+----------------------+---------------------+-----------------------+--------------+-----------------+-----------------+
|                  0|                   0|                     0|                    0|                      0|            13|          2476.03|           orders|
+-------------------+--------------------+----------------------+---------------------+-----------------------+--------------+-----------------+-----------------+



## Data Analysis using CDF

In [28]:
df = spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 3) \
  .table(orders_table)

In [29]:
df_signal_rule_orders = change_data_feed_column_signal(df, col, "order_value", "order_value")
df_signal_rule_count = change_data_feed_column_signal(df_signal_rule_orders, lit_1, "any", "count_order")

(df_signal_rule_count
 .withColumn("year_month", date_format(col("order_date"), "yyyy_MM"))
 .groupBy("year_month", "country_code", "client_id")
 .agg(sum("any").alias("total_orders"), sum("order_value").alias("total_value"))
 .fillna("IS_NULL", subset="client_id").show())

+----------+------------+---------+------------+-----------+
|year_month|country_code|client_id|total_orders|total_value|
+----------+------------+---------+------------+-----------+
|   2025_08|          US|    CL005|           0|       0.00|
|   2025_08|          BR|    CL003|           1|      80.00|
|   2025_08|          US|    CL009|           1|      99.99|
|   2025_08|          US|    CL010|           1|     150.00|
|   2025_08|          BR|    CL011|           1|     300.00|
|   2025_08|          BR|    CL005|           1|     215.00|
|   2025_08|          US|    CL007|           1|     210.75|
|   2025_08|          BR|    CL002|           1|     299.99|
|   2025_08|          US|    CL002|           1|     149.00|
|   2025_08|          US|    CL006|           1|      89.95|
|   2025_08|          BR|  IS_NULL|           1|     125.50|
|   2025_08|          US|    CL008|           1|      55.55|
|   2025_08|          BR|    CL004|           1|     560.00|
|   2025_08|          BR

In [30]:
def process_table_analytics(batch_df, batch_id):
  df_signal_rule_orders = change_data_feed_column_signal(batch_df, col, "order_value", "order_value")
  df_signal_rule_count = change_data_feed_column_signal(df_signal_rule_orders, lit_1, "any", "count_order")

  df_grouped = (df_signal_rule_count
  .withColumn("year_month", date_format(col("order_date"), "yyyy_MM"))
  .groupBy("year_month", "country_code", "client_id")
  .agg(sum("any").alias("total_orders"), sum("order_value").alias("total_value"))
  .fillna("IS_NULL", subset="client_id"))

  if spark.catalog.tableExists(orders_analyse):
    set_columns = _write_table_update(df_grouped.drop("year_month", "country_code", "client_id").columns)

    delta_table = DeltaTable.forName(spark, orders_analyse)

    delta_table.alias("target").merge(
      df_grouped.alias("source"),
          """
          target.year_month = source.year_month AND
          target.country_code = source.country_code AND
          target.client_id = source.client_id
          """
      ).whenMatchedUpdate(
          set = set_columns
      ).whenNotMatchedInsertAll().execute()
  else:
    df_grouped.write.format("delta").mode("append").saveAsTable(orders_analyse)


streaming = (spark
             .readStream
             .option("readChangeFeed", "true")
             .table(orders_table)
             .writeStream
             .format('delta')
             .option("checkpointLocation", orders_analyse_checkpoint)
             .foreachBatch(process_table_analytics)
             .trigger(availableNow=True)             
)

streaming.start().awaitTermination()

25/08/12 18:35:40 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [31]:
spark.table(orders_analyse).show()

+----------+------------+---------+------------+-----------+
|year_month|country_code|client_id|total_orders|total_value|
+----------+------------+---------+------------+-----------+
|   2025_08|          BR|    CL002|           1|     299.99|
|   2025_08|          US|    CL002|           1|     149.00|
|   2025_08|          US|    CL009|           1|      99.99|
|   2025_08|          US|    CL007|           1|     210.75|
|   2025_08|          BR|    CL001|           2|     255.75|
|   2025_08|          US|    CL006|           1|     100.00|
|   2025_08|          BR|    CL011|           1|     300.00|
|   2025_08|          BR|    CL003|           1|      80.00|
|   2025_08|          BR|    CL004|           1|     560.00|
|   2025_08|          BR|    CL005|           1|     215.00|
|   2025_08|          US|    CL008|           1|      55.55|
|   2025_08|          US|    CL010|           1|     150.00|
+----------+------------+---------+------------+-----------+



In [33]:
# Sample data
data = [
    ("ORD015", None, Decimal("125.50"), datetime(2025, 8, 3, 10, 30), "BR"),
    ("ORD016", "CL002", Decimal("100.99"), datetime(2025, 8, 2, 11, 15), "BR"),
    ("ORD017", "CL003", Decimal("20.00"), datetime(2025, 8, 5, 9, 45), "BR"),
    ("ORD018", "CL004", Decimal("80.00"), datetime(2025, 8, 9, 15, 10), "BR"),
]

# Create DataFrame
orders_df = spark.createDataFrame(data, schema=order_schema)

# Write to Delta table
orders_df.coalesce(1).write.format("delta").mode("append").saveAsTable(orders_table)

In [34]:
streaming.start().awaitTermination()
spark.table(orders_analyse).show()

25/08/12 18:36:07 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


+----------+------------+---------+------------+-----------+
|year_month|country_code|client_id|total_orders|total_value|
+----------+------------+---------+------------+-----------+
|   2025_08|          BR|    CL002|           3|     501.97|
|   2025_08|          BR|    CL003|           3|     120.00|
|   2025_08|          BR|    CL004|           3|     720.00|
|   2025_08|          BR|  IS_NULL|           2|     251.00|
|   2025_08|          US|    CL002|           1|     149.00|
|   2025_08|          US|    CL009|           1|      99.99|
|   2025_08|          US|    CL007|           1|     210.75|
|   2025_08|          BR|    CL001|           2|     255.75|
|   2025_08|          US|    CL006|           1|     100.00|
|   2025_08|          BR|    CL011|           1|     300.00|
|   2025_08|          BR|    CL005|           1|     215.00|
|   2025_08|          US|    CL008|           1|      55.55|
|   2025_08|          US|    CL010|           1|     150.00|
+----------+------------

In [35]:
spark.sql(f"UPDATE {orders_table} SET client_id='CL001' WHERE order_id='ORD015'")
streaming.start().awaitTermination()
spark.table(orders_analyse).show()

25/08/12 18:36:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


+----------+------------+---------+------------+-----------+
|year_month|country_code|client_id|total_orders|total_value|
+----------+------------+---------+------------+-----------+
|   2025_08|          BR|    CL001|           4|     506.75|
|   2025_08|          BR|    CL002|           3|     501.97|
|   2025_08|          BR|    CL003|           3|     120.00|
|   2025_08|          BR|    CL004|           3|     720.00|
|   2025_08|          BR|  IS_NULL|           0|       0.00|
|   2025_08|          US|    CL002|           1|     149.00|
|   2025_08|          US|    CL009|           1|      99.99|
|   2025_08|          US|    CL007|           1|     210.75|
|   2025_08|          US|    CL006|           1|     100.00|
|   2025_08|          BR|    CL011|           1|     300.00|
|   2025_08|          BR|    CL005|           1|     215.00|
|   2025_08|          US|    CL008|           1|      55.55|
|   2025_08|          US|    CL010|           1|     150.00|
+----------+------------

## Vacuum

In [37]:
df = spark.read \
  .option("readChangeFeed", "true") \
  .option("startingVersion", 0) \
  .option("endingVersion", 3) \
  .table(orders_table)

df.show()

+--------+---------+-----------+-------------------+------------+----------------+---------------+--------------------+
|order_id|client_id|order_value|         order_date|country_code|    _change_type|_commit_version|   _commit_timestamp|
+--------+---------+-----------+-------------------+------------+----------------+---------------+--------------------+
|  ORD006|    CL005|     199.99|2025-08-03 16:30:00|          US|          delete|              2|2025-08-12 18:33:...|
|  ORD003|    CL003|      75.00|2025-08-02 09:45:00|          BR| update_preimage|              3|2025-08-12 18:33:...|
|  ORD003|    CL003|      80.00|2025-08-05 09:00:00|          BR|update_postimage|              3|2025-08-12 18:33:...|
|  ORD006|    CL005|     215.00|2025-08-05 10:00:00|          BR|          insert|              3|2025-08-12 18:33:...|
|  ORD011|    CL009|      99.99|2025-08-05 11:30:00|          US|          insert|              3|2025-08-12 18:33:...|
|  ORD012|    CL010|     150.00|2025-08-

In [38]:
spark.sql(f"VACUUM {orders_table} RETAIN 0 HOURS")

                                                                                

Deleted 12 files and directories in a total of 2 directories.


DataFrame[path: string]

In [39]:
df.show()

25/08/12 18:40:42 ERROR Executor: Exception in task 0.0 in stage 423.0 (TID 14494)
org.apache.spark.SparkFileNotFoundException: File file:/home/jacques/Documents/courses/DatabricksCourse/tips/spark-warehouse/orders/_change_data/cdc-00000-b22b1601-873c-443a-bbcf-854ffe476400.c000.snappy.parquet does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:781)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:222)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	

Py4JJavaError: An error occurred while calling o1183.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 423.0 failed 1 times, most recent failure: Lost task 0.0 in stage 423.0 (TID 14494) (192.168.1.23 executor driver): org.apache.spark.SparkFileNotFoundException: File file:/home/jacques/Documents/courses/DatabricksCourse/tips/spark-warehouse/orders/_change_data/cdc-00000-b22b1601-873c-443a-bbcf-854ffe476400.c000.snappy.parquet does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:781)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:222)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at sun.reflect.GeneratedMethodAccessor244.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkFileNotFoundException: File file:/home/jacques/Documents/courses/DatabricksCourse/tips/spark-warehouse/orders/_change_data/cdc-00000-b22b1601-873c-443a-bbcf-854ffe476400.c000.snappy.parquet does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.readCurrentFileNotFoundError(QueryExecutionErrors.scala:781)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:222)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:282)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:131)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
