* Master DAC - BDLE
* Author: Mohamed-Amine Baazizi
* Affiliation: LIP6 - Faculté des Sciences - Sorbonne Université
* Email: mohamed-amine.baazizi@lip6.fr
* October 2024


# Delta Lake


## Outline

This lab is dedicated to practicing Delta Lake. It begins with a set of demos meant to illustrate the usage of Delta, on small examples.
A use case based on realistic data is then presented and followed by the analysis of query plans generated for Delta operations.


For the official documentation visit https://docs.delta.io/latest/index.html


- For Demo1, Demo2 and Demo3 the answers are provided: run and observe
- In the Use case section answers are not provided and left as exercice
- In the CDF section, a scenario is provided. The task is to suggest similar updates and observe the impact on the log trail





## Prerequisite

### System setup

In [None]:
appName = "Delta Lake "

In [None]:
%%capture
!pip install -q pyspark
!pip install -q delta-spark
!pip install pyngrok

In [None]:
!pip list|grep spark

delta-spark                        3.2.1
pyspark                            3.5.3


In [None]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

local = "local[*]"

localConfig = SparkConf().setAppName(appName).setMaster(local).\
  set("spark.executor.memory", "8G").\
  set("spark.driver.memory","8G").\
  set("spark.sql.catalogImplementation","in-memory").\
  set("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension").\
  set("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").\
  set("spark.jars.packages","io.delta:delta-spark_2.12:3.1.0").\
  set("spark.databricks.delta.schema.autoMerge.enabled","true")


spark = SparkSession.builder.config(conf = localConfig).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [None]:
spark

### Data import

In [None]:
%%capture
! wget https://nuage.lip6.fr/s/BbQ9rzGHKJexKYp/download/sales.tar -O /tmp/sales.tar
!mkdir /tmp/delta
! tar xvf /tmp/sales.tar -C /tmp/delta


In [None]:
!ls /tmp/delta/sales

march23_sales.csv  salesOriginal.csv


## Demo1: first steps

### load the data into delta

In [None]:
query = """
CREATE TABLE delta.`/tmp/delta-table` USING DELTA AS SELECT col1 as id FROM VALUES 0,1,2,3,4;
"""
spark.sql(query)


DataFrame[]

In [None]:
query = """
SELECT * FROM delta.`/tmp/delta-table`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  0|
|  1|
+---+



### update the data
#### overwrite

In [None]:
query = """
INSERT OVERWRITE delta.`/tmp/delta-table` SELECT col1 as id FROM VALUES 5,6,7,8,9;
"""
spark.sql(query).show()


++
||
++
++



In [None]:
query = """
SELECT * FROM delta.`/tmp/delta-table`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  7|
|  8|
|  9|
|  5|
|  6|
+---+



#### conditional overwrite

In [None]:
query = """
UPDATE delta.`/tmp/delta-table` SET id = id + 100 WHERE id % 2 == 0;
"""
spark.sql(query).show()


+-----------------+
|num_affected_rows|
+-----------------+
|                2|
+-----------------+



In [None]:
query = """
SELECT * FROM delta.`/tmp/delta-table`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  7|
|108|
|  9|
|  5|
|106|
+---+



In [None]:
query = """
DELETE FROM delta.`/tmp/delta-table` WHERE id % 2 == 0;
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|                2|
+-----------------+



In [None]:
query = """
SELECT * FROM delta.`/tmp/delta-table`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  7|
|  9|
|  5|
+---+



In [None]:
query = """
CREATE TEMP VIEW newData AS SELECT col1 AS id FROM VALUES 1,3,5,7,9,11,13,15,17,19;
"""
spark.sql(query).show()

++
||
++
++



In [None]:
query = """
SELECT * FROM `newData`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  1|
|  3|
|  5|
|  7|
|  9|
| 11|
| 13|
| 15|
| 17|
| 19|
+---+



In [None]:
query = """
MERGE INTO delta.`/tmp/delta-table` AS oldData
USING newData
ON oldData.id = newData.id
WHEN MATCHED
  THEN UPDATE SET id = newData.id
WHEN NOT MATCHED
  THEN INSERT (id) VALUES (newData.id);
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|               10|               3|               0|                7|
+-----------------+----------------+----------------+-----------------+



In [None]:
query = """
SELECT * FROM delta.`/tmp/delta-table`;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  1|
|  3|
|  5|
|  7|
|  9|
| 11|
| 13|
| 15|
| 17|
| 19|
+---+



### viewing history

In [None]:
query = """
DESCRIBE HISTORY delta.`/tmp/delta-table`
"""
spark.sql(query).show()

+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      4|2024-10-16 16:41:...|  NULL|    NULL|               MERGE|{predicate -> ["(...|NULL|    NULL|     NULL|          3|  Serializable|        false|{numTargetRowsCop...|        NULL|Apache-Spark/3.5....|
|      3|2024-10-16 16:38:...|  NULL|    NULL|              DELETE|{predicate -> ["(...|NULL|    NULL|     NULL|          2|  Serializable|        false|{numRemoved

In [None]:
query = """
SELECT * FROM delta.`/tmp/delta-table` VERSION AS OF 0;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  2|
|  3|
|  4|
|  0|
|  1|
+---+



In [None]:
query = """
SELECT * FROM delta.`/tmp/delta-table` VERSION AS OF 1;
"""
spark.sql(query).show()

+---+
| id|
+---+
|  7|
|  8|
|  9|
|  5|
|  6|
+---+



## Creating synthetic data


### Persons

In [None]:
query = """
CREATE TABLE delta.`/tmp/persons` USING DELTA AS
SELECT col1 as serial, col2 as name, col3 as age, col4 as address
FROM VALUES ("12345", "Alice", 25, "123 Main St"),
            ("67890", "Bob", 30, "456 Oak Ave"),
            ("24680", "Charlie", 35, "789 Elm St");
"""
spark.sql(query)


DataFrame[]

In [None]:
query = """
SELECT * FROM delta.`/tmp/persons`;
"""
spark.sql(query).show()

+------+-------+---+-----------+
|serial|   name|age|    address|
+------+-------+---+-----------+
| 12345|  Alice| 25|123 Main St|
| 67890|    Bob| 30|456 Oak Ave|
| 24680|Charlie| 35| 789 Elm St|
+------+-------+---+-----------+



In [None]:
query = """
CREATE TEMP VIEW newPersons AS
SELECT col1 as serial, col2 as name, col3 as age, col4 as address
FROM VALUES ("78120", "Dan", 42, "432 Holly Rd"), ("97362", "Lorry", 40, "290 Wise Ave"), ("12345", "Alice", 25, "123 Main St")
"""
spark.sql(query)

DataFrame[]

In [None]:
query = """
SELECT * from newPersons
"""
spark.sql(query).show()

+------+-----+---+------------+
|serial| name|age|     address|
+------+-----+---+------------+
| 78120|  Dan| 42|432 Holly Rd|
| 97362|Lorry| 40|290 Wise Ave|
| 12345|Alice| 25| 123 Main St|
+------+-----+---+------------+



### Salaries

In [None]:
query = """
CREATE TABLE delta.`/tmp/salaries` USING DELTA AS
SELECT col1 as serial, col2 as salary
FROM VALUES ("12345", 45000),
        ("67890", 52000),
        ("24680", 36000),
        ("78120", 60000),
        ("97362",38000)
"""
spark.sql(query)

DataFrame[]

In [None]:
query = """
SELECT * from delta.`/tmp/salaries`
"""
spark.sql(query).show()

+------+------+
|serial|salary|
+------+------+
| 24680| 36000|
| 78120| 60000|
| 97362| 38000|
| 12345| 45000|
| 67890| 52000|
+------+------+



In [None]:
query = """
CREATE TEMP VIEW newSalaries AS
SELECT col1 as serial, col2 as salary
FROM VALUES ("12345", 47000),
        ("67890", 50000),
        ("24680", 46000),
        ("78120", 61000),
        ("97362",39000)
"""
spark.sql(query)

DataFrame[]

In [None]:
query = """
SELECT * from newSalaries
"""
spark.sql(query).show()

+------+------+
|serial|salary|
+------+------+
| 12345| 47000|
| 67890| 50000|
| 24680| 46000|
| 78120| 61000|
| 97362| 39000|
+------+------+



### Sales

In [None]:
query = """
CREATE TABLE delta.`/tmp/sales` USING DELTA AS
SELECT col1 as product_id, col2 as quantity, col3 as totalprice
FROM VALUES ("CHA_2",2,60),("BED_4",1,300),("SHO_15",2,60)
"""
spark.sql(query)

DataFrame[]

In [None]:
query = """
SELECT * FROM delta.`/tmp/sales`
"""
spark.sql(query).show()

+----------+--------+----------+
|product_id|quantity|totalprice|
+----------+--------+----------+
|     BED_4|       1|       300|
|    SHO_15|       2|        60|
|     CHA_2|       2|        60|
+----------+--------+----------+



In [None]:
query = """
CREATE TABLE delta.`/tmp/salesStatus` USING DELTA AS
SELECT product_id, quantity, totalprice, 'available' as status
FROM delta.`/tmp/sales`
"""
spark.sql(query)

DataFrame[]

In [None]:
query = """
SELECT * FROM delta.`/tmp/salesStatus`
"""
spark.sql(query).show()

+----------+--------+----------+---------+
|product_id|quantity|totalprice|   status|
+----------+--------+----------+---------+
|     BED_4|       1|       300|available|
|    SHO_15|       2|        60|available|
|     CHA_2|       2|        60|available|
+----------+--------+----------+---------+



In [None]:
query = """
CREATE TEMP VIEW newSales AS
SELECT col1 as product_id, col2 as quantity, col3 as totalprice
FROM VALUES ("SHO_15",3,90),("CHA_2",1,30),("BED_6",1,200)

"""
spark.sql(query)


DataFrame[]

In [None]:
query = """
SELECT * FROM newSales
"""
spark.sql(query).show()

+----------+--------+----------+
|product_id|quantity|totalprice|
+----------+--------+----------+
|    SHO_15|       3|        90|
|     CHA_2|       1|        30|
|     BED_6|       1|       200|
+----------+--------+----------+



### Products

In [None]:
query = """
CREATE TEMP VIEW products AS
SELECT col1 as product_id, col2 as category, col3 as color
FROM VALUES ("CHA_2","Furniture","blue"),("BED_4","Furniture","brown"),("SHO_15","Cloth","black")

"""
spark.sql(query)

DataFrame[]

In [None]:
query = """
SELECT * FROM products
"""
spark.sql(query).show()

+----------+---------+-----+
|product_id| category|color|
+----------+---------+-----+
|     CHA_2|Furniture| blue|
|     BED_4|Furniture|brown|
|    SHO_15|    Cloth|black|
+----------+---------+-----+



### Query the catalog

In [None]:
query = """
SHOW Tables
"""
spark.sql(query).show()

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|         |    newdata|      false|
|         | newpersons|      false|
|         |newsalaries|      false|
|         |   newsales|      false|
|         |   products|      false|
+---------+-----------+-----------+



In [None]:
query = """
Describe products
"""
spark.sql(query).show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|product_id|   string|   NULL|
|  category|   string|   NULL|
|     color|   string|   NULL|
+----------+---------+-------+



## Demo2: delta operations

### Q1. Adding new tuples
Consider the Delta table `persons` with the following columns: serial, name, age, and address. You have a new dataset `newPersons` with the same columns, but with additional records. Write a merge statement to update the Delta table with the new records.


In [None]:
query = """
Describe delta.`/tmp/persons`
"""
spark.sql(query).show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  serial|   string|   NULL|
|    name|   string|   NULL|
|     age|      int|   NULL|
| address|   string|   NULL|
+--------+---------+-------+



In [None]:
query = """
SELECT * FROM
delta.`/tmp/persons`
"""
spark.sql(query).show()

+------+-------+---+-----------+
|serial|   name|age|    address|
+------+-------+---+-----------+
| 12345|  Alice| 25|123 Main St|
| 67890|    Bob| 30|456 Oak Ave|
| 24680|Charlie| 35| 789 Elm St|
+------+-------+---+-----------+



In [None]:
query = """
MERGE INTO delta.`/tmp/persons` AS oldData
USING newPersons
ON oldData.serial = newPersons.serial
WHEN NOT MATCHED
  THEN INSERT *;
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                2|               0|               0|                2|
+-----------------+----------------+----------------+-----------------+



In [None]:
query = """
SELECT * FROM
delta.`/tmp/persons`
"""
spark.sql(query).show()

+------+-------+---+------------+
|serial|   name|age|     address|
+------+-------+---+------------+
| 97362|  Lorry| 40|290 Wise Ave|
| 12345|  Alice| 25| 123 Main St|
| 78120|    Dan| 42|432 Holly Rd|
| 67890|    Bob| 30| 456 Oak Ave|
| 24680|Charlie| 35|  789 Elm St|
+------+-------+---+------------+



### Q2: updating existing tuples
Assume you have a Delta table `salaries` with columns serial and salary. You want to update the salary of the employees who earn less than 50,000. You have a new dataset, `newSalaries` with the same columns but with updated salary information. Write a merge statement to update the `salaries` table with the new salary information.


In [None]:
query = """
Describe delta.`/tmp/salaries`
"""
spark.sql(query).show()

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|  serial|   string|   NULL|
|  salary|      int|   NULL|
+--------+---------+-------+



In [None]:
query = """
SELECT * FROM
delta.`/tmp/salaries`
"""
spark.sql(query).show()

+------+------+
|serial|salary|
+------+------+
| 24680| 36000|
| 78120| 60000|
| 97362| 38000|
| 12345| 45000|
| 67890| 52000|
+------+------+



In [None]:
query = """
MERGE INTO delta.`/tmp/salaries` AS oldData
USING newSalaries
ON oldData.serial = newSalaries.serial
WHEN MATCHED AND oldData.salary<50000
  THEN UPDATE SET oldData.salary=newSalaries.salary;
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                3|               3|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [None]:
query = """
SELECT * FROM
delta.`/tmp/salaries`
"""
spark.sql(query).show()

+------+------+
|serial|salary|
+------+------+
| 24680| 46000|
| 78120| 60000|
| 97362| 39000|
| 12345| 47000|
| 67890| 52000|
+------+------+



### Q3: adding new tuples and updating existing ones
You have a Delta table `sales` with columns `product_id`, `quantity`, and `totalprice`. Write a merge statement to insert the new products from a dataframe `newSales` into `sales` and to make sure that, for existing products `sales` has the sum of the quantity and totalprice.


In [None]:
query = """
Describe delta.`/tmp/sales`
"""
spark.sql(query).show()

+----------+---------+-------+
|  col_name|data_type|comment|
+----------+---------+-------+
|product_id|   string|   NULL|
|  quantity|      int|   NULL|
|totalprice|      int|   NULL|
+----------+---------+-------+



In [None]:
query = """
SELECT * FROM
delta.`/tmp/sales`
"""
spark.sql(query).show()

+----------+--------+----------+
|product_id|quantity|totalprice|
+----------+--------+----------+
|     BED_4|       1|       300|
|    SHO_15|       2|        60|
|     CHA_2|       2|        60|
+----------+--------+----------+



In [None]:
query = """
MERGE INTO delta.`/tmp/sales` AS oldData
USING newSales
ON oldData.product_id = newSales.product_id
WHEN MATCHED
  THEN UPDATE SET oldData.quantity = oldData.quantity + newSales.quantity,
                  oldData.totalprice = oldData.totalprice + newSales.totalprice
WHEN NOT MATCHED
  THEN INSERT *
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                3|               2|               0|                1|
+-----------------+----------------+----------------+-----------------+



In [None]:
query = """
SELECT * FROM
delta.`/tmp/sales`
"""
spark.sql(query).show()

+----------+--------+----------+
|product_id|quantity|totalprice|
+----------+--------+----------+
|     BED_4|       1|       300|
|     BED_6|       1|       200|
|     CHA_2|       3|        90|
|    SHO_15|       5|       150|
+----------+--------+----------+



### Q4: Merging tables with different schemas
Consier the delta table `sales`.  Write a merge statement to augment `sales` with the cateogry and the color of the products by using an auxiliary table `Products` whose schema is `product_id`, `category` and `color` and such that `product_id` can used for matching the tuples of `sales`.

In [None]:
query = """
MERGE INTO delta.`/tmp/sales` oldData
USING products
ON oldData.product_id = products.product_id
WHEN MATCHED
  THEN UPDATE SET *
WHEN NOT MATCHED
  THEN INSERT *
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                3|               3|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [None]:
query = """
SELECT * FROM delta.`/tmp/sales`
"""
spark.sql(query).show()

+----------+--------+----------+---------+-----+
|product_id|quantity|totalprice| category|color|
+----------+--------+----------+---------+-----+
|     BED_4|       1|       300|Furniture|brown|
|     BED_6|       1|       200|     NULL| NULL|
|     CHA_2|       3|        90|Furniture| blue|
|    SHO_15|       5|       150|    Cloth|black|
+----------+--------+----------+---------+-----+



### Q5: updating existing tuples when not matched by source
Consier the delta table `salesStatus` which extends the table `sales` with the column `status` meant to track the availability of products.
Write a merge statement that:
- updates the quantity of products in `salesStatus` by considering sales reported in `newSales` like in Q3 above and
- marks the status of the products which are not reported in `newSales` as 'unavailable'

In [None]:
query = """
MERGE INTO delta.`/tmp/salesStatus` AS oldData
USING newSales
ON oldData.product_id = newSales.product_id
WHEN MATCHED
  THEN UPDATE SET oldData.quantity = oldData.quantity + newSales.quantity,
                  oldData.totalprice = oldData.totalprice + newSales.totalprice
WHEN NOT MATCHED BY SOURCE
  THEN UPDATE SET oldData.status = 'unavailable'
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|                3|               3|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [None]:
query = """
SELECT * FROM delta.`/tmp/salesStatus`
"""
spark.sql(query).show()

+----------+--------+----------+-----------+
|product_id|quantity|totalprice|     status|
+----------+--------+----------+-----------+
|     BED_4|       1|       300|unavailable|
|     CHA_2|       3|        90|  available|
|    SHO_15|       5|       150|  available|
+----------+--------+----------+-----------+



## Demo 3: Attaching constraints

### Not-null constraint

In [None]:
query = """
CREATE TABLE default.persons (
    serial INT NOT NULL,
    name STRING,
    birthDate TIMESTAMP,
    address STRING
  ) USING DELTA;
"""
spark.sql(query)

DataFrame[]

In [None]:
query = """insert into default.persons values (12345, "Alice","2000-02-01" ,"123 Main St") """
spark.sql(query)

DataFrame[]

In [None]:
query = """select * from default.persons """
spark.sql(query).show()

+------+-----+-------------------+-----------+
|serial| name|          birthDate|    address|
+------+-----+-------------------+-----------+
| 12345|Alice|2000-02-01 00:00:00|123 Main St|
+------+-----+-------------------+-----------+



Can we run the following statement?

In [None]:
#query = """insert into default.persons values (null, "Bob","1996-03-14" ,"456 Oak Ave") """
#spark.sql(query).show()

### Predicate constraint

In [None]:
spark.sql(""" ALTER TABLE default.persons ADD CONSTRAINT birthdate CHECK (birthDate > '2000-01-01'); """)

DataFrame[]

In [None]:
spark.sql("""SHOW TBLPROPERTIES default.persons""").show(truncate=False)

+---------------------------+------------------------+
|key                        |value                   |
+---------------------------+------------------------+
|delta.constraints.birthdate|birthDate > '2000-01-01'|
|delta.minReaderVersion     |1                       |
|delta.minWriterVersion     |3                       |
+---------------------------+------------------------+



In [None]:
spark.sql("""insert into default.persons values (47962, "Bob","2003-03-14" ,"456 Oak Ave") """)

DataFrame[]

Can we run the following statement?

In [None]:
#spark.sql("""insert into default.persons values (47962, "Bob","1999-03-14" ,"456 Oak Ave") """)

### Generated columns
The following  delta table contains three columns `year`, `month` and `day` that must correspond to the date elements in the `saledate` column.

In [None]:
from delta.tables import *
DeltaTable.createOrReplace(spark) \
  .tableName("default.sales") \
  .addColumn("saleid", "STRING") \
  .addColumn("saledate", "TIMESTAMP") \
  .addColumn("quantity", "INT") \
  .addColumn("year", "INT", generatedAlwaysAs="YEAR(saledate)") \
  .addColumn("month", "INT", generatedAlwaysAs="MONTH(saledate)") \
  .addColumn("day", "INT", generatedAlwaysAs="DAYOFMONTH(saledate)") \
  .partitionedBy("year", "month") \
  .execute()

<delta.tables.DeltaTable at 0x7e3602fbc910>

In [None]:
spark.sql(""" insert into default.sales
            values ('S000000124','2023-02-26 00:00:00',2.0,2023,02,26)  """)

DataFrame[]

In [None]:
spark.sql(""" select * from default.sales """).show()

+----------+-------------------+--------+----+-----+---+
|    saleid|           saledate|quantity|year|month|day|
+----------+-------------------+--------+----+-----+---+
|S000000124|2023-02-26 00:00:00|       2|2023|    2| 26|
+----------+-------------------+--------+----+-----+---+



can we run the following command?

In [None]:
#spark.sql(""" insert into default.sales values ('S000000124','2024-02-26 00:00:00',2.0,2023,02,26)  """)

## Use case (answers not provided)

### Data import

In [None]:
query = """
CREATE TABLE IF NOT EXISTS salesOriginal
USING csv
OPTIONS (
  header "true",
  path "/tmp/delta/sales/salesOriginal.csv",
  inferSchema "true"
)
"""
spark.sql(query)

DataFrame[]

In [None]:
query = """
DESCRIBE salesOriginal
"""
spark.sql(query).show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|       saleid|   string|   NULL|
|     saledate|     date|   NULL|
|     quantity|   double|   NULL|
|    unitprice|   double|   NULL|
|       shopid|   string|   NULL|
|         city|   string|   NULL|
|        state|   string|   NULL|
|      country|   string|   NULL|
|     shopsize|   string|   NULL|
|    productid|   string|   NULL|
|     category|   string|   NULL|
|  subcategory|   string|   NULL|
|         size|   string|   NULL|
|purchaseprice|   double|   NULL|
|        color|   string|   NULL|
|        brand|   string|   NULL|
+-------------+---------+-------+



In [None]:
query = """
CREATE TABLE IF NOT EXISTS march23_sales
USING csv
OPTIONS (
  header "true",
  path "/tmp/delta/sales/march23_sales.csv",
  inferSchema "true"
)
"""
spark.sql(query)

DataFrame[]

In [None]:
query = """
DESCRIBE march23_sales
"""
spark.sql(query).show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|       saleid|   string|   NULL|
|     saledate|     date|   NULL|
|     quantity|   double|   NULL|
|    unitprice|   double|   NULL|
|       shopid|   string|   NULL|
|         city|   string|   NULL|
|        state|   string|   NULL|
|      country|   string|   NULL|
|     shopsize|   string|   NULL|
|    productid|   string|   NULL|
|     category|   string|   NULL|
|  subcategory|   string|   NULL|
|         size|   string|   NULL|
|purchaseprice|   double|   NULL|
|        color|   string|   NULL|
|        brand|   string|   NULL|
+-------------+---------+-------+



### Creation of the delta tables

In [None]:
query = """
CREATE TABLE delta.`/tmp/deltaSales` USING DELTA AS SELECT * FROM salesOriginal;
"""
spark.sql(query)

DataFrame[]

In [None]:
query = """
SELECT * FROM  delta.`/tmp/deltaSales`
"""
spark.sql(query).show(5)

+----------+----------+--------+---------+------+-------------+----------+-------+--------+---------+---------+-----------+------+-------------+-----+-----+
|    saleid|  saledate|quantity|unitprice|shopid|         city|     state|country|shopsize|productid| category|subcategory|  size|purchaseprice|color|brand|
+----------+----------+--------+---------+------+-------------+----------+-------+--------+---------+---------+-----------+------+-------------+-----+-----+
|S000000124|2023-02-26|     2.0|     60.0|shop_4|San Francisco|California|    USA|   small|    CHA_2|Furniture|      Chair|  NULL|         48.0| blue|Basic|
|S000000125|2023-02-25|     1.0|    150.0|shop_5|      Houston|     Texas|    USA|   small|    BED_3|Furniture|        Bed|Single|        127.0|  red| Mega|
|S000000126|2023-02-24|     1.0|    300.0|shop_6|  San Antonio|     Texas|    USA|   small|    BED_4|Furniture|        Bed|Double|        252.0|brown|Basic|
|S000000127|2023-02-23|     1.0|    395.0|shop_7|      Chi

### Adding new records
Write a merge statement to insert the march 2023 records into `deltaSales`

In [None]:
query = """
MERGE INTO delta.`/tmp/deltaSales` as oldData
USING march23_sales
ON oldData.saleid = march23_sales.saleid
WHEN NOT MATCHED
  THEN INSERT *
"""
spark.sql(query).show()

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|               84|               0|               0|               84|
+-----------------+----------------+----------------+-----------------+



### Updating records
Write update statements that increases the unitprice of products sold on 2023, based on their category, as follows: furniture -> 5%, others -> 10%

In [None]:
query = """
UPDATE delta.`/tmp/deltaSales`
SET unitprice = unitprice * 1.05
WHERE saledate >= '2023-01-01' AND category = 'Furniture'
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|              882|
+-----------------+



In [None]:
query = """
UPDATE delta.`/tmp/deltaSales`
SET unitprice = unitprice * 1.1
WHERE saledate >= '2023-01-01' and category != 'Furniture'
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|             1680|
+-----------------+



### Removing old records
remove all sales older than 01-Jan-2023. How many records remain?

In [None]:
query = """
DELETE FROM delta.`/tmp/deltaSales`
WHERE saledate < '2023-01-01'
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|             2438|
+-----------------+



### History viewing


In [None]:
query = """
DESCRIBE HISTORY delta.`/tmp/deltaSales`
"""
spark.sql(query).show()

+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      7|2024-10-16 18:01:...|  NULL|    NULL|              DELETE|{predicate -> ["(...|NULL|    NULL|     NULL|          6|  Serializable|        false|{numRemovedFiles ...|        NULL|Apache-Spark/3.5....|
|      6|2024-10-16 18:00:...|  NULL|    NULL|              UPDATE|{predicate -> ["(...|NULL|    NULL|     NULL|          5|  Serializable|        false|{numRemoved

### Restoring to a previous version

In [None]:
query = """
RESTORE TABLE delta.`/tmp/deltaSales` TO VERSION AS OF 2
"""
spark.sql(query).show()


+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|table_size_after_restore|num_of_files_after_restore|num_removed_files|num_restored_files|removed_files_size|restored_files_size|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+
|                   44148|                         2|                2|                 2|             28405|              44148|
+------------------------+--------------------------+-----------------+------------------+------------------+-------------------+



In [None]:
query = """
DESCRIBE HISTORY delta.`/tmp/deltaSales`
"""
spark.sql(query).show()

+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+--------------+-------------+--------------------+------------+--------------------+
|      8|2024-10-16 18:03:...|  NULL|    NULL|             RESTORE|{version -> 2, ti...|NULL|    NULL|     NULL|          7|  Serializable|        false|{numRestoredFiles...|        NULL|Apache-Spark/3.5....|
|      7|2024-10-16 18:01:...|  NULL|    NULL|              DELETE|{predicate -> ["(...|NULL|    NULL|     NULL|          6|  Serializable|        false|{numRemoved

### Vacuuming old records
Permanently remove the deleted records using `vacuum`. Check the history again and make sure that the removal has been performed.

In [None]:
query = """
VACUUM delta.`/tmp/deltaSales`
"""
spark.sql(query).show()

+--------------------+
|                path|
+--------------------+
|file:/tmp/deltaSales|
+--------------------+



In [None]:
query = """
DESCRIBE HISTORY delta.`/tmp/deltaSales`
"""
spark.sql(query).show()

+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|version|           timestamp|userId|userName|           operation| operationParameters| job|notebook|clusterId|readVersion|   isolationLevel|isBlindAppend|    operationMetrics|userMetadata|          engineInfo|
+-------+--------------------+------+--------+--------------------+--------------------+----+--------+---------+-----------+-----------------+-------------+--------------------+------------+--------------------+
|     10|2024-10-16 18:07:...|  NULL|    NULL|          VACUUM END|{status -> COMPLE...|NULL|    NULL|     NULL|          9|SnapshotIsolation|         true|{numDeletedFiles ...|        NULL|Apache-Spark/3.5....|
|      9|2024-10-16 18:06:...|  NULL|    NULL|        VACUUM START|{retentionCheckEn...|NULL|    NULL|     NULL|          8|SnapshotIsolation|         t

## Change data feed

### Table creation with CDF activated

In [None]:
query = """
CREATE TABLE IF NOT EXISTS salesOriginal
USING csv
OPTIONS (
  header "true",
  path "/tmp/delta/sales/salesOriginal.csv",
  inferSchema "true"
)
"""
spark.sql(query)

DataFrame[]

In [None]:
query = """
CREATE TABLE delta.`/tmp/deltaSalesCDF` USING DELTA TBLPROPERTIES (delta.enableChangeDataFeed = true)
AS SELECT * FROM salesOriginal
"""
spark.sql(query)

DataFrame[]

### CDF for Updates

In [None]:
query = """
select count(*) as nb_rows from delta.`/tmp/deltaSalesCDF`
"""
spark.sql(query).show()

+-------+
|nb_rows|
+-------+
|   4916|
+-------+



In [None]:
query = """
describe delta.`/tmp/deltaSalesCDF`
"""
spark.sql(query).show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|       saleid|   string|   NULL|
|     saledate|     date|   NULL|
|     quantity|   double|   NULL|
|    unitprice|   double|   NULL|
|       shopid|   string|   NULL|
|         city|   string|   NULL|
|        state|   string|   NULL|
|      country|   string|   NULL|
|     shopsize|   string|   NULL|
|    productid|   string|   NULL|
|     category|   string|   NULL|
|  subcategory|   string|   NULL|
|         size|   string|   NULL|
|purchaseprice|   double|   NULL|
|        color|   string|   NULL|
|        brand|   string|   NULL|
+-------------+---------+-------+



In [None]:
query = """
UPDATE delta.`/tmp/deltaSalesCDF`
SET unitprice = unitprice * 1.05
WHERE saledate >= '2023-02-01' and category='Cloth'
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|              765|
+-----------------+



In [None]:
query = """
SELECT * FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
"""
spark.sql(query)


DataFrame[saleid: string, saledate: date, quantity: double, unitprice: double, shopid: string, city: string, state: string, country: string, shopsize: string, productid: string, category: string, subcategory: string, size: string, purchaseprice: double, color: string, brand: string, _change_type: string, _commit_version: bigint, _commit_timestamp: timestamp]

In [None]:
query = """
SELECT _change_type, _commit_version, _commit_timestamp, count(*)
FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
GROUP BY _change_type, _commit_version, _commit_timestamp
"""
spark.sql(query).show()

+----------------+---------------+--------------------+--------+
|    _change_type|_commit_version|   _commit_timestamp|count(1)|
+----------------+---------------+--------------------+--------+
| update_preimage|              1|2024-10-16 18:10:...|     765|
|update_postimage|              1|2024-10-16 18:10:...|     765|
|          insert|              0|2024-10-16 18:09:...|    4916|
+----------------+---------------+--------------------+--------+



In [None]:
query = """
SELECT saleid, _change_type, unitprice
FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
WHERE saledate >= '2023-02-01' and category='Cloth' and _commit_version = 1
CLUSTER BY saleid
"""
spark.sql(query).show()

+----------+----------------+------------------+
|    saleid|    _change_type|         unitprice|
+----------+----------------+------------------+
|S000000007| update_preimage|              20.0|
|S000000007|update_postimage|              21.0|
|S000000008| update_preimage|              20.0|
|S000000008|update_postimage|              21.0|
|S000000009| update_preimage|              22.0|
|S000000009|update_postimage|              23.1|
|S000000010| update_preimage|              24.0|
|S000000010|update_postimage|25.200000000000003|
|S000000011| update_preimage|              24.0|
|S000000011|update_postimage|25.200000000000003|
|S000000012| update_preimage|              45.0|
|S000000012|update_postimage|             47.25|
|S000000013| update_preimage|              48.0|
|S000000013|update_postimage|50.400000000000006|
|S000000014| update_preimage|              49.0|
|S000000014|update_postimage|             51.45|
|S000000015| update_preimage|              60.0|
|S000000015|update_p

### CDF for Deletes

In [None]:
query = """
DELETE FROM delta.`/tmp/deltaSalesCDF`
WHERE city = 'Chicago' and category='Cloth'
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|              244|
+-----------------+



In [None]:
query = """
SELECT _change_type, _commit_version, _commit_timestamp, count(*)
FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
GROUP BY _change_type, _commit_version, _commit_timestamp
"""
spark.sql(query).show()

+----------------+---------------+--------------------+--------+
|    _change_type|_commit_version|   _commit_timestamp|count(1)|
+----------------+---------------+--------------------+--------+
| update_preimage|              1|2024-10-16 18:10:...|     765|
|update_postimage|              1|2024-10-16 18:10:...|     765|
|          delete|              2|2024-10-16 18:19:...|     244|
|          insert|              0|2024-10-16 18:09:...|    4916|
+----------------+---------------+--------------------+--------+



Retrieve the deleted records

In [None]:
query = """
SELECT distinct city, category
FROM table_changes_by_path('/tmp/deltaSalesCDF', 2)
"""
spark.sql(query).show()

+-------+--------+
|   city|category|
+-------+--------+
|Chicago|   Cloth|
+-------+--------+



### Suggest an update operation then audit the changes




In [None]:
query = """
UPDATE delta.`/tmp/deltaSalesCDF`
SET purchaseprice = purchaseprice * 2
WHERE saledate >= '2022-09-01' and category = 'Furniture'
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|             1692|
+-----------------+



In [None]:
query = """
SELECT _change_type, _commit_version, _commit_timestamp, count(*)
FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
GROUP BY _change_type, _commit_version, _commit_timestamp
"""
spark.sql(query).show()

+----------------+---------------+--------------------+--------+
|    _change_type|_commit_version|   _commit_timestamp|count(1)|
+----------------+---------------+--------------------+--------+
| update_preimage|              1|2024-10-16 18:10:...|     765|
|update_postimage|              1|2024-10-16 18:10:...|     765|
| update_preimage|              3|2024-10-16 18:29:...|    1692|
|update_postimage|              3|2024-10-16 18:29:...|    1692|
|          delete|              2|2024-10-16 18:19:...|     244|
|          insert|              0|2024-10-16 18:09:...|    4916|
+----------------+---------------+--------------------+--------+



In [None]:
query = """
SELECT saleid, _change_type, purchaseprice
FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
WHERE saledate >= '2022-09-01' and category='Furniture' and _commit_version = 3
CLUSTER BY saleid
"""
spark.sql(query).show()

+----------+----------------+-------------+
|    saleid|    _change_type|purchaseprice|
+----------+----------------+-------------+
|S000000002| update_preimage|         48.0|
|S000000002|update_postimage|         96.0|
|S000000003| update_preimage|        127.0|
|S000000003|update_postimage|        254.0|
|S000000004| update_preimage|        252.0|
|S000000004|update_postimage|        504.0|
|S000000005| update_preimage|        333.0|
|S000000005|update_postimage|        666.0|
|S000000006| update_preimage|        375.0|
|S000000006|update_postimage|        750.0|
|S000000021| update_preimage|        190.0|
|S000000021|update_postimage|        380.0|
|S000000022| update_preimage|        280.0|
|S000000022|update_postimage|        560.0|
|S000000023| update_preimage|         48.0|
|S000000023|update_postimage|         96.0|
|S000000024| update_preimage|        127.0|
|S000000024|update_postimage|        254.0|
|S000000025| update_preimage|        252.0|
|S000000025|update_postimage|   

### Suggest a delete operation then audit the changes


In [None]:
query = """
DELETE FROM delta.`/tmp/deltaSalesCDF`
WHERE country = 'USA'
"""
spark.sql(query).show()

+-----------------+
|num_affected_rows|
+-----------------+
|             2672|
+-----------------+



In [None]:
query = """
SELECT _change_type, _commit_version, _commit_timestamp, count(*)
FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
GROUP BY _change_type, _commit_version, _commit_timestamp
"""
spark.sql(query).show()

+----------------+---------------+--------------------+--------+
|    _change_type|_commit_version|   _commit_timestamp|count(1)|
+----------------+---------------+--------------------+--------+
|          delete|              4|2024-10-16 18:33:...|    2672|
| update_preimage|              3|2024-10-16 18:29:...|    1692|
|update_postimage|              3|2024-10-16 18:29:...|    1692|
| update_preimage|              1|2024-10-16 18:10:...|     765|
|update_postimage|              1|2024-10-16 18:10:...|     765|
|          delete|              2|2024-10-16 18:19:...|     244|
|          insert|              0|2024-10-16 18:09:...|    4916|
+----------------+---------------+--------------------+--------+



In [None]:
query = """
SELECT saleid, _change_type, country
FROM table_changes_by_path('/tmp/deltaSalesCDF', 0)
WHERE country = 'USA' and _commit_version = 4
CLUSTER BY saleid
"""
spark.sql(query).show()

+----------+------------+-------+
|    saleid|_change_type|country|
+----------+------------+-------+
|S000000002|      delete|    USA|
|S000000003|      delete|    USA|
|S000000004|      delete|    USA|
|S000000005|      delete|    USA|
|S000000006|      delete|    USA|
|S000000016|      delete|    USA|
|S000000017|      delete|    USA|
|S000000018|      delete|    USA|
|S000000019|      delete|    USA|
|S000000020|      delete|    USA|
|S000000021|      delete|    USA|
|S000000022|      delete|    USA|
|S000000023|      delete|    USA|
|S000000032|      delete|    USA|
|S000000033|      delete|    USA|
|S000000034|      delete|    USA|
|S000000035|      delete|    USA|
|S000000036|      delete|    USA|
|S000000037|      delete|    USA|
|S000000038|      delete|    USA|
+----------+------------+-------+
only showing top 20 rows



In [None]:
query = """
SELECT distinct country
FROM table_changes_by_path('/tmp/deltaSalesCDF', 4)
"""
spark.sql(query).show()

+-------+
|country|
+-------+
|    USA|
+-------+



## Where to go from here

Study the possibility of loading JSON data with varying types by reading the official documentation is https://github.com/delta-io/delta/releases/tag/v4.0.0rc1

Practice: load the `vk` dataset from the [JSON notebook](https://colab.research.google.com/drive/1Cs6nxmkxr2VCU1PslBYPouoIVLk_5WbT?usp=drive_link)