
## Liquid Clustering

- [Liquid Clustering](https://delta.io/blog/liquid-clustering/)
  - [Delta: Optimize](https://delta.io/blog/delta-lake-optimize/)
  - [Delta: Transaction Log](https://www.databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html)
- [Databricks Datasets](https://docs.databricks.com/aws/en/discover/databricks-datasets): `/databricks-datasets`


## Data

We are going to use the `/databricks-datasets/asa/airlines/` data.

> 22 CSVs with data from `1987` until `2008`

In [0]:
# display(dbutils.fs.ls('/databricks-datasets/asa/airlines/'))

In [0]:
airlines = (
    spark.read
        .option("inferSchema", "true")
        .option("header", "true")
        .csv("/databricks-datasets/asa/airlines/")
)
airlines.count() # 123,534,969

123534969

## Writing

In [0]:
%sql
create schema if not exists anthony_caliani;
drop table if exists anthony_caliani.airlines_delta;

#### No Clustering

In [0]:
import re
from pyspark.sql import functions as f


def camel_to_snake(name):
    s1 = re.sub(r'(.)([A-Z][a-z]+)', r'\1_\2', name)
    return re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', s1).lower()


attributes = ["flight_id"] + [f.col(col).alias(camel_to_snake(col)) for col in airlines.columns]

(airlines
    .withColumn("flight_id", f.expr("uuid()"))
    .select(*attributes)
    .write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("anthony_caliani.airlines_delta"))

In [0]:
%sql  select * from anthony_caliani.airlines_delta limit 5;

flight_id,year,month,dayof_month,day_of_week,dep_time,crs_dep_time,arr_time,crs_arr_time,unique_carrier,flight_num,tail_num,actual_elapsed_time,crs_elapsed_time,air_time,arr_delay,dep_delay,origin,dest,distance,taxi_in,taxi_out,cancelled,cancellation_code,diverted,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay
60319e09-057f-4e50-aeb7-986284a81ec4,1987,10,14,3,741,730,912,849,PS,1451,,91,79,,23,11,SAN,SFO,447,,,0,,0,,,,,
c461246d-6721-4e69-9957-93461125368f,1987,10,15,4,729,730,903,849,PS,1451,,94,79,,14,-1,SAN,SFO,447,,,0,,0,,,,,
97ecd034-480d-4124-9f4e-08b956a14747,1987,10,17,6,741,730,918,849,PS,1451,,97,79,,29,11,SAN,SFO,447,,,0,,0,,,,,
e0b856e4-4253-4b57-8606-6d96c57462ea,1987,10,18,7,729,730,847,849,PS,1451,,78,79,,-2,-1,SAN,SFO,447,,,0,,0,,,,,
2f7c9384-5da2-4aa7-ad95-99ac44b66017,1987,10,19,1,749,730,922,849,PS,1451,,93,79,,33,19,SAN,SFO,447,,,0,,0,,,,,


In [0]:
%sql 
-- num files.....: 99
-- size in bytes.: 5,944,713,305 (5.54 GB)
-- size / files..: 57 MB
describe detail anthony_caliani.airlines_delta;

format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics,clusterByAuto
delta,3f8c5b0f-330f-468c-bddc-ae62d8e4c215,workspace.anthony_caliani.airlines_delta,,,2025-11-02T12:33:24.335Z,2025-11-02T12:35:00.000Z,List(),List(),99,5944713305,Map(delta.enableDeletionVectors -> true),3,7,"List(appendOnly, deletionVectors, invariants)","Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)",False


#### Log History

In [0]:
%sql 
describe history anthony_caliani.airlines_delta;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2025-11-02T12:35:00.000Z,1306554100705382,avcaliani.it@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(2249394640270120),1102-122431-qjbsf9n3-v2n,,WriteSerializable,False,"Map(numFiles -> 99, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 123534969, numOutputBytes -> 5944713305)",,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13


---

#### Cluster by Year

In [0]:
%sql 
alter table anthony_caliani.airlines_delta cluster by (year);
optimize anthony_caliani.airlines_delta;

path,metrics
,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, null, null, 0, 0, 99, 0, false, 0, 0, 1762086908096, 1762086921492, 8, 0, null, List(0, 0), null, 30, 30, 0, 0, List(5944713305, true, false, false, null, null, null, null, 0, 0, 0, 0, 99, 5944713305, 5944713305, null, log, 16777216, 67108864, 4, 31, 0, null, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, List(147, 672, 0, 612, 0, 10044), 2, 1, 5, sizeAware))"
,"List(2, 4, List(64115273, 65218489, 6.4666881E7, 2, 129333762), List(764673, 63557311, 3.234648525E7, 4, 129385941), 0, null, null, 0, 1, 99, 95, true, 0, 0, 1762086921534, 1762086927694, 8, 2, null, List(0, 0), null, 30, 30, 3790, 0, List(5944713305, false, false, false, 0.49996361961528457, List(0.49996361961528457), 0.0217562884172382, post-optimize-compaction, 0, 0, 0, 0, 4, 129385941, 129385941, null, null, 33554432, 67108864, 0, 0, 0, List(2), 0, 0, 2, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 129385941, 0, 129385941, 0, 0, 0, 0, 0, 129385941, 129385941, List(0, 0, 712, 0, 0, 0), 15, 1, 1, null))"


In [0]:
%sql 
-- num files.....: 97
-- size in bytes.: 5,944,661,126 (5.54 GB)
-- size / files..: 58 MB
describe detail anthony_caliani.airlines_delta

format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics,clusterByAuto
delta,3f8c5b0f-330f-468c-bddc-ae62d8e4c215,workspace.anthony_caliani.airlines_delta,,,2025-11-02T12:33:24.335Z,2025-11-02T12:35:28.000Z,List(),List(year),97,5944661126,"Map(delta.enableDeletionVectors -> true, delta.enableRowTracking -> true, delta.checkpointPolicy -> v2, delta.rowTracking.materializedRowCommitVersionColumnName -> _row-commit-version-col-2152fa00-0967-4a0a-ad14-a8bc5fa1decd, delta.rowTracking.materializedRowIdColumnName -> _row-id-col-507786d0-6444-4d4e-ba0b-f1a930910be7)",3,7,"List(appendOnly, clustering, deletionVectors, domainMetadata, invariants, rowTracking, v2Checkpoint)","Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)",False


In [0]:
%sql 
describe history anthony_caliani.airlines_delta;

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
5,2025-11-02T12:35:28.000Z,1306554100705382,avcaliani.it@gmail.com,OPTIMIZE,"Map(predicate -> [], auto -> false, clusterBy -> [], zOrderBy -> [], batchId -> 0)",,List(2249394640270120),1102-122431-qjbsf9n3-v2n,4.0,SnapshotIsolation,False,"Map(numRemovedFiles -> 4, numRemovedBytes -> 129385941, p25FileSize -> 64115273, numDeletionVectorsRemoved -> 0, minFileSize -> 64115273, numAddedFiles -> 2, maxFileSize -> 65218489, p75FileSize -> 65218489, p50FileSize -> 65218489, numAddedBytes -> 129333762)",,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13
4,2025-11-02T12:35:20.000Z,1306554100705382,avcaliani.it@gmail.com,OPTIMIZE,"Map(predicate -> [], auto -> false, clusterBy -> [""year""], isFull -> false, zOrderBy -> [], batchId -> -1)",,List(2249394640270120),1102-122431-qjbsf9n3-v2n,3.0,SnapshotIsolation,True,Map(),,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13
3,2025-11-02T12:35:08.000Z,1306554100705382,avcaliani.it@gmail.com,CLUSTER BY,"Map(oldClusteringColumns -> , newClusteringColumns -> year)",,List(2249394640270120),1102-122431-qjbsf9n3-v2n,2.0,WriteSerializable,True,Map(),,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13
2,2025-11-02T12:35:07.000Z,1306554100705382,avcaliani.it@gmail.com,ROW TRACKING BACKFILL,Map(batchId -> 0),,List(2249394640270120),1102-122431-qjbsf9n3-v2n,1.0,SnapshotIsolation,False,Map(),,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13
1,2025-11-02T12:35:06.000Z,1306554100705382,avcaliani.it@gmail.com,UPGRADE PROTOCOL,"Map(newProtocol -> {""minReaderVersion"":3,""minWriterVersion"":7,""readerFeatures"":[""deletionVectors""],""writerFeatures"":[""deletionVectors"",""domainMetadata"",""rowTracking"",""invariants"",""appendOnly""]})",,List(2249394640270120),1102-122431-qjbsf9n3-v2n,0.0,WriteSerializable,True,Map(),,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13
0,2025-11-02T12:35:00.000Z,1306554100705382,avcaliani.it@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(2249394640270120),1102-122431-qjbsf9n3-v2n,,WriteSerializable,False,"Map(numFiles -> 99, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 123534969, numOutputBytes -> 5944713305)",,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13


---

#### Cluster by Flight ID, Year

**[How do I trigger clustering?](https://delta.io/blog/liquid-clustering/)**  
You can also **manually trigger** a liquid clustering operation using the `OPTIMIZE` command.

In [0]:
%sql 
alter table anthony_caliani.airlines_delta cluster by (flight_id, year);
optimize anthony_caliani.airlines_delta;

path,metrics
,"List(99, 95, List(39023717, 98966700, 6.652841003030303E7, 99, 6586312593), List(576628, 76824811, 6.121397225263158E7, 95, 5815327364), 0, null, null, 0, 1, 97, 0, false, 0, 0, 1762086932517, 1762087023174, 8, 1, null, List(0, 0), null, 30, 30, 425687, 0, List(5944661126, true, false, false, 0.9890886860723271, List(0.9133504991728002, 0.8709009111958324), 0.9807414275315872, null, 0, 95, 5815327364, 5815327364, 0, 0, 0, null, log, 16777216, 67108864, 4, 0, 0, List(0, 0), 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 5815327364, 5815327364, List(124, 2893, 806, 515, 2072, 3586), 2, 1, 5, sizeAware))"
,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, null, null, 0, 0, 101, 0, false, 0, 0, 1762087023273, 1762087026634, 8, 0, null, List(0, 0), null, 30, 30, 0, 0, List(6715646355, false, false, false, 0.9890886860723271, List(0.9133504991728002, 0.8709009111958324), 0.9807414275315872, null, 0, 0, 0, 0, 0, 0, 0, null, log, 16777216, 67108864, 4, 0, 0, null, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, List(94, 106, 434, 0, 0, 0), 2, 2, 5, sizeAware))"
,"List(0, 0, List(null, null, 0.0, 0, 0), List(null, null, 0.0, 0, 0), 0, null, null, 0, 0, 101, 0, true, 0, 0, 1762087026674, 1762087028594, 8, 0, null, List(0, 0), null, 30, 30, 0, 0, List(6715646355, false, false, false, 0.9890886860723271, List(0.9133504991728002, 0.8709009111958324), 0.9807414275315872, post-optimize-compaction, 0, 0, 0, 0, 0, 0, 0, null, null, 33554432, 67108864, 0, 0, 0, null, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, List(0, 0, 416, 0, 0, 0), 15, 1, 1, null))"


In [0]:
%sql 
-- num files.....: 101
-- size in bytes.: 6,715,646,355 (6.25 GB)
-- size / files..: 63 MB
describe detail anthony_caliani.airlines_delta;

format,id,name,description,location,createdAt,lastModified,partitionColumns,clusteringColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics,clusterByAuto
delta,3f8c5b0f-330f-468c-bddc-ae62d8e4c215,workspace.anthony_caliani.airlines_delta,,,2025-11-02T12:33:24.335Z,2025-11-02T12:37:03.000Z,List(),"List(flight_id, year)",101,6715646355,"Map(delta.enableDeletionVectors -> true, delta.enableRowTracking -> true, delta.checkpointPolicy -> v2, delta.rowTracking.materializedRowCommitVersionColumnName -> _row-commit-version-col-2152fa00-0967-4a0a-ad14-a8bc5fa1decd, delta.rowTracking.materializedRowIdColumnName -> _row-id-col-507786d0-6444-4d4e-ba0b-f1a930910be7)",3,7,"List(appendOnly, clustering, deletionVectors, domainMetadata, invariants, rowTracking, v2Checkpoint)","Map(numRowsDeletedByDeletionVectors -> 0, numDeletionVectors -> 0)",False
