## 1. Environment Check

Verify that Spark and Delta Lake are available.

In [None]:
# Check Spark version
print(f"Spark Version: {spark.version}")
print(f"Scala Version: {spark.sparkContext._conf.get('spark.executor.extraJavaOptions', 'N/A')}")

# Check Delta Lake
try:
    import delta
    print(f"Delta Lake Version: {delta.__version__}")
    print("✓ Delta Lake is available")
except ImportError:
    print("✗ Delta Lake not found - install or use runtime 11.0+")

# List available databases
spark.sql("SHOW DATABASES").show()

print("\n✓ Environment check complete")

## 2. Upload Sample Data to DBFS

**Option A: Manual Upload (Recommended for Community Edition)**
1. Go to "Data" tab in Databricks
2. Click "Create Table"
3. Upload files:
   - `domains/sales/sample_data/orders_2025-01-01.csv`
   - `domains/research_ops/sample_data/experiments_2025-01-01.csv`
4. Note the DBFS paths (e.g., `/FileStore/tables/orders_2025_01_01.csv`)

**Option B: Programmatic Upload (if files are in Workspace)**
Run the cell below if you've uploaded files to your workspace.

In [None]:
# Option B: Copy from workspace to DBFS (adjust paths as needed)
# Uncomment if using this approach

# import os
# 
# # Create raw data directories
# dbutils.fs.mkdirs("dbfs:/raw/sales/")
# dbutils.fs.mkdirs("dbfs:/raw/research_ops/")
# 
# # Copy sales data
# dbutils.fs.cp(
#     "file:/Workspace/Users/<your_email>/orders_2025-01-01.csv",
#     "dbfs:/raw/sales/orders_2025-01-01.csv"
# )
# 
# # Copy research ops data
# dbutils.fs.cp(
#     "file:/Workspace/Users/<your_email>/experiments_2025-01-01.csv",
#     "dbfs:/raw/research_ops/experiments_2025-01-01.csv"
# )
# 
# print("✓ Sample data uploaded to DBFS")

# For manual upload, just set the paths here:
sales_raw_path = "dbfs:/FileStore/tables/orders_2025_01_01.csv"  # Adjust based on actual upload
experiments_raw_path = "dbfs:/FileStore/tables/experiments_2025_01_01.csv"  # Adjust based on actual upload

print(f"Sales data path: {sales_raw_path}")
print(f"Experiments data path: {experiments_raw_path}")

## 3. Sales Domain - Bronze Layer

Read raw CSV data and write to Bronze layer with metadata.

**Bronze Layer Purpose:**
- Ingest raw data as-is
- Add metadata (_ingestion_timestamp, _source_file, _ingestion_date)
- Store in Delta format for ACID guarantees
- Partition by ingestion date

In [None]:
from pyspark.sql.functions import input_file_name, current_timestamp, to_date, lit

# Configuration
bronze_path = "dbfs:/bronze/sales/orders/"

# Read raw CSV
print("Reading raw sales data...")
sales_df = spark.read.csv(
    sales_raw_path,
    header=True,
    inferSchema=True
)

print(f"Read {sales_df.count()} rows")
print("\nSchema:")
sales_df.printSchema()

# Add metadata columns
sales_bronze_df = sales_df \
    .withColumn("_ingestion_timestamp", current_timestamp()) \
    .withColumn("_source_file", lit(sales_raw_path)) \
    .withColumn("_ingestion_date", to_date(current_timestamp()))

# Write to Delta (Bronze layer)
print("\nWriting to Bronze layer...")
sales_bronze_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("_ingestion_date") \
    .save(bronze_path)

print(f"✓ Bronze: Wrote {sales_bronze_df.count()} rows to {bronze_path}")

# Verify
print("\nSample data:")
spark.read.format("delta").load(bronze_path).show(5)

## 4. Sales Domain - Silver Layer

Clean and validate data from Bronze layer.

**Silver Layer Purpose:**
- Deduplicate records
- Validate data quality (quantity > 0, unit_price >= 0)
- Type conversions and standardization
- Remove invalid records

In [None]:
from pyspark.sql.functions import col

# Configuration
silver_path = "dbfs:/silver/sales/orders/"

# Read from Bronze
print("Reading from Bronze layer...")
bronze_df = spark.read.format("delta").load(bronze_path)
print(f"Bronze records: {bronze_df.count()}")

# Deduplicate by order_id (keep latest by ingestion timestamp)
print("\nDeduplicating...")
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.partitionBy("order_id").orderBy(col("_ingestion_timestamp").desc())
silver_df = bronze_df \
    .withColumn("row_num", row_number().over(window_spec)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")

# Data quality checks
print("Applying quality checks...")
silver_df = silver_df.filter(
    (col("quantity").isNotNull()) &
    (col("quantity") > 0) &
    (col("unit_price").isNotNull()) &
    (col("unit_price") >= 0)
)

# Type conversions
silver_df = silver_df \
    .withColumn("quantity", col("quantity").cast("int")) \
    .withColumn("unit_price", col("unit_price").cast("decimal(10,2)"))

# Write to Delta (Silver layer)
print("\nWriting to Silver layer...")
silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(silver_path)

print(f"✓ Silver: Wrote {silver_df.count()} validated rows to {silver_path}")

# Verify
print("\nSample data:")
spark.read.format("delta").load(silver_path).show(5)

## 5. Sales Domain - Gold Layer

Create aggregated data product: daily revenue.

**Gold Layer Purpose:**
- Business-level aggregations
- Ready for analytics and reporting
- Conforms to data contracts
- Optimized for consumption

In [None]:
from pyspark.sql.functions import sum as _sum

# Configuration
gold_path = "dbfs:/gold/sales/daily_revenue/"

# Read from Silver
print("Reading from Silver layer...")
silver_df = spark.read.format("delta").load(silver_path)

# Calculate revenue for each order
print("\nCalculating revenue...")
gold_df = silver_df.withColumn(
    "revenue",
    col("quantity") * col("unit_price")
)

# Aggregate by order_date
print("Aggregating by date...")
gold_df = gold_df.groupBy("order_date").agg(
    _sum("revenue").alias("daily_revenue")
).orderBy("order_date")

# Write to Delta (Gold layer)
print("\nWriting to Gold layer...")
gold_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(gold_path)

print(f"✓ Gold: Wrote {gold_df.count()} aggregated rows to {gold_path}")

# Display results
print("\nDaily Revenue Report:")
gold_df.show()

## 6. Research Ops Domain - Bronze Layer

Ingest experiment data with metadata.

In [None]:
# Configuration
bronze_path = "dbfs:/bronze/research_ops/experiments/"

# Read raw CSV
print("Reading raw experiments data...")
exp_df = spark.read.csv(
    experiments_raw_path,
    header=True,
    inferSchema=True
)

print(f"Read {exp_df.count()} rows")
print("\nSchema:")
exp_df.printSchema()

# Add metadata
exp_bronze_df = exp_df \
    .withColumn("_ingestion_timestamp", current_timestamp()) \
    .withColumn("_source_file", lit(experiments_raw_path)) \
    .withColumn("_ingestion_date", to_date(current_timestamp()))

# Write to Bronze
print("\nWriting to Bronze layer...")
exp_bronze_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("_ingestion_date") \
    .save(bronze_path)

print(f"✓ Bronze: Wrote {exp_bronze_df.count()} rows to {bronze_path}")

# Verify
print("\nSample data:")
spark.read.format("delta").load(bronze_path).show(5)

## 7. Research Ops Domain - Silver Layer

Clean and validate experiment data.

In [None]:
from pyspark.sql.functions import upper, trim

# Configuration
silver_path = "dbfs:/silver/research_ops/experiments/"

# Read from Bronze
print("Reading from Bronze layer...")
bronze_df = spark.read.format("delta").load(bronze_path)
print(f"Bronze records: {bronze_df.count()}")

# Deduplicate by experiment_id
print("\nDeduplicating...")
window_spec = Window.partitionBy("experiment_id").orderBy(col("_ingestion_timestamp").desc())
silver_df = bronze_df \
    .withColumn("row_num", row_number().over(window_spec)) \
    .filter(col("row_num") == 1) \
    .drop("row_num")

# Standardize IDs (uppercase, trim)
print("Standardizing data...")
silver_df = silver_df \
    .withColumn("experiment_id", upper(trim(col("experiment_id")))) \
    .withColumn("researcher_id", upper(trim(col("researcher_id"))))

# Data quality checks
print("Applying quality checks...")
silver_df = silver_df.filter(
    (col("trial_count").isNotNull()) &
    (col("trial_count") > 0) &
    (col("duration_minutes").isNotNull()) &
    (col("duration_minutes") >= 0)
)

# Type conversions
silver_df = silver_df \
    .withColumn("trial_count", col("trial_count").cast("int")) \
    .withColumn("duration_minutes", col("duration_minutes").cast("decimal(10,2)"))

# Write to Silver
print("\nWriting to Silver layer...")
silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(silver_path)

print(f"✓ Silver: Wrote {silver_df.count()} validated rows to {silver_path}")

# Verify
print("\nSample data:")
spark.read.format("delta").load(silver_path).show(5)

## 8. Research Ops Domain - Gold Layer

Create aggregated experiment metrics by date.

In [None]:
from pyspark.sql.functions import count, avg

# Configuration
gold_path = "dbfs:/gold/research_ops/daily_metrics/"

# Read from Silver
print("Reading from Silver layer...")
silver_df = spark.read.format("delta").load(silver_path)

# Aggregate by experiment_date
print("\nCalculating metrics...")
gold_df = silver_df.groupBy("experiment_date").agg(
    _sum("trial_count").alias("total_trials"),
    count("experiment_id").alias("experiment_count"),
    avg("duration_minutes").alias("avg_duration_minutes")
).orderBy("experiment_date")

# Write to Gold
print("\nWriting to Gold layer...")
gold_df.write \
    .format("delta") \
    .mode("overwrite") \
    .save(gold_path)

print(f"✓ Gold: Wrote {gold_df.count()} aggregated rows to {gold_path}")

# Display results
print("\nDaily Experiment Metrics:")
gold_df.show()

## 9. Verification & Queries

Run SQL queries to verify the pipeline results.

In [None]:
# Register Delta tables as temp views
print("Registering tables...\n")

spark.read.format("delta").load("dbfs:/gold/sales/daily_revenue/") \
    .createOrReplaceTempView("sales_daily_revenue")

spark.read.format("delta").load("dbfs:/gold/research_ops/daily_metrics/") \
    .createOrReplaceTempView("research_ops_daily_metrics")

print("✓ Tables registered\n")

# Query 1: Sales revenue summary
print("=" * 60)
print("Query 1: Sales Revenue Summary")
print("=" * 60)
spark.sql("""
SELECT 
    order_date,
    ROUND(daily_revenue, 2) as daily_revenue
FROM sales_daily_revenue
ORDER BY order_date
""").show()

# Query 2: Total revenue
print("\n" + "=" * 60)
print("Query 2: Total Revenue")
print("=" * 60)
spark.sql("""
SELECT 
    COUNT(*) as total_days,
    ROUND(SUM(daily_revenue), 2) as total_revenue,
    ROUND(AVG(daily_revenue), 2) as avg_daily_revenue
FROM sales_daily_revenue
""").show()

# Query 3: Research ops metrics
print("\n" + "=" * 60)
print("Query 3: Research Ops Metrics")
print("=" * 60)
spark.sql("""
SELECT 
    experiment_date,
    experiment_count,
    total_trials,
    ROUND(avg_duration_minutes, 2) as avg_duration_minutes
FROM research_ops_daily_metrics
ORDER BY experiment_date
""").show()

# Query 4: Data quality check
print("\n" + "=" * 60)
print("Query 4: Data Quality - Row Counts by Layer")
print("=" * 60)

sales_bronze_count = spark.read.format("delta").load("dbfs:/bronze/sales/orders/").count()
sales_silver_count = spark.read.format("delta").load("dbfs:/silver/sales/orders/").count()
sales_gold_count = spark.read.format("delta").load("dbfs:/gold/sales/daily_revenue/").count()

exp_bronze_count = spark.read.format("delta").load("dbfs:/bronze/research_ops/experiments/").count()
exp_silver_count = spark.read.format("delta").load("dbfs:/silver/research_ops/experiments/").count()
exp_gold_count = spark.read.format("delta").load("dbfs:/gold/research_ops/daily_metrics/").count()

print(f"Sales Domain:")
print(f"  Bronze: {sales_bronze_count} rows")
print(f"  Silver: {sales_silver_count} rows (deduped/validated)")
print(f"  Gold:   {sales_gold_count} rows (aggregated)")
print(f"\nResearch Ops Domain:")
print(f"  Bronze: {exp_bronze_count} rows")
print(f"  Silver: {exp_silver_count} rows (deduped/validated)")
print(f"  Gold:   {exp_gold_count} rows (aggregated)")

print("\n✓ All queries complete")

## 10. View Delta Table History

Delta Lake provides time travel and audit capabilities.

In [None]:
from delta.tables import DeltaTable

# View history of sales gold table
print("Sales Daily Revenue - Table History:")
print("=" * 80)
dt = DeltaTable.forPath(spark, "dbfs:/gold/sales/daily_revenue/")
dt.history().select("version", "timestamp", "operation", "operationMetrics").show(truncate=False)

# View history of research ops gold table
print("\nResearch Ops Daily Metrics - Table History:")
print("=" * 80)
dt2 = DeltaTable.forPath(spark, "dbfs:/gold/research_ops/daily_metrics/")
dt2.history().select("version", "timestamp", "operation", "operationMetrics").show(truncate=False)

## 11. Cleanup (Optional)

Remove test data and Delta tables. **Run only when done testing!**

In [None]:
# WARNING: This will delete all test data!
# Uncomment to run cleanup

# print("Cleaning up test data...\n")
# 
# # Remove Delta tables
# paths_to_remove = [
#     "dbfs:/bronze/sales/orders/",
#     "dbfs:/silver/sales/orders/",
#     "dbfs:/gold/sales/daily_revenue/",
#     "dbfs:/bronze/research_ops/experiments/",
#     "dbfs:/silver/research_ops/experiments/",
#     "dbfs:/gold/research_ops/daily_metrics/"
# ]
# 
# for path in paths_to_remove:
#     try:
#         dbutils.fs.rm(path, recurse=True)
#         print(f"✓ Removed: {path}")
#     except Exception as e:
#         print(f"✗ Failed to remove {path}: {e}")
# 
# print("\n✓ Cleanup complete")

print("Cleanup commands are commented out.")
print("Uncomment the code above to remove test data.")

## Summary

This notebook demonstrated:

✅ **Environment validation** - Checked Spark and Delta Lake

✅ **Data ingestion** - Uploaded sample CSVs to DBFS

✅ **Bronze layer** - Raw data with metadata

✅ **Silver layer** - Deduplication and validation

✅ **Gold layer** - Business aggregations

✅ **Two domains** - Sales and Research Ops

✅ **Verification** - SQL queries and row counts

✅ **Time travel** - Delta Lake history

### Next Steps:

1. **Modify transformations** - Adjust Silver/Gold logic for your use case
2. **Add more domains** - Create new domain folders and pipelines
3. **Test with larger data** - Upload bigger CSV files
4. **Create dashboards** - Use SQL queries in Databricks SQL
5. **Schedule jobs** - Convert to Databricks Jobs for automation
6. **Connect to ADLS** - For production, use Azure Storage

### Resources:

- [Databricks Documentation](https://docs.databricks.com/)
- [Delta Lake Guide](https://docs.delta.io/)
- [PySpark API](https://spark.apache.org/docs/latest/api/python/)
- [Data Mesh Principles](https://www.datamesh-architecture.com/)