## Create you first catalog and schema 

In [0]:
%sql
CREATE CATALOG IF NOT EXISTS my_first_catalog
MANAGED LOCATION 'abfss://delta@templearningdelta.dfs.core.windows.net/'

In [0]:
%sql
SHOW CATALOGS;

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS my_first_catalog.my_first_schema;

In [0]:
%sql
USE CATALOG my_first_catalog;
SHOW SCHEMAS;

In [0]:
from delta.tables import *
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, array, ArrayType, DateType, TimestampType, FloatType, DoubleType, DecimalType
from pyspark.sql.functions import *

## Creating a Pyspark dataframe

In [0]:
employee_data = [
    {"emp_id": 1, "emp_name": "John Sean", "dept_id": 101, "salary": 60000.00, "hire_date": "2025-01-15"},
    {"emp_id": 2, "emp_name": "Alice Cooper", "dept_id": 102, "salary": 75000.00, "hire_date": "2025-03-20"},
    {"emp_id": 3, "emp_name": "Bob K.", "dept_id": 101, "salary": 50000.00, "hire_date": "2025-07-10"},
    {"emp_id": 4, "emp_name": "David Harbour", "dept_id": 103, "salary": 82000.00, "hire_date": "2026-02-05"},
    {"emp_id": 5, "emp_name": "Emma Norton", "dept_id": 102, "salary": 67000.00, "hire_date": "2026-01-25"},
    {"emp_id": 6, "emp_name": "Frank Jeff", "dept_id": 103, "salary": 70000.00, "hire_date": "2025-04-15"},
    {"emp_id": 7, "emp_name": "Grace Morgan", "dept_id": 104, "salary": 55000.00, "hire_date": "2026-01-09"},
    {"emp_id": 8, "emp_name": "Henry J. Knott", "dept_id": 104, "salary": 60000.00, "hire_date": "2025-06-03"},
    {"emp_id": 9, "emp_name": "Ivy Lee", "dept_id": 101, "salary": 72000.00, "hire_date": "2025-09-18"},
    {"emp_id": 10, "emp_name": "Jack White", "dept_id": 103, "salary": 65000.00, "hire_date": "2026-02-08"},
]

schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("emp_name", StringType(), True),
    StructField("dept_id", IntegerType(), True),
    StructField("salary", DoubleType(), True),
    StructField("hire_date", StringType(), True)
])

df_employees = spark.createDataFrame(employee_data, schema=schema)

df_employees = df_employees.withColumn("hire_date", to_date("hire_date"))

df_employees.printSchema()
df_employees.show()


## Create a Delta Table and apply partitioning to optimize query performance

In [0]:
%sql
DROP TABLE IF EXISTS my_first_catalog.my_first_schema.employees;

In [0]:
%sql
CREATE TABLE my_first_catalog.my_first_schema.employees (
    emp_id INT PRIMARY KEY,
    emp_name STRING,
    dept_id INT,
    salary DOUBLE,
    hire_date DATE
) USING DELTA
PARTITIONED BY (dept_id);

In [0]:
df_employees.write.format("delta").mode("append").saveAsTable("my_first_catalog.my_first_schema.employees")

In [0]:
%sql
SELECT * FROM my_first_catalog.my_first_schema.employees LIMIT 5;

In [0]:
%sql
DESCRIBE DETAIL my_first_catalog.my_first_schema.employees;

In [0]:
%sql
DESCRIBE HISTORY my_first_catalog.my_first_schema.employees;


In [0]:
%sql
SHOW TBLPROPERTIES my_first_catalog.my_first_schema.employees;

### Drop previous table and recreate it with a generated column 

In [0]:
%sql
DROP TABLE my_first_catalog.my_first_schema.employees;

In [0]:
%sql
CREATE TABLE my_first_catalog.my_first_schema.employees (
    emp_id INT,
    emp_name STRING,
    dept_id INT,
    salary DOUBLE,
    hire_date DATE,
    hire_year INT GENERATED ALWAYS AS (YEAR(hire_date))
) USING DELTA
PARTITIONED BY (dept_id);

In [0]:
df_employees.write.format("delta").mode("append").saveAsTable("my_first_catalog.my_first_schema.employees")

In [0]:
%sql
SELECT * FROM my_first_catalog.my_first_schema.employees LIMIT 5;

### Performing an insert in employees Delta table. We will additinally record additinal user metadata for the this transaction


In [0]:
spark.conf.set(
    "spark.databricks.delta.commitInfo.userMetadata",
    "This row was missed during ingestion from the source system due to a network glitch"
)

In [0]:
%sql
INSERT INTO my_first_catalog.my_first_schema.employees
VALUES (NULL, 'Chris', 104, 55000.00, DATE '2024-01-10');

In [0]:
%sql
SELECT * FROM my_first_catalog.my_first_schema.employees WHERE emp_id=5; 

In [0]:
%sql
DESCRIBE HISTORY my_first_catalog.my_first_schema.employees;

### Performing an update in employees Delta table 

In [0]:
%sql
SELECT * FROM my_first_catalog.my_first_schema.employees WHERE emp_id=5; 

In [0]:
%sql
UPDATE my_first_catalog.my_first_schema.employees SET salary=90000 WHERE emp_id=5;

In [0]:
%sql
DESCRIBE HISTORY my_first_catalog.my_first_schema.employees;

### Turn auto-optimize off in a Delta table 

In [0]:
%sql

ALTER TABLE my_first_catalog.my_first_schema.employees SET TBLPROPERTIES (
  'delta.autoOptimize.autoCompact' = 'false',
  'delta.autoOptimize.optimizeWrite' = 'false'
);


### Performing a delete in employees Delta table 

In [0]:
%sql
SELECT * FROM my_first_catalog.my_first_schema.employees VERSION AS OF 15 WHERE emp_id=5;

In [0]:
%sql
DELETE FROM my_first_catalog.my_first_schema.employees WHERE emp_id=5;
SELECT * FROM my_first_catalog.my_first_schema.employees WHERE emp_id=5;

In [0]:
%sql
DESCRIBE HISTORY my_first_catalog.my_first_schema.employees;

### Performing time-travel Delta table 

In [0]:
%%sql
SELECT * FROM my_first_catalog.my_first_schema.employees VERSION AS OF 0 WHERE emp_id=5;

In [0]:
%sql
RESTORE TABLE my_first_catalog.my_first_schema.employees TO VERSION AS OF 0;
SELECT * FROM my_first_catalog.my_first_schema.employees emp_id=5; 

In [0]:
%sql
DESCRIBE HISTORY my_first_catalog.my_first_schema.employees;

## Performing a CDC merge operation

In [0]:
cdc_data1 = [
    {"emp_id": 2, "emp_name": "Alice Cooper", "dept_id": 102, "salary": 75000.00, "hire_date": "2025-03-20"},
    {"emp_id": 11, "emp_name": "Jack Mathew", "dept_id": 104, "salary": 50000.00, "hire_date": "2026-02-10"},
    {"emp_id": 12, "emp_name": "Kristen Johnson", "dept_id": 103, "salary": 45000.00, "hire_date": "2025-02-10"}
]

df_cdc1 = spark.createDataFrame(cdc_data1, schema)
df_cdc1 = df_cdc1.withColumn("hire_date", to_date("hire_date"))
df_cdc1.show()
df_cdc1.printSchema()
df_cdc1.createOrReplaceTempView("cdc_employees_1")


In [0]:
%sql
MERGE INTO my_first_catalog.my_first_schema.employees
USING cdc_employees_1
ON cdc_employees_1.emp_id = my_first_catalog.my_first_schema.employees.emp_id
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *

In [0]:
%sql
DESCRIBE HISTORY my_first_catalog.my_first_schema.employees;

## Performing a CDC merge operation with schema evolution

In [0]:
cdc_data2 = [
    {"emp_id": 13, "emp_name": "Jane Marry", "dept_id": 102, "salary": 85000.00, "hire_date": "2025-02-22"},
    {"emp_id": 14, "emp_name": "Andrew Anderson", "dept_id": 103, "salary": 55000.00, "hire_date": "2026-02-11"},
    {"emp_id": 15, "emp_name": "Carl Degz", "dept_id": 102, "salary": 75000.00, "hire_date": "2026-02-11"}
]
schema = StructType([
    StructField("emp_id", IntegerType(), False),
    StructField("emp_name", StringType(), True),
    StructField("dept_id", IntegerType(), True),
    StructField("salary", DoubleType(), True),
    StructField("hire_date", StringType(), True)
])

df_cdc2 = spark.createDataFrame(cdc_data2, schema)
df_cdc2 = df_cdc2.withColumn("hire_date", to_date("hire_date"))
df_cdc2.show()
df_cdc2.printSchema()
df_cdc2.createOrReplaceTempView("cdc_employees_2")

In [0]:
%sql
MERGE WITH SCHEMA EVOLUTION INTO my_first_catalog.my_first_schema.employees
USING cdc_employees_2
ON cdc_employees_2.emp_id = my_first_catalog.my_first_schema.employees.emp_id
WHEN MATCHED THEN
  UPDATE SET *
WHEN NOT MATCHED THEN
  INSERT *

In [0]:
%sql
SELECT count(*) FROM my_first_catalog.my_first_schema.employees;

## Using the replaceWhere feature in Delta Lake

In [0]:
%sql
SELECT * FROM my_first_catalog.my_first_schema.employees WHERE hire_date BETWEEN '2026-01-01' AND '2026-01-31';

In [0]:
replace_data = (
    spark.table("my_first_catalog.my_first_schema.employees")                             
         .filter("hire_date BETWEEN '2026-01-01' AND '2026-01-31'")  
         .withColumn("salary", col("salary") + 1000)       
)

In [0]:
(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "hire_date BETWEEN '2026-01-01' AND '2026-01-31'")
  .saveAsTable("my_first_catalog.my_first_schema.employees")
)

In [0]:
%sql
SELECT * FROM my_first_catalog.my_first_schema.employees WHERE hire_date BETWEEN '2026-01-01' AND '2026-01-31';

## Adding new column in Delta Table

In [0]:
%sql
ALTER TABLE my_first_catalog.my_first_schema.employees ADD COLUMNS (dummy_col STRING AFTER emp_id);

In [0]:
%sql
DESCRIBE HISTORY my_first_catalog.my_first_schema.employees;

## Renaming a column in Delta Table

In [0]:
%sql
ALTER TABLE my_first_catalog.my_first_schema.employees RENAME COLUMN dummy_col TO new_dummy_col;

In [0]:
%sql
ALTER TABLE my_first_catalog.my_first_schema.employees SET TBLPROPERTIES ('delta.columnMapping.mode' ='name')

In [0]:
%sql
ALTER TABLE my_first_catalog.my_first_schema.employees RENAME COLUMN dummy_col TO new_dummy_col;

In [0]:
%sql
DESCRIBE TABLE my_first_catalog.my_first_schema.employees;

In [0]:
%sql
SHOW TBLPROPERTIES my_first_catalog.my_first_schema.employees;

## Drop a column in a Delta table


In [0]:
%sql
ALTER TABLE my_first_catalog.my_first_schema.employees DROP COLUMN new_dummy_col;

In [0]:
%sql
DESCRIBE TABLE my_first_catalog.my_first_schema.employees;

In [0]:
%sql
DESCRIBE HISTORY my_first_catalog.my_first_schema.employees;

## Setting up constraints on Delta Table - Not Null Contraint

In [0]:
%sql
ALTER TABLE my_first_catalog.my_first_schema.employees
ALTER COLUMN emp_id SET NOT NULL;

In [0]:
%sql
INSERT INTO my_first_catalog.my_first_schema.employees
VALUES (NULL, 'Chris', 104, 55000.00, DATE '2024-01-10');
    

## Setting up constraints on Delta Table - Check Contraint

In [0]:
%sql
ALTER TABLE my_first_catalog.my_first_schema.employees ADD CONSTRAINT valid_date CHECK (hire_date >= '2025-01-01' and hire_date < '2027-01-01');
DESCRIBE DETAIL my_first_catalog.my_first_schema.employees;
SHOW TBLPROPERTIES my_first_catalog.my_first_schema.employees;

In [0]:
%sql
INSERT INTO my_first_catalog.my_first_schema.employees (emp_id,emp_name,dept_id,salary,hire_date) VALUES (15,'Chris',104,55000.00,'2027-01-10');

## Setting up constraints on Delta Table - Foreign key Contraint

In [0]:
%sql
DROP TABLE IF EXISTS my_first_catalog.my_first_schema.department

In [0]:
%sql
CREATE TABLE IF NOT EXISTS my_first_catalog.my_first_schema.department (dept_id INT PRIMARY KEY, dept_name VARCHAR(255), dept_head VARCHAR(255));


In [0]:
%sql
INSERT INTO my_first_catalog.my_first_schema.department VALUES (101,'Sales','John Doe'),(102,'Marketing','Jane Smith'),(103,'Engineering','Alice Johnson'),(104,'HR','Bob Brown');
    


In [0]:
%sql
SELECT * FROM my_first_catalog.my_first_schema.department;

In [0]:
%sql
DESCRIBE EXTENDED my_first_catalog.my_first_schema.department;

In [0]:
%sql
ALTER TABLE my_first_catalog.my_first_schema.employees ADD CONSTRAINT dept_id FOREIGN KEY (dept_id) REFERENCES my_first_catalog.my_first_schema.department (dept_id);

In [0]:
%sql
EXPLAIN SELECT 
    e.emp_id,
    e.emp_name,
    e.salary,
    e.hire_date,
    d.dept_name
FROM my_first_catalog.my_first_schema.employees e
JOIN my_first_catalog.my_first_schema.department d
  ON e.dept_id = d.dept_id;

### Unlike traditional SQL systems, Delta table foreign key constraints are informational and not enforced, so inserts succeed even when the referenced department record is missing.

In [0]:
%sql
INSERT INTO my_first_catalog.my_first_schema.employees(emp_id, emp_name, dept_id, salary, hire_date) VALUES (18,'May Lauren',106,60000,'2025-01-01');

In [0]:
%sql
ALTER TABLE my_first_catalog.my_first_schema.employees
DROP CONSTRAINT IF EXISTS employees_dept_fk;

## Create a Delta Table with Liquid Clustering

In [0]:
%sql
DROP TABLE IF EXISTS my_first_catalog.my_first_schema.employees_clusters;

In [0]:
df_employees.write \
  .format("delta") \
  .clusterBy("hire_date") \
  .saveAsTable("my_first_catalog.my_first_schema.employees_clusters")

In [0]:
%sql
DESCRIBE DETAIL my_first_catalog.my_first_schema.employees_clusters;

In [0]:
%sql
SHOW TBLPROPERTIES my_first_catalog.my_first_schema.employees_clusters;

In [0]:
%sql
SELECT * FROM my_first_catalog.my_first_schema.employees_clusters WHERE hire_date BETWEEN '2025-01-01' AND '2025-06-30';

### Optimizing the Delta Table using Z-Ordering. %md
### After OPTIMIZE, Delta rewrites and compacts files within partitions, while Z-Ordering reorganizes data to improve data skipping based on emp_id, resulting in fewer and better-organized files.

In [0]:
%sql
DESCRIBE DETAIL my_first_catalog.my_first_schema.employees

In [0]:
%sql
OPTIMIZE my_first_catalog.my_first_schema.employees
ZORDER BY (emp_id)

In [0]:
%sql
DESCRIBE DETAIL my_first_catalog.my_first_schema.employees