* Formation Continue EMIASD, Univ. Paris Dauphine, Promo 6
* Author: Mohamed-Amine Baazizi
* Affiliation: LIP6 - Faculté des Sciences - Sorbonne Université
* Email: mohamed-amine.baazizi@lip6.fr
* Reusing without consent of the author is strictly forbidden
* June 2025

<p align="center">
  <a href="https://colab.research.google.com/github/auduvignac/Data_Lakehouse/blob/main/notebooks/example/delta_lake_main_correction-1.ipynb" target="_blank">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Ouvrir dans Google Colab"/>
  </a>
</p>

# Delta Lake


## Outline

This lab is dedicated to practicing Delta Lake. It begins with a set of demos meant to illustrate the usage of Delta, on small examples.
A use case based on realistic data is then presented and followed by the analysis of query plans generated for Delta operations.


For the official documentation visit https://docs.delta.io/latest/index.html









## Prerequisite

In [1]:
!wget -q https://raw.githubusercontent.com/auduvignac/Data_Lakehouse/refs/heads/main/setup_env.py -O setup_env.py
%run setup_env.py

⚡ Exécution sur Colab : vérification stricte des dépendances…
✅ delta-spark 3.2.1 — OK
✅ parquet-tools 0.2.16 — OK
✅ pyngrok 7.3.0 — OK
✅ pyspark 3.5.3 — OK
✅ Toutes les dépendances satisfont les contraintes.


### System setup

In [None]:
%%capture
%pip install pyspark==3.5.3
%pip install -q delta-spark==3.2.1
%pip install pyngrok

In [None]:
%pip list|grep spark

In [None]:
import pyspark

print(f"PySpark version: {pyspark.__version__}")

In [2]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

local = "local[*]"
appName = "Formation Continue EMIASD - Delta Lake "
localConfig = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "8G").\
  set("spark.driver.memory","8G").\
  set("spark.sql.catalogImplementation","in-memory").\
  set("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension").\
  set("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").\
  set("spark.jars.packages","io.delta:delta-spark_2.12:3.1.0").\
  set("spark.databricks.delta.schema.autoMerge.enabled","true")

spark = SparkSession.builder.config(conf = localConfig).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [None]:
spark

### Data import

In [3]:
%%capture
! wget https://nuage.lip6.fr/s/BbQ9rzGHKJexKYp/download/sales.tar -O /tmp/sales.tar
!mkdir /tmp/delta
! tar xvf /tmp/sales.tar -C /tmp/delta

In [4]:
!ls /tmp/delta/sales

march23_sales.csv  salesOriginal.csv


## Demo1: first steps

### load the data into delta

In [5]:
query = """
CREATE TABLE IF NOT EXISTS delta.`/tmp/delta-table` USING DELTA AS
SELECT col1 AS id
FROM
VALUES 0,
       1,
       2,
       3,
       4;
"""
spark.sql(query)

DataFrame[]

In [6]:
query = """
SELECT *
FROM delta.`/tmp/delta-table`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  1|
|  3|
|  5|
|  7|
|  9|
| 11|
| 13|
| 15|
| 17|
| 19|
+---+



### update the data
#### overwrite

In [7]:
query = """
INSERT OVERWRITE delta.`/tmp/delta-table`
SELECT col1 AS id
FROM
VALUES 5,
       6,
       7,
       8,
       9;
"""
spark.sql(query)

DataFrame[]

In [8]:
query = """
SELECT *
FROM delta.`/tmp/delta-table`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  7|
|  8|
|  9|
|  5|
|  6|
+---+



#### conditional overwrite

In [9]:
query = """
UPDATE delta.`/tmp/delta-table`
SET id = id + 100
WHERE id % 2 == 0;
"""
spark.sql(query)

DataFrame[num_affected_rows: bigint]

In [10]:
query = """
SELECT *
FROM delta.`/tmp/delta-table`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  7|
|108|
|  9|
|  5|
|106|
+---+



In [11]:
query = """
DELETE
FROM delta.`/tmp/delta-table`
WHERE id % 2 == 0;
"""
spark.sql(query)

DataFrame[num_affected_rows: bigint]

In [12]:
query = """
SELECT *
FROM delta.`/tmp/delta-table`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  7|
|  9|
|  5|
+---+



In [13]:
query = """
CREATE OR REPLACE TEMP VIEW newData AS
SELECT col1 AS id
FROM
VALUES 1,
       3,
       5,
       7,
       9,
       11,
       13,
       15,
       17,
       19;
"""
spark.sql(query)

DataFrame[]

In [14]:
query = """
SELECT *
FROM `newData`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  1|
|  3|
|  5|
|  7|
|  9|
| 11|
| 13|
| 15|
| 17|
| 19|
+---+



In [15]:
query = """
MERGE INTO delta.`/tmp/delta-table` AS oldData
USING newData
ON oldData.id = newData.id
WHEN MATCHED
  THEN UPDATE SET id = newData.id
WHEN NOT MATCHED
  THEN INSERT (id) VALUES (newData.id);
"""
spark.sql(query)

DataFrame[num_affected_rows: bigint, num_updated_rows: bigint, num_deleted_rows: bigint, num_inserted_rows: bigint]

In [16]:
query = """
SELECT *
FROM delta.`/tmp/delta-table`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  1|
|  3|
|  5|
|  7|
|  9|
| 11|
| 13|
| 15|
| 17|
| 19|
+---+



### view history

In [17]:
query = """
SELECT *
FROM delta.`/tmp/delta-table` VERSION AS OF 0;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  0|
|  1|
+---+



In [18]:
query = """
SELECT *
FROM delta.`/tmp/delta-table` VERSION AS OF 1;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  7|
|  8|
|  9|
|  5|
|  6|
+---+



## Creating synthetic data


### Persons

In [19]:
query = """
CREATE TABLE IF NOT EXISTS delta.`/tmp/persons` USING DELTA AS
SELECT col1 AS serial,
       col2 AS name,
       col3 AS age,
       col4 AS address
FROM
VALUES ("12345", "Alice", 25, "123 Main St"),
       ("67890", "Bob", 30, "456 Oak Ave"),
       ("24680", "Charlie", 35, "789 Elm St");
"""
spark.sql(query)

DataFrame[]

In [20]:
query = """
SELECT *
FROM delta.`/tmp/persons`;
"""
spark.sql(query).show()

+------+-------+---+------------+
|serial|   name|age|     address|
+------+-------+---+------------+
| 97362|  Lorry| 40|290 Wise Ave|
| 12345|  Alice| 25| 123 Main St|
| 78120|    Dan| 42|432 Holly Rd|
| 67890|    Bob| 30| 456 Oak Ave|
| 24680|Charlie| 35|  789 Elm St|
+------+-------+---+------------+



In [21]:
! ls -alR /tmp/persons

/tmp/persons:
total 44
drwxr-xr-x 3 root root 4096 Aug 22 16:51 .
drwxrwxrwt 1 root root 4096 Aug 22 19:00 ..
drwxr-xr-x 2 root root 4096 Aug 22 16:51 _delta_log
-rw-r--r-- 1 root root 1227 Aug 22 16:51 part-00000-151ee026-9112-4340-bf5b-89a36d54a688-c000.snappy.parquet
-rw-r--r-- 1 root root   20 Aug 22 16:51 .part-00000-151ee026-9112-4340-bf5b-89a36d54a688-c000.snappy.parquet.crc
-rw-r--r-- 1 root root 1234 Aug 22 16:33 part-00000-af27b3e0-8354-4976-ae06-beb29b16ed2d-c000.snappy.parquet
-rw-r--r-- 1 root root   20 Aug 22 16:33 .part-00000-af27b3e0-8354-4976-ae06-beb29b16ed2d-c000.snappy.parquet.crc
-rw-r--r-- 1 root root 1241 Aug 22 16:51 part-00001-691f8a13-8d31-431f-8432-fddd6de70d4f-c000.snappy.parquet
-rw-r--r-- 1 root root   20 Aug 22 16:51 .part-00001-691f8a13-8d31-431f-8432-fddd6de70d4f-c000.snappy.parquet.crc
-rw-r--r-- 1 root root 1213 Aug 22 16:33 part-00001-6de1b723-c519-40bd-90e2-305dd5c1cc0c-c000.snappy.parquet
-rw-r--r-- 1 root root   20 Aug 22 16:33 .part-00001-6de1b72

In [22]:
query = """
CREATE OR REPLACE TEMP VIEW newPersons AS
SELECT col1 AS serial,
       col2 AS name,
       col3 AS age,
       col4 AS address
FROM
VALUES ("78120", "Dan", 42, "432 Holly Rd"),
       ("97362", "Lorry", 40, "290 Wise Ave"),
       ("12345", "Alice", 25, "123 Main St")
"""
spark.sql(query)

DataFrame[]

In [23]:
query = """
SELECT *
from newPersons
"""
spark.sql(query).show()

+------+-----+---+------------+
|serial| name|age|     address|
+------+-----+---+------------+
| 78120|  Dan| 42|432 Holly Rd|
| 97362|Lorry| 40|290 Wise Ave|
| 12345|Alice| 25| 123 Main St|
+------+-----+---+------------+



### Salaries

In [24]:
query = """
CREATE TABLE IF NOT EXISTS delta.`/tmp/salaries` USING DELTA AS
SELECT col1 AS serial,
       col2 AS salary
FROM
VALUES ("12345", 45000),
       ("67890", 52000),
       ("24680", 36000),
       ("78120", 60000),
       ("97362",38000)
"""
spark.sql(query)

DataFrame[]

In [25]:
query = """
SELECT *
from delta.`/tmp/salaries`
"""
spark.sql(query).show()

+------+------+
|serial|salary|
+------+------+
| 24680| 46000|
| 78120| 60000|
| 97362| 39000|
| 12345| 47000|
| 67890| 52000|
+------+------+



In [26]:
query = """
CREATE OR REPLACE TEMP VIEW newSalaries AS
SELECT col1 AS serial,
       col2 AS salary
FROM
VALUES ("12345", 47000),
       ("67890", 50000),
       ("24680", 46000),
       ("78120", 61000),
       ("97362",39000)
"""
spark.sql(query)

DataFrame[]

In [27]:
query = """
SELECT *
from newSalaries
"""
spark.sql(query).show()

+------+------+
|serial|salary|
+------+------+
| 12345| 47000|
| 67890| 50000|
| 24680| 46000|
| 78120| 61000|
| 97362| 39000|
+------+------+



### Sales

In [28]:
query = """
CREATE TABLE IF NOT EXISTS delta.`/tmp/sales` USING DELTA AS
SELECT col1 AS product_id,
       col2 AS quantity,
       col3 AS totalprice
FROM
VALUES ("CHA_2",2,60),
       ("BED_4",1,300),
       ("SHO_15",2,60)
"""
spark.sql(query)

DataFrame[]

In [29]:
query = """
SELECT *
FROM delta.`/tmp/sales`
"""
spark.sql(query).show()

+----------+--------+----------+---------+-----+
|product_id|quantity|totalprice| category|color|
+----------+--------+----------+---------+-----+
|     BED_4|       1|       300|Furniture|brown|
|     BED_6|       1|       200|     NULL| NULL|
|     CHA_2|       3|        90|Furniture| blue|
|    SHO_15|       5|       150|    Cloth|black|
+----------+--------+----------+---------+-----+



In [30]:
!ls -hR /tmp/sales

/tmp/sales:
_delta_log
part-00000-0216a1a1-3238-49ac-b50f-40f4509ec290-c000.snappy.parquet
part-00000-2388d698-53c1-4eb4-8cad-a56188fc1953-c000.snappy.parquet
part-00000-2e5b5f71-a3d9-4b14-8680-23933d206feb-c000.snappy.parquet
part-00001-bef91a23-dfec-4be9-983f-81272af79b04-c000.snappy.parquet

/tmp/sales/_delta_log:
00000000000000000000.json  00000000000000000001.json  00000000000000000002.json


In [56]:
query = """
CREATE IF NOT EXISTS delta.`/tmp/salesStatus` USING DELTA AS
SELECT product_id, quantity, totalprice, CAST('available' AS STRING) AS status
FROM delta.`/tmp/sales`
"""
spark.sql(query)

DataFrame[]

In [52]:
query = """
SELECT * FROM delta.`/tmp/salesStatus`
"""
spark.sql(query).show()

+----------+--------+----------+
|product_id|quantity|totalprice|
+----------+--------+----------+
|     BED_4|       1|       300|
|    SHO_15|       2|        60|
|     CHA_2|       2|        60|
+----------+--------+----------+



In [57]:
spark.sql(f"DESCRIBE TABLE EXTENDED delta.`/tmp/salesStatus`").show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|          product_id|              string|   NULL|
|            quantity|                 int|   NULL|
|          totalprice|                 int|   NULL|
|              status|              string|   NULL|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|                Name|delta.`file:/tmp/...|       |
|                Type|             MANAGED|       |
|            Location|    /tmp/salesStatus|       |
|            Provider|               delta|       |
|    Table Properties|[delta.minReaderV...|       |
+--------------------+--------------------+-------+



In [34]:
!ls -hR /tmp/salesStatus

/tmp/salesStatus:
_delta_log
part-00000-039c92f3-fe48-4649-8f56-ab99b5413286-c000.snappy.parquet
part-00001-1d799168-f36a-40be-b4d7-8e3704765dff-c000.snappy.parquet

/tmp/salesStatus/_delta_log:
00000000000000000000.json


In [35]:
query = """
CREATE OR REPLACE TEMP VIEW newSales AS
SELECT col1 as product_id, col2 as quantity, col3 as totalprice
FROM VALUES ("SHO_15",3,90),("CHA_2",1,30),("BED_6",1,200)

"""
spark.sql(query)

DataFrame[]

In [36]:
query = """
SELECT *
FROM newSales
"""
spark.sql(query).show()

+----------+--------+----------+
|product_id|quantity|totalprice|
+----------+--------+----------+
|    SHO_15|       3|        90|
|     CHA_2|       1|        30|
|     BED_6|       1|       200|
+----------+--------+----------+



### Products

In [37]:
query = """
CREATE OR REPLACE TEMP VIEW products AS
SELECT col1 as product_id, col2 as category, col3 as color
FROM VALUES ("CHA_2","Furniture","blue"),("BED_4","Furniture","brown"),("SHO_15","Cloth","black")

"""
spark.sql(query)

DataFrame[]

In [38]:
query = """
SELECT *
FROM products
"""
spark.sql(query).show()

+----------+---------+-----+
|product_id| category|color|
+----------+---------+-----+
|     CHA_2|Furniture| blue|
|     BED_4|Furniture|brown|
|    SHO_15|    Cloth|black|
+----------+---------+-----+



In [39]:
spark.sql("SHOW TABLES").show()

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|         |    newdata|      false|
|         | newpersons|      false|
|         |newsalaries|      false|
|         |   newsales|      false|
|         |   products|      false|
+---------+-----------+-----------+



## Demo2: delta operations

### Q1. Adding new tuples
Consider the Delta table `person` with the following columns: serial, name, age, and address. You have a new dataset `newPersons` with the same columns, but with additional records. Write a merge statement to update the Delta table with the new records.


In [40]:
query = """
MERGE INTO delta.`/tmp/persons` AS oldData
USING newPersons
ON oldData.serial = newPersons.serial
WHEN NOT MATCHED
  THEN INSERT *;
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                0|               0|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [41]:
query = """
SELECT *
FROM delta.`/tmp/persons`
"""
spark.sql(query).show()

+------+-------+---+------------+
|serial|   name|age|     address|
+------+-------+---+------------+
| 97362|  Lorry| 40|290 Wise Ave|
| 12345|  Alice| 25| 123 Main St|
| 78120|    Dan| 42|432 Holly Rd|
| 67890|    Bob| 30| 456 Oak Ave|
| 24680|Charlie| 35|  789 Elm St|
+------+-------+---+------------+



### Q2: updating existing tuples
Assume you have a Delta table `salaries` with columns serial and salary. You want to update the salary of the employees who earn less than 50,000. You have a new dataset, `newSalaries` with the same columns but with updated salary information. Write a merge statement to update the `salaries` table with the new salary information.


In [42]:
query = """
MERGE INTO delta.`/tmp/salaries` AS oldData
USING newSalaries
ON oldData.serial = newSalaries.serial
WHEN MATCHED AND oldData.salary<50000
  THEN UPDATE SET oldData.salary=newSalaries.salary;
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                3|               3|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [43]:
query = """
SELECT *
FROM delta.`/tmp/salaries`
"""
spark.sql(query).show()

+------+------+
|serial|salary|
+------+------+
| 24680| 46000|
| 78120| 60000|
| 97362| 39000|
| 12345| 47000|
| 67890| 52000|
+------+------+



### Q3: adding new tuples and updating existing ones
You have a Delta table `sales` with columns `product_id`, `quantity`, and `totalprice`. Write a merge statement to insert the new products from a dataframe `newSales` into `sales` and to make sure that, for existing products, the column `sales` has the sum of the quantity and totalprice.


In [44]:
query = """
MERGE INTO delta.`/tmp/sales` AS oldData
USING newSales
ON oldData.product_id = newSales.product_id
WHEN MATCHED
  THEN UPDATE SET oldData.quantity = oldData.quantity + newSales.quantity,
                  oldData.totalprice = oldData.totalprice + newSales.totalprice
WHEN NOT MATCHED
  THEN INSERT *
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                3|               3|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [45]:
query = """
SELECT *
FROM delta.`/tmp/sales`
"""
spark.sql(query).show()

+----------+--------+----------+---------+-----+
|product_id|quantity|totalprice| category|color|
+----------+--------+----------+---------+-----+
|     BED_4|       1|       300|Furniture|brown|
|     BED_6|       2|       400|     NULL| NULL|
|     CHA_2|       4|       120|Furniture| blue|
|    SHO_15|       8|       240|    Cloth|black|
+----------+--------+----------+---------+-----+



### Q4: Merge tables with different schemas
Consider the delta table `sales`.  Write a merge statement to augment `sales` with the cateogry and the color of the products by using an auxiliary table `Products` whose schema is `product_id`, `category` and `color` and such that `product_id` can used for matching the tuples of `sales`.

In [46]:
query = """
MERGE INTO delta.`/tmp/sales` oldData
USING products
ON oldData.product_id = products.product_id
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                3|               3|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [47]:
query = """
SELECT *
FROM delta.`/tmp/sales`
"""
spark.sql(query).show()

+----------+--------+----------+---------+-----+
|product_id|quantity|totalprice| category|color|
+----------+--------+----------+---------+-----+
|     BED_4|       1|       300|Furniture|brown|
|     BED_6|       2|       400|     NULL| NULL|
|     CHA_2|       4|       120|Furniture| blue|
|    SHO_15|       8|       240|    Cloth|black|
+----------+--------+----------+---------+-----+



### Q5: updating existing tuples when not matched by source
Consider the delta table `salesStatus` which extends the table `sales` with the column `status` meant to track the availability of products.
Write a merge statement that:
- updates the quantity of products in `salesStatus` by considering sales reported in `newSales` like in Q3 above and
- marks the status of the products which are not reported in `newSales` as 'unavailable'

In [58]:
query = """
MERGE INTO delta.`/tmp/salesStatus` AS oldData
USING newSales
ON oldData.product_id = newSales.product_id
WHEN MATCHED
  THEN UPDATE SET oldData.quantity = oldData.quantity + newSales.quantity,
                  oldData.totalprice = oldData.totalprice + newSales.totalprice
WHEN NOT MATCHED BY SOURCE
  THEN UPDATE SET oldData.status = 'unavailable'
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                4|               4|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [59]:
query = """
SELECT * FROM delta.`/tmp/salesStatus`
"""
spark.sql(query).show()

+----------+--------+----------+-----------+
|product_id|quantity|totalprice|     status|
+----------+--------+----------+-----------+
|     BED_4|       1|       300|unavailable|
|     BED_6|       3|       600|  available|
|     CHA_2|       5|       150|  available|
|    SHO_15|      11|       330|  available|
+----------+--------+----------+-----------+



## Demo 3: Attaching constraints

### Not-null constraint

In [60]:
query = """
CREATE TABLE default.persons (
    serial INT NOT NULL,
    name STRING,
    birthDate TIMESTAMP,
    address STRING
  ) USING DELTA;
"""
spark.sql(query)

DataFrame[]

In [61]:
query = """insert into default.persons values (12345, "Alice","2000-02-01" ,"123 Main St") """
spark.sql(query)

DataFrame[]

In [62]:
query = """select * from default.persons """
spark.sql(query).show()

+------+-----+-------------------+-----------+
|serial| name|          birthDate|    address|
+------+-----+-------------------+-----------+
| 12345|Alice|2000-02-01 00:00:00|123 Main St|
+------+-----+-------------------+-----------+



Can we run the following statement?

In [None]:
query = """insert into default.persons values (null, "Bob","1996-03-14" ,"456 Oak Ave") """
spark.sql(query).show()

The code above gives the following error :
```pgsql
DeltaInvariantViolationException: [DELTA_NOT_NULL_CONSTRAINT_VIOLATED]
NOT NULL constraint violated for column: serial
```

Why?

The `default.persons` table has a serial column defined as `NOT NULL`.

But We ar trying to insert null into it:
```sql
insert into default.persons values (null, "Bob","1996-03-14" ,"456 Oak Ave")
```
So Spark/Delta Lake rejects the insert.

### Predicate constraint

In [64]:
spark.sql(""" ALTER TABLE default.persons ADD CONSTRAINT birthdate CHECK (birthDate > '2000-01-01'); """)

DataFrame[]

In [65]:
spark.sql("""SHOW TBLPROPERTIES default.persons""").show(truncate=False)

+---------------------------+------------------------+
|key                        |value                   |
+---------------------------+------------------------+
|delta.constraints.birthdate|birthDate > '2000-01-01'|
|delta.minReaderVersion     |1                       |
|delta.minWriterVersion     |3                       |
+---------------------------+------------------------+



In [66]:
spark.sql("""insert into default.persons values (47962, "Bob","2003-03-14" ,"456 Oak Ave") """)

DataFrame[]

Can we run the following statement?

In [67]:
spark.sql("""insert into default.persons values (47962, "Bob","1999-03-14" ,"456 Oak Ave") """)

Py4JJavaError: An error occurred while calling o42.sql.
: org.apache.spark.sql.delta.schema.DeltaInvariantViolationException: [DELTA_VIOLATE_CONSTRAINT_WITH_VALUES] CHECK constraint birthdate (birthDate > '2000-01-01') violated by row with values:
 - birthDate : 921369600000000
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException$.getConstraintViolationWithValuesException(InvariantViolationException.scala:75)
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException$.apply(InvariantViolationException.scala:101)
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException$.apply(InvariantViolationException.scala:112)
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException.apply(InvariantViolationException.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.CheckDeltaInvariant_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.delta.constraints.DeltaInvariantCheckerExec.$anonfun$doExecute$3(DeltaInvariantCheckerExec.scala:79)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeTask$1(DeltaFileFormatWriter.scala:430)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.executeTask(DeltaFileFormatWriter.scala:437)
	at org.apache.spark.sql.delta.files.DeltaFileFormatWriter$.$anonfun$executeWrite$2(DeltaFileFormatWriter.scala:274)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


The code above gives the following error :
```
DeltaInvariantViolationException: getConstraintViolationWithValuesException
```
which means that the table `default.persons` has one or more constraints defined (beyond `NOT NULL`), and your row doesn't satisfy them.

Let's rung the following code to check the constraints defined on `default.persons`.

In [69]:
# Show table details including constraints
details = spark.sql("DESCRIBE DETAIL default.persons")
details.select("properties").show(truncate=False)

+---------------------------------------------------------+
|properties                                               |
+---------------------------------------------------------+
|{delta.constraints.birthdate -> birthDate > '2000-01-01'}|
+---------------------------------------------------------+



The `properties` column lists all constraints defined on the table.

If we insert a row that violates one of these (e.g., `serial=47962` when `serial < 10000` is required), Delta will raise a `DeltaInvariantViolationException`.

Always check `DESCRIBE DETAIL` if we're not sure which rules your table enforces.

### Generated columns
The following  delta table contains three columns `year`, `month` and `day` that must correspond to the date elements in the `saledate` column.

In [70]:
from delta.tables import *

DeltaTable.createOrReplace(spark) \
  .tableName("default.sales") \
  .addColumn("saleid", "STRING") \
  .addColumn("saledate", "TIMESTAMP") \
  .addColumn("quantity", "INT") \
  .addColumn("year", "INT", generatedAlwaysAs="YEAR(saledate)") \
  .addColumn("month", "INT", generatedAlwaysAs="MONTH(saledate)") \
  .addColumn("day", "INT", generatedAlwaysAs="DAYOFMONTH(saledate)") \
  .partitionedBy("year", "month") \
  .execute()

<delta.tables.DeltaTable at 0x7a4b8a36eab0>

In [71]:
spark.sql("""
insert into default.sales
values ('S000000124','2023-02-26 00:00:00',2.0,2023,02,26)
""")

DataFrame[]

In [72]:
spark.sql(""" select * from default.sales """).show()

+----------+-------------------+--------+----+-----+---+
|    saleid|           saledate|quantity|year|month|day|
+----------+-------------------+--------+----+-----+---+
|S000000124|2023-02-26 00:00:00|       2|2023|    2| 26|
+----------+-------------------+--------+----+-----+---+



can we run the following command?

In [73]:
spark.sql(""" insert into default.sales values ('S000000124','2024-02-26 00:00:00',2.0,2023,02,26)  """)

Py4JJavaError: An error occurred while calling o42.sql.
: org.apache.spark.sql.delta.schema.DeltaInvariantViolationException: [DELTA_VIOLATE_CONSTRAINT_WITH_VALUES] CHECK constraint Generated Column (year <=> YEAR(saledate)) violated by row with values:
 - saledate : 1708905600000000
 - year : 2023
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException$.getConstraintViolationWithValuesException(InvariantViolationException.scala:75)
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException$.apply(InvariantViolationException.scala:101)
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException$.apply(InvariantViolationException.scala:112)
	at org.apache.spark.sql.delta.schema.DeltaInvariantViolationException.apply(InvariantViolationException.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.CheckDeltaInvariant_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.delta.constraints.DeltaInvariantCheckerExec.$anonfun$doExecute$3(DeltaInvariantCheckerExec.scala:79)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
	at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


The code above gives the following error :
```
DeltaInvariantViolationException: getConstraintViolationWithValuesException
```
That means the `default.sales` Delta table has constraints (like `NOT NULL` or `CHECK`) and our row broke one of them.

## Exercice to solve

### Data import

In [74]:
query = """
CREATE TABLE IF NOT EXISTS salesOriginal
USING csv
OPTIONS (
  header "true",
  path "/tmp/delta/sales/salesOriginal.csv",
  inferSchema "true"
)
"""
spark.sql(query)

DataFrame[]

In [75]:
query = """
DESCRIBE salesOriginal
"""
spark.sql(query).show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|       saleid|   string|   NULL|
|     saledate|     date|   NULL|
|     quantity|   double|   NULL|
|    unitprice|   double|   NULL|
|       shopid|   string|   NULL|
|         city|   string|   NULL|
|        state|   string|   NULL|
|      country|   string|   NULL|
|     shopsize|   string|   NULL|
|    productid|   string|   NULL|
|     category|   string|   NULL|
|  subcategory|   string|   NULL|
|         size|   string|   NULL|
|purchaseprice|   double|   NULL|
|        color|   string|   NULL|
|        brand|   string|   NULL|
+-------------+---------+-------+



In [76]:
query = """
CREATE TABLE IF NOT EXISTS march23_sales
USING csv
OPTIONS (
  header "true",
  path "/tmp/delta/sales/march23_sales.csv",
  inferSchema "true"
)
"""
spark.sql(query)

DataFrame[]

In [77]:
query = """
DESCRIBE march23_sales
"""
spark.sql(query).show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|       saleid|   string|   NULL|
|     saledate|     date|   NULL|
|     quantity|   double|   NULL|
|    unitprice|   double|   NULL|
|       shopid|   string|   NULL|
|         city|   string|   NULL|
|        state|   string|   NULL|
|      country|   string|   NULL|
|     shopsize|   string|   NULL|
|    productid|   string|   NULL|
|     category|   string|   NULL|
|  subcategory|   string|   NULL|
|         size|   string|   NULL|
|purchaseprice|   double|   NULL|
|        color|   string|   NULL|
|        brand|   string|   NULL|
+-------------+---------+-------+



In [78]:
query = """
SELECT count(*) FROM  march23_sales
"""
spark.sql(query).show(5)

+--------+
|count(1)|
+--------+
|      84|
+--------+



### Creation of the delta tables

In [79]:
query = """
CREATE TABLE delta.`/tmp/deltaSales` USING DELTA AS SELECT * FROM salesOriginal;
"""
spark.sql(query)

DataFrame[]

In [80]:
query = """
SELECT * FROM  delta.`/tmp/deltaSales`
"""
spark.sql(query).show(5)

+----------+----------+--------+---------+------+-------------+----------+-------+--------+---------+---------+-----------+------+-------------+-----+-----+
|    saleid|  saledate|quantity|unitprice|shopid|         city|     state|country|shopsize|productid| category|subcategory|  size|purchaseprice|color|brand|
+----------+----------+--------+---------+------+-------------+----------+-------+--------+---------+---------+-----------+------+-------------+-----+-----+
|S000000124|2023-02-26|     2.0|     60.0|shop_4|San Francisco|California|    USA|   small|    CHA_2|Furniture|      Chair|  NULL|         48.0| blue|Basic|
|S000000125|2023-02-25|     1.0|    150.0|shop_5|      Houston|     Texas|    USA|   small|    BED_3|Furniture|        Bed|Single|        127.0|  red| Mega|
|S000000126|2023-02-24|     1.0|    300.0|shop_6|  San Antonio|     Texas|    USA|   small|    BED_4|Furniture|        Bed|Double|        252.0|brown|Basic|
|S000000127|2023-02-23|     1.0|    395.0|shop_7|      Chi

In [81]:
query = """
SELECT count(*) FROM  delta.`/tmp/deltaSales`
"""
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|    4916|
+--------+



### Adding new records
Write a merge statement to insert the march 2023 records into `deltaSales`

In [82]:
query = """
MERGE INTO delta.`/tmp/deltaSales` AS oldData
USING march23_sales
ON oldData.saleid = march23_sales.saleid
WHEN NOT MATCHED
  THEN INSERT *;

"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|               84|               0|               0|               84|
+-----------------+----------------+----------------+-----------------+



### Updating records
Write update statements that increases the unitprice of products sold on 2023, based on their category, as follows: furniture -> 5%, others -> 10%

In [83]:
query = """
select category, count(*) from delta.`/tmp/deltaSales`
WHERE YEAR(saledate) >=2023
group by category
"""
spark.sql(query).show()

+---------+--------+
| category|count(1)|
+---------+--------+
|Furniture|     882|
|    Cloth|    1680|
+---------+--------+



In [84]:
query = """
update delta.`/tmp/deltaSales`
set unitprice=unitprice*1.05
WHERE category='Furniture' and YEAR(saledate) >=2023
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|              882|
+-----------------+



In [85]:
query = """
update delta.`/tmp/deltaSales`
set unitprice=unitprice*1.1
WHERE category!='Furniture' and YEAR(saledate) >=2023
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|             1680|
+-----------------+



### Removing old records
remove all sales older than 01-Jan-2023. How many records remain?

In [86]:
query = """
select count(*) from delta.`/tmp/deltaSales`
WHERE saledate <'2023-01-01'
"""
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|    2438|
+--------+



In [87]:
query = """
delete from delta.`/tmp/deltaSales`
WHERE saledate <'2023-01-01'
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|             2438|
+-----------------+



### History viewing


In [88]:
queryv = """
DESCRIBE HISTORY delta.`/tmp/deltaSales`
"""
dfv = spark.sql(queryv)
dfv.show()

+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      4|2025-08-22 19:25:...|  NULL|    NULL|              DELETE|{predicate -> ["(...|NULL|    NULL|     NULL|          3|  Serializable|        false|{numRemovedFiles ...|        NULL|Apache-Spark/3.5....|
|      3|2025-08-22 19:25:...|  NULL|    NULL|              UPDATE|{predicate -> ["(...|NULL|    NULL|     NULL|          2|  Serializable|        false|{numRemoved

In [89]:
dfv = dfv.select("operation",  "operationMetrics")
# dfv.select("operation", "operationParameters")
dfv.show(truncate=False)

+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|operation             |operationMetrics                                                                                                                                                                                                                                                                                                                 

### Restoring to a previous version

In [90]:
query = """
RESTORE TABLE delta.`/tmp/deltaSales` TO VERSION AS OF 2
"""
spark.sql(query).show()

+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|                   43948|                         2|                2|                 2|             28370|              43948|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+



In [91]:
query = """
select count(*) from delta.`/tmp/deltaSales`
WHERE saledate <'2023-01-01'
"""
spark.sql(query).show()

+--------+
|count(1)|
+--------+
|    2438|
+--------+



In [92]:
query = """
DESCRIBE HISTORY delta.`/tmp/deltaSales`
"""
spark.sql(query).show()

+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      5|2025-08-22 19:26:...|  NULL|    NULL|             RESTORE|{version -> 2, ti...|NULL|    NULL|     NULL|          4|  Serializable|        false|{numRestoredFiles...|        NULL|Apache-Spark/3.5....|
|      4|2025-08-22 19:25:...|  NULL|    NULL|              DELETE|{predicate -> ["(...|NULL|    NULL|     NULL|          3|  Serializable|        false|{numRemoved

In [93]:
query = """
RESTORE TABLE delta.`/tmp/deltaSales` TO VERSION AS OF 4
"""
spark.sql(query).show()

+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|                   28370|                         2|                2|                 2|             43948|              28370|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+



## Demo 4: change data feed

### Table creation with CDF activated

In [94]:
query = """
CREATE TABLE IF NOT EXISTS salesOriginal
USING csv
OPTIONS (
  header "true",
  path "/tmp/delta/sales/salesOriginal.csv",
  inferSchema "true"
)
"""
spark.sql(query)

DataFrame[]

In [95]:
query = """
CREATE TABLE delta.`/tmp/deltaSalesCDF` USING DELTA TBLPROPERTIES (delta.enableChangeDataFeed = true)
AS SELECT * FROM salesOriginal
"""
spark.sql(query)

DataFrame[]

### CDF for Updates

In [96]:
query = """
UPDATE delta.`/tmp/deltaSalesCDF`
SET unitprice = unitprice * 1.05
WHERE saledate >= '2023-02-01' and category='Cloth'
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|              765|
+-----------------+



In [97]:
query = """
SELECT * FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
"""
spark.sql(query).show()

+----------+----------+--------+------------------+-------+-----------+----------+-------+--------+---------+--------+-----------+----+-------------+-----+--------+----------------+---------------+--------------------+
|    saleid|  saledate|quantity|         unitprice| shopid|       city|     state|country|shopsize|productid|category|subcategory|size|purchaseprice|color|   brand|    _change_type|_commit_version|   _commit_timestamp|
+----------+----------+--------+------------------+-------+-----------+----------+-------+--------+---------+--------+-----------+----+-------------+-----+--------+----------------+---------------+--------------------+
|S000000129|2023-02-21|     1.0|              20.0| shop_9|     Munich|   Bavaria|Germany|     big|    TSH_7|   Cloth|     Tshirt|  XS|         17.0| blue|NewBrand| update_preimage|              1|2025-08-22 19:27:...|
|S000000129|2023-02-21|     1.0|              21.0| shop_9|     Munich|   Bavaria|Germany|     big|    TSH_7|   Cloth|     T

In [98]:
query = """
SELECT _change_type, _commit_version, _commit_timestamp, count(*)
FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
GROUP BY _change_type, _commit_version, _commit_timestamp
"""
spark.sql(query).show()

+----------------+---------------+--------------------+--------+
|    _change_type|_commit_version|   _commit_timestamp|count(1)|
+----------------+---------------+--------------------+--------+
| update_preimage|              1|2025-08-22 19:27:...|     765|
|update_postimage|              1|2025-08-22 19:27:...|     765|
|          insert|              0|2025-08-22 19:26:...|    4916|
+----------------+---------------+--------------------+--------+



In [99]:
query = """
SELECT saleid, _change_type, unitprice
FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
WHERE saledate >= '2023-02-01' and category='Cloth' and _commit_version = 1
CLUSTER BY saleid
"""
spark.sql(query).show()

+----------+----------------+------------------+
|    saleid|    _change_type|         unitprice|
+----------+----------------+------------------+
|S000000007| update_preimage|              20.0|
|S000000007|update_postimage|              21.0|
|S000000008| update_preimage|              20.0|
|S000000008|update_postimage|              21.0|
|S000000009| update_preimage|              22.0|
|S000000009|update_postimage|              23.1|
|S000000010| update_preimage|              24.0|
|S000000010|update_postimage|25.200000000000003|
|S000000011| update_preimage|              24.0|
|S000000011|update_postimage|25.200000000000003|
|S000000012| update_preimage|              45.0|
|S000000012|update_postimage|             47.25|
|S000000013| update_preimage|              48.0|
|S000000013|update_postimage|50.400000000000006|
|S000000014| update_preimage|              49.0|
|S000000014|update_postimage|             51.45|
|S000000015| update_preimage|              60.0|
|S000000015|update_p

### CDF for Deletes

In [100]:
query = """
DELETE FROM delta.`/tmp/deltaSalesCDF`
WHERE city = 'Chicago' and category='Cloth'
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|              244|
+-----------------+



In [101]:
query = """
SELECT _change_type, _commit_version, _commit_timestamp, count(*)
FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
GROUP BY _change_type, _commit_version, _commit_timestamp
"""
spark.sql(query).show()

+----------------+---------------+--------------------+--------+
|    _change_type|_commit_version|   _commit_timestamp|count(1)|
+----------------+---------------+--------------------+--------+
| update_preimage|              1|2025-08-22 19:27:...|     765|
|update_postimage|              1|2025-08-22 19:27:...|     765|
|          delete|              2|2025-08-22 19:27:...|     244|
|          insert|              0|2025-08-22 19:26:...|    4916|
+----------------+---------------+--------------------+--------+



Retrieve the deleted records

In [102]:
query = """
SELECT distinct city, category
FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
"""
spark.sql(query).show()

+-------------+---------+
|         city| category|
+-------------+---------+
|  Los Angeles|    Cloth|
|      Pescara|    Cloth|
|       Munich|    Cloth|
|        Milan|    Cloth|
|   Regensburg|    Cloth|
|San Francisco|    Cloth|
|     New York|    Cloth|
|  San Antonio|    Cloth|
|      Chicago|    Cloth|
|    San Diego|    Cloth|
|      Bergamo|    Cloth|
|        Fulda|    Cloth|
|       Chieti|    Cloth|
|      Houston|    Cloth|
|     San Jose|    Cloth|
|    Offenbach|    Cloth|
|        Fulda|Furniture|
|       Munich|Furniture|
|  San Antonio|Furniture|
|     New York|Furniture|
+-------------+---------+
only showing top 20 rows



## Query plan analysis

The goal is to observe the impact of clustering on query plans. We start by creating a tunnel, using the ngrok.com service, to access the Spark GUI.
Make sure to have access to ngrok.com by connecting using your google account, for example.

In [103]:
spark.conf.set("spark.sql.adaptive.enabled", False)

In [None]:
!ngrok config add-authtoken

In [105]:
import getpass

from pyngrok import conf, ngrok

print("Enter your authtoken, which can be copied "
"from https://dashboard.ngrok.com/get-started/your-authtoken")
conf.get_default().auth_token = getpass.getpass()

ui_port = 4040
public_url = ngrok.connect(ui_port).public_url
print(f" * ngrok tunnel \"{public_url}\" -> \"http://127.0.0.1:{ui_port}\"")

Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken
··········
 * ngrok tunnel "https://fe788677b00f.ngrok-free.app" -> "http://127.0.0.1:4040"


### Creation of the partitionned delta tables

In [106]:
query = """
CREATE TABLE IF NOT EXISTS salesOriginal
USING csv
OPTIONS (
  header "true",
  path "/tmp/delta/sales/salesOriginal.csv",
  inferSchema "true"
)
"""
spark.sql(query)

DataFrame[]

#### Partition by one column

In [107]:
query = """
CREATE TABLE delta.`/tmp/deltaSalesPerCity` USING DELTA PARTITIONED BY (city)
AS SELECT * FROM salesOriginal
"""
spark.sql(query)

DataFrame[]

In [108]:
query = """
DESCRIBE delta.`/tmp/deltaSalesPerCity`
"""
spark.sql(query).show(truncate=False)

+-----------------------+---------+-------+
|col_name               |data_type|comment|
+-----------------------+---------+-------+
|saleid                 |string   |NULL   |
|saledate               |date     |NULL   |
|quantity               |double   |NULL   |
|unitprice              |double   |NULL   |
|shopid                 |string   |NULL   |
|city                   |string   |NULL   |
|state                  |string   |NULL   |
|country                |string   |NULL   |
|shopsize               |string   |NULL   |
|productid              |string   |NULL   |
|category               |string   |NULL   |
|subcategory            |string   |NULL   |
|size                   |string   |NULL   |
|purchaseprice          |double   |NULL   |
|color                  |string   |NULL   |
|brand                  |string   |NULL   |
|# Partition Information|         |       |
|# col_name             |data_type|comment|
|city                   |string   |NULL   |
+-----------------------+-------

In [109]:
! ls /tmp/deltaSalesPerCity

'city=Bergamo'	'city=Los Angeles'  'city=Pescara'	  'city=San Jose'
'city=Chicago'	'city=Milan'	    'city=Regensburg'	   _delta_log
'city=Chieti'	'city=Munich'	    'city=San Antonio'
'city=Fulda'	'city=New York'     'city=San Diego'
'city=Houston'	'city=Offenbach'    'city=San Francisco'


In [110]:
! ls /tmp/deltaSalesPerCity/'city=Bergamo'

part-00000-b4a21374-5973-4e27-888c-5088aa23cacf.c000.snappy.parquet


In [111]:
%%capture
%pip install parquet-tools

In [113]:
!parquet-tools inspect --detail /tmp/deltaSalesPerCity/'city=Chicago'/*parquet

FileMetaData
[31m■■■■[0mversion = 1
[31m■■■■[0mschema = list
[31m■■■■[0m[33m■■■■[0mSchemaElement
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mname = spark_schema
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mnum_children = 15
[31m■■■■[0m[33m■■■■[0mSchemaElement
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mtype = 6
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mrepetition_type = 1
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mname = saleid
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mlogicalType = LogicalType
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0m[34m■■■■[0mSTRING = StringType
[31m■■■■[0m[33m■■■■[0mSchemaElement
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mtype = 1
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mrepetition_type = 1
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mname = saledate
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mconverted_type = 6
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mlogicalType = LogicalType
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0m[34m■■■■[0mDATE = DateType
[31m■■■■[0m[33m■■■■[0mSchemaElement
[31m■■■■

In [114]:
!parquet-tools inspect --detail /tmp/deltaSalesPerCity/'city=Bergamo'/*parquet

FileMetaData
[31m■■■■[0mversion = 1
[31m■■■■[0mschema = list
[31m■■■■[0m[33m■■■■[0mSchemaElement
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mname = spark_schema
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mnum_children = 15
[31m■■■■[0m[33m■■■■[0mSchemaElement
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mtype = 6
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mrepetition_type = 1
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mname = saleid
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mlogicalType = LogicalType
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0m[34m■■■■[0mSTRING = StringType
[31m■■■■[0m[33m■■■■[0mSchemaElement
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mtype = 1
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mrepetition_type = 1
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mname = saledate
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mconverted_type = 6
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0mlogicalType = LogicalType
[31m■■■■[0m[33m■■■■[0m[32m■■■■[0m[34m■■■■[0mDATE = DateType
[31m■■■■[0m[33m■■■■[0mSchemaElement
[31m■■■■

#### Partition by two columns

In [115]:
query = """
CREATE TABLE delta.`/tmp/deltaSalesPerCityCategory` USING DELTA PARTITIONED BY (city,category)
AS SELECT * FROM salesOriginal
"""
spark.sql(query)

DataFrame[]

In [116]:
query = """
DESCRIBE delta.`/tmp/deltaSalesPerCityCategory`
"""
spark.sql(query).show(truncate=False)

+-----------------------+---------+-------+
|col_name               |data_type|comment|
+-----------------------+---------+-------+
|saleid                 |string   |NULL   |
|saledate               |date     |NULL   |
|quantity               |double   |NULL   |
|unitprice              |double   |NULL   |
|shopid                 |string   |NULL   |
|city                   |string   |NULL   |
|state                  |string   |NULL   |
|country                |string   |NULL   |
|shopsize               |string   |NULL   |
|productid              |string   |NULL   |
|category               |string   |NULL   |
|subcategory            |string   |NULL   |
|size                   |string   |NULL   |
|purchaseprice          |double   |NULL   |
|color                  |string   |NULL   |
|brand                  |string   |NULL   |
|# Partition Information|         |       |
|# col_name             |data_type|comment|
|city                   |string   |NULL   |
|category               |string 

In [117]:
! ls /tmp/deltaSalesPerCityCategory

'city=Bergamo'	'city=Los Angeles'  'city=Pescara'	  'city=San Jose'
'city=Chicago'	'city=Milan'	    'city=Regensburg'	   _delta_log
'city=Chieti'	'city=Munich'	    'city=San Antonio'
'city=Fulda'	'city=New York'     'city=San Diego'
'city=Houston'	'city=Offenbach'    'city=San Francisco'


In [118]:
! ls /tmp/deltaSalesPerCityCategory/'city=Bergamo'

'category=Cloth'  'category=Furniture'


In [119]:
! ls /tmp/deltaSalesPerCityCategory/_delta_log

00000000000000000000.json


### Comparing the query plans

#### selection query on the partitionning column

In [120]:
spark.sparkContext.setJobDescription('P1: selection salesOriginal on city = SF or C')

query = """
SELECT sum(quantity) as sumQty, max(unitprice) as maxPrice
FROM salesOriginal
WHERE city in('San Francisco', 'Chicago')
"""
spark.sql(query).collect()

[Row(sumQty=1214.0, maxPrice=440.0)]

In [121]:
spark.sparkContext.setJobDescription('P2: selection deltaSalesPerCity on city = SF or C')

query = """
SELECT sum(quantity) as sumQty, max(unitprice) as maxPrice
FROM delta.`/tmp/deltaSalesPerCity`
WHERE city in('San Francisco', 'Chicago')
"""
spark.sql(query).collect()

[Row(sumQty=1214.0, maxPrice=440.0)]

report and compare the number of files and size of data read in the two above plans
- P1
- P2

In [122]:
spark.sparkContext.setJobDescription('P3: selection salesOriginal on category = C')

query = """
SELECT sum(quantity) as sumQty, max(unitprice) as maxPrice
FROM salesOriginal
WHERE category = 'Cloth'
"""
spark.sql(query).collect()

[Row(sumQty=5208.0, maxPrice=60.0)]

In [123]:
spark.sparkContext.setJobDescription('P4 selection deltaSalesPerCityCategory on category = C')

query = """
SELECT sum(quantity) as sumQty, max(unitprice) as maxPrice
FROM delta.`/tmp/deltaSalesPerCityCategory`
WHERE category = 'Cloth'
"""
spark.sql(query).collect()

[Row(sumQty=5208.0, maxPrice=60.0)]

report and compare the number of files and size of data read in the two above plans
- P3
- P4

#### selection query on a column not used for partitionning

In [124]:
spark.sparkContext.setJobDescription('P5 selection salesOriginal on country = G or I')

query = """
SELECT sum(quantity) as sumQty, max(unitprice) as maxPrice
FROM salesOriginal
WHERE country in ('Germany', 'Italy')
"""
spark.sql(query).collect()

[Row(sumQty=3232.0, maxPrice=440.0)]

In [125]:
spark.sparkContext.setJobDescription('P6 selection deltaSalesPerCity on country = G or I')

query = """
SELECT sum(quantity) as sumQty, max(unitprice) as maxPrice
FROM delta.`/tmp/deltaSalesPerCity`
WHERE country in ('Germany', 'Italy')
"""
spark.sql(query).collect()

[Row(sumQty=3232.0, maxPrice=440.0)]

report and compare the number of files and size of data read in the two above plans
- P5
- P6

#### aggregation query on the partitionning column

In [126]:
spark.sparkContext.setJobDescription('P7 aggregation salesOriginal on city')

query = """
SELECT city, sum(quantity) as sumQty, max(unitprice) as maxPrice
FROM salesOriginal
group by city
"""
spark.sql(query).collect()[0]

Row(city='Fulda', sumQty=405.0, maxPrice=440.0)

In [127]:
spark.sparkContext.setJobDescription('P8 aggregation deltaSalesPerCity on city')

query = """
SELECT city, sum(quantity) as sumQty, max(unitprice) as maxPrice
FROM delta.`/tmp/deltaSalesPerCity`
group by city
"""
spark.sql(query).collect()[0]

Row(city='Fulda', sumQty=405.0, maxPrice=440.0)

report and compare the number of files and size of data read in the two above plans
- P7
- P8

#### aggregation query on a column not used for partitionning

In [128]:
spark.sparkContext.setJobDescription('P9 aggregation salesOriginal on country')

query = """
SELECT country, sum(quantity) as sumQty, max(unitprice) as maxPrice
FROM salesOriginal
group by country
"""
spark.sql(query).collect()[0]

Row(country='Germany', sumQty=1616.0, maxPrice=440.0)

In [129]:
spark.sparkContext.setJobDescription('P10 aggregation deltaSalesPerCity on country')

query = """
SELECT country, sum(quantity) as sumQty, max(unitprice) as maxPrice
FROM delta.`/tmp/deltaSalesPerCity`
group by country
"""
spark.sql(query).collect()[0]

Row(country='Germany', sumQty=1616.0, maxPrice=440.0)

report and compare the number of files and size of data read in the two above plans
- P9
- P10