# ðŸ§  Databricks Delta Tables 101 (Managed Tables Only)
End-to-end learning notebook for Delta Tables: ACID operations, Time Travel, CDC, and Streaming with checkpoints.

In [None]:
%sql
CREATE CATALOG IF NOT EXISTS demo;
USE CATALOG demo;
CREATE SCHEMA IF NOT EXISTS delta101;
USE demo.delta101;

In [None]:
%sql
CREATE OR REPLACE TABLE sales_delta (
  id BIGINT,
  ts TIMESTAMP,
  product STRING,
  qty INT,
  price DECIMAL(10,2)
) USING DELTA;

INSERT INTO sales_delta VALUES
  (1, current_timestamp(), 'Pencil', 5, 1.20),
  (2, current_timestamp(), 'Pen', 2, 2.50),
  (3, current_timestamp(), 'Notebook', 1, 4.90);

SELECT * FROM sales_delta ORDER BY id;

In [None]:
%sql
UPDATE sales_delta SET qty = 10 WHERE id = 1;
DELETE FROM sales_delta WHERE product = 'Pen';
MERGE INTO sales_delta t
USING (SELECT 2 AS id, current_timestamp() AS ts, 'Pen' AS product, 3 AS qty, 2.50 AS price) s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

SELECT * FROM sales_delta ORDER BY id;

In [None]:
%sql
DESCRIBE HISTORY sales_delta;
SELECT * FROM sales_delta VERSION AS OF 0;

In [None]:
%sql
ALTER TABLE sales_delta ADD COLUMN channel STRING;
ALTER TABLE sales_delta SET TBLPROPERTIES ('delta.constraints.qty_positive' = 'qty > 0');
DESCRIBE DETAIL sales_delta;

In [None]:
from pyspark.sql import functions as F

df = spark.table('demo.delta101.sales_delta').withColumn('channel', F.lit('web'))
(df.write
  .option('mergeSchema','true')
  .format('delta')
  .mode('append')
  .saveAsTable('demo.delta101.sales_delta'))

In [None]:
%sql
OPTIMIZE sales_delta;
OPTIMIZE sales_delta ZORDER BY (product, ts);
VACUUM sales_delta RETAIN 168 HOURS;

In [None]:
%sql
ALTER TABLE sales_delta SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true');
UPDATE sales_delta SET qty = qty + 1 WHERE product = 'Pencil';
SELECT * FROM table_changes('sales_delta', 0, 100);

In [None]:
from pyspark.sql.functions import col, to_timestamp
import json, os, shutil

landing_local = '/tmp/delta101_landing_json'
shutil.rmtree(landing_local, ignore_errors=True)
os.makedirs(landing_local, exist_ok=True)

batch1 = [
  {'id': 4, 'ts': '2025-10-27T00:00:00', 'product': 'Eraser', 'qty': 2, 'price': 0.90},
  {'id': 5, 'ts': '2025-10-27T00:05:00', 'product': 'Pencil', 'qty': 1, 'price': 1.20},
]

with open(os.path.join(landing_local, 'batch1.json'), 'w') as f:
    for r in batch1:
        f.write(json.dumps(r) + '\n')

source = spark.readStream.format('json').load(landing_local)
clean = source.withColumn('ts', to_timestamp(col('ts'))).select('id','ts','product','qty','price')
checkpoint_local = '/tmp/delta101_checkpoints/sales_stream_chk'

query = (clean.writeStream
  .format('delta')
  .outputMode('append')
  .option('checkpointLocation', checkpoint_local)
  .toTable('demo.delta101.sales_delta_stream'))

In [None]:
%sql
SELECT * FROM demo.delta101.sales_delta_stream ORDER BY id;
DESCRIBE HISTORY demo.delta101.sales_delta_stream;

In [None]:
spark.sql('''
CREATE OR REPLACE TABLE bronze_sales AS
SELECT * FROM demo.delta101.sales_delta_stream;
''')

spark.sql('''
CREATE TABLE IF NOT EXISTS silver_sales (
  id BIGINT,
  ts TIMESTAMP,
  product STRING,
  qty INT,
  price DECIMAL(10,2)
) USING DELTA;

MERGE INTO silver_sales t
USING (SELECT DISTINCT * FROM bronze_sales) s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
''')

spark.sql('''
CREATE OR REPLACE TABLE gold_sales_10m AS
SELECT window(ts, '10 minutes').start AS window_start,
       product,
       SUM(qty) AS qty_10m,
       SUM(qty*price) AS revenue_10m
FROM silver_sales
GROUP BY window(ts, '10 minutes'), product;

OPTIMIZE gold_sales_10m ZORDER BY (window_start, product);
''')

In [None]:
%sql
DESCRIBE DETAIL sales_delta;
DESCRIBE HISTORY sales_delta LIMIT 10;
ALTER TABLE sales_delta SET TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite'='true',
  'delta.autoOptimize.autoCompact'='true'
);