# Chapter 6: Maintaining your Delta Lake
> Exercises based on the nyc_taxi dataset

In [1]:
spark.sql("""
     CREATE TABLE IF NOT EXISTS default.nyc_taxi (
       VendorID BIGINT
     ) USING DELTA
     TBLPROPERTIES('delta.logRetentionDuration'='interval 7 days');
   """)


23/04/24 23:27:19 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/04/24 23:27:19 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
23/04/24 23:27:21 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
23/04/24 23:27:21 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@172.17.0.2
23/04/24 23:27:21 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException


                                                                                

23/04/24 23:27:25 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `default`.`nyc_taxi` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
23/04/24 23:27:25 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
23/04/24 23:27:25 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
23/04/24 23:27:25 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/04/24 23:27:25 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


DataFrame[]

In [2]:
spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default| nyc_taxi|      false|
+---------+---------+-----------+



In [3]:
# should be zero on your first pass
# this means that while the table exists, there are no data files associated with it
len(spark.table("default.nyc_taxi").inputFiles())

                                                                                

0

## Start Populating the Table
> The next three commands are used to show Schema Evolution and Validation with Delta Lake


In [8]:
# Populate the Table reading the Parquet nyc_taxi Data
# note: this will fail and that is okay
(spark.read
      .format("parquet")
      .load("/opt/spark/data/dldg/datasets/nyc_taxi/*.parquet")
      .write
      .format("delta")
      .saveAsTable("default.nyc_taxi"))


AnalysisException: Table default.nyc_taxi already exists

In [10]:
# one step closer, there is still something missing...
# and yes, this operation still fails... if only...
(spark.read
      .format("parquet")
      .load("/opt/spark/data/dldg/datasets/nyc_taxi/*.parquet")
      .write
      .format("delta")
      .mode("append")
      .saveAsTable("default.nyc_taxi"))

AnalysisException: A schema mismatch detected when writing to the Delta table (Table ID: 44fe9670-3600-42ae-98ee-ea9c3d1717e3).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- VendorID: long (nullable = true)


Data schema:
root
-- VendorID: long (nullable = true)
-- tpep_pickup_datetime: timestamp (nullable = true)
-- tpep_dropoff_datetime: timestamp (nullable = true)
-- passenger_count: double (nullable = true)
-- trip_distance: double (nullable = true)
-- RatecodeID: double (nullable = true)
-- store_and_fwd_flag: string (nullable = true)
-- PULocationID: long (nullable = true)
-- DOLocationID: long (nullable = true)
-- payment_type: long (nullable = true)
-- fare_amount: double (nullable = true)
-- extra: double (nullable = true)
-- mta_tax: double (nullable = true)
-- tip_amount: double (nullable = true)
-- tolls_amount: double (nullable = true)
-- improvement_surcharge: double (nullable = true)
-- total_amount: double (nullable = true)
-- congestion_surcharge: double (nullable = true)
-- airport_fee: double (nullable = true)

         

## Schema Evolution: Handle Automatically
If you trust the upstream data source (provider) then you can add the `option("mergeSchema", "true")`. Otherwise, it is better to specifically select a subset of the columns you expected to see. In this example use case, the only known column is the `VendorID`.

In [11]:
# Evolve the Schema. (Showcases how to auto-merge changes to the schema)
# note: if you can trust the upstream, then this option is perfectly fine
# however, if you don't trust the upstream, then it is good to opt-in to the 
# changing columns.

(spark.read
      .format("parquet")
      .load("/opt/spark/data/dldg/datasets/nyc_taxi/*.parquet")
      .write
      .format("delta")
      .mode("append")
      .option("mergeSchema", "true")
      .saveAsTable("default.nyc_taxi")
    )


                                                                                

In [15]:
# view the table structure using DESCRIBE
spark.sql("describe extended default.nyc_taxi").show(30, truncate=False)

+----------------------------+----------------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                                     |comment|
+----------------------------+----------------------------------------------------------------------------------------------+-------+
|VendorID                    |bigint                                                                                        |       |
|tpep_pickup_datetime        |timestamp                                                                                     |       |
|tpep_dropoff_datetime       |timestamp                                                                                     |       |
|passenger_count             |double                                                                                        |       |
|trip_distance               |double                          

In [59]:
spark.sql("select * from default.nyc_taxi limit 10").show(truncate=True)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0|    0.5|       0.

# Adding and Modifying Table Properties

In [16]:
spark.sql("""
ALTER TABLE default.nyc_taxi 
SET TBLPROPERTIES (
  'delta.logRetentionDuration'='interval 14 days',
  'delta.deletedFileRetentionDuration'='interval 28 days'
)""")


DataFrame[]

In [18]:
# view the Table History
# this will show the SET TBLPROPERTIES transaction in the Delta Lake transaction log
from delta.tables import DeltaTable

dt = DeltaTable.forName(spark, 'default.nyc_taxi')

(
    dt
    .history(10)
    .select("version", "timestamp", "operation")
    .show(truncate=False)
)


+-------+-----------------------+-----------------+
|version|timestamp              |operation        |
+-------+-----------------------+-----------------+
|2      |2023-04-24 23:39:10.124|SET TBLPROPERTIES|
|1      |2023-04-24 23:37:13.968|WRITE            |
|0      |2023-04-24 23:27:22.46 |CREATE TABLE     |
+-------+-----------------------+-----------------+



In [20]:
# view your Delta Lake tblproperties using Spark SQL
# you'll see the tblproperties we added as well as the delta.minReaderVersion, delta.minWriterVersion
spark.sql("show tblproperties default.nyc_taxi").show(truncate=False)

+----------------------------------+----------------+
|key                               |value           |
+----------------------------------+----------------+
|delta.deletedFileRetentionDuration|interval 28 days|
|delta.logRetentionDuration        |interval 14 days|
|delta.minReaderVersion            |1               |
|delta.minWriterVersion            |2               |
+----------------------------------+----------------+



In [21]:
# viewing the properties using the DeltaTable command
(
    DeltaTable
    .forName(spark, 'default.nyc_taxi')
    .detail()
    .select("properties")
    .show(truncate=False)
)

+--------------------------------------------------------------------------------------------------------+
|properties                                                                                              |
+--------------------------------------------------------------------------------------------------------+
|{delta.logRetentionDuration -> interval 14 days, delta.deletedFileRetentionDuration -> interval 28 days}|
+--------------------------------------------------------------------------------------------------------+



# Removing Table Properties

In [23]:
# using unset to remove a property.
# in the case of a key not existing, the operation becomes noop
# so there is no need to use IF EXISTS conditionals
spark.sql("""
    ALTER TABLE default.nyc_taxi
    UNSET TBLPROPERTIES('delta.loRgetentionDuratio')
  """)


DataFrame[]

# Delta Table Optimizations (Cleaning, Tuning)
> The following section showcases how to create and fix a poorly optimized Delta Lake table
* Long Running Code will have a warning before hand.

In [24]:
# create a new table called `nonoptimal_nyc_taxi` in the `default` database.
from delta.tables import DeltaTable
(DeltaTable.createIfNotExists(spark)
    .tableName("default.nonoptimal_nyc_taxi")
    .property("description", "table to be optimized")
    .addColumn("VendorID", "BIGINT")
    .addColumn("tpep_pickup_datetime", "TIMESTAMP")
    .addColumn("tpep_dropoff_datetime", "TIMESTAMP")
    .addColumn("passenger_count", "DOUBLE")
    .addColumn("trip_distance", "DOUBLE")
    .addColumn("RatecodeID", "DOUBLE")
    .addColumn("store_and_fwd_flag", "STRING")
    .addColumn("PULocationID", "BIGINT")
    .addColumn("DOLocationID", "BIGINT")
    .addColumn("payment_type", "BIGINT")
    .addColumn("fare_amount", "DOUBLE")
    .addColumn("extra", "DOUBLE")
    .addColumn("mta_tax", "DOUBLE")
    .addColumn("tip_amount", "DOUBLE")
    .addColumn("tolls_amount", "DOUBLE")
    .addColumn("improvement_surcharge", "DOUBLE")
    .addColumn("total_amount", "DOUBLE")
    .addColumn("congestion_surcharge", "DOUBLE")
    .addColumn("airport_fee", "DOUBLE")
    .execute()
  )


23/04/24 23:48:30 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider delta. Persisting data source table `default`.`nonoptimal_nyc_taxi` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
23/04/24 23:48:30 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
23/04/24 23:48:30 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
23/04/24 23:48:30 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
23/04/24 23:48:30 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist


<delta.tables.DeltaTable at 0xffff5fc1f940>

In [25]:
spark.sql("show tables").show()

+---------+-------------------+-----------+
|namespace|          tableName|isTemporary|
+---------+-------------------+-----------+
|  default|nonoptimal_nyc_taxi|      false|
|  default|           nyc_taxi|      false|
+---------+-------------------+-----------+



In [26]:
# Create a function to take a Row and save as a separate Table
# note: this is for example, this is not optimal
# note 2: if you want to use something similar, please use `rows` to write a collection of rows per transaction
def append_row_to_table(row, schema, table):
    (spark.createDataFrame([row], schema)
      .write
      .format("delta")
      .mode("append")
      .saveAsTable(table))


In [27]:
####### Warning: will run for a while
source_df = spark.table("default.nyc_taxi")
source_schema = source_df.schema
destination_table = "default.nonoptimal_nyc_taxi"
# change limit to 1000 or 10000 if you want to generate a more verbose example
limit = 100

# warning: list comprehension is used here to run synchronous inserts and to prove a point
# please don't use this code for production use cases, and just to create a poorly optimized table
([
    append_row_to_table(row, source_schema, destination_table) 
    for row in source_df.limit(limit).collect()
])


[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

## Using Optimize
> Using Bin-Packing Optimize (default) will allow us to coalesce many small files (which we just created) into fewer large files.

Spark Config:
1. `spark.databricks.delta.optimize.repartition.enabled=true` is useful when we have many small files like we do in the case of the nonoptimal_nyc_taxi Delta Lake Table.

Databricks-Only: Delta Lake Table Properties:
1. `delta.targetFileSize=20mb`
2. `delta.tuneFileSizesForRewrites=true`

When running OPTIMIZE outside of databricks, like we are inside this jupyter notebook, we can lean on some alternative spark configuration to control how many files we read and how we can optimize differently.

In [49]:
# modify the table properties for OPTIMIZE

# note: these configurations are only for databricks at the time of writing so the 
# `spark.databricks.delta.allowArbitraryProperties.enabled` is used to prevent an exception from being thrown
spark.conf.set('spark.databricks.delta.allowArbitraryProperties.enabled','true')
spark.conf.set('spark.databricks.delta.optimize.repartition.enabled', 'true')
spark.conf.set('spark.sql.files.maxRecordsPerFile', '1000000')
spark.sql("""
    ALTER TABLE default.nonoptimal_nyc_taxi
    SET TBLPROPERTIES (
      'delta.targetFileSize'='20mb',
      'delta.tuneFileSizesForRewrites'='true'
    )
  """)

You are setting a property: delta.targetFileSize that is not recognized by this version of Delta
You are setting a property: delta.tuneFileSizesForRewrites that is not recognized by this version of Delta


DataFrame[]

In [37]:
# execute OPTIMIZE from the Delta python client

df = (
    DeltaTable.forName(spark, "default.nonoptimal_nyc_taxi")
    .optimize()
    .executeCompaction()
)
df.show(truncate=True)

+--------------------+--------------------+
|                path|             metrics|
+--------------------+--------------------+
|file:/opt/spark/w...|{0, 0, {null, nul...|
+--------------------+--------------------+



In [48]:
# view the full metadata from the OPTIMIZE operation
from pyspark.sql.functions import col
(
    DeltaTable
    .forName(spark, "default.nonoptimal_nyc_taxi")
    .history(2)
    .where(col("operation") == "OPTIMIZE")
    .select("version", "timestamp", "operation", "operationMetrics.numRemovedFiles", "operationMetrics.numAddedFiles")
    .show(truncate=False)
)


+-------+-----------------------+---------+---------------+-------------+
|version|timestamp              |operation|numRemovedFiles|numAddedFiles|
+-------+-----------------------+---------+---------------+-------------+
|103    |2023-04-25 00:08:11.166|OPTIMIZE |200            |1            |
+-------+-----------------------+---------+---------------+-------------+



# Using Z-Order Optimize
> Z-Order optimize is a co-location technique to minimize the total number of files loaded to answer common questions (queries) from your Delta Lake tables. For example, let’s say 80% of the queries to the `nyc_taxi dataset` always search first for `tpep_pickup_datetime` followed by a specific `RatecodeID`. You could optimize for faster query results by co-locating the `tpep_pickup_datetime` and `RatecodeID` so that the search space is reduced
> This allows us to reduce the number of files that need to be opened, using data skipping, since Delta Lake captures statistics automatically for the first 32 columns of a Delta Lake table.

## Delta Lake Table Properties
`delta.dataSkippingNumIndexedCols=6` could be used in the case where we only care about the first 6 columns of our Delta Lake table.


In [54]:
# change the statistics collected for the table from the default 32 down to 6
# since we will be calling zorder optimize, this setting can take effect
# along with the operation itself
spark.sql("""
    ALTER TABLE default.nonoptimal_nyc_taxi
    SET TBLPROPERTIES (
      'delta.dataSkippingNumIndexedCols'='6'
    )
  """)

DataFrame[]

In [83]:
# see https://docs.databricks.com/delta/data-skipping.html for more details
dt = DeltaTable.forName(spark, "default.nonoptimal_nyc_taxi")
(
    dt
    .optimize()
    .executeZOrderBy("tpep_pickup_datetime", "RatecodeID")
)


DataFrame[path: string, metrics: struct<numFilesAdded:bigint,numFilesRemoved:bigint,filesAdded:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,filesRemoved:struct<min:bigint,max:bigint,avg:double,totalFiles:bigint,totalSize:bigint>,partitionsOptimized:bigint,zOrderStats:struct<strategyName:string,inputCubeFiles:struct<num:bigint,size:bigint>,inputOtherFiles:struct<num:bigint,size:bigint>,inputNumCubes:bigint,mergedFiles:struct<num:bigint,size:bigint>,numOutputCubes:bigint,mergedNumCubes:bigint>,numBatches:bigint,totalConsideredFiles:bigint,totalFilesSkipped:bigint,preserveInsertionOrder:boolean,numFilesSkippedToReduceWriteAmplification:bigint,numBytesSkippedToReduceWriteAmplification:bigint,startTimeMs:bigint,endTimeMs:bigint,totalClusterParallelism:bigint,totalScheduledTasks:bigint,autoCompactParallelismStats:struct<maxClusterActiveParallelism:bigint,minClusterActiveParallelism:bigint,maxSessionActiveParallelism:bigint,minSessionActiveParallelism:bigint>>]

In [84]:
# view the results of the optimization
# in the case where we user z-order optimize on a single file, it isn't going to help much
# but you get the idea!
(
    dt.history(10)
    .where(col("operation") == "OPTIMIZE")
    .select("version", "timestamp", "operation", "operationMetrics.numRemovedFiles", "operationMetrics.numAddedFiles")
    .show(truncate=False)
)

+-------+-----------------------+---------+---------------+-------------+
|version|timestamp              |operation|numRemovedFiles|numAddedFiles|
+-------+-----------------------+---------+---------------+-------------+
|107    |2023-04-25 01:01:56.656|OPTIMIZE |1              |1            |
|106    |2023-04-25 00:55:36.921|OPTIMIZE |1              |1            |
|103    |2023-04-25 00:08:11.166|OPTIMIZE |200            |1            |
+-------+-----------------------+---------+---------------+-------------+

