# Advanced Features of Delta-Lake tables

In this notebook you will
* test the Liquid Clustering (LC)
* test Deletion Vectors for merging increment to the table
* merge the increment into it

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import os
import time

In [None]:
spark = (
    SparkSession
    .builder
    .appName('advanced-features')
    .config('spark.jars.packages', 'io.delta:delta-spark_2.12:3.2.1')
    .config('spark.sql.extensions', 'io.delta.sql.DeltaSparkSessionExtension')
    .config('spark.sql.catalog.spark_catalog', 'org.apache.spark.sql.delta.catalog.DeltaCatalog')
    .getOrCreate()
)

In [None]:
base_path = os.getcwd()

project_path = ('/').join(base_path.split('/')[0:-3]) 

questions = os.path.join(project_path, 'data/questions')
users_base_path = os.path.join(project_path, 'data/users_base')
users_increment_path = os.path.join(project_path, 'data/users_increment')
accounts_output_path = os.path.join(project_path, 'output/accounts')
accounts_output_path_v2 = os.path.join(project_path, 'output/accounts_v2')
testing_output = os.path.join(project_path, 'output/tests')

In [None]:
spark.sql('drop table if exists accounts')
spark.sql('drop table if exists accounts_v2')

In [None]:
# recreate the table accounts:

(
    spark.read.parquet(users_base_path)
    .write
    .mode('overwrite')
    .format('delta')
    .option('path', accounts_output_path)
    .saveAsTable('accounts')
)

## Turn on liquid clustering

* Create a table accounts_v2 on location accounts_output_path_v2
* Turn on liquid clustering on the table
* Fill the table with data from the accounts table

In [None]:
# create table accounts_v2 (it is not possible to turn on LC on existing table)

spark.sql(f"""
    CREATE TABLE accounts_v2 USING DELTA 
    CLUSTER BY (user_id)
    LOCATION '{accounts_output_path_v2}'
    AS SELECT * from accounts
""")

In [None]:
# Check the table properties of the accounts table:

spark.sql('show tblproperties accounts').show(truncate=100)

In [None]:
# Check the table properties of the accounts_v2 table:
# See the minReaderVersion/minWriterVersion

spark.sql('show tblproperties accounts_v2').show(truncate=100)

In [None]:
# you can also see it using sql: desc detail

spark.sql('desc detail accounts').select('properties', 'minReaderVersion', 'minWriterVersion').show(n=100, truncate=100)

In [None]:
spark.sql('desc detail accounts_v2').select('properties', 'minReaderVersion', 'minWriterVersion').show(n=100, truncate=100)

In [None]:
spark.sql('show tables').show()

In [None]:
# query the accounts table
# measure the execution time using the time module
# use the noop operator for the write
# do the same for the accounts_v2 table

start_time = time.time()
(
    spark.table('accounts')
    .filter(col('user_id') > 10000)
    .distinct()
    .write
    .mode('overwrite')
    .format('noop')
    .save()
)
end_time = time.time()

execution_time = end_time - start_time
print(f'Execution takes: {execution_time}')

In [None]:
# do the same for accounts_v2:

start_time = time.time()
(
    spark.table('accounts_v2')
    .filter(col('user_id') > 10000)
    .distinct()
    .write
    .mode('overwrite')
    .format('noop')
    .save()
)
end_time = time.time()

execution_time = end_time - start_time
print(f'Execution takes: {execution_time}')

In [None]:
# Now run optimize command on both tables:

spark.sql('optimize accounts_v2')
spark.sql('optimize accounts')

In [None]:
# Call vacuum on the tables:

spark.conf.set('spark.databricks.delta.retentionDurationCheck.enabled', False)

spark.sql('vacuum accounts RETAIN 0 HOURS ')

spark.sql('vacuum accounts_v2 RETAIN 0 HOURS ')

In [None]:
# Now measure the execution time again:

start_time = time.time()
(
    spark.table('accounts')
    .filter(col('user_id') > 10000)
    .distinct()
    .write
    .mode('overwrite')
    .format('noop')
    .save()
)
end_time = time.time()

execution_time = end_time - start_time
print(f'Execution takes: {execution_time}')

In [None]:
start_time = time.time()
(
    spark.table('accounts_v2')
    .filter(col('user_id') > 10000)
    .distinct()
    .write
    .mode('overwrite')
    .format('noop')
    .save()
)
end_time = time.time()

execution_time = end_time - start_time
print(f'Execution takes: {execution_time}')

## Deletion Vectors

* turn on the feature on account_v2
* check the table properties
* merge the increment to the table
* check the folder with the data and see the bin file
* run optimize & vacuum and check the folder again
* the noop operator doesn't work on the table with the deletion vectors

In [None]:
spark.sql('ALTER TABLE accounts_v2 SET TBLPROPERTIES (delta.enableDeletionVectors = true)')

In [None]:
spark.sql('show tblproperties accounts_v2').show(truncate=100)

In [None]:
increment = spark.read.parquet(users_increment_path)

In [None]:
(
    DeltaTable.forName(spark, 'accounts_v2')
    .alias('accounts')
    .merge(
        increment.alias('increment'),
        'accounts.user_id == increment.user_id'
    )
    .whenMatchedUpdateAll()
    .whenNotMatchedInsertAll()
    .execute()
)

In [None]:
spark.sql('optimize accounts_v2')

spark.sql('vacuum accounts_v2 RETAIN 0 HOURS ')

In [None]:
# fails with: requirement failed: Cannot work with a non-pinned table snapshot of the TahoeFileIndex
(
    spark.table('accounts_v2')
    .filter(col('user_id') > 10000)
    .distinct()
    .write
    .mode('overwrite')
    .format('noop')
    # .save()
)

In [None]:
spark.stop()