# Delta Deep Dive

## Preparation

In [0]:
# Get user_name
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime, date
import random
import re

def get_user_name():
    user_id = spark.sql('select current_user() as user').collect()[0]['user']
    match = re.match(r'^([^@]+)@', user_id)
    if match:
        username = match.group(1)
        username = re.sub(r'[._]', '_', username)
        return username
    else:
        return None
    

user_name = get_user_name()


print(f"Your Username: {user_name}")

In [0]:
# Set variables
schema = f"training.dev_{user_name}_raw" # --> may a widget?
sample_table = f"{schema}.banking"

print(f"Table: {sample_table}")

## Create sample data

In [0]:
# Create dataframe with sample data
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DateType, DoubleType# Define schema for the flattened banking data

schema = StructType([
    StructField("msg_id", StringType(), True),
    StructField("msg_created", TimestampType(), True),
    StructField("notification_id", StringType(), True),
    StructField("notification_created", TimestampType(), True),
    StructField("account_iban", StringType(), True),
    StructField("account_owner", StringType(), True),
    StructField("account_currency", StringType(), True),
    StructField("bank_bic", StringType(), True),
    StructField("bank_name", StringType(), True),
    StructField("entry_amount", DoubleType(), True),
    StructField("entry_currency", StringType(), True),
    StructField("credit_debit_indicator", StringType(), True),
    StructField("booking_date", DateType(), True),
    StructField("value_date", DateType(), True),
    StructField("tx_amount", DoubleType(), True),
    StructField("tx_currency", StringType(), True),
    StructField("creditor_name", StringType(), True),
    StructField("debtor_name", StringType(), True),
    StructField("creditor_iban", StringType(), True),
    StructField("debtor_iban", StringType(), True),
    StructField("remittance_info", StringType(), True),
    StructField("end_to_end_id", StringType(), True)
])

# Create sample data
sample_data = [
    # Transaction 1 - Credit
    ("MSG001", datetime(2024, 9, 22, 10, 15, 0), "NOTIF001", datetime(2024, 9, 22, 10, 16, 0),
     "CH93 0000 0000 0000 0001", "Altyca AG", "CHF", "UBSWCHZH80A", "UBS Switzerland AG",
     1500.0, "CHF", "CRDT", date(2024, 9, 22), date(2024, 9, 22),
     1500.0, "CHF", "Altyca AG", "Client Company Ltd", "CH93 0000 0000 0000 0001", "DE89 3704 0044 0532 0130 00",
     "Invoice payment INV-2024-001", "E2E-001-2024-09-22"),
    
    # Transaction 2 - Debit
    ("MSG002", datetime(2024, 9, 21, 14, 30, 0), "NOTIF002", datetime(2024, 9, 21, 14, 31, 0),
     "CH93 0000 0000 0000 0001", "Altyca AG", "CHF", "UBSWCHZH80A", "UBS Switzerland AG",
     -750.0, "CHF", "DBIT", date(2024, 9, 21), date(2024, 9, 21),
     -750.0, "CHF", "Office Supplies Inc", "Altyca AG", "CH44 0900 0000 8754 2832", "CH93 0000 0000 0000 0001",
     "Office equipment purchase", "E2E-002-2024-09-21"),
    
    # Transaction 3 - Credit
    ("MSG003", datetime(2024, 9, 20, 9, 45, 0), "NOTIF003", datetime(2024, 9, 20, 9, 46, 0),
     "CH93 0000 0000 0000 0001", "Altyca AG", "CHF", "UBSWCHZH80A", "UBS Switzerland AG",
     2250.0, "CHF", "CRDT", date(2024, 9, 20), date(2024, 9, 20),
     2250.0, "CHF", "Altyca AG", "Tech Solutions GmbH", "CH93 0000 0000 0000 0001", "DE44 5001 0517 5407 3249 31",
     "Consulting services Q3 2024", "E2E-003-2024-09-20"),
    
    # Transaction 4 - Debit (Different account)
    ("MSG004", datetime(2024, 9, 19, 16, 20, 0), "NOTIF004", datetime(2024, 9, 19, 16, 21, 0),
     "CH44 0900 0000 8754 2832", "Stefan Koch", "CHF", "ZKBKCHZZ80A", "Zürcher Kantonalbank",
     -320.50, "CHF", "DBIT", date(2024, 9, 19), date(2024, 9, 19),
     -320.50, "CHF", "Migros", "Stefan Koch", "CH18 0900 0000 3001 2345 6", "CH44 0900 0000 8754 2832",
     "Grocery shopping", "E2E-004-2024-09-19"),
    
    # Transaction 5 - Credit
    ("MSG005", datetime(2024, 9, 18, 11, 10, 0), "NOTIF005", datetime(2024, 9, 18, 11, 11, 0),
     "CH44 0900 0000 8754 2832", "Stefan Koch", "CHF", "ZKBKCHZZ80A", "Zürcher Kantonalbank",
     5000.0, "CHF", "CRDT", date(2024, 9, 18), date(2024, 9, 18),
     5000.0, "CHF", "Stefan Koch", "Altyca AG", "CH44 0900 0000 8754 2832", "CH93 0000 0000 0000 0001",
     "Salary September 2024", "E2E-005-2024-09-18"),
    
    # Transaction 6 - International transfer
    ("MSG006", datetime(2024, 9, 17, 13, 45, 0), "NOTIF006", datetime(2024, 9, 17, 13, 46, 0),
     "CH93 0000 0000 0000 0001", "Altyca AG", "EUR", "UBSWCHZH80A", "UBS Switzerland AG",
     1200.0, "EUR", "CRDT", date(2024, 9, 17), date(2024, 9, 17),
     1200.0, "EUR", "Altyca AG", "European Client SA", "CH93 0000 0000 0000 0001", "FR14 2004 1010 0505 0001 3M02 606",
     "Project delivery milestone 2", "E2E-006-2024-09-17"),
    
    # Transaction 7 - Debit with fees
    ("MSG007", datetime(2024, 9, 16, 15, 30, 0), "NOTIF007", datetime(2024, 9, 16, 15, 31, 0),
     "CH93 0000 0000 0000 0001", "Altyca AG", "CHF", "UBSWCHZH80A", "UBS Switzerland AG",
     -25.0, "CHF", "DBIT", date(2024, 9, 16), date(2024, 9, 16),
     -25.0, "CHF", "UBS Switzerland AG", "Altyca AG", "CH80 0024 0240 5917 4470 1", "CH93 0000 0000 0000 0001",
     "International transfer fee", "E2E-007-2024-09-16"),
    
    # Transaction 8 - Another credit
    ("MSG008", datetime(2024, 9, 15, 8, 20, 0), "NOTIF008", datetime(2024, 9, 15, 8, 21, 0),
     "CH44 0900 0000 8754 2832", "Stefan Koch", "CHF", "ZKBKCHZZ80A", "Zürcher Kantonalbank",
     450.0, "CHF", "CRDT", date(2024, 9, 15), date(2024, 9, 15),
     450.0, "CHF", "Stefan Koch", "Theater Escholzmatt", "CH44 0900 0000 8754 2832", "CH21 0900 0000 1234 5678 9",
     "Freelance web development", "E2E-008-2024-09-15"),
]

# Create DataFrame
sample_df = spark.createDataFrame(sample_data, schema)
display(sample_df)

In [0]:
# Write dataframe to table
spark.sql(f"DROP TABLE IF EXISTS {sample_table}").display()

sample_df.write.mode("overwrite").saveAsTable(sample_table)

print(f"Table {sample_table} created successfully!")

In [0]:
# Read Table
spark.sql(f"SELECT * FROM {sample_table}").display()

## Get infos about the table

In [0]:
# Describe Detail
sql = f"DESCRIBE DETAIL {sample_table}"

print(f"SQL Statement to get details about a table: \n{sql}")
spark.sql(sql).display()

## Query the table with sql (with widgets and sql)

In [0]:
dbutils.widgets.text("table_name", "training.dev_giuseppe_dicrisci_raw.banking")

In [0]:
%sql
SELECT * FROM ${table_name}

In [0]:
%sql
SELECT 
    account_iban,
    account_owner,
    tx_amount,
    remittance_info,
    booking_date,
    end_to_end_id
FROM ${table_name}
WHERE 
    end_to_end_id = 'E2E-003-2024-09-20'
    AND account_iban = 'CH93 0000 0000 0000 0001';

## Update the table

In [0]:
display(sample_table)

In [0]:
spark.sql(f"""
UPDATE {sample_table}
SET
  tx_amount = 2000.0,
  entry_amount = 2000.0,
  remittance_info = 'Updated: Consulting services Q3 2024 – Final payment'
WHERE
  end_to_end_id = 'E2E-003-2024-09-20'
  AND account_iban = 'CH93 0000 0000 0000 0001'
""")

In [0]:
from pyspark.sql.functions import when, col, lit


# 1) Read the table from the parameter
df = spark.table(sample_table)

# 2) Update selected row(s)
row_filter = (
    (col("end_to_end_id") == lit("E2E-003-2024-09-20")) &
    (col("account_iban") == lit("CH93 0000 0000 0000 0001"))
)

updated_df = (
    df
    .withColumn("tx_amount",       when(row_filter, lit(2000.0)).otherwise(col("tx_amount")))
    .withColumn("entry_amount",    when(row_filter, lit(2000.0)).otherwise(col("entry_amount")))
    .withColumn("remittance_info", when(row_filter, lit("Updated: Consulting services Q3 2024 – Final payment")).otherwise(col("remittance_info")))
)

# 3) Overwrite table with updated data (rewrites the whole table)
(updated_df.write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(sample_table))

print("✅ Table overwritten with updated data.")


In [0]:
query = f"""
SELECT 
    account_iban,
    account_owner,
    tx_amount,
    remittance_info,
    booking_date,
    end_to_end_id
FROM {sample_table}
WHERE 
    end_to_end_id = 'E2E-003-2024-09-20'
    AND account_iban = 'CH93 0000 0000 0000 0001'
"""

df = spark.sql(query)
display(df)

## Use Delta instead of parquet

### Convert the parquet table to delta

In [0]:
# Convert to Delta format first (run once)
old_sample_table = f"{sample_table}"
new_sample_table = f"{sample_table}_delta"
location = f"{db_location}/{new_sample_table}"

sql_statement = f"""
                    CREATE OR REPLACE TABLE {new_sample_table}
                    USING DELTA
                    LOCATION '{location}'
                    AS SELECT * FROM {old_sample_table};
                """
print(sql_statement)
spark.sql(sql_statement)

In [0]:
%%sql
SELECT 
    account_iban,
    account_owner,
    tx_amount,
    remittance_info,
    booking_date,
    end_to_end_id
FROM stefan_koch_bronze.sample_data_delta
WHERE 
    end_to_end_id = 'E2E-003-2024-09-20'
    AND account_iban = 'CH93 0000 0000 0000 0001';

In [0]:
%%sql
-- Update a specific transaction amount and remittance info
UPDATE stefan_koch_bronze.sample_data_delta
SET 
    tx_amount = 2000.0,
    entry_amount = 2000.0,
    remittance_info = 'Updated: Consulting services Q3 2024 - Final payment'
WHERE 
    end_to_end_id = 'E2E-003-2024-09-20'
    AND account_iban = 'CH93 0000 0000 0000 0001';

In [0]:
%%sql
SELECT 
    account_iban,
    account_owner,
    tx_amount,
    remittance_info,
    booking_date,
    end_to_end_id
FROM stefan_koch_bronze.sample_data_delta
WHERE 
    end_to_end_id = 'E2E-003-2024-09-20'
    AND account_iban = 'CH93 0000 0000 0000 0001';

## remove the parquet tables

In [0]:
spark.sql("SHOW TABLES").show()

In [0]:
%%sql
-- Drop specific table
DROP TABLE IF EXISTS stefan_koch_bronze.sample_data_2;

In [0]:
spark.sql("SHOW TABLES").show()

## Create Views

In [0]:
%%sql
CREATE VIEW stefan_koch_bronze.vw_sample_data AS SELECT * FROM stefan_koch_bronze.sample_data_delta;

In [0]:
%%sql
SELECT * FROM stefan_koch_bronze.vw_sample_data 

## create delta tables from dataframe
You don't have to convert a spark table into delta, directly write delta

In [0]:
display(sample_df)

In [0]:
# Define storage location
sample_table = "stefan_koch_bronze.my_dataframe_delta"
storage_location = f"{db_location}/my_dataframe_delta"

# Write DataFrame as Delta table
df.write \
  .format("delta") \
  .mode("overwrite") \
  .option("path", storage_location) \
  .saveAsTable(sample_table)

print(f"✅ Delta table '{sample_table}' created at: {storage_location}")

## Delta Tables

In [0]:
# Show current table info
print("=== Current Delta Table Info ===")
spark.sql("DESCRIBE EXTENDED stefan_koch_bronze.sample_data_delta").show(truncate=False)


In [0]:

# Show Delta Lake properties
spark.sql("SHOW TBLPROPERTIES stefan_koch_bronze.sample_data_delta").show(truncate=False)

### Some deletes on the table

In [0]:
%%sql
SELECT COUNT(*) FROM stefan_koch_bronze.sample_data_delta

In [0]:
%%sql
SELECT * FROM stefan_koch_bronze.sample_data_delta

In [0]:
%%sql
DELETE FROM stefan_koch_bronze.sample_data_delta
WHERE account_currency = 'CHF';

In [0]:
%%sql
SELECT * FROM stefan_koch_bronze.sample_data_delta

## Show history

In [0]:
%%sql
DESCRIBE HISTORY stefan_koch_bronze.sample_data_delta

## Time travel by version

In [0]:
%%sql
-- the actual table, as it is:
SELECT * FROM stefan_koch_bronze.sample_data_delta 

In [0]:
%%sql
-- Query Version 0 (original data)
SELECT * FROM stefan_koch_bronze.sample_data_delta VERSION AS OF 0

In [0]:
%%sql
-- Query Version 0 (original data)
SELECT * FROM stefan_koch_bronze.sample_data_delta VERSION AS OF 1

In [0]:
%%sql
-- Query Version 0 (original data)
SELECT * FROM stefan_koch_bronze.sample_data_delta VERSION AS OF 2

## Timetravel by timestamp

In [0]:
%%sql
SELECT *
FROM stefan_koch_bronze.sample_data_delta 
TIMESTAMP AS OF current_timestamp() - INTERVAL 10 MINUTES

## Delta History - Audit Trail

In [0]:
%%sql
SELECT * FROM stefan_koch_bronze.sample_data_delta VERSION AS OF 1
EXCEPT
SELECT * FROM stefan_koch_bronze.sample_data_delta VERSION AS OF 0

## Restore table to an earlier version
we want to go back to version 1

In [0]:
%%sql
-- Restore by version
RESTORE TABLE stefan_koch_bronze.sample_data_delta TO VERSION AS OF 1;

In [0]:
%%sql
SELECT * FROM stefan_koch_bronze.sample_data_delta

In [0]:
%%sql
DESCRIBE HISTORY stefan_koch_bronze.sample_data_delta

## Schema Evolution

In [0]:
print("Adding new column 'transaction_category'...")
spark.sql("""
ALTER TABLE stefan_koch_bronze.sample_data_delta 
ADD COLUMN transaction_category STRING DEFAULT 'General'
""")

In [0]:
%%sql
SELECT transaction_category, * FROM stefan_koch_bronze.sample_data_delta

In [0]:
# Update some categories
spark.sql("""
UPDATE stefan_koch_bronze.sample_data_delta 
SET transaction_category = CASE 
    WHEN remittance_info LIKE '%salary%' OR remittance_info LIKE '%Salary%' THEN 'Payroll'
    WHEN remittance_info LIKE '%training%' OR remittance_info LIKE '%Training%' THEN 'Education'
    WHEN remittance_info LIKE '%office%' OR remittance_info LIKE '%Office%' THEN 'Office Expenses'
    ELSE 'General'
END
""")

In [0]:
%%sql
SELECT transaction_category, * FROM stefan_koch_bronze.sample_data_delta

## Optimizations

### have a look at the folder, where the delta table is stored.


### let's have a look at some statistics

In [0]:
%%sql
DESCRIBE DETAIL stefan_koch_bronze.sample_data_delta

### let's optimize and do file compaction

In [0]:
# OPTIMIZE - Compaction
print("1. OPTIMIZE (File Compaction):")
spark.sql("OPTIMIZE stefan_koch_bronze.sample_data_delta").show()

### Z-ORDER optimization

In [0]:
# Z-ORDER optimization 
print("\n2. Z-ORDER optimization (by account_owner and booking_date):")
spark.sql("""
OPTIMIZE stefan_koch_bronze.sample_data_delta 
ZORDER BY (account_owner, booking_date)
""").show()

### Show file statistics

In [0]:
%%sql
DESCRIBE DETAIL stefan_koch_bronze.sample_data_delta

## Vacuum - Cleanup old files

In [0]:
print("Files before VACUUM:")
detail_before = spark.sql("DESCRIBE DETAIL stefan_koch_bronze.sample_data_delta")
detail_before.select("numFiles", "sizeInBytes").show()

In [0]:
# VACUUM (careful with retention!)
print("Running VACUUM (removing files older than 168 hours)...")
spark.sql("VACUUM stefan_koch_bronze.sample_data_delta RETAIN 168 HOURS").show()


## Get rid of all versions

In [0]:
# Enable VACUUM with 0 hours (normally not allowed)
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")


In [0]:
print("=== Running VACUUM (removing ALL old versions) ===")
print("⚠️  This will remove ALL historical data files!")

# VACUUM with 0 hours retention - removes everything except current version
vacuum_result = spark.sql("VACUUM stefan_koch_bronze.sample_data_delta RETAIN 0 HOURS")
vacuum_result.show()

## check Vacuum Operation

have a look at the Storage Account, where the delta table is stored

In [0]:
%%sql
DESCRIBE HISTORY stefan_koch_bronze.sample_data_delta

In [0]:
%%sql
DESCRIBE DETAIL stefan_koch_bronze.sample_data_delta

In [0]:
%%sql
SELECT * FROM stefan_koch_bronze.sample_data_delta

## Query the data with the SQL Pool (built-in)

```sql
SELECT
    TOP 100 *
FROM
    OPENROWSET(
        BULK 'https://adlssynapsealtyca.dfs.core.windows.net/bronze/stefan_koch/sample_data_delta/',
        FORMAT = 'DELTA'
    ) AS [result]
```