# Databricks Operational Workspace Setup Guide
## Data Engineering Best Practices

This notebook provides a comprehensive framework for establishing operational patterns in your Databricks workspace:

* **Notebooks Structure** - Organized folder hierarchy and naming conventions
* **Cluster Policies** - Standardized compute configurations and cost controls
* **Secrets Management** - Secure credential access patterns
* **Environment Separation** - Dev/Staging/Production isolation strategies

## 1. Notebooks Structure & Organization

### Recommended Folder Hierarchy
```
/Workspace/
├── Shared/
│   ├── libraries/          # Reusable functions and utilities
│   ├── configs/            # Configuration notebooks
│   └── templates/          # Starter templates
├── Projects/
│   ├── project_name/
│   │   ├── bronze/         # Raw data ingestion
│   │   ├── silver/         # Cleaned/transformed data
│   │   ├── gold/           # Business-level aggregates
│   │   ├── orchestration/  # Workflow definitions
│   │   └── tests/          # Unit and integration tests
└── Users/                  # Individual development workspaces
```

### Naming Conventions
* **Notebooks:** `01_ingest_source_data.py`, `02_transform_customers.sql`
* **Tables:** `{env}_{layer}_{domain}_{entity}` (e.g., `prod_silver_sales_orders`)
* **Jobs:** `{env}_{project}_{pipeline}` (e.g., `prod_sales_daily_etl`)

In [0]:
# Standard notebook header template for data engineering

# ============================================
# NOTEBOOK: Data Ingestion Template
# PURPOSE: Ingest raw data from source systems
# AUTHOR: Data Engineering Team
# ============================================

# Import standard libraries
from pyspark.sql import functions as F
from datetime import datetime, timedelta
import json

# Notebook parameters (for job orchestration)
dbutils.widgets.text("environment", "dev", "Environment (dev/staging/prod)")
dbutils.widgets.text("run_date", "", "Run Date (YYYY-MM-DD)")

env = dbutils.widgets.get("environment")
run_date = dbutils.widgets.get("run_date") or datetime.now().strftime("%Y-%m-%d")

print(f"Environment: {env}")
print(f"Run Date: {run_date}")

In [0]:
# Centralized configuration pattern using Python dictionaries

class WorkspaceConfig:
    """Centralized configuration for environment-specific settings"""
    
    ENVIRONMENTS = {
        "dev": {
            "catalog": "dev_catalog",
            "schema": "engineering",
            "storage_path": "/mnt/dev/data",
            "checkpoint_path": "/mnt/dev/checkpoints",
            "cluster_policy": "dev-policy-id"
        },
        "staging": {
            "catalog": "staging_catalog",
            "schema": "engineering",
            "storage_path": "/mnt/staging/data",
            "checkpoint_path": "/mnt/staging/checkpoints",
            "cluster_policy": "staging-policy-id"
        },
        "prod": {
            "catalog": "prod_catalog",
            "schema": "engineering",
            "storage_path": "/mnt/prod/data",
            "checkpoint_path": "/mnt/prod/checkpoints",
            "cluster_policy": "prod-policy-id"
        }
    }
    
    @staticmethod
    def get_config(environment: str) -> dict:
        """Get configuration for specified environment"""
        if environment not in WorkspaceConfig.ENVIRONMENTS:
            raise ValueError(f"Invalid environment: {environment}. Must be one of {list(WorkspaceConfig.ENVIRONMENTS.keys())}")
        return WorkspaceConfig.ENVIRONMENTS[environment]

# Usage example
config = WorkspaceConfig.get_config(env)
print(f"Catalog: {config['catalog']}")
print(f"Storage Path: {config['storage_path']}")

## 2. Cluster Policies & Compute Management

### Policy Strategy
Cluster policies enforce standardized configurations and cost controls across environments.

### Recommended Policies

**Development Policy:**
* Single-node or small clusters (1-3 workers)
* Auto-termination: 30 minutes
* Spot instances enabled
* DBR: Latest LTS version

**Production Policy:**
* Fixed-size or autoscaling clusters
* Auto-termination: 120 minutes
* On-demand instances
* DBR: Stable LTS version
* Enhanced monitoring enabled

**Interactive Policy:**
* For ad-hoc analysis
* Auto-termination: 60 minutes
* Photon enabled for SQL workloads

In [0]:
# Development Cluster Policy (JSON format)
# Apply via Databricks Admin Console > Compute > Policies

dev_cluster_policy = {
    "name": "Data Engineering - Development",
    "definition": {
        "spark_version": {
            "type": "fixed",
            "value": "15.4.x-scala2.12"
        },
        "node_type_id": {
            "type": "allowlist",
            "values": ["n2-highmem-4", "n2-standard-4"],
            "defaultValue": "n2-standard-4"
        },
        "num_workers": {
            "type": "range",
            "minValue": 0,
            "maxValue": 3,
            "defaultValue": 1
        },
        "autotermination_minutes": {
            "type": "fixed",
            "value": 30
        },
        "gcp_attributes.use_preemptible_executors": {
            "type": "fixed",
            "value": True
        },
        "data_security_mode": {
            "type": "fixed",
            "value": "USER_ISOLATION"
        },
        "spark_conf.spark.databricks.cluster.profile": {
            "type": "fixed",
            "value": "singleNode"
        }
    }
}

print(json.dumps(dev_cluster_policy, indent=2))

In [0]:
# Production Cluster Policy (JSON format)

prod_cluster_policy = {
    "name": "Data Engineering - Production",
    "definition": {
        "spark_version": {
            "type": "fixed",
            "value": "15.4.x-scala2.12"  # Use stable LTS
        },
        "node_type_id": {
            "type": "allowlist",
            "values": ["n2-highmem-4", "n2-highmem-8"],
            "defaultValue": "n2-highmem-4"
        },
        "autoscale": {
            "type": "fixed",
            "value": {
                "min_workers": 2,
                "max_workers": 10
            }
        },
        "autotermination_minutes": {
            "type": "fixed",
            "value": 120
        },
        "gcp_attributes.use_preemptible_executors": {
            "type": "fixed",
            "value": False  # On-demand for production
        },
        "data_security_mode": {
            "type": "fixed",
            "value": "USER_ISOLATION"
        },
        "spark_conf.spark.databricks.delta.optimizeWrite.enabled": {
            "type": "fixed",
            "value": "true"
        },
        "spark_conf.spark.databricks.delta.autoCompact.enabled": {
            "type": "fixed",
            "value": "true"
        }
    }
}

print(json.dumps(prod_cluster_policy, indent=2))

## 3. Secrets Management & Secure Access

### Databricks Secrets Architecture
Use **Databricks Secrets** to securely store credentials, API keys, and connection strings.

### Setup Steps

**1. Create Secret Scopes** (via Databricks CLI or UI):
```bash
# Using Databricks CLI
databricks secrets create-scope --scope dev-secrets
databricks secrets create-scope --scope prod-secrets
```

**2. Add Secrets to Scopes:**
```bash
# Example: Store database password
databricks secrets put --scope dev-secrets --key db-password

# Example: Store API key
databricks secrets put --scope prod-secrets --key api-key
```

**3. Grant Access via ACLs:**
```bash
# Grant read access to a group
databricks secrets put-acl --scope prod-secrets --principal data-eng-team --permission READ
```

### Best Practices
* Separate scopes per environment (dev/staging/prod)
* Use GCP Secret Manager backend for production
* Never hardcode credentials in notebooks
* Rotate secrets regularly

In [0]:
# Secure secrets access pattern in notebooks

class SecretsManager:
    """Wrapper for secure credential access"""
    
    def __init__(self, environment: str):
        self.scope = f"{environment}-secrets"
    
    def get_secret(self, key: str) -> str:
        """Retrieve secret from Databricks secret scope"""
        try:
            return dbutils.secrets.get(scope=self.scope, key=key)
        except Exception as e:
            raise ValueError(f"Failed to retrieve secret '{key}' from scope '{self.scope}': {str(e)}")
    
    def get_jdbc_connection(self, db_type: str) -> dict:
        """Get database connection parameters"""
        return {
            "url": self.get_secret(f"{db_type}-url"),
            "user": self.get_secret(f"{db_type}-user"),
            "password": self.get_secret(f"{db_type}-password")
        }

# Usage example
secrets = SecretsManager(env)

# Access individual secrets
# api_key = secrets.get_secret("external-api-key")

# Access database credentials
# db_config = secrets.get_jdbc_connection("postgres")

print(f"Secrets scope: {secrets.scope}")
print("✓ Secrets manager initialized")

In [0]:
# Example: Connecting to external database using secrets

def read_from_external_db(table_name: str, environment: str):
    """Read data from external database using secure credentials"""
    
    secrets = SecretsManager(environment)
    
    # Retrieve connection details from secrets
    jdbc_url = secrets.get_secret("postgres-url")
    jdbc_user = secrets.get_secret("postgres-user")
    jdbc_password = secrets.get_secret("postgres-password")
    
    # Read data using JDBC
    df = (spark.read
        .format("jdbc")
        .option("url", jdbc_url)
        .option("dbtable", table_name)
        .option("user", jdbc_user)
        .option("password", jdbc_password)
        .option("driver", "org.postgresql.Driver")
        .load()
    )
    
    return df

# Example usage (commented to avoid execution without secrets)
# df = read_from_external_db("public.customers", env)
# display(df.limit(5))

print("✓ Secure connection function defined")

## 4. Environment Separation Strategy

### Multi-Environment Architecture

**Isolation Levels:**

| Component | Dev | Staging | Production |
|-----------|-----|---------|------------|
| **Catalog** | `dev_catalog` | `staging_catalog` | `prod_catalog` |
| **Storage** | `/mnt/dev/` | `/mnt/staging/` | `/mnt/prod/` |
| **Secrets** | `dev-secrets` | `staging-secrets` | `prod-secrets` |
| **Clusters** | Spot/Preemptible | Mixed | On-Demand |
| **Access** | All engineers | Limited team | Restricted |

### Promotion Strategy

1. **Development:** Individual experimentation and feature development
2. **Staging:** Integration testing with production-like data volumes
3. **Production:** Automated deployments via CI/CD

### Unity Catalog Setup
```sql
-- Create environment-specific catalogs
CREATE CATALOG IF NOT EXISTS dev_catalog;
CREATE CATALOG IF NOT EXISTS staging_catalog;
CREATE CATALOG IF NOT EXISTS prod_catalog;

-- Create schemas within catalogs
CREATE SCHEMA IF NOT EXISTS dev_catalog.bronze;
CREATE SCHEMA IF NOT EXISTS dev_catalog.silver;
CREATE SCHEMA IF NOT EXISTS dev_catalog.gold;
```

In [0]:
# Environment-aware data access pattern

class DataAccessLayer:
    """Abstraction layer for environment-aware data access"""
    
    def __init__(self, environment: str):
        self.config = WorkspaceConfig.get_config(environment)
        self.catalog = self.config['catalog']
        self.schema = self.config['schema']
    
    def get_table_path(self, layer: str, table_name: str) -> str:
        """Generate fully qualified table name"""
        return f"{self.catalog}.{layer}.{table_name}"
    
    def read_table(self, layer: str, table_name: str):
        """Read table from specified layer"""
        table_path = self.get_table_path(layer, table_name)
        print(f"Reading from: {table_path}")
        return spark.table(table_path)
    
    def write_table(self, df, layer: str, table_name: str, mode: str = "overwrite"):
        """Write DataFrame to specified layer"""
        table_path = self.get_table_path(layer, table_name)
        print(f"Writing to: {table_path} (mode: {mode})")
        
        (df.write
            .format("delta")
            .mode(mode)
            .option("mergeSchema", "true")
            .saveAsTable(table_path)
        )
        
        return table_path

# Usage example
data_access = DataAccessLayer(env)

# Example table paths
print("Bronze layer:", data_access.get_table_path("bronze", "raw_events"))
print("Silver layer:", data_access.get_table_path("silver", "cleaned_events"))
print("Gold layer:", data_access.get_table_path("gold", "daily_metrics"))

In [0]:
# CI/CD deployment pattern for notebooks

import subprocess
import sys

class DeploymentManager:
    """Manage notebook deployments across environments"""
    
    def __init__(self, source_env: str, target_env: str):
        self.source_env = source_env
        self.target_env = target_env
    
    def validate_deployment(self) -> bool:
        """Pre-deployment validation checks"""
        checks = {
            "source_env_valid": self.source_env in ["dev", "staging"],
            "target_env_valid": self.target_env in ["staging", "prod"],
            "promotion_path_valid": self._validate_promotion_path()
        }
        
        print("Deployment Validation:")
        for check, result in checks.items():
            status = "✓" if result else "✗"
            print(f"  {status} {check}: {result}")
        
        return all(checks.values())
    
    def _validate_promotion_path(self) -> bool:
        """Ensure proper promotion path (dev->staging->prod)"""
        valid_paths = [
            ("dev", "staging"),
            ("staging", "prod")
        ]
        return (self.source_env, self.target_env) in valid_paths
    
    def deploy(self, notebook_path: str):
        """Deploy notebook to target environment"""
        if not self.validate_deployment():
            raise ValueError("Deployment validation failed")
        
        print(f"\nDeploying {notebook_path}")
        print(f"  From: {self.source_env}")
        print(f"  To: {self.target_env}")
        print("\n[Deployment would execute via Databricks CLI/API]")

# Example usage
deployment = DeploymentManager(source_env="dev", target_env="staging")
deployment.validate_deployment()

In [0]:
# Unit testing pattern for data pipelines

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from datetime import datetime

class DataQualityTests:
    """Data quality testing framework"""
    
    def __init__(self, df):
        self.df = df
        self.results = []
    
    def test_not_empty(self, test_name: str = "Not Empty"):
        """Test that DataFrame is not empty"""
        count = self.df.count()
        passed = count > 0
        self.results.append({"test": test_name, "passed": passed, "details": f"Row count: {count}"})
        return self
    
    def test_no_nulls(self, columns: list, test_name: str = "No Nulls"):
        """Test that specified columns have no null values"""
        for col in columns:
            null_count = self.df.filter(F.col(col).isNull()).count()
            passed = null_count == 0
            self.results.append({
                "test": f"{test_name} - {col}",
                "passed": passed,
                "details": f"Null count: {null_count}"
            })
        return self
    
    def test_unique(self, column: str, test_name: str = "Unique Values"):
        """Test that column has unique values"""
        total_count = self.df.count()
        distinct_count = self.df.select(column).distinct().count()
        passed = total_count == distinct_count
        self.results.append({
            "test": f"{test_name} - {column}",
            "passed": passed,
            "details": f"Total: {total_count}, Distinct: {distinct_count}"
        })
        return self
    
    def report(self):
        """Print test results"""
        print("\n" + "="*60)
        print("DATA QUALITY TEST RESULTS")
        print("="*60)
        
        passed_count = sum(1 for r in self.results if r["passed"])
        total_count = len(self.results)
        
        for result in self.results:
            status = "✓ PASS" if result["passed"] else "✗ FAIL"
            print(f"{status} | {result['test']}: {result['details']}")
        
        print("="*60)
        print(f"Results: {passed_count}/{total_count} tests passed")
        print("="*60 + "\n")
        
        return passed_count == total_count

# Example usage with sample data
sample_schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("created_at", TimestampType(), False)
])

sample_data = [
    (1, "Record A", datetime.now()),
    (2, "Record B", datetime.now()),
    (3, "Record C", datetime.now())
]

sample_df = spark.createDataFrame(sample_data, sample_schema)

# Run tests
tests = DataQualityTests(sample_df)
tests.test_not_empty().test_no_nulls(["id", "name"]).test_unique("id").report()

In [0]:
# Pipeline monitoring and logging pattern

import logging
from datetime import datetime

class PipelineMonitor:
    """Monitor pipeline execution and log metrics"""
    
    def __init__(self, pipeline_name: str, environment: str):
        self.pipeline_name = pipeline_name
        self.environment = environment
        self.start_time = datetime.now()
        self.metrics = {}
    
    def log_metric(self, metric_name: str, value):
        """Log a pipeline metric"""
        self.metrics[metric_name] = value
        print(f"[METRIC] {metric_name}: {value}")
    
    def log_stage(self, stage_name: str, status: str = "started"):
        """Log pipeline stage"""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"[{timestamp}] [{self.environment.upper()}] {stage_name}: {status}")
    
    def finalize(self, status: str = "success"):
        """Finalize monitoring and log summary"""
        end_time = datetime.now()
        duration = (end_time - self.start_time).total_seconds()
        
        print("\n" + "="*60)
        print(f"PIPELINE EXECUTION SUMMARY: {self.pipeline_name}")
        print("="*60)
        print(f"Environment: {self.environment}")
        print(f"Status: {status.upper()}")
        print(f"Duration: {duration:.2f} seconds")
        print(f"Start Time: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"End Time: {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
        
        if self.metrics:
            print("\nMetrics:")
            for metric, value in self.metrics.items():
                print(f"  - {metric}: {value}")
        
        print("="*60 + "\n")
        
        # In production, send metrics to monitoring system
        # self._send_to_monitoring_system()
    
    def _send_to_monitoring_system(self):
        """Send metrics to external monitoring (e.g., Datadog, CloudWatch)"""
        # Implementation would integrate with your monitoring platform
        pass

# Example usage
monitor = PipelineMonitor("customer_etl", env)

monitor.log_stage("Data Ingestion", "started")
monitor.log_metric("records_ingested", 10000)
monitor.log_stage("Data Ingestion", "completed")

monitor.log_stage("Data Transformation", "started")
monitor.log_metric("records_transformed", 9850)
monitor.log_stage("Data Transformation", "completed")

monitor.log_stage("Data Quality Checks", "started")
monitor.log_metric("quality_score", 98.5)
monitor.log_stage("Data Quality Checks", "completed")

monitor.finalize(status="success")