In [1]:
# run first for imports
from pyspark.sql.types import DateType
from pyspark.sql.functions import col, desc
from delta.tables import DeltaTable

# Chapter 5: Maintaining your Delta Lake (Extras)
> The following exercises use the open-source [nyc_taxi dataset](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page), specifically the Yellow Taxi Trip Records parquet. 

In [None]:
spark.sql("""
     CREATE TABLE IF NOT EXISTS default.nyc_taxi (
       VendorID BIGINT
     ) USING DELTA
     TBLPROPERTIES('delta.logRetentionDuration'='interval 7 days');
   """)


In [None]:
spark.sql("show tables").show()

In [None]:
# spark.sql("drop table default.nyc_taxi")

In [None]:
# should be zero on your first pass
# this means that while the table exists, there are no data files associated with it
len(spark.table("default.nyc_taxi").inputFiles())

## Start Populating the Table
> The next three commands are used to show Schema Evolution and Validation with Delta Lake


In [None]:
# Populate the Table reading the Parquet nyc_taxi Data
# note: this will fail and that is okay
(spark.read
      .format("parquet")
      .load("/opt/spark/data/datasets/nyc_taxi/yellow_tripdata_2023-01.parquet")
      .write
      .format("delta")
      .saveAsTable("default.nyc_taxi"))


In [None]:
# one step closer, there is still something missing...
# and yes, this operation still fails... if only...
(spark.read
      .format("parquet")
      .load("/opt/spark/data/datasets/nyc_taxi/yellow_tripdata_2023-01.parquet")
      .write
      .format("delta")
      .mode("append")
      .saveAsTable("default.nyc_taxi"))

## Schema Evolution: Handle Automatically
If you trust the upstream data source (provider) then you can add the `option("mergeSchema", "true")`. Otherwise, it is better to specifically select a subset of the columns you expected to see. In this example use case, the only known column is the `VendorID`.

In [None]:
# Evolve the Schema. (Showcases how to auto-merge changes to the schema)
# note: if you can trust the upstream, then this option is perfectly fine
# however, if you don't trust the upstream, then it is good to opt-in to the 
# changing columns.

(spark.read
      .format("parquet")
      .load("/opt/spark/data/datasets/nyc_taxi/yellow_tripdata_2023-01.parquet")
      .write
      .format("delta")
      .mode("append")
      .option("mergeSchema", "true")
      .saveAsTable("default.nyc_taxi")
    )


# Alternatives to Auto Schema Evolution
In the previous case, we used `.option("mergeSchema", "true")` to modify the behavior of the Delta Lake writer. While this option simplifies how we evolve our Delta Lake table schemas, it comes at the price of not being fully aware of the changes to our table schema. In the case where there are unknown columns being introduced from an upstream source, you'll want to know which columns are intended to bring forward, and which columns can be safely ignored.

## Intentionally Adding Columns with Alter Table

In [None]:
# manually set the columns. This is an example of intentional opt-in to the new columns outside of '.option("mergeSchema", "true")`. 
# Note: this can be run once, afterwards the ADD columns will fail since they already exist
spark.sql("""
ALTER TABLE default.nyc_taxi 
ADD columns (
  tpep_pickup_datetime TIMESTAMP,
  tpep_dropoff_datetime TIMESTAMP,
  passenger_count DOUBLE,
  trip_distance DOUBLE,
  RatecodeID DOUBLE,
  store_and_fwd_flag STRING,
  PULocationID BIGINT,
  DOLocationID BIGINT,
  payment_type BIGINT,
  fare_amount DOUBLE,
  extra DOUBLE,
  mta_tax DOUBLE,
  tip_amount DOUBLE,
  tolls_amount DOUBLE,
  improvement_surcharge DOUBLE,
  total_amount DOUBLE,
  congestion_surcharge DOUBLE, 
  airport_fee DOUBLE
);
""")

In [None]:
# view the table structure using DESCRIBE
spark.sql("describe extended default.nyc_taxi").show(30, truncate=False)

In [None]:
spark.sql("select * from default.nyc_taxi limit 10").show(truncate=True)

# Adding and Modifying Table Properties

In [None]:
spark.sql("""
ALTER TABLE default.nyc_taxi 
SET TBLPROPERTIES (
  'delta.logRetentionDuration'='interval 14 days',
  'delta.deletedFileRetentionDuration'='interval 28 days'
)""")


In [None]:
# view the Table History
# this will show the SET TBLPROPERTIES transaction in the Delta Lake transaction log
dt = DeltaTable.forName(spark, 'default.nyc_taxi')
(
    dt
    .history(10)
    .select("version", "timestamp", "operation")
    .show(truncate=False)
)


In [None]:
# view your Delta Lake tblproperties using Spark SQL
# you'll see the tblproperties we added as well as the delta.minReaderVersion, delta.minWriterVersion
spark.sql("show tblproperties default.nyc_taxi").show(truncate=False)

In [None]:
# viewing the properties using the DeltaTable command
(
    DeltaTable
    .forName(spark, 'default.nyc_taxi')
    .detail()
    .select("properties")
    .show(truncate=False)
)

# Removing Table Properties

In [None]:
# using unset to remove a property.
# in the case of a key not existing, the operation becomes noop
# so there is no need to use IF EXISTS conditionals
spark.sql("""
    ALTER TABLE default.nyc_taxi
    UNSET TBLPROPERTIES('delta.loRgetentionDuratio')
  """)


# Delta Table Optimizations (Cleaning, Tuning)
> The following section showcases how to create and fix a poorly optimized Delta Lake table
* Long Running Code will have a warning before hand.

In [None]:
# create a new table called `nonoptimal_nyc_taxi` in the `default` database.
(DeltaTable.createIfNotExists(spark)
    .tableName("default.nonoptimal_nyc_taxi")
    .property("description", "table to be optimized")
    .addColumn("VendorID", "BIGINT")
    .addColumn("tpep_pickup_datetime", "TIMESTAMP")
    .addColumn("tpep_dropoff_datetime", "TIMESTAMP")
    .addColumn("passenger_count", "DOUBLE")
    .addColumn("trip_distance", "DOUBLE")
    .addColumn("RatecodeID", "DOUBLE")
    .addColumn("store_and_fwd_flag", "STRING")
    .addColumn("PULocationID", "BIGINT")
    .addColumn("DOLocationID", "BIGINT")
    .addColumn("payment_type", "BIGINT")
    .addColumn("fare_amount", "DOUBLE")
    .addColumn("extra", "DOUBLE")
    .addColumn("mta_tax", "DOUBLE")
    .addColumn("tip_amount", "DOUBLE")
    .addColumn("tolls_amount", "DOUBLE")
    .addColumn("improvement_surcharge", "DOUBLE")
    .addColumn("total_amount", "DOUBLE")
    .addColumn("congestion_surcharge", "DOUBLE")
    .addColumn("airport_fee", "DOUBLE")
    .execute()
  )


In [None]:
spark.sql("show tables").show()

In [26]:
# Create a function to take a Row and save as a separate Table
# note: this is for example, this is not optimal
# note 2: if you want to use something similar, please use `rows` to write a collection of rows per transaction
def append_row_to_table(row, schema, table):
    (spark.createDataFrame([row], schema)
      .write
      .format("delta")
      .mode("append")
      .saveAsTable(table))


In [None]:
####### Warning: will run for a while
source_df = spark.table("default.nyc_taxi")
source_schema = source_df.schema
destination_table = "default.nonoptimal_nyc_taxi"
# change limit to 1000 or 10000 if you want to generate a more verbose example
limit = 100

# warning: list comprehension is used here to run synchronous inserts and to prove a point
# please don't use this code for production use cases, and just to create a poorly optimized table
([
    append_row_to_table(row, source_schema, destination_table) 
    for row in source_df.limit(limit).collect()
])


## Using Optimize
> Using Bin-Packing Optimize (default) will allow us to coalesce many small files (which we just created) into fewer large files.

Spark Config:
1. `spark.databricks.delta.optimize.repartition.enabled=true` is useful when we have many small files like we do in the case of the nonoptimal_nyc_taxi Delta Lake Table.

Databricks-Only: Delta Lake Table Properties:
1. `delta.targetFileSize=20mb`
2. `delta.tuneFileSizesForRewrites=true`

When running OPTIMIZE outside of databricks, like we are inside this jupyter notebook, we can lean on some alternative spark configuration to control how many files we read and how we can optimize differently.

In [None]:
# modify the table properties for OPTIMIZE

# note: these configurations are only for databricks at the time of writing so the 
# `spark.databricks.delta.allowArbitraryProperties.enabled` is used to prevent an exception from being thrown
spark.conf.set('spark.databricks.delta.allowArbitraryProperties.enabled','true')
spark.conf.set('spark.databricks.delta.optimize.repartition.enabled', 'true')
spark.conf.set('spark.sql.files.maxRecordsPerFile', '1000000')
spark.sql("""
    ALTER TABLE default.nonoptimal_nyc_taxi
    SET TBLPROPERTIES (
      'delta.targetFileSize'='20mb',
      'delta.tuneFileSizesForRewrites'='true'
    )
  """)

In [None]:
# execute OPTIMIZE from the Delta python client
df = (
    DeltaTable.forName(spark, "default.nonoptimal_nyc_taxi")
    .optimize()
    .executeCompaction()
)
df.show(truncate=True)

In [None]:
# view the full metadata from the OPTIMIZE operation

(
    DeltaTable
    .forName(spark, "default.nonoptimal_nyc_taxi")
    .history(2)
    .where(col("operation") == "OPTIMIZE")
    .select("version", "timestamp", "operation", "operationMetrics.numRemovedFiles", "operationMetrics.numAddedFiles")
    .show(truncate=False)
)


# Using Z-Order Optimize
> Z-Order optimize is a co-location technique to minimize the total number of files loaded to answer common questions (queries) from your Delta Lake tables. For example, let’s say 80% of the queries to the `nyc_taxi dataset` always search first for `tpep_pickup_datetime` followed by a specific `RatecodeID`. You could optimize for faster query results by co-locating the `tpep_pickup_datetime` and `RatecodeID` so that the search space is reduced
> This allows us to reduce the number of files that need to be opened, using data skipping, since Delta Lake captures statistics automatically for the first 32 columns of a Delta Lake table.

## Delta Lake Table Properties
`delta.dataSkippingNumIndexedCols=6` could be used in the case where we only care about the first 6 columns of our Delta Lake table.


In [None]:
# change the statistics collected for the table from the default 32 down to 6
# since we will be calling zorder optimize, this setting can take effect
# along with the operation itself
spark.sql("""
    ALTER TABLE default.nonoptimal_nyc_taxi
    SET TBLPROPERTIES (
      'delta.dataSkippingNumIndexedCols'='6'
    )
  """)

In [None]:
# see https://docs.databricks.com/delta/data-skipping.html for more details
dt = DeltaTable.forName(spark, "default.nonoptimal_nyc_taxi")
(
    dt
    .optimize()
    .executeZOrderBy("tpep_pickup_datetime", "RatecodeID")
)


In [None]:
# view the results of the optimization
# in the case where we user z-order optimize on a single file, it isn't going to help much
# but you get the idea!
(
    dt.history(10)
    .where(col("operation") == "OPTIMIZE")
    .select("version", "timestamp", "operation", "operationMetrics.numRemovedFiles", "operationMetrics.numAddedFiles")
    .show(truncate=False)
)

# Partition Tuning
> Note: for tables under 1TB it isn't advised to use any partitioning and lean on OPTIMIZE and Z-ORDER OPTIMIZE.
> Edge Cases: GDPR and Data Governance: N day TTLs (30, 10, and 7 day policies are fairly standard for TTL)

You'll learn to achieve the following next:
1. Create a Table Partitioned by a given Column
2. Add or Remove Partitions for a given table
3. Modify an existing, non-partitioned table, to introduce partitioning (* requires some communication to your downstream data consumers)

In [9]:
# if you need to go back and drop this table. 
# spark.sql("drop table default.nyc_taxi_by_day")

In [None]:
# Table Creation with Partitions
(DeltaTable.createIfNotExists(spark)
  .tableName("default.nyc_taxi_by_day")
  .addColumn("VendorID", "BIGINT")
  .addColumn("tpep_pickup_datetime", "TIMESTAMP")
  .addColumn("tpep_dropoff_datetime", "TIMESTAMP", comment="trip drop off and partition source column")
  .addColumn("tpep_dropoff_date", DateType(), generatedAlwaysAs="CAST(tpep_dropoff_datetime AS DATE)")
  .addColumn("passenger_count", "DOUBLE")
  .addColumn("trip_distance", "DOUBLE")
  .addColumn("RatecodeID", "DOUBLE")
  .addColumn("store_and_fwd_flag", "STRING")
  .addColumn("PULocationID", "BIGINT")
  .addColumn("DOLocationID", "BIGINT")
  .addColumn("payment_type", "BIGINT")
  .addColumn("fare_amount", "DOUBLE")
  .addColumn("extra", "DOUBLE")
  .addColumn("mta_tax", "DOUBLE")
  .addColumn("tip_amount", "DOUBLE")
  .addColumn("tolls_amount", "DOUBLE")
  .addColumn("improvement_surcharge", "DOUBLE")
  .addColumn("total_amount", "DOUBLE")
  .addColumn("congestion_surcharge", "DOUBLE")
  .addColumn("airport_fee", "DOUBLE")
  .partitionedBy("tpep_dropoff_date")
  .property("description", "partitioned by taxi trip end time")
  .property("delta.logRetentionDuration", "interval 30 days")
  .property("delta.deletedFileRetentionDuration", "interval 1 day")
  .property("delta.dataSkippingNumIndexedCols", "10")
  .property("delta.checkpoint.writeStatsAsStruct", "true")
  .property("delta.checkpoint.writeStatsAsJson", "false")
  .execute()
)

In [None]:
# read the data from our default.nyc_taxi table into the newly created nyc_taxi_by_day table 
# this will allow us to generate a partitioned table
# note: append will continue to add to the table, if you run this block multiple times - you'll have duplicates
(
    spark
    .table("default.nyc_taxi")
    .write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "false")
    .saveAsTable("default.nyc_taxi_by_day")
)

In [None]:
spark.sql("describe extended default.nyc_taxi_by_day").show(30, truncate=False)

In [11]:
# removing bad data.
# ls -la ch05/spark-warehouse/nyc_taxi_by_day/
# will create some interesting table partitions: 
# tpep_dropoff_date=2009-01-01
# tpep_dropoff_date=2022-10-24
# tpep_dropoff_date=2022-10-25
# tpep_dropoff_date=2022-12-31

# we can investigate what is incorrect with the data in each of these partitions, or just wipe them out, as an exploration, 
# feel free to put your detectives hat on and see what is wrong with the data, or just delete the specific dates.
(
    DeltaTable
    .forName(spark, 'default.nyc_taxi_by_day')
    .delete(col("tpep_dropoff_date") < "2023-01-01")
)

# Ensure the erroneous data is out of the current table Snapshot
While we have deleted the bad partitions, we haven't wiped them out fully, to do so we will need to vacuum the table which we'll get to later.

In [12]:
delta_table = spark.table("default.nyc_taxi_by_day").where(col("tpep_dropoff_date") < "2023-01-01")

In [None]:
delta_table.count()

In [None]:
# the table itself has around 3 million trips
spark.table("default.nyc_taxi_by_day").count()

In [None]:
# Using time travel to 
dt = DeltaTable.forName(spark, 'default.nyc_taxi_by_day')
#dt.detail().printSchema()
"""
root
 |-- format: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- location: string (nullable = true)
 |-- createdAt: timestamp (nullable = true)
 |-- lastModified: timestamp (nullable = true)
 |-- partitionColumns: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- numFiles: long (nullable = true)
 |-- sizeInBytes: long (nullable = true)
 |-- properties: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- minReaderVersion: integer (nullable = true)
 |-- minWriterVersion: integer (nullable = true)
 |-- tableFeatures: array (nullable = true)
 |    |-- element: string (containsNull = true)
"""

# view the table details
dt.detail().select("partitionColumns", "numFiles", "lastModified").show(truncate=False)

# describe the extended history of the table
spark.sql("describe extended default.nyc_taxi_by_day").show(40, truncate=True)

# gather partition days
from pyspark.sql.functions import desc
(spark
   .table('default.nyc_taxi_by_day')
   .select("tpep_dropoff_date")
   .distinct()
   .sort(desc("tpep_dropoff_date"))
   .show(33))

# Recovering Data using Partitions using "Replays"
> Let's face it. Even with the best intentions in place, we are all human and make mistakes. In your career as a data engineer, one thing you'll be required to learn is the art of data recovery. When we recover data, the process is commonly called 'replaying' since the action we are taking is to rollback the clock, or rewind, to an earlier point in time. This enables us to remove problematic changes to a table, and replace the erroneous data with whatever the "fixed" data is.






In [None]:
# using write mode 'overwrite' and replaceWhere condition
recovery_table = ""
partition_col = ""
table_to_fix = ""
(
  spark
    .table(recovery_table)
    .write
    .format("delta")
    .mode("overwrite")
    .option("replaceWhere", f"{partition_col} == '2023-01-01'")
    .saveAsTable(table_to_fix)
)    
