# Your Hands-On Guide to Delta Tables (in Python)

Delta is a versioned parquet file with a transaction log, we get compression and all benefits of parquet  
Transaction log is single-source of truth, see who does what, plays nicely with spark and python  


Python APIs for Delta-Lake
- `pyspark`
- `delta-rs` pip install delta-lake
- pyspark declarative `pip install delta-spark`


In [2]:
from delta import *
from pyspark.sql import SparkSession

builder = SparkSession.builder.appName('delta-tutorial').config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension").config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [114]:
# load vanilla spark table
sdf = spark.read.load(
  '/storage/data/airline_2m.csv' ,
  format='com.databricks.spark.csv',
  header='true',
  inferSchema='true'
).select(['FlightDate', 'Reporting_Airline', 'Flight_Number_Reporting_Airline','Origin', 'Dest', 'DepTime', 'DepDelay', 'ArrTime', 'ArrDelay' ])

# save as a delta table
sdf.write.format('delta').mode('overwrite').save('/storage/data/airline_2m.delta')

                                                                                

23/06/10 12:36:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,012,583,616 bytes) of heap memory
Scaling row group sizes to 94.30% for 8 writers
23/06/10 12:36:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,012,583,616 bytes) of heap memory
Scaling row group sizes to 83.83% for 9 writers
23/06/10 12:36:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,012,583,616 bytes) of heap memory
Scaling row group sizes to 75.44% for 10 writers
23/06/10 12:36:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,012,583,616 bytes) of heap memory
Scaling row group sizes to 68.58% for 11 writers
23/06/10 12:36:43 WARN MemoryManager: Total allocation exceeds 95.00% (1,012,583,616 bytes) of heap memory
Scaling row group sizes to 62.87% for 12 writers


[Stage 178:>                                                      (0 + 12) / 12]

23/06/10 12:36:44 WARN MemoryManager: Total allocation exceeds 95.00% (1,012,583,616 bytes) of heap memory
Scaling row group sizes to 68.58% for 11 writers
23/06/10 12:36:45 WARN MemoryManager: Total allocation exceeds 95.00% (1,012,583,616 bytes) of heap memory
Scaling row group sizes to 75.44% for 10 writers
23/06/10 12:36:45 WARN MemoryManager: Total allocation exceeds 95.00% (1,012,583,616 bytes) of heap memory
Scaling row group sizes to 83.83% for 9 writers
23/06/10 12:36:45 WARN MemoryManager: Total allocation exceeds 95.00% (1,012,583,616 bytes) of heap memory
Scaling row group sizes to 94.30% for 8 writers


                                                                                

In [119]:
delta_table = DeltaTable.forPath(spark, '/storage/data/airline_2m.delta/')

In [21]:
# inspecting what we just wrote
import os

os.listdir('/storage/data/airline_2m.delta/')
# bunch of individual files and a folder called "_delta_log"

['part-00008-37835621-905f-481b-bee1-f2fbde02595c-c000.snappy.parquet',
 '.part-00002-f79474ae-503d-4914-8aff-54af17da6de2-c000.snappy.parquet.crc',
 '.part-00006-9cdf358f-5ff7-4c45-9314-09aeeb770ab4-c000.snappy.parquet.crc',
 '.part-00001-da20e657-cbd5-4ca0-b7bf-f862a6083d2b-c000.snappy.parquet.crc',
 'part-00007-3e4232fc-fbd4-4cce-84fb-7a862aff9928-c000.snappy.parquet',
 '.part-00010-bff9f938-783e-490b-b462-636415a9f94d-c000.snappy.parquet.crc',
 '.part-00009-6c30192d-45cc-4819-b3ce-3e6c4de1c218-c000.snappy.parquet.crc',
 'part-00004-016a4d0d-6026-4cff-9c53-8a4af7c32241-c000.snappy.parquet',
 'part-00005-3885ce51-666c-4fb2-a0ad-a12c90ec095a-c000.snappy.parquet',
 '_delta_log',
 'part-00003-9aec4e25-09dd-4f1d-8bff-e6c7c368ae7d-c000.snappy.parquet',
 '.part-00008-37835621-905f-481b-bee1-f2fbde02595c-c000.snappy.parquet.crc',
 'part-00006-9cdf358f-5ff7-4c45-9314-09aeeb770ab4-c000.snappy.parquet',
 'part-00009-6c30192d-45cc-4819-b3ce-3e6c4de1c218-c000.snappy.parquet',
 '.part-00003-9aec4

In [22]:
# _delta_log
os.listdir('/storage/data/airline_2m.delta/_delta_log/')
# a json and a .crc file, crc files are checksums added to prevent corruption if parquet is corrupted in-flight

['00000000000000000000.json', '.00000000000000000000.json.crc']

logs hold:
copy-paste:
Whenever a user performs an operation to modify a table (such as an INSERT, UPDATE or DELETE), Delta Lake breaks that operation down into a series of discrete steps composed of one or more of the actions below.

Add file - adds a data file.
Remove file - removes a data file.
Update metadata - Updates the table’s metadata (e.g., changing the table’s name, schema or partitioning).
Set transaction - Records that a structured streaming job has committed a micro-batch with the given ID.
Change protocol - enables new features by switching the Delta Lake transaction log to the newest software protocol.
Commit info - Contains information around the commit, which operation was made, from where and at what time.


WHen table is created, table's transaciton log automatically created in `_delta_log`. Each change is recorded as an atomic commit in the transaction log

Time-travel `DESCRIBE HISTORY` in SQL, can select fomr using `TIMESTAMP` OR the `VERSION` `TABLE@v2` 


Delta transaciton enables ACID, full audit and scalable metadata.

Hive metastore stores table definition, where table is stored


# CTAS Statements
`CREATE_TABLE _ AS SELECT` use output of select to crae

Does not support manual schema declaration, automatically infers schema from query results, does not require an `INSERT` statement.

```CRETE TABLE new_table
COMMENT "some comment"
PARTITIONED BY (id1, id2) --best practice to non-partition
LOCATION '/some/path'
FROM ...
```

# Constraints
- NOT NULL and CHECK supported
`CHECK` looks like `WHERE` clauses, `NOT NULL` is obvious

# Copying
**DEEP** clone  
`CREATE TABLE table_clone DEEP CLONE source_table` can occur incrementall (done multiple times)

**SHALLOW** clone  
only copies transaction log, no actual data copied  


# Things to observe 
- Updates
- Delete
- Optimize: compacts multiple small files
- ZORDER BY: added to optimize, this compacts files ordered by the column, like indexing
    - Zordering for high-cardinality columns (>= 2 columns): can I go ahead and skip (i.e. reduce number of files need to scan), diminishing returns if z-order by all columns
    - Partitioning is for low-cardinality columns, table >= 1TB of data, partitions >= 1GB
- Cleaning up: delta lake `VACUUM table_name [retention period]`, feault retention period is 7 days. Once vacuum is run, time-travel is lost
- Try deleting then using time-travel to go back using `RESTORE TABLE` then calling `DESCRIBE HISTORY`  

can call `delta_table.history().show()`  

In [23]:
import json

with open('/storage/data/airline_2m.delta/_delta_log/00000000000000000000.json', 'r') as json_file:
    for line in json_file:
      json_object = json.loads(line) 
      print(json.dumps(json_object, indent=2))
  

{
  "commitInfo": {
    "timestamp": 1686413205823,
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Overwrite",
      "partitionBy": "[]"
    },
    "isolationLevel": "Serializable",
    "isBlindAppend": false,
    "operationMetrics": {
      "numFiles": "12",
      "numOutputRows": "2000000",
      "numOutputBytes": "20619620"
    },
    "engineInfo": "Apache-Spark/3.3.2 Delta-Lake/2.3.0",
    "txnId": "4f5656f9-d9d2-404e-bf11-722a1349387b"
  }
}
{
  "protocol": {
    "minReaderVersion": 1,
    "minWriterVersion": 2
  }
}
{
  "metaData": {
    "id": "609a6c9c-5c5d-4844-9276-49201985b8f1",
    "format": {
      "provider": "parquet",
      "options": {}
    },
    "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"FlightDate\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"Reporting_Airline\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"Origin\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\"

In [126]:
sdf.filter('Origin="JFK" and FlightDate = "2017-07-26"').show() # filter for flights leaving JFK on the 26th July, 2017

+-------------------+-----------------+-------------------------------+------+----+-------+--------+-------+--------+
|         FlightDate|Reporting_Airline|Flight_Number_Reporting_Airline|Origin|Dest|DepTime|DepDelay|ArrTime|ArrDelay|
+-------------------+-----------------+-------------------------------+------+----+-------+--------+-------+--------+
|2017-07-26 00:00:00|               AA|                              9|   JFK| SFO|    657|    -3.0|    947|   -33.0|
|2017-07-26 00:00:00|               DL|                           2051|   JFK| SJU|   2036|     7.0|     17|   -26.0|
|2017-07-26 00:00:00|               B6|                           1089|   JFK| MCO|   1340|    21.0|   1638|    27.0|
+-------------------+-----------------+-------------------------------+------+----+-------+--------+-------+--------+



                                                                                

In [136]:
# create new table for upsert
sdf.dtypes

updates = [
    (datetime.strptime('2017-07-26', '%Y-%m-%d'), 'AA',9, 'JFK', 'SFO', 756, -4.0, 1117, 0.0), # update existing entry
    (datetime.strptime('2017-07-26', '%Y-%m-%d'), 'DL',1368, 'JFK', 'MIA', 1107, 1.0, 1421, 0.0), # new entry
]
(updates_table := spark.createDataFrame(updates, schema=sdf.schema)).show()

+-------------------+-----------------+-------------------------------+------+----+-------+--------+-------+--------+
|         FlightDate|Reporting_Airline|Flight_Number_Reporting_Airline|Origin|Dest|DepTime|DepDelay|ArrTime|ArrDelay|
+-------------------+-----------------+-------------------------------+------+----+-------+--------+-------+--------+
|2017-07-26 00:00:00|               AA|                              9|   JFK| SFO|    756|    -4.0|   1117|     0.0|
|2017-07-26 00:00:00|               DL|                           1368|   JFK| MIA|   1107|     1.0|   1421|     0.0|
+-------------------+-----------------+-------------------------------+------+----+-------+--------+-------+--------+



In [144]:
# perform upsert
(delta_table.alias('current_data')
  .merge(
      source=updates_table.alias('new_data'), 
      condition=F.expr('current_data.FlightDate = new_data.FlightDate and new_data.Origin = current_data.Origin and current_data.Dest = new_data.Dest and  current_data.Reporting_Airline = new_data.Reporting_Airline and current_data.Flight_Number_Reporting_Airline = new_data.Flight_Number_Reporting_Airline'))
  .whenMatchedUpdate(set = {
    'DepTime': F.col('new_data.DepTime'),
    'DepDelay': F.col('new_data.DepDelay'),
    'ArrTime': F.col('new_data.ArrTime'),
    'ArrDelay': F.col('new_data.ArrDelay'),
  })
  # .whenNotMatchedInsert(values = {
  #   'FlightDate': 'new_data.FlightDate',
  #   'Reporting_Airline': 'new_data.Reporting_Airline',
  #   'Flight_Number_Reporting_Airline': 'new_data.Flight_Number_Reporting_Airline',
  #   'DepTime': 'new_data.DepTime',
  #   'DepDelay': 'new_data.DepDelay',
  #   'ArrTime': 'new_data.ArrTime',
  #   'ArrDelay': 'new_data.ArrDelay',
  # })
  .whenNotMatchedInsertAll()
  .execute()

  # can have any number of whenMatched (at most one update and one delete action)
  # update in merge only updates sepcified columns
  # multiple whenMatched executes in order specified
  # to update all columns of target dleta table use whenMatched(...).updateAll()


  # whenNotMatched when source doesn't match target
  # can have ONLY the insert action, any unspecified columns assume null
  # each whenNotMatched can have optional conditoin, 
  # see https://docs.delta.io/latest/delta-update.html#language-python
)

In [145]:
delta_table.toDF().filter('Origin="JFK" and FlightDate = "2017-07-26"').show()

+-------------------+-----------------+-------------------------------+------+----+-------+--------+-------+--------+
|         FlightDate|Reporting_Airline|Flight_Number_Reporting_Airline|Origin|Dest|DepTime|DepDelay|ArrTime|ArrDelay|
+-------------------+-----------------+-------------------------------+------+----+-------+--------+-------+--------+
|2017-07-26 00:00:00|               DL|                           2051|   JFK| SJU|   2036|     7.0|     17|   -26.0|
|2017-07-26 00:00:00|               B6|                           1089|   JFK| MCO|   1340|    21.0|   1638|    27.0|
|2017-07-26 00:00:00|               DL|                           1368|   JFK| MIA|   1107|     1.0|   1421|     0.0|
|2017-07-26 00:00:00|               AA|                              9|   JFK| SFO|    756|    -4.0|   1117|     0.0|
+-------------------+-----------------+-------------------------------+------+----+-------+--------+-------+--------+



In [5]:
import pandas as pd

df = pd.DataFrame({
  'a': ['A', 'B', 'C', 'D'],
  'value': [1, 2, 3, 4]
})

sdf = spark.createDataFrame(df)

In [8]:
from pyspark.sql import functions as F

sdf.filter(F.col('a').isin(['A', 'B'])).show()

+---+-----+
|  a|value|
+---+-----+
|  A|    1|
|  B|    2|
+---+-----+



# Content Starts Here


This post covers the Delta Lake, which is an open-source format extending parquet files for ACID transactions. More specifically, this covers how to work with Delta tables using the `pyspark` and native `Delta` APIs. 

Delta tables can be thought of as having the benefits of a non-flat file format (compression via more efficient encoding), with a single source of truth called the transaction log.

## Creating a Delta Table

In order to create a delta table, I'm loading an existing CSV using `pyspark`, and saving it using the `format` option in `pyspark`'s `write`:

(Completely irrelevant, however the dataset being used here is [IBM's Airline Reporting Carrier On-Timer Performance Dataset](https://developer.ibm.com/exchanges/data/all/airline/))

In [17]:
# load original dataset
sdf = spark.read.load(
  '/storage/data/airline_2m.csv' ,
  format='com.databricks.spark.csv',
  header='true',
  inferSchema='true'
).select(['FlightDate', 'Reporting_Airline', 'Flight_Number_Reporting_Airline','Origin', 'Dest', 'DepTime', 'DepDelay', 'ArrTime', 'ArrDelay' ]).filter('Origin="JFK" and FlightDate>="2017-12-01" and FlightDate <= "2017-12-31"')

# write as a delta table
sdf.write.format('delta').mode('overwrite').save('/storage/data/airline_2m.delta')

                                                                                

23/06/22 22:09:51 WARN MemoryManager: Total allocation exceeds 95.00% (986,185,716 bytes) of heap memory
Scaling row group sizes to 91.85% for 8 writers
23/06/22 22:09:51 WARN MemoryManager: Total allocation exceeds 95.00% (986,185,716 bytes) of heap memory
Scaling row group sizes to 81.64% for 9 writers
23/06/22 22:09:51 WARN MemoryManager: Total allocation exceeds 95.00% (986,185,716 bytes) of heap memory
Scaling row group sizes to 73.48% for 10 writers
23/06/22 22:09:51 WARN MemoryManager: Total allocation exceeds 95.00% (986,185,716 bytes) of heap memory
Scaling row group sizes to 66.80% for 11 writers


[Stage 57:>                                                       (0 + 12) / 12]

23/06/22 22:09:51 WARN MemoryManager: Total allocation exceeds 95.00% (986,185,716 bytes) of heap memory
Scaling row group sizes to 61.23% for 12 writers
23/06/22 22:09:52 WARN MemoryManager: Total allocation exceeds 95.00% (986,185,716 bytes) of heap memory
Scaling row group sizes to 66.80% for 11 writers
23/06/22 22:09:52 WARN MemoryManager: Total allocation exceeds 95.00% (986,185,716 bytes) of heap memory
Scaling row group sizes to 73.48% for 10 writers
23/06/22 22:09:52 WARN MemoryManager: Total allocation exceeds 95.00% (986,185,716 bytes) of heap memory
Scaling row group sizes to 81.64% for 9 writers
23/06/22 22:09:52 WARN MemoryManager: Total allocation exceeds 95.00% (986,185,716 bytes) of heap memory
Scaling row group sizes to 91.85% for 8 writers


                                                                                

Now, if we inspect the path which was written to, there are two things to note:
1. There are multiple `.parquet` files
2. There's a directory called the `_delta_log`.

In [18]:
%ls /storage/data/airline_2m.delta

[0m[01;34m_delta_log[0m/
part-00000-9db93c29-a618-4f69-aa5f-776e1ca1a221-c000.snappy.parquet
part-00001-43218537-207b-4569-8d98-7cb1d2959d3d-c000.snappy.parquet
part-00002-b41a2670-c5bc-4515-93c6-c9fe87c3d132-c000.snappy.parquet
part-00003-0393fe9a-e8cc-4c69-83a4-e11828b75886-c000.snappy.parquet
part-00004-edbda9cf-91b8-4752-bec3-f30e93651fe8-c000.snappy.parquet
part-00005-9c275ad8-871a-4948-9630-40aef37c3d50-c000.snappy.parquet
part-00006-d273f657-9c1f-4dd1-8bbf-fb66eba644f3-c000.snappy.parquet
part-00007-91fbd325-4e1c-437f-a7c4-e0fd8e91b26d-c000.snappy.parquet
part-00008-d6982b42-0653-4ef5-8b00-72f3bb62b7ce-c000.snappy.parquet
part-00009-4539d215-00c7-4f2b-9fc7-cf8c7544070c-c000.snappy.parquet
part-00010-4238061e-7d3c-43d5-9d29-9b4291b38d55-c000.snappy.parquet
part-00011-00828917-003b-4eff-a175-81b3e86890cb-c000.snappy.parquet


This folder called the `_delta_log` is the single source of truth for the delta table, and contains all history for a given table; currently there is a single `.json` file, since only one operation was done to this table.

In [19]:
%ls /storage/data/airline_2m.delta/_delta_log

00000000000000000000.json


In [22]:
(jdf := spark.read.json("/storage/data/airline_2m.delta/_delta_log/00000000000000000000.json")).show()

+--------------------+--------------------+--------------------+--------+
|                 add|          commitInfo|            metaData|protocol|
+--------------------+--------------------+--------------------+--------+
|                null|{Apache-Spark/3.3...|                null|    null|
|                null|                null|                null|  {1, 2}|
|                null|                null|{1687486190991, {...|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                null|                null|    null|
|{true, 1687486192...|                

The above is a bit hard to see, to let's filter for the one entry where `commitInfo` is not null. In the commit info, there are several important parameters, namely the ovewrite mode, the operation (in this case `WRITE`) and the timestamp.

In [31]:
from pyspark.sql import functions as F

jdf.filter(F.col('commitInfo').isNotNull()).select('commitInfo').show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|commitInfo                                                                                                                                              |
+--------------------------------------------------------------------------------------------------------------------------------------------------------+
|{Apache-Spark/3.3.2 Delta-Lake/2.3.0, false, Serializable, WRITE, {12, 33238, 75}, {Overwrite, []}, 1687486192334, 0e2eefc4-557d-45b2-ac9f-e1f56b484fbc}|
+--------------------------------------------------------------------------------------------------------------------------------------------------------+



The metadata stores information on the columns, type of columns, constraints on the columns and the type of file (parquet).

In [32]:
jdf.filter(F.col('metaData').isNotNull()).select('metaData').show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|metaData                                                                                                                                                                                                                                                                        

If I were to modify the table in any way, such as by adding a new row