# Databricks Data Engineering: Best Practices & Advanced Patterns

**Production-Ready Patterns for Data Engineering on Databricks**

This notebook covers advanced topics and best practices for building robust, scalable data pipelines.

## Topics Covered:

- üîÑ **Incremental Processing**: Handle updates efficiently
- üîç **Data Quality**: Validation frameworks and error handling
- ‚ö° **Performance Optimization**: Partitioning, caching, and Z-ordering
- üîí **Idempotency**: Make pipelines safe to re-run
- üìä **Monitoring & Logging**: Track pipeline health
- üéØ **Advanced Delta Lake**: Merge operations, SCD patterns
- üß™ **Testing**: Data quality tests and pipeline validation

---


## 1. Incremental Data Processing

Instead of reprocessing all data, process only new or changed records.


In [None]:
# Import libraries
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

print("‚úÖ Libraries imported")


### Pattern 1: Watermark-Based Incremental Loading


In [None]:
# Track the last processed timestamp
def get_last_watermark(table_name):
    """Get the max timestamp from the last successful load"""
    try:
        max_timestamp = spark.sql(f"SELECT MAX(updated_at) as max_ts FROM {table_name}").collect()[0][0]
        return max_timestamp if max_timestamp else "1900-01-01"
    except:
        return "1900-01-01"

# Example: Only process new orders
# last_processed = get_last_watermark("silver.orders")
# new_orders = spark.sql(f"""
#     SELECT * FROM bronze.orders
#     WHERE _ingestion_timestamp > '{last_processed}'
# """)


### Pattern 2: Merge (Upsert) Operations

Handle inserts and updates in a single operation:


In [None]:
# Example: Upsert customer data
# Assume we have updated customer data in a DataFrame called `updated_customers`

# from delta.tables import DeltaTable
# 
# target_table = DeltaTable.forName(spark, "silver.customers")
# 
# target_table.alias("target").merge(
#     updated_customers.alias("source"),
#     "target.customer_id = source.customer_id"
# ).whenMatchedUpdate(
#     set = {
#         "first_name": "source.first_name",
#         "last_name": "source.last_name",
#         "email": "source.email",
#         "updated_at": "current_timestamp()"
#     }
# ).whenNotMatchedInsert(
#     values = {
#         "customer_id": "source.customer_id",
#         "first_name": "source.first_name",
#         "last_name": "source.last_name",
#         "email": "source.email",
#         "updated_at": "current_timestamp()"
#     }
# ).execute()

print("‚úÖ Merge pattern example")


---

## 2. Data Quality Framework

Build robust data quality checks into your pipelines.


In [None]:
# Data Quality Check Framework
class DataQualityValidator:
    """Simple data quality validation framework"""
    
    def __init__(self, df):
        self.df = df
        self.checks = []
        
    def check_not_null(self, column):
        """Ensure column has no nulls"""
        null_count = self.df.filter(col(column).isNull()).count()
        self.checks.append({
            "check": f"{column} NOT NULL",
            "passed": null_count == 0,
            "failed_records": null_count
        })
        return self
    
    def check_range(self, column, min_val, max_val):
        """Ensure numeric column is within range"""
        out_of_range = self.df.filter(
            (col(column) < min_val) | (col(column) > max_val)
        ).count()
        self.checks.append({
            "check": f"{column} BETWEEN {min_val} AND {max_val}",
            "passed": out_of_range == 0,
            "failed_records": out_of_range
        })
        return self
    
    def check_unique(self, column):
        """Ensure column values are unique"""
        total_count = self.df.count()
        unique_count = self.df.select(column).distinct().count()
        self.checks.append({
            "check": f"{column} UNIQUE",
            "passed": total_count == unique_count,
            "failed_records": total_count - unique_count
        })
        return self
    
    def report(self):
        """Print validation report"""
        print("="*70)
        print("DATA QUALITY VALIDATION REPORT")
        print("="*70)
        for check in self.checks:
            status = "‚úÖ PASS" if check["passed"] else "‚ùå FAIL"
            print(f"{status} | {check['check']}")
            if not check["passed"]:
                print(f"     Failed records: {check['failed_records']}")
        print("="*70)
        return all(c["passed"] for c in self.checks)

# Example usage:
# validator = DataQualityValidator(spark.table("silver.orders"))
# validator.check_not_null("order_id") \
#          .check_not_null("customer_id") \
#          .check_range("discount_percent", 0, 100) \
#          .check_unique("order_id") \
#          .report()

print("‚úÖ Data quality framework defined")


### Partitioning

Partition large tables by commonly filtered columns:


In [None]:
-- Example: Partition orders by year and month
-- CREATE TABLE silver.orders_partitioned (
--   order_id BIGINT,
--   customer_id BIGINT,
--   order_date TIMESTAMP,
--   status STRING,
--   -- other columns...
-- ) USING DELTA
-- PARTITIONED BY (order_year, order_month);


### Z-Ordering

Optimize for queries that filter on specific columns:


In [None]:
-- Z-order by columns frequently used in WHERE clauses
-- OPTIMIZE silver.orders ZORDER BY (customer_id, status);


### Caching

Cache frequently accessed DataFrames:


In [None]:
# Cache tables you'll query multiple times
# customers_df = spark.table("silver.customers").cache()
# products_df = spark.table("silver.products").cache()

# Don't forget to unpersist when done
# customers_df.unpersist()

print("‚úÖ Caching examples")


---

## 4. Slowly Changing Dimensions (SCD Type 2)

Track historical changes to dimension tables.


In [None]:
# Example: Customer SCD Type 2 implementation
# 
# SCD Type 2 maintains full history by:
# - Adding effective_date and end_date columns
# - Adding is_current flag
# - Closing old records and inserting new ones on change

# Schema for SCD Type 2:
# - customer_id
# - first_name, last_name, email, etc.
# - effective_date (when this version became active)
# - end_date (when this version was superseded, NULL if current)
# - is_current (TRUE if this is the current version)

print("""
Example SCD Type 2 Logic:

1. New records: Insert with is_current=True, end_date=NULL
2. Changed records: 
   - Update old record: Set is_current=False, end_date=today
   - Insert new record: Set is_current=True, end_date=NULL, effective_date=today
3. Unchanged records: No action needed

This allows you to query:
- Current state: WHERE is_current = True
- Historical state: WHERE effective_date <= '2023-01-01' AND (end_date > '2023-01-01' OR end_date IS NULL)
""")


---

## 5. Error Handling & Dead Letter Queue

Gracefully handle bad records.


In [None]:
# Pattern: Separate good and bad records

# # Example: Validate and split records
# all_records = spark.table("bronze.orders")
# 
# # Good records pass validation
# good_records = all_records.filter(
#     (col("order_id").isNotNull()) &
#     (col("customer_id").isNotNull()) &
#     (col("order_date").isNotNull())
# )
# 
# # Bad records fail validation - send to quarantine table
# bad_records = all_records.exceptAll(good_records) \
#     .withColumn("rejection_reason", lit("Missing required fields")) \
#     .withColumn("rejected_at", current_timestamp())
# 
# # Write bad records to dead letter queue for investigation
# bad_records.write.mode("append").saveAsTable("quarantine.orders_dlq")
# 
# # Process only good records
# good_records.write.mode("append").saveAsTable("silver.orders")

print("‚úÖ Error handling pattern")


---

## 6. Monitoring & Observability

Track pipeline metrics for production monitoring.


In [None]:
# Log pipeline metrics to a monitoring table

# from datetime import datetime
# 
# def log_pipeline_metrics(pipeline_name, status, records_processed, error_message=None):
#     """Log pipeline execution metrics"""
#     metrics = [{
#         "pipeline_name": pipeline_name,
#         "execution_time": datetime.now(),
#         "status": status,  # 'SUCCESS', 'FAILED', 'PARTIAL'
#         "records_processed": records_processed,
#         "error_message": error_message,
#         "spark_app_id": spark.sparkContext.applicationId
#     }]
#     
#     spark.createDataFrame(metrics).write.mode("append").saveAsTable("monitoring.pipeline_runs")
# 
# # Usage:
# try:
#     records_count = process_orders()
#     log_pipeline_metrics("orders_pipeline", "SUCCESS", records_count)
# except Exception as e:
#     log_pipeline_metrics("orders_pipeline", "FAILED", 0, str(e))
#     raise

print("‚úÖ Monitoring pattern")


---

## 7. Key Takeaways

### Production-Ready Checklist:

- ‚úÖ **Idempotent**: Can safely re-run without duplicating data
- ‚úÖ **Incremental**: Only processes new/changed data
- ‚úÖ **Validated**: Data quality checks at each layer
- ‚úÖ **Monitored**: Logs metrics for observability
- ‚úÖ **Resilient**: Handles errors gracefully with DLQ
- ‚úÖ **Performant**: Optimized with partitioning and Z-ordering
- ‚úÖ **Documented**: Clear lineage and metadata
- ‚úÖ **Tested**: Unit and integration tests

### Common Anti-Patterns to Avoid:

- ‚ùå Full table scans on every run
- ‚ùå No data quality checks
- ‚ùå Silent failures (no error handling)
- ‚ùå Hardcoded values instead of parameters
- ‚ùå No monitoring or alerting
- ‚ùå Overwriting data without history
- ‚ùå Missing indexes on large tables

### Resources:

- [Delta Lake Best Practices](https://docs.delta.io/latest/best-practices.html)
- [Databricks Performance Tuning](https://docs.databricks.com/optimizations/index.html)
- [Data Quality on Databricks](https://www.databricks.com/blog/2022/01/19/data-quality-at-scale-with-databricks.html)

---

**Continue learning and building robust data pipelines! üöÄ**
