Point to the right catalog

In [0]:
%sql
USE CATALOG clauseeu

Load a demo data set from Databricks

In [0]:
%fs
ls dbfs:/databricks-datasets/nyctaxi/tripdata/yellow
 

In [0]:
Define schema and show a dataframe

In [0]:
from pyspark.sql.functions import col, lit, expr, when
from pyspark.sql.types import *
from datetime import datetime
import time
 
# Define schema
nyc_schema = StructType([
  StructField('Vendor', StringType(), True),
  StructField('Pickup_DateTime', TimestampType(), True),
  StructField('Dropoff_DateTime', TimestampType(), True),
  StructField('Passenger_Count', IntegerType(), True),
  StructField('Trip_Distance', DoubleType(), True),
  StructField('Pickup_Longitude', DoubleType(), True),
  StructField('Pickup_Latitude', DoubleType(), True),
  StructField('Rate_Code', StringType(), True),
  StructField('Store_And_Forward', StringType(), True),
  StructField('Dropoff_Longitude', DoubleType(), True),
  StructField('Dropoff_Latitude', DoubleType(), True),
  StructField('Payment_Type', StringType(), True),
  StructField('Fare_Amount', DoubleType(), True),
  StructField('Surcharge', DoubleType(), True),
  StructField('MTA_Tax', DoubleType(), True),
  StructField('Tip_Amount', DoubleType(), True),
  StructField('Tolls_Amount', DoubleType(), True),
  StructField('Total_Amount', DoubleType(), True)
])
 
rawDF = spark.read.format('csv').options(header=True).schema(nyc_schema).load("dbfs:/databricks-datasets/nyctaxi/tripdata/yellow/*.gz")
 
rawDF.take(5)

In [0]:
rawDF.printSchema()

Now, switch to SQL and start with Delta lake

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS taxidata;
DROP TABLE IF EXISTS taxidata.taxi_2019_12;

In [0]:
%python
rawDF.write.mode("overwrite").saveAsTable("taxidata.taxi_2019_12")

In [0]:
%sql
describe extended taxidata.taxi_2019_12

In [0]:
%sql
select * from taxidata.taxi 
limit 10

Vendor,Pickup_DateTime,Dropoff_DateTime,Passenger_Count,Trip_Distance,Pickup_Longitude,Pickup_Latitude,Rate_Code,Store_And_Forward,Dropoff_Longitude,Dropoff_Latitude,Payment_Type,Fare_Amount,Surcharge,MTA_Tax,Tip_Amount,Tolls_Amount,Total_Amount
VTS,2009-01-04T02:52:00Z,2009-01-04T03:02:00Z,1,2.63,-73.991957,40.721567,,,-73.993803,40.695922,CASH,8.9,0.5,,0.0,0.0,9.4
VTS,2009-01-04T03:31:00Z,2009-01-04T03:38:00Z,3,4.55,-73.982102,40.73629,,,-73.95585,40.76803,Credit,12.1,0.5,,2.0,0.0,14.6
VTS,2009-01-03T15:43:00Z,2009-01-03T15:57:00Z,5,10.35,-74.002587,40.739748,,,-73.869983,40.770225,Credit,23.7,0.0,,4.74,0.0,28.44
DDS,2009-01-01T20:52:58Z,2009-01-01T21:14:00Z,1,5.0,-73.974267,40.790955,,,-73.996558,40.731849,CREDIT,14.9,0.5,,3.05,0.0,18.45
DDS,2009-01-24T16:18:23Z,2009-01-24T16:24:56Z,1,0.4,-74.00158,40.719382,,,-74.008378,40.72035,CASH,3.7,0.0,,0.0,0.0,3.7
DDS,2009-01-16T22:35:59Z,2009-01-16T22:43:35Z,2,1.2,-73.989806,40.735006,,,-73.985021,40.724494,CASH,6.1,0.5,,0.0,0.0,6.6
DDS,2009-01-21T08:55:57Z,2009-01-21T09:05:42Z,1,0.4,-73.98405,40.743544,,,-73.98026,40.748926,CREDIT,5.7,0.0,,1.0,0.0,6.7
VTS,2009-01-04T04:31:00Z,2009-01-04T04:36:00Z,1,1.72,-73.992635,40.748362,,,-73.995585,40.728307,CASH,6.1,0.5,,0.0,0.0,6.6
CMT,2009-01-05T16:29:02Z,2009-01-05T16:40:21Z,1,1.6,-73.96969,40.749244,,,-73.990413,40.751082,Credit,8.7,0.0,,1.3,0.0,10.0
CMT,2009-01-05T18:53:13Z,2009-01-05T18:57:45Z,1,0.7,-73.955173,40.783044,,,-73.958598,40.774822,Cash,5.9,0.0,,0.0,0.0,5.9


In [0]:
%sql
select * from taxidata.taxi 
WHERE Vendor = 'V1'
limit 10


In [0]:
from pyspark.sql import Row
from datetime import datetime

# Create a list of rows to insert
data = [
    Row(Vendor="Vendor1", Pickup_DateTime=datetime(2025, 6, 12, 8, 0), Dropoff_DateTime=datetime(2025, 6, 12, 8, 15), Passenger_Count=1, Trip_Distance=2.5, Pickup_Longitude=-73.985, Pickup_Latitude=40.758, Rate_Code=1, Store_And_Forward="N", Dropoff_Longitude=-73.985, Dropoff_Latitude=40.758, Payment_Type="Card", Fare_Amount=10.0, Surcharge=0.5, MTA_Tax=0.5, Tip_Amount=2.0, Tolls_Amount=0.0, Total_Amount=13.0),
    Row(Vendor="Vendor2", Pickup_DateTime=datetime(2025, 6, 12, 9, 0), Dropoff_DateTime=datetime(2025, 6, 12, 9, 20), Passenger_Count=2, Trip_Distance=3.0, Pickup_Longitude=-73.985, Pickup_Latitude=40.758, Rate_Code=1, Store_And_Forward="N", Dropoff_Longitude=-73.985, Dropoff_Latitude=40.758, Payment_Type="Cash", Fare_Amount=12.0, Surcharge=0.5, MTA_Tax=0.5, Tip_Amount=3.0, Tolls_Amount=0.0, Total_Amount=16.0),
    Row(Vendor="Vendor1", Pickup_DateTime=datetime(2025, 6, 12, 10, 0), Dropoff_DateTime=datetime(2025, 6, 12, 10, 10), Passenger_Count=1, Trip_Distance=1.5, Pickup_Longitude=-73.985, Pickup_Latitude=40.758, Rate_Code=1, Store_And_Forward="N", Dropoff_Longitude=-73.985, Dropoff_Latitude=40.758, Payment_Type="Card", Fare_Amount=8.0, Surcharge=0.5, MTA_Tax=0.5, Tip_Amount=1.5, Tolls_Amount=0.0, Total_Amount=10.5),
    Row(Vendor="Vendor2", Pickup_DateTime=datetime(2025, 6, 12, 11, 0), Dropoff_DateTime=datetime(2025, 6, 12, 11, 25), Passenger_Count=3, Trip_Distance=4.0, Pickup_Longitude=-73.985, Pickup_Latitude=40.758, Rate_Code=1, Store_And_Forward="N", Dropoff_Longitude=-73.985, Dropoff_Latitude=40.758, Payment_Type="Cash", Fare_Amount=15.0, Surcharge=0.5, MTA_Tax=0.5, Tip_Amount=4.0, Tolls_Amount=0.0, Total_Amount=20.0),
    Row(Vendor="Vendor1", Pickup_DateTime=datetime(2025, 6, 12, 12, 0), Dropoff_DateTime=datetime(2025, 6, 12, 12, 30), Passenger_Count=1, Trip_Distance=5.0, Pickup_Longitude=-73.985, Pickup_Latitude=40.758, Rate_Code=1, Store_And_Forward="N", Dropoff_Longitude=-73.985, Dropoff_Latitude=40.758, Payment_Type="Card", Fare_Amount=18.0, Surcharge=0.5, MTA_Tax=0.5, Tip_Amount=3.5, Tolls_Amount=0.0, Total_Amount=22.5),
    Row(Vendor="Vendor2", Pickup_DateTime=datetime(2025, 6, 12, 13, 0), Dropoff_DateTime=datetime(2025, 6, 12, 13, 15), Passenger_Count=2, Trip_Distance=2.0, Pickup_Longitude=-73.985, Pickup_Latitude=40.758, Rate_Code=1, Store_And_Forward="N", Dropoff_Longitude=-73.985, Dropoff_Latitude=40.758, Payment_Type="Cash", Fare_Amount=9.0, Surcharge=0.5, MTA_Tax=0.5, Tip_Amount=2.0, Tolls_Amount=0.0, Total_Amount=12.0),
    Row(Vendor="Vendor1", Pickup_DateTime=datetime(2025, 6, 12, 14, 0), Dropoff_DateTime=datetime(2025, 6, 12, 14, 10), Passenger_Count=1, Trip_Distance=1.0, Pickup_Longitude=-73.985, Pickup_Latitude=40.758, Rate_Code=1, Store_And_Forward="N", Dropoff_Longitude=-73.985, Dropoff_Latitude=40.758, Payment_Type="Card", Fare_Amount=7.0, Surcharge=0.5, MTA_Tax=0.5, Tip_Amount=1.0, Tolls_Amount=0.0, Total_Amount=9.0),
    Row(Vendor="Vendor2", Pickup_DateTime=datetime(2025, 6, 12, 15, 0), Dropoff_DateTime=datetime(2025, 6, 12, 15, 20), Passenger_Count=3, Trip_Distance=3.5, Pickup_Longitude=-73.985, Pickup_Latitude=40.758, Rate_Code=1, Store_And_Forward="N", Dropoff_Longitude=-73.985, Dropoff_Latitude=40.758, Payment_Type="Cash", Fare_Amount=14.0, Surcharge=0.5, MTA_Tax=0.5, Tip_Amount=3.0, Tolls_Amount=0.0, Total_Amount=18.0),
    Row(Vendor="Vendor1", Pickup_DateTime=datetime(2025, 6, 12, 16, 0), Dropoff_DateTime=datetime(2025, 6, 12, 16, 25), Passenger_Count=2, Trip_Distance=4.5, Pickup_Longitude=-73.985, Pickup_Latitude=40.758, Rate_Code=1, Store_And_Forward="N", Dropoff_Longitude=-73.985, Dropoff_Latitude=40.758, Payment_Type="Card", Fare_Amount=16.0, Surcharge=0.5, MTA_Tax=0.5, Tip_Amount=3.0, Tolls_Amount=0.0, Total_Amount=20.0),
    Row(Vendor="Vendor2", Pickup_DateTime=datetime(2025, 6, 12, 17, 0), Dropoff_DateTime=datetime(2025, 6, 12, 17, 30), Passenger_Count=1, Trip_Distance=5.5, Pickup_Longitude=-73.985, Pickup_Latitude=40.758, Rate_Code=1, Store_And_Forward="N", Dropoff_Longitude=-73.985, Dropoff_Latitude=40.758, Payment_Type="Cash", Fare_Amount=20.0, Surcharge=0.5, MTA_Tax=0.5, Tip_Amount=4.0, Tolls_Amount=0.0, Total_Amount=25.0)
]

# Create DataFrame
df = spark.createDataFrame(data)

# Insert data into the table
df.write.insertInto("clauseeu.taxidata.taxi", overwrite=False)

Insert 10 rows

In [0]:
%sql
INSERT INTO clauseeu.taxidata.taxi (Vendor,Pickup_DateTime,Dropoff_DateTime,Passenger_Count,Trip_Distance,Pickup_Longitude,Pickup_Latitude,Rate_Code,Store_And_Forward,Dropoff_Longitude, Dropoff_Latitude, Payment_Type, Fare_Amount, Surcharge, MTA_Tax, Tip_Amount, Tolls_Amount, Total_Amount) VALUES('V1','2009-01-04T02:52:00.000+00:00','2009-01-04T03:38:00.000+00:00',4,2.63,-73.989806,40.735006,null,null,-73.993803,40.695922,'CASH',8.9,0.5,null,0,0,9.4)

num_affected_rows,num_inserted_rows
1,1


Update some rows

In [0]:
%sql
UPDATE clauseeu.taxidata.taxi SET Vendor = 'V1' WHERE Vendor = 'Vendor1'
    

num_affected_rows
5


In [0]:
%sql
select * from taxidata.taxi 
WHERE Vendor = 'V1'
limit 10


com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

Fetch table properties

In [0]:
%sql
describe extended clauseeu.taxidata.taxi

col_name,data_type,comment
Vendor,string,
Pickup_DateTime,timestamp,
Dropoff_DateTime,timestamp,
Passenger_Count,int,
Trip_Distance,double,
Pickup_Longitude,double,
Pickup_Latitude,double,
Rate_Code,string,
Store_And_Forward,string,
Dropoff_Longitude,double,


Change a property

In [0]:
%sql
ALTER TABLE clauseeu.taxidata.taxi SET TBLPROPERTIES('delta.dataSkippingStatsColumns' = 'Vendor, Pickup_DateTime, Dropoff_DateTime')

In [0]:
%sql
ANALYZE TABLE clauseeu.taxidata.taxi COMPUTE DELTA STATISTICS

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:132)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:132)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:129)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:129)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:715)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:435)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:435)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

In [0]:
%sql
OPTIMIZE clauseeu.taxidata.taxi_2019_12

ZORDER BY (Vendor)

path,metrics
abfss://clausebrickslakehouse@clauserawstore.dfs.core.windows.net/483ee370-f193-428d-9373-c2e81825102f/tables/9c47ba29-a0ee-4515-a7e2-6df617c958a1,"List(831, 137, List(5987, 205427613, 7.612448611311673E7, 831, 63259447960), List(5183, 796805213, 4.152565097810219E8, 137, 56890141840), 0, List(minCubeSize(107374182400), List(0, 0), List(137, 56890141840), 0, List(137, 56890141840), 1, null), null, 0, 1, 137, 0, false, 0, 0, 1749801827954, 1749806947872, 4, 1, null, List(0, 0), 18, 18, 12232862, 0, null)"


How to optimize and index on a partitioned table

In [0]:
%sql
OPTIMIZE clauseeu.taxidata.taxi
WHERE Pickup_DateTime >= current_timestamp() - INTERVAL 1 day
ZORDER BY (Vendor)

Vacuum

In [0]:
%sql
VACUUM clauseeu.taxidata.taxi_2019_12  DRY RUN    -- do dry run to get the list of files to be deleted
VACUUM clauseeu.taxidata.taxi    -- vacuum files not required by versions older than the default retention period

VACUUM clauseeu.taxidata.taxi  RETAIN 100 HOURS  -- vacuum files not required by versions more than 100 hours old




path


Partition a table

In [0]:
# Read the existing table
df = spark.table("clauseeu.taxidata.taxi")

# Write the table with partitioning by Vendor
df.write.partitionBy("Vendor").mode("overwrite").saveAsTable("clauseeu.taxidata.taxi_partitioned")

Liquid clustering

In [0]:
from delta.tables import DeltaTable

# Enable liquid clustering on the existing table
DeltaTable.forName(spark, "clauseeu.taxidata.taxi_2019_12").alter().clusterBy("Vendor").execute()

# Trigger clustering
spark.sql("OPTIMIZE clauseeu.taxidata.taxi_2019_12 ")

In [0]:
%sql
select * from taxidata.taxi 
limit 10