In [0]:
%sql

USE tabular.dataexpert

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id, col, expr
from faker import Faker
import pandas as pd

fake = Faker()
spark.sql("USE tabular.dataexpert")

# Helper to create synthetic dimension data
def create_dim_product(n=10):
    data = [{"product_id": i+1, "product_name": fake.word(), "price": round(fake.random_number(digits=3), 2)} for i in range(n)]
    return pd.DataFrame(data)

def create_dim_category(n=5):
    data = [{"category_id": i+1, "category_name": fake.word()} for i in range(n)]
    return pd.DataFrame(data)

def create_dim_date(n=20):
    data = [{"date_id": i+1, "date": fake.date_this_decade()} for i in range(n)]
    return pd.DataFrame(data)

# Create dimension tables
spark.sql("""
CREATE TABLE IF NOT EXISTS dim_product_sahil (
    product_id INT,
    product_name STRING,
    price DOUBLE
) USING DELTA
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS dim_category_sahil (
    category_id INT,
    category_name STRING
) USING DELTA
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS dim_date_sahil (
    date_id INT,
    date DATE
) USING DELTA
""")

# Create fact table
spark.sql("""
CREATE TABLE IF NOT EXISTS fact_sales_sahil (
    sale_id BIGINT GENERATED ALWAYS AS IDENTITY,
    product_id INT,
    category_id INT,
    date_id INT,
    quantity INT,
    sales_amount DOUBLE,
    ts TIMESTAMP
) USING DELTA
""")


In [0]:
#print(create_dim_product())
print(create_dim_category())
#print(create_dim_date())

In [0]:
# Insert synthetic data into dimension tables
df_product = spark.createDataFrame(create_dim_product(10)).selectExpr("CAST(product_id AS INT) AS product_id", "product_name", "CAST(price AS DOUBLE) AS price")
df_category = spark.createDataFrame(create_dim_category(5)).selectExpr("CAST(category_id AS INT) AS category_id", "category_name")
df_date = spark.createDataFrame(create_dim_date(20)).selectExpr("CAST(date_id AS INT) AS date_id", "CAST(date AS DATE) AS date")

df_product.write.format("delta").mode("overwrite").saveAsTable("dim_product_sahil")
df_category.write.format("delta").mode("overwrite").saveAsTable("dim_category_sahil")
df_date.write.format("delta").mode("overwrite").saveAsTable("dim_date_sahil")


# Verify
spark.sql("SELECT * FROM dim_product_sahil").show()
spark.sql("SELECT * FROM dim_category_sahil").show()
spark.sql("SELECT * FROM dim_date_sahil").show()


In [0]:
%sql
select * from dim_category_sahil

In [0]:
%sql
SELECT * FROM dim_date_sahil

In [0]:
import random
from datetime import datetime, timedelta
import pandas as pd


base_date = datetime.today()
dates = [(i, (base_date - timedelta(days=i)).date()) for i in range(1, 21)]

# Generate fact data
fact_data = []
for _ in range(50):
    product_id = random.randint(1, 10)
    category_id = random.randint(1, 5)
    date_id = random.randint(1, 20)
    quantity = random.randint(1, 20)
    sales_amount = round(quantity * random.uniform(10, 100), 2)
    ts = (base_date - timedelta(days=random.randint(0, 20))).strftime('%Y-%m-%d %H:%M:%S')
    
    
    fact_data.append({
        "product_id": product_id,
        "category_id": category_id,
        "date_id": date_id,
        "quantity": quantity,
        "sales_amount": sales_amount,
        "ts": ts
    })

df_fact = spark.createDataFrame(pd.DataFrame(fact_data)).selectExpr(
    "CAST(product_id AS INT) AS product_id",
    "CAST(category_id AS INT) AS category_id",
    "CAST(date_id AS INT) AS date_id",
    "CAST(quantity AS INT) AS quantity",
    "CAST(sales_amount AS DOUBLE) AS sales_amount",
    "CAST(ts AS TIMESTAMP) AS ts"
)
spark.sql("USE tabular.dataexpert")
df_fact.write.format("delta").mode("overwrite").saveAsTable("fact_sales_sahil")

# Verify
spark.sql("SELECT * FROM fact_sales_sahil").show()


In [0]:
%sql
DESCRIBE DETAIL dim_product_sahil;

In [0]:
%sql

describe history dim_product_sahil;

In [0]:
%sql

describe detail dim_category_sahil;

In [0]:
%sql

describe history dim_category_sahil;
    


In [0]:
%sql
DESCRIBE DETAIL dim_date_sahil;

In [0]:
%sql

describe history dim_date_sahil;

In [0]:
%sql
DESCRIBE DETAIL fact_sales_sahil;

In [0]:
%sql

DESCRIBE HISTORY fact_sales_sahil

In [0]:
%sql

LIST 's3://techcreator/dataexpert-databricks/external_storage/schemas/e54ae632-5fbd-4a2a-8b56-e9c9f3f8a77b/tables/8685d5af-2448-4c11-a124-f9694660189a'

In [0]:
%sql
-- Apply Liquid Clustering

ALTER TABLE dim_product_sahil CLUSTER BY (product_id);
ALTER TABLE dim_category_sahil CLUSTER BY (category_id);
ALTER TABLE dim_date_sahil CLUSTER BY (date_id);
ALTER TABLE fact_sales_sahil CLUSTER BY (product_id, category_id, date_id, ts);


In [0]:
%sql
DESCRIBE DETAIL dim_product_sahil;

In [0]:
%sql

drop table dim_category_sahil;
drop table dim_date_sahil;
drop table dim_product_sahil;
drop table fact_sales_sahil;

In [0]:
DESCRIBE DETAIL dim_product_sahil;

In [0]:
%sql
-- Table Optimization

OPTIMIZE dim_product_sahil;
OPTIMIZE dim_category_sahil;
OPTIMIZE dim_date_sahil;
OPTIMIZE fact_sales_sahil;


In [0]:
%sql

USE tabular.dataexpert;
-- Insert batch 1: Initial synthetic fact data
INSERT INTO fact_sales_sahil (product_id, category_id, date_id, quantity, sales_amount, ts) VALUES
  (1, 1, 1, 10, 100.0, '2025-06-01 10:00:00'),
  (2, 2, 2, 5, 60.0, '2025-06-01 11:00:00'),
  (3, 3, 3, 8, 88.0, '2025-06-01 12:00:00');

-- Insert batch 2: More synthetic fact data
INSERT INTO fact_sales_sahil (product_id, category_id, date_id, quantity, sales_amount, ts) VALUES
  (4, 2, 4, 12, 144.0, '2025-06-02 09:00:00'),
  (5, 1, 5, 7, 77.0, '2025-06-02 10:30:00');

-- Update: Change quantity and sales_amount for one row
UPDATE fact_sales_sahil
SET quantity = 15, sales_amount = 150.0
WHERE product_id = 1 AND category_id = 1 AND date_id = 1;

-- Insert batch 3: Even more synthetic fact data
INSERT INTO fact_sales_sahil (product_id, category_id, date_id, quantity, sales_amount, ts) VALUES
  (2, 2, 6, 9, 99.0, '2025-06-03 08:00:00'),
  (3, 3, 7, 6, 66.0, '2025-06-03 09:30:00');

-- Update: Simulate a correction
UPDATE fact_sales_sahil
SET sales_amount = sales_amount + 10
WHERE product_id = 2;

-- Optional: Delete a row to create another snapshot
DELETE FROM fact_sales_sahil
WHERE product_id = 5 AND category_id = 1 AND date_id = 5;

-- Check current state
SELECT * FROM fact_sales_sahil;

In [0]:
%sql

DESCRIBE HISTORY fact_sales_sahil;

In [0]:
%sql

SELECT * FROM fact_sales_sahil VERSION AS OF 0;



In [0]:
%sql
-- Set the history retention period to 30 days
ALTER TABLE fact_sales_sahil SET TBLPROPERTIES ('delta.logRetentionDuration' = '30 days');

-- Perform some operations on the Delta table
INSERT INTO fact_sales_sahil (product_id, category_id, date_id, quantity, sales_amount, ts) VALUES
  (6, 3, 8, 10, 100.0, '2025-06-04 10:00:00');

UPDATE fact_sales_sahil
SET quantity = 20
WHERE product_id = 6;

DELETE FROM fact_sales_sahil
WHERE product_id = 6;

-- Check the history of the Delta table
DESCRIBE HISTORY fact_sales_sahil;

SELECT * FROM fact_sales_sahil VERSION AS OF 1;


Optimization in Delta Lake compacts many small files into fewer large files, reducing file fragmentation. This minimizes the overhead of opening and scanning multiple files during queries. As a result, query engines can read data more efficiently, leading to faster query performance and lower resource consumption. Regular optimization ensures that data is organized for optimal access patterns, especially after frequent inserts, updates, or deletes.

In [0]:
%sql
select count(*) from fact_sales_sahil

In [0]:
%sql


DELETE FROM fact_sales_sahil WHERE 1=1;

-- Check table is empty
SELECT COUNT(*) FROM fact_sales_sahil;

-- Restore
RESTORE TABLE fact_sales_sahil TO VERSION AS OF 1;

-- Check data is restored
SELECT COUNT(*) FROM fact_sales_sahil;

