# Vehicle Maintenance Pipeline Integration Tests

This notebook contains end-to-end tests for the vehicle maintenance data pipeline:
1. Test Data Generation
2. Bronze Layer Validation
3. Silver Layer Transformation Tests
4. Gold Layer Analytics Validation
5. Data Quality Checks
6. Performance Metrics

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *
import json
from datetime import datetime, timedelta
import random
import pytest

# Initialize Spark session
spark = SparkSession.builder \
    .appName("VehicleMaintenance-Tests") \
    .getOrCreate()

# Set database
spark.sql("USE vehicle_maintenance")

def test_setup():
    """Verify environment setup and connectivity"""
    try:
        # Check Delta Lake setup
        tables = spark.sql("SHOW TABLES").collect()
        assert len(tables) > 0, "No tables found in database"
        
        # Check GCS mount
        mount_point = "/mnt/vehicle-data"
        assert any(mount_point in mount.mountPoint for mount in dbutils.fs.mounts()), \
            "GCS mount point not found"
            
        print("✓ Environment setup verified")
        return True
    except Exception as e:
        print(f"✗ Setup verification failed: {str(e)}")
        return False

# Run setup test
setup_ok = test_setup()

In [None]:
# Generate test data
def generate_test_data(num_records=100):
    """Generate synthetic maintenance records for testing"""
    
    vehicles = [f"VIN{i:03d}" for i in range(10)]
    service_types = [
        "Oil Change", "Brake Repair", "Tire Rotation",
        "Engine Tune-up", "Filter Replacement", "Inspection"
    ]
    technicians = ["John", "Alice", "Bob", "Carol", "David"]
    parts = [
        ["Oil Filter", "Engine Oil"],
        ["Brake Pads", "Brake Fluid"],
        ["Air Filter"],
        ["Spark Plugs", "Ignition Coil"],
        ["Fuel Filter", "Air Filter"],
        []
    ]
    
    data = []
    base_date = datetime.now() - timedelta(days=365)
    
    for _ in range(num_records):
        vehicle = random.choice(vehicles)
        service_idx = random.randint(0, len(service_types)-1)
        date = base_date + timedelta(days=random.randint(0, 365))
        
        record = {
            "vehicle_id": vehicle,
            "maintenance_date": date.isoformat(),
            "service_type": service_types[service_idx],
            "mileage": random.randint(5000, 100000),
            "cost": round(random.uniform(50, 1000), 2),
            "technician": random.choice(technicians),
            "notes": f"Regular {service_types[service_idx]}",
            "parts_used": parts[service_idx]
        }
        data.append(record)
    
    # Create DataFrame
    test_df = spark.createDataFrame(spark.sparkContext.parallelize(data))
    return test_df

def test_bronze_ingestion(test_df):
    """Test Bronze layer ingestion and validation"""
    try:
        # Write test data to bronze
        test_df.write.format("delta") \
            .mode("append") \
            .saveAsTable("vehicle_maintenance.test_bronze_maintenance")
        
        # Verify data
        bronze_df = spark.table("vehicle_maintenance.test_bronze_maintenance")
        assert bronze_df.count() >= len(test_df.collect()), "Record count mismatch"
        
        # Check schema
        required_cols = ["vehicle_id", "maintenance_date", "service_type", "mileage"]
        assert all(col in bronze_df.columns for col in required_cols), "Missing required columns"
        
        print("✓ Bronze layer tests passed")
        return True
    except Exception as e:
        print(f"✗ Bronze layer test failed: {str(e)}")
        return False

# Generate and test bronze
if setup_ok:
    test_data = generate_test_data()
    bronze_ok = test_bronze_ingestion(test_data)

In [None]:
# Test Silver layer transformations
def test_silver_transformations():
    """Test Silver layer data cleaning and enrichment"""
    try:
        # Read bronze test data
        bronze_df = spark.table("vehicle_maintenance.test_bronze_maintenance")
        
        # Apply silver transformations
        silver_df = bronze_df \
            .withColumn("maintenance_date", to_date("maintenance_date")) \
            .withColumn("service_category", 
                when(col("service_type").isin(["Oil Change", "Filter Replacement"]), "Routine")
                .when(col("service_type").isin(["Brake Repair", "Engine Tune-up"]), "Repair")
                .when(col("service_type") == "Inspection", "Inspection")
                .otherwise("Other")) \
            .withColumn("labor_cost", col("cost") * 0.6) \
            .withColumn("parts_cost", col("cost") * 0.4) \
            .withColumn("maintenance_status",
                when(current_date() > add_months(col("maintenance_date"), 6), "Due")
                .otherwise("Current")) \
            .withColumn("next_maintenance_date", add_months(col("maintenance_date"), 6))
        
        # Write to test silver table
        silver_df.write.format("delta") \
            .mode("append") \
            .saveAsTable("vehicle_maintenance.test_silver_maintenance")
        
        # Verify transformations
        assert "service_category" in silver_df.columns, "Missing service categorization"
        assert "labor_cost" in silver_df.columns, "Missing cost breakdown"
        assert silver_df.filter(col("service_category").isNull()).count() == 0, \
            "Found uncategorized services"
        
        print("✓ Silver layer tests passed")
        return True, silver_df
    except Exception as e:
        print(f"✗ Silver layer test failed: {str(e)}")
        return False, None

# Run silver tests if bronze passed
if 'bronze_ok' in locals() and bronze_ok:
    silver_ok, silver_df = test_silver_transformations()