# Delta Lake Training - Environment Setup

This notebook sets up the training environment including:
- Catalog: `databricks_training`
- Schema: `delta_demo`
- Volumes: `raw_data`, `bronze`, `silver`, `gold`
- Generates sample retail datasets with realistic data quality issues


In [0]:
# Step 1: Create Catalog
spark.sql("CREATE CATALOG IF NOT EXISTS avish_practice")
spark.sql("USE CATALOG avish_practice")
print("✓ Catalog 'avish_practice' created/verified")


In [0]:
# Step 2: Create Schema
spark.sql("CREATE SCHEMA IF NOT EXISTS delta_demo")
spark.sql("USE SCHEMA delta_demo")
print("✓ Schema 'delta_demo' created/verified")


In [0]:
# Step 3: Create Volumes
volumes = ['raw_data', 'bronze', 'silver', 'gold']

for volume in volumes:
    spark.sql(f"CREATE VOLUME IF NOT EXISTS delta_demo.{volume}")
    print(f"✓ Volume '{volume}' created/verified")

print("\nAll volumes created successfully!")


In [0]:
# Verify volumes
spark.sql("SHOW VOLUMES IN delta_demo").show(truncate=False)


## Generate Sample Retail Datasets

We'll create:
- **Customers**: Customer master data
- **Products**: Product catalog
- **Geographies**: Geographic reference data
- **Orders**: Transaction fact table (500k+ rows) with partitions

Data quality issues introduced:
- Date inconsistencies (different formats, invalid dates)
- Missing/null values
- Incorrect data types
- Bad records (negative values, out of range)
- Multiple file formats (CSV and Parquet)
- Partitioned by date


In [0]:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
import os

# Set random seed for reproducibility
np.random.seed(42)
random.seed(42)

# Base path for volumes
base_path = "/Volumes/avish_practice//delta_demo"

print("Starting data generation...")


In [0]:
# Generate Geographies Data
geographies = {
    'geography_id': range(1, 51),
    'country': ['USA'] * 30 + ['Canada'] * 10 + ['Mexico'] * 10,
    'state': ['CA', 'NY', 'TX', 'FL', 'IL'] * 10,
    'city': [f'City_{i}' for i in range(1, 51)],
    'postal_code': [f'{random.randint(10000, 99999)}' for _ in range(50)]
}

df_geographies = pd.DataFrame(geographies)
# Introduce some nulls
df_geographies.loc[df_geographies.sample(frac=0.1).index, 'postal_code'] = None

# Save as CSV
geo_path = f"{base_path}/raw_data/geographies.csv"
df_geographies.to_csv(geo_path, index=False)
print(f"✓ Created geographies.csv with {len(df_geographies)} records")


In [0]:
# Generate Products Data
product_categories = ['Electronics', 'Clothing', 'Food', 'Books', 'Toys', 'Home', 'Sports', 'Beauty']
products = {
    'product_id': range(1, 1001),
    'product_name': [f'Product_{i}' for i in range(1, 1001)],
    'category': [random.choice(product_categories) for _ in range(1000)],
    'price': np.round(np.random.uniform(10, 500, 1000), 2),
    'cost': np.round(np.random.uniform(5, 400, 1000), 2),
    'supplier_id': [random.randint(1, 50) for _ in range(1000)]
}

df_products = pd.DataFrame(products)
# Introduce bad data: negative prices, nulls
bad_indices = df_products.sample(frac=0.05).index
df_products.loc[bad_indices, 'price'] = -abs(df_products.loc[bad_indices, 'price'])
df_products.loc[df_products.sample(frac=0.08).index, 'category'] = None
df_products.loc[df_products.sample(frac=0.03).index, 'supplier_id'] = None

# Save as Parquet (partitioned)
products_path = f"{base_path}/raw_data/products"
df_products.to_parquet(products_path, index=False, partition_cols=['category'])
print(f"✓ Created products parquet with {len(df_products)} records (partitioned by category)")


In [0]:
# Generate Customers Data
first_names = ['John', 'Jane', 'Mike', 'Sarah', 'David', 'Emily', 'Chris', 'Lisa', 'Tom', 'Amy']
last_names = ['Smith', 'Johnson', 'Williams', 'Brown', 'Jones', 'Garcia', 'Miller', 'Davis', 'Rodriguez', 'Martinez']

customers = {
    'customer_id': range(1, 10001),
    'first_name': [random.choice(first_names) for _ in range(10000)],
    'last_name': [random.choice(last_names) for _ in range(10000)],
    'email': [f'customer_{i}@example.com' for i in range(1, 10001)],
    'phone': [f'{random.randint(100, 999)}-{random.randint(100, 999)}-{random.randint(1000, 9999)}' for _ in range(10000)],
    'geography_id': [random.randint(1, 50) for _ in range(10000)],
    'registration_date': [(datetime(2020, 1, 1) + timedelta(days=random.randint(0, 1460))).strftime('%Y-%m-%d') for _ in range(10000)]
}

df_customers = pd.DataFrame(customers)
# Introduce date inconsistencies and nulls
date_inconsistencies = df_customers.sample(frac=0.1).index
df_customers.loc[date_inconsistencies[:len(date_inconsistencies)//3], 'registration_date'] = \
    [(datetime(2020, 1, 1) + timedelta(days=random.randint(0, 1460))).strftime('%m/%d/%Y') for _ in range(len(date_inconsistencies)//3)]
df_customers.loc[date_inconsistencies[len(date_inconsistencies)//3:2*len(date_inconsistencies)//3], 'registration_date'] = \
    [(datetime(2020, 1, 1) + timedelta(days=random.randint(0, 1460))).strftime('%d-%m-%Y') for _ in range(len(date_inconsistencies)//3)]
df_customers.loc[date_inconsistencies[2*len(date_inconsistencies)//3:], 'registration_date'] = 'invalid_date'

# Add nulls
df_customers.loc[df_customers.sample(frac=0.05).index, 'phone'] = None
df_customers.loc[df_customers.sample(frac=0.03).index, 'geography_id'] = None

# Save as CSV (multiple files to simulate partitions)
customers_path = f"{base_path}/raw_data/customers"
os.makedirs(customers_path, exist_ok=True)
chunk_size = 2500
for i in range(0, len(df_customers), chunk_size):
    chunk = df_customers.iloc[i:i+chunk_size]
    chunk.to_csv(f"{customers_path}/customers_part_{i//chunk_size + 1}.csv", index=False)
print(f"✓ Created customers CSV files with {len(df_customers)} records (4 partitions)")


In [0]:
# Generate Orders Data (500k+ rows) - Fact Table
num_orders = 500000
start_date = datetime(2023, 1, 1)
end_date = datetime(2024, 12, 31)

orders = {
    'order_id': range(1, num_orders + 1),
    'customer_id': [random.randint(1, 10000) for _ in range(num_orders)],
    'product_id': [random.randint(1, 1000) for _ in range(num_orders)],
    'order_date': [(start_date + timedelta(days=random.randint(0, (end_date - start_date).days))).strftime('%Y-%m-%d') for _ in range(num_orders)],
    'quantity': [random.randint(1, 10) for _ in range(num_orders)],
    'unit_price': np.round(np.random.uniform(10, 500, num_orders), 2),
    'discount': np.round(np.random.uniform(0, 0.3, num_orders), 2),
    'shipping_cost': np.round(np.random.uniform(5, 50, num_orders), 2)
}

df_orders = pd.DataFrame(orders)

# Introduce data quality issues
# Date inconsistencies
date_issues = df_orders.sample(frac=0.15).index
df_orders.loc[date_issues[:len(date_issues)//4], 'order_date'] = \
    [(start_date + timedelta(days=random.randint(0, (end_date - start_date).days))).strftime('%m/%d/%Y') for _ in range(len(date_issues)//4)]
df_orders.loc[date_issues[len(date_issues)//4:len(date_issues)//2], 'order_date'] = \
    [(start_date + timedelta(days=random.randint(0, (end_date - start_date).days))).strftime('%d-%m-%Y') for _ in range(len(date_issues)//4)]
df_orders.loc[date_issues[len(date_issues)//2:3*len(date_issues)//4], 'order_date'] = \
    [(start_date + timedelta(days=random.randint(0, (end_date - start_date).days))).strftime('%Y%m%d') for _ in range(len(date_issues)//4)]
df_orders.loc[date_issues[3*len(date_issues)//4:], 'order_date'] = 'invalid'  # Invalid dates

# Bad data: negative quantities, excessive discounts, nulls
bad_indices = df_orders.sample(frac=0.02).index
df_orders.loc[bad_indices, 'quantity'] = -abs(df_orders.loc[bad_indices, 'quantity'])

bad_discounts = df_orders.sample(frac=0.01).index
df_orders.loc[bad_discounts, 'discount'] = np.random.uniform(1.0, 2.0, len(bad_discounts))  # Discount > 100%

# Nulls
df_orders.loc[df_orders.sample(frac=0.04).index, 'discount'] = None
df_orders.loc[df_orders.sample(frac=0.02).index, 'shipping_cost'] = None
df_orders.loc[df_orders.sample(frac=0.01).index, 'customer_id'] = None

# Save as Parquet partitioned by order_date (year-month)
df_orders['order_date_parsed'] = pd.to_datetime(df_orders['order_date'], errors='coerce')
df_orders['year_month'] = df_orders['order_date_parsed'].dt.to_period('M').astype(str)
df_orders = df_orders.drop('order_date_parsed', axis=1)

orders_path = f"{base_path}/raw_data/orders"
os.makedirs(orders_path, exist_ok=True)

# Save in chunks by year_month partition
for partition in df_orders['year_month'].dropna().unique():
    partition_data = df_orders[df_orders['year_month'] == partition].drop('year_month', axis=1)
    partition_data.to_parquet(f"{orders_path}/year_month={partition}", index=False, partition_cols=None)

print(f"✓ Created orders parquet with {len(df_orders)} records (partitioned by year_month)")
print(f"  Partitions: {df_orders['year_month'].nunique()} unique year-month combinations")


In [0]:
# Verify all files created
print("\n=== Data Generation Summary ===")
print(f"Geographies: {base_path}/raw_data/geographies.csv")
print(f"Products: {base_path}/raw_data/products/ (partitioned by category)")
print(f"Customers: {base_path}/raw_data/customers/ (4 CSV files)")
print(f"Orders: {base_path}/raw_data/orders/ (partitioned by year_month)")
print("\n✓ Setup complete! Ready for data transformation notebooks.")
