In [0]:
%pip install requests

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import requests

def extract_products():
    """
    Extract 50 users and 100 products from FakerAPI.
    """
    product_url = "https://fakerapi.it/api/v2/products?_quantity=100"
    
    print("Fetching products from API...")
    product_response = requests.get(product_url)
    product_response.raise_for_status()
    
    data = product_response.json()["data"]
    products_data = []

    for product in data:
      products_data.append({
          "id": product["id"],
          "name": product["name"],
          "description": product["description"],
          "upc": product["upc"],
          "image": product["image"],
          "net_price": product["net_price"],
          "taxes": product["taxes"],
          "price": product["price"],
          "categories": product["categories"]
      })
    
    return products_data

In [0]:
def test_extraction():
    """
    Unit test to validate schema and data types from user and product APIs.
    """
    print("=== UNIT TEST: extract_users_and_products ===")
    
    products = extract_products()
    
    for product in products:
        assert "id" in product and isinstance(product["id"], int)
        assert "name" in product and isinstance(product["name"], str)
        assert "description" in product and isinstance(product["description"], str)
        assert "upc" in product and isinstance(product["upc"], str)
        assert "image" in product and isinstance(product["image"], str)
        assert "net_price" in product and isinstance(product["net_price"], (int, float))
        assert "taxes" in product and isinstance(product["taxes"], (int, float))
        assert "price" in product and isinstance(product["price"], (int, float))
        assert "categories" in product and isinstance(product["categories"], list)

    print("✅ All schema and data type tests PASSED")

In [0]:
test_extraction()

=== UNIT TEST: extract_users_and_products ===
Fetching products from API...
✅ All schema and data type tests PASSED


In [0]:
def transform_product_data(raw_products):
    """
    Transform API product data to our internal format.
    Skips invalid or corrupt records.
    """
    transformed_products = []
    for product in raw_products:
        try:
            if not product.get('id') and isinstance(product.get('id'), int):
              print(f"Skipping product ID {product['id']}: Missing supplier_id")
              continue

            price = float(product['price'])
            net_price = float(product.get('net_price', 0))
            if price <= 0:
                print(f"Skipping product ID {product['id']}: Non-positive price ({price})")
                continue

            clean_product = {
                'product_id': product['id'],
                'price': price,
                'net_price': net_price,
                'description': product['description'],
                'taxes': product.get('taxes')
            }
            transformed_products.append(clean_product)

        except (ValueError, TypeError) as e:
            print(f"Skipping product ID {product.get('id', 'Unknown')}: Invalid price format → {e}")
            continue

    return transformed_products

In [0]:
def test_transform_product_data():
    print("\n=== UNIT TEST: transform_product_data ===")

    raw_data = [
      # ✅ Valid product
      {
          "id": 1,
          "name": "Valid Product 1",
          "description": "All values present and valid.",
          "ean": "1234567890123",
          "upc": "123456789012",
          "price": 199.99,
          "net_price": 179.99,
          "taxes": 25
      },

      # ❌ Negative price
      {
          "id": 2,
          "name": "Negative Price Product",
          "description": "Price is negative.",
          "ean": "1234567890125",
          "upc": "123456789014",
          "price": -89.99,
          "net_price": -79.99,
          "taxes": 25
      },

      # ❌ Zero price
      {
          "id": 3,
          "name": "Zero Price Product",
          "description": "Zero price should be skipped.",
          "ean": "1234567890126",
          "upc": "123456789015",
          "price": 0.0,
          "net_price": 0.0,
          "taxes": 25
      },

      # ✅ Another valid product
      {
          "id": 4,
          "name": "Valid Product 2",
          "description": "Everything is clean.",
          "ean": "1234567890127",
          "upc": "123456789016",
          "price": 59.5,
          "net_price": 50.0,
          "taxes": 25
      },

      # ❌ Price as a string (invalid type)
      {
          "id": 5,
          "name": "Invalid Type Product",
          "description": "Price is a string.",
          "ean": "1234567890128",
          "upc": "123456789017",
          "price": "free",
          "net_price": "zero",
          "taxes": 25
      },

      # ✅ Valid product
      {
          "id": 8,
          "name": "Valid Product 3",
          "description": "Great product with full data.",
          "ean": "1234567890130",
          "upc": "123456789019",
          "price": 300.0,
          "net_price": 280.0,
          "taxes": 25
      }
  ]


    expected_ids = [1, 4, 8]  # Only these should be transformed successfully

    result = transform_product_data(raw_data)
    result_ids = [r["product_id"] for r in result]

    assert result_ids == expected_ids, f"❌ Transform test FAILED. Expected: {expected_ids}, Got: {result_ids}"
    print("✅ Transform test PASSED")
    return True


In [0]:
test_transform_product_data()


=== UNIT TEST: transform_product_data ===
Skipping product ID 2: Non-positive price (-89.99)
Skipping product ID 3: Non-positive price (0.0)
Skipping product ID 5: Invalid price format → could not convert string to float: 'free'
✅ Transform test PASSED


True

In [0]:
def simulate_problematic_api_data(real_api_data):
    """
    Simulate edge cases that might occur in real API responses.
    These represent scenarios not covered in unit testing.
    """
    # Start with a copy of real data
    problematic_data = real_api_data.copy()
    
    # Edge Case 1: Completely corrupted product structure
    problematic_data.append({
        "id": None,  # Null ID
        "name": "",  # Empty name
        "description": None,  # Null description
        "categories": "not_a_list",  # Wrong type
        # Missing all price-related fields
    })
    
    # Edge Case 2: Product with nested price structure (API change)
    problematic_data.append({
        "id": 99999,
        "name": "Nested Price Product",
        "description": "Product with complex pricing",
        "price": {  # Price is now an object instead of number
            "amount": 150.0,
            "currency": "USD"
        },
        "net_price": [129.99],  # Net price is now a list
    })
    
    # Edge Case 3: Product with extreme data values
    problematic_data.append({
        "id": "not_a_number",  # ID as string instead of int
        "name": "A" * 1000,  # Extremely long name
        "description": None,
        "price": float('inf'),  # Infinite price
        "net_price": float('-inf'),  # Negative infinite net price
        "taxes": 20
    })
    
    # Edge Case 4: Product with special characters causing encoding issues
    problematic_data.append({
        "id": None,
        "name": "Product with 特殊字符 and émojis 🚀💰",
        "description": "Test\x00null\x00byte",  # Null bytes in string
        "price": "€199.99",  # Price with currency symbol
        "net_price": "£150,50",  # Price with comma decimal separator
        "taxes": 10
    })
    
    return problematic_data


In [0]:
def test_integration_extract_and_transform():
    """
    Integration test that combines extract_products() and transform_product_data().
    This test is designed to FAIL due to realistic edge cases not handled by the functions.
    """
    print("\n" + "="*60)
    print("🧪 INTEGRATION TEST: extract_products + transform_product_data")
    print("="*60)
    
    try:
        # Step 1: Extract real data from API
        print("Step 1: Extracting data from API...")
        real_api_data = extract_products()
        print(f"Successfully extracted {len(real_api_data)} products from API")
        
        # Step 2: Simulate problematic data that might occur in production
        print("\nStep 2: Simulating edge cases not covered in unit testing...")
        problematic_data = simulate_problematic_api_data(real_api_data)
        print(f"Created {len(problematic_data)} products with edge cases")
        
        # Step 3: Attempt transformation (this should reveal failures)
        print("\nStep 3: Attempting to transform problematic data...")
        transformed_data = transform_product_data(problematic_data)
        
        # Test whether length of data taken as an input is equal to the length of output data
        assert len(problematic_data) == len(transformed_data), "❌ Data is skipped due to None values in product_id, or due to other reason!"
        
        # This assertion will FAIL - we expect all products to have required fields
        for product in transformed_data:
            assert 'product_id' in product, "Missing product_id in transformed data"
            assert 'price' in product, "Missing price in transformed data" 
            assert product['price'] > 0, "Invalid price in transformed data"
        
        print("✅ Integration test PASSED (unexpected)")
        return True
        
    except AssertionError as e:
        print(f"\nINTEGRATION TEST FAILED: {e}")
        print("\n🔍 Analysis of failures:")
        print("- Missing 'supplier_id' field in API response (not provided by FakerAPI)")
        print("- Missing 'price' field in some products (simulated API inconsistency)")
        print("- Corrupted data structures not handled by transform function")
        print("- Special data types and edge values causing transformation failures")
        print("\n💡 These failures represent realistic production scenarios!")
        return False
        
    # except Exception as e:
    #     print(f"\nINTEGRATION TEST CRASHED: {e}")
    #     print("This indicates the functions cannot handle unexpected data formats")
    #     return False

In [0]:
# Extract + Transform with edge cases
test_integration_extract_and_transform()


🧪 INTEGRATION TEST: extract_products + transform_product_data
Step 1: Extracting data from API...
Fetching products from API...
Successfully extracted 100 products from API

Step 2: Simulating edge cases not covered in unit testing...
Created 104 products with edge cases

Step 3: Attempting to transform problematic data...


[0;31m---------------------------------------------------------------------------[0m
[0;31mKeyError[0m                                  Traceback (most recent call last)
File [0;32m<command-8085892093823539>, line 2[0m
[1;32m      1[0m [38;5;66;03m# Extract + Transform with edge cases[39;00m
[0;32m----> 2[0m test_integration_extract_and_transform()

File [0;32m<command-7829083480830920>, line 23[0m, in [0;36mtest_integration_extract_and_transform[0;34m()[0m
[1;32m     21[0m [38;5;66;03m# Step 3: Attempt transformation (this should reveal failures)[39;00m
[1;32m     22[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124mStep 3: Attempting to transform problematic data...[39m[38;5;124m"[39m)
[0;32m---> 23[0m transformed_data [38;5;241m=[39m transform_product_data(problematic_data)
[1;32m     25[0m [38;5;66;03m# Test whether length of data taken as an input is equal to the length of output data[39;00m
[1;32m     26[0m [38;5;28;0


The above error clearly states that we forgot to deal with the case that what will happen if price is empty, the price doesn't exists, and that's why there is no logic to handle this situation, so from our end we were sure that records will contain price, but in realistic some situation can come where the column that we expect doesn't exists and in that case it can leads to failures or big impact, and thus to identify that issue for reporting, we perform integration testing


### First start with the below simple use case, and then proceed with the above use case

In [0]:
def extract_source_data():
    """Simulates extracting raw data."""
    print("EXTRACT: Getting raw customer data...")

    # Assume that this data is fetched from the database
    raw_data = [
        {"id": 1, "name": "Alice Wonderland", "email": "alice@example.com"},
        {"id": 2, "name": "", "email": "bob@example.com"},  # missing name
        {"id": 3, "name": "Charlie Brown", "email": None},  # missing email
        {"id": 4, "name": "David", "email": "david@company.org"}  # normal
    ]
    return raw_data

In [0]:
def transform_customer_data(customer_list):
    """Transforms customer data, e.g., adds a domain field and makes name uppercase."""
    print("TRANSFORM: Processing customer data...")
    transformed_list = []
    if not customer_list: # Basic check for empty input
        print("TRANSFORM: No data to transform.")
        return []
    for customer in customer_list:
        try:
            name_parts = customer.get("name", "").split(" ", 1)
            first_name = name_parts[0] if name_parts else "Unknown"
            
            email_domain = customer.get("email", "").split("@")[-1] if "@" in customer.get("email", "") else "unknown"
            
            transformed_list.append({
                "customer_id": customer.get("id"),
                "full_name_upper": customer.get("name", "").upper(),
                "first_name": first_name,
                "email_domain": email_domain
            })
        except Exception as e:
            print(f"TRANSFORM: Error processing customer {customer.get('id')}: {e}")
            
    return transformed_list

In [0]:
# --- Integration Test ---
def test_extraction_and_transformation():
    print("\n--- Running Integration Test: Extraction & Transformation ---")
    
    # 1. Call the first component
    extracted_data = extract_source_data()
    
    # 2. Pass its output to the second component
    transformed_data = transform_customer_data(extracted_data)
    
    # 3. Validate the interaction and final output
    print("\nVALIDATE: Checking transformed data...")
    
    # Check if we got any data back
    assert len(transformed_data) == len(extracted_data), f"Expected {len(extracted_data)} records, got {len(transformed_data)}"
    print(f"Test Passed: Got {len(transformed_data)} records as expected.")

    # Check specifics of the first transformed record (example)
    first_record = transformed_data[0]
    expected_first_record_name = "ALICE WONDERLAND"

    assert first_record["full_name_upper"] == expected_first_record_name, \
        f"Expected name '{expected_first_record_name}', got '{first_record['full_name_upper']}'"
    print(f"Test Passed: First record name '{first_record['full_name_upper']}' is correct.")
    
    expected_first_record_domain = "example.com"
    assert first_record["email_domain"] == expected_first_record_domain, \
        f"Expected domain '{expected_first_record_domain}', got '{first_record['email_domain']}'"
    print(f"Test Passed: First record email domain '{first_record['email_domain']}' is correct.")
    
    print("--- Integration Test Completed Successfully! ---")
    # For display in notebook:
    print("\nFinal Transformed Data:")
    for record in transformed_data:
        print(record)

In [0]:
test_extraction_and_transformation()


--- Running Integration Test: Extraction & Transformation ---
EXTRACT: Getting raw customer data...
TRANSFORM: Processing customer data...
TRANSFORM: Error processing customer 3: argument of type 'NoneType' is not iterable

VALIDATE: Checking transformed data...


[0;31m---------------------------------------------------------------------------[0m
[0;31mAssertionError[0m                            Traceback (most recent call last)
File [0;32m<command-8085892093823536>, line 1[0m
[0;32m----> 1[0m test_extraction_and_transformation()

File [0;32m<command-8085892093823535>, line 15[0m, in [0;36mtest_extraction_and_transformation[0;34m()[0m
[1;32m     12[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;130;01m\n[39;00m[38;5;124mVALIDATE: Checking transformed data...[39m[38;5;124m"[39m)
[1;32m     14[0m [38;5;66;03m# Check if we got any data back[39;00m
[0;32m---> 15[0m [38;5;28;01massert[39;00m [38;5;28mlen[39m(transformed_data) [38;5;241m==[39m [38;5;28mlen[39m(extracted_data), [38;5;124mf[39m[38;5;124m"[39m[38;5;124mExpected [39m[38;5;132;01m{[39;00m[38;5;28mlen[39m(extracted_data)[38;5;132;01m}[39;00m[38;5;124m records, got [39m[38;5;132;01m{[39;00m[38;5;28mlen[39m(transformed_data)[38;5;132;01m


**The above test case indicates that due to problem in bad data, that was not identified in extract or transformed data, the record of that bad data was skipped in transformation, and that problem identified in the Integration Testing, that we expect 4 records but got only 3 records means some mistake occured in transformation**


## Boundary Testing

#### WHAT IS BOUNDARY TESTING?

**SIMPLE ANALOGY:** Restaurant Kitchen
- INTERNAL BOUNDARY: Kitchen staff passing food between cooking stations
  (Small change = Minor impact: Chef changes garnish style)
  
- EXTERNAL BOUNDARY: Kitchen receiving ingredients from suppliers
  (Small change = MAJOR impact: Supplier sends wrong ingredients)

IN DATA PIPELINES:
- INTERNAL: Between our pipeline steps (we control both sides)
- EXTERNAL: Between our pipeline and external systems (we control only one side)

KEY INSIGHT: External boundary changes cause bigger problems!

In [0]:
from pyspark.sql.functions import col, when
from pyspark.sql import Row

print("\nPART 1: BOUNDARY TESTING")
print("=" * 50)

print("""

""")

def create_sample_orders():
    """
    Create sample order data (simulating internal data)
    """
    orders = [
        Row(order_id="ORD001", customer_id="CUST001", product="Laptop", amount=1200.0, status="completed"),
        Row(order_id="ORD002", customer_id="CUST002", product="Phone", amount=800.0, status="completed"),
        Row(order_id="ORD003", customer_id="CUST003", product="Tablet", amount=400.0, status="pending")
    ]
    return spark.createDataFrame(orders)

# internal boundary step 1
def categorize_orders_by_amount(orders_df):
    """
    Categorizes each order into 'Premium', 'Standard', or 'Basic' based on the order amount.

    - 'Premium' for orders with amount >= 1000
    - 'Standard' for orders with amount >= 500 and < 1000
    - 'Basic' for orders with amount < 500
    """
    print("Categorizing orders based on amount")
    
    categorized_df = orders_df.withColumn(
        "order_category",
        when(col("amount") >= 1000, "Premium")
        .when(col("amount") >= 500, "Standard")
        .otherwise("Basic")
    )
    
    return categorized_df


def apply_discounts_by_category(categorized_df):
    """
    Applies discount rates based on the order category and calculates the final amount:

    - 'Premium' orders receive a 10% discount
    - 'Standard' orders receive a 5% discount
    - 'Basic' orders receive a 2% discount

    Adds two new columns:
    - 'discount_rate': The applied discount rate
    - 'final_amount': The amount after applying the discount
    """
    print("Applying discount rates based on order category")
    
    final_df = categorized_df.withColumn(
        "discount_rate",
        when(col("order_category") == "Premium", 0.10)
        .when(col("order_category") == "Standard", 0.05)
        .otherwise(0.02)
    ).withColumn(
        "final_amount",
        col("amount") * (1 - col("discount_rate"))
    )
    
    return final_df

def test_internal_boundary_change():
    """
    Test what happens when we change internal boundary (low impact)
    """
    print("\nTESTING INTERNAL BOUNDARY CHANGES")
    print("-" * 40)
    
    # Original pipeline
    print("Original Internal Pipeline:")
    orders = create_sample_orders()
    step1_result = categorize_orders_by_amount(orders)
    step2_result = apply_discounts_by_category(step1_result)
    step2_result.show()
    
    # Small change in internal boundary (change category threshold)
    print("\nAfter Internal Change (Premium threshold 800 → 1000):")
    
    def modified_internal_step1(orders_df):
        # Changed: Premium threshold from 1000 to 800
        return orders_df.withColumn(
            "order_category",
            when(col("amount") >= 800, "Premium")  # Changed threshold
            .when(col("amount") >= 500, "Standard")
            .otherwise("Basic")
        )
    
    modified_step1 = modified_internal_step1(orders)
    modified_step2 = apply_discounts_by_category(modified_step1)
    modified_step2.show()
    
    print("RESULT: Internal change has predictable, manageable impact")

# Run internal boundary test
orders_data = create_sample_orders()
test_internal_boundary_change()


PART 1: BOUNDARY TESTING




TESTING INTERNAL BOUNDARY CHANGES
----------------------------------------
Original Internal Pipeline:
Categorizing orders based on amount
Applying discount rates based on order category
+--------+-----------+-------+------+---------+--------------+-------------+------------+
|order_id|customer_id|product|amount|   status|order_category|discount_rate|final_amount|
+--------+-----------+-------+------+---------+--------------+-------------+------------+
|  ORD001|    CUST001| Laptop|1200.0|completed|       Premium|          0.1|      1080.0|
|  ORD002|    CUST002|  Phone| 800.0|completed|      Standard|         0.05|       760.0|
|  ORD003|    CUST003| Tablet| 400.0|  pending|         Basic|         0.02|       392.0|
+--------+-----------+-------+------+---------+--------------+-------------+------------+


After Internal Change (Premium threshold 800 → 1000):
Applying discount rates based on order category
+--------+-----------+-------+------+---------+--

In [0]:
from pyspark.sql import Row

def load_external_customer_data_v1():
    """
    Loads customer data from the external system - Version 1.
    Schema:
    - cust_id
    - name
    - email
    """
    customers = [
        Row(cust_id="CUST001", name="John Smith", email="john@email.com"),
        Row(cust_id="CUST002", name="Sarah Johnson", email="sarah@email.com"),
        Row(cust_id="CUST003", name="Mike Brown", email="mike@email.com")
    ]
    return spark.createDataFrame(customers)

def load_external_customer_data_v2():
    """
    Loads customer data from the updated external system - Version 2.
    Schema changed:
    - customer_id (was cust_id)
    - full_name (was name)
    - contact_email (was email)
    - phone (new field)
    """
    customers = [
        Row(customer_id="CUST001", full_name="John Smith", contact_email="john@email.com", phone="123-456-7890"),
        Row(customer_id="CUST002", full_name="Sarah Johnson", contact_email="sarah@email.com", phone="234-567-8901"),
        Row(customer_id="CUST003", full_name="Mike Brown", contact_email="mike@email.com", phone="345-678-9012")
    ]
    return spark.createDataFrame(customers)

def join_orders_with_customers(orders_df, customers_df, version="v1"):
    """
    Joins orders with customer data depending on the external system version.
    
    Parameters:
    - orders_df: DataFrame containing orders
    - customers_df: DataFrame containing customer data (schema depends on version)
    - version: 'v1' for legacy format, 'v2' for updated format

    Returns:
    - Tuple: (joined DataFrame, join success flag)
    """
    try:
        if version == "v1":
            joined_df = orders_df.join(customers_df, orders_df.customer_id == customers_df.cust_id, "left") \
                .select(
                    orders_df["*"],
                    customers_df["name"].alias("customer_name"),
                    customers_df["email"].alias("customer_email")
                )
        elif version == "v2":
            joined_df = orders_df.join(customers_df, orders_df.customer_id == customers_df.customer_id, "left") \
                .select(
                    orders_df["*"],
                    customers_df["full_name"].alias("customer_name"),
                    customers_df["contact_email"].alias("customer_email"),
                    customers_df["phone"]
                )
        else:
            raise ValueError("Unsupported version passed to join logic.")
        
        return joined_df, True

    except Exception as e:
        print(f"❌ Join failed for version '{version}': {e}")
        return orders_df, False

def test_external_customer_schema_change():
    """
    Simulates and tests the impact of external customer schema change on the pipeline.
    """
    print("\n\n🧪 TESTING EXTERNAL SYSTEM SCHEMA CHANGE")
    print("-" * 40)

    orders = create_sample_orders()  # Assume this is defined elsewhere

    # ✅ Test with external system v1
    print("✅ Testing with External Customer Data V1:")
    customers_v1 = load_external_customer_data_v1()
    joined_v1, success_v1 = join_orders_with_customers(orders, customers_v1, version="v1")
    if success_v1:
        joined_v1.select("order_id", "customer_id", "product", "customer_name", "customer_email").show()

    # ❌ Test with external system v2
    print("\n❌ Testing with External Customer Data V2 (Changed Format):")
    customers_v2 = load_external_customer_data_v2()
    joined_v2, success_v2 = join_orders_with_customers(orders, customers_v2, version="v2")
    
    if not success_v2:
        print("💥 PIPELINE BROKEN! Could not join with new customer schema.")
    else:
        joined_v2.select("order_id", "customer_id", "product", "customer_name", "customer_email", "phone").show()

    print("\n🚨 RESULT: External schema changes require pipeline adjustments.")

# Run the test
test_external_customer_schema_change()



🧪 TESTING EXTERNAL SYSTEM SCHEMA CHANGE
----------------------------------------
✅ Testing with External Customer Data V1:
+--------+-----------+-------+-------------+---------------+
|order_id|customer_id|product|customer_name| customer_email|
+--------+-----------+-------+-------------+---------------+
|  ORD001|    CUST001| Laptop|   John Smith| john@email.com|
|  ORD002|    CUST002|  Phone|Sarah Johnson|sarah@email.com|
|  ORD003|    CUST003| Tablet|   Mike Brown| mike@email.com|
+--------+-----------+-------+-------------+---------------+


❌ Testing with External Customer Data V2 (Changed Format):
+--------+-----------+-------+-------------+---------------+------------+
|order_id|customer_id|product|customer_name| customer_email|       phone|
+--------+-----------+-------+-------------+---------------+------------+
|  ORD001|    CUST001| Laptop|   John Smith| john@email.com|123-456-7890|
|  ORD002|    CUST002|  Phone|Sarah Johnson|sarah@email.com|234-567-8901|
|  ORD003|    CUS

In [0]:
from pyspark.sql import Row

def load_external_customer_data_v1():
    """
    Loads customer data from the external system - Version 1.
    Schema:
    - cust_id
    - name
    - email
    """
    customers = [
        Row(cust_id="CUST001", name="John Smith", email="john@email.com"),
        Row(cust_id="CUST002", name="Sarah Johnson", email="sarah@email.com"),
        Row(cust_id="CUST003", name="Mike Brown", email="mike@email.com")
    ]
    return spark.createDataFrame(customers)

def load_external_customer_data_v2():
    """
    Loads customer data from the updated external system - Version 2.
    Schema changed:
    - customer_id (was cust_id)
    - full_name (was name)
    - contact_email (was email)
    - phone (new field)
    """
    customers = [
        Row(id="CUST001", full_name="John Smith", contact_email="john@email.com", phone="123-456-7890"),
        Row(id="CUST002", full_name="Sarah Johnson", contact_email="sarah@email.com", phone="234-567-8901"),
        Row(id="CUST003", full_name="Mike Brown", contact_email="mike@email.com", phone="345-678-9012")
    ]
    return spark.createDataFrame(customers)

def join_orders_with_customers(orders_df, customers_df):
    """
    Joins orders with customer data depending on the external system version.
    
    Parameters:
    - orders_df: DataFrame containing orders
    - customers_df: DataFrame containing customer data (schema depends on version)

    Returns:
    - Tuple: (joined DataFrame, join success flag)
    """
    try:
        joined_df = orders_df.join(customers_df, orders_df.customer_id == customers_df.cust_id, "left") \
                .select(
                    orders_df["*"],
                    customers_df["name"].alias("customer_name"),
                    customers_df["email"].alias("customer_email")
        )
        
        return joined_df, True

    except Exception as e:
        print(f"❌ Join failed: {e}")
        return orders_df, False

def test_external_customer_schema_change():
    """
    Simulates and tests the impact of external customer schema change on the pipeline.
    """
    print("\n\n🧪 TESTING EXTERNAL SYSTEM SCHEMA CHANGE")
    print("-" * 40)

    orders = create_sample_orders()  # Assume this is defined elsewhere

    # ✅ Test with external system v1
    print("✅ Testing with External Customer Data V1:")
    customers_v1 = load_external_customer_data_v1()
    joined_v1, success_v1 = join_orders_with_customers(orders, customers_v1)
    if success_v1:
        joined_v1.select("order_id", "customer_id", "product", "customer_name", "customer_email").show()

    # ❌ Test with external system v2
    print("\n❌ Testing with External Customer Data V2 (Changed Format):")
    customers_v2 = load_external_customer_data_v2()
    joined_v2, success_v2 = join_orders_with_customers(orders, customers_v2)
    
    if not success_v2:
        print("💥 PIPELINE BROKEN! Could not join with new customer schema.")
    else:
        joined_v2.select("order_id", "customer_id", "product", "customer_name", "customer_email", "phone").show()

    print("\n🚨 RESULT: External schema changes require pipeline adjustments.")

# Run the test
test_external_customer_schema_change()



🧪 TESTING EXTERNAL SYSTEM SCHEMA CHANGE
----------------------------------------
✅ Testing with External Customer Data V1:
+--------+-----------+-------+-------------+---------------+
|order_id|customer_id|product|customer_name| customer_email|
+--------+-----------+-------+-------------+---------------+
|  ORD001|    CUST001| Laptop|   John Smith| john@email.com|
|  ORD002|    CUST002|  Phone|Sarah Johnson|sarah@email.com|
|  ORD003|    CUST003| Tablet|   Mike Brown| mike@email.com|
+--------+-----------+-------+-------------+---------------+


❌ Testing with External Customer Data V2 (Changed Format):
❌ Join failed: [ATTRIBUTE_NOT_SUPPORTED] Attribute `cust_id` is not supported.
💥 PIPELINE BROKEN! Could not join with new customer schema.

🚨 RESULT: External schema changes require pipeline adjustments.



## Dependency Testing

WHAT IS DEPENDENCY TESTING?

SIMPLE ANALOGY: Assembly Line Factory
- Raw materials arrive in specific format (schema)
- If supplier changes material format → assembly line breaks
- If materials arrive late → production delays

IN DATA PIPELINES:
- Upstream system changes schema → our transformations break
- Data arrives late → downstream processes affected
- New columns added → logic might ignore them

KEY TYPES:
1. Schema Changes (column names, data types)
2. Data Freshness (timing of data arrival)

In [0]:
from pyspark.sql.functions import sum, count

def create_upstream_data_old_schema():
    """
    Upstream system original schema
    """
    transactions = [
        Row(txn_id="TXN001", customer="CUST001", amount=100.50, date="2024-01-15"),
        Row(txn_id="TXN002", customer="CUST002", amount=250.75, date="2024-01-16"),
        Row(txn_id="TXN003", customer="CUST003", amount=75.25, date="2024-01-17")
    ]
    return spark.createDataFrame(transactions)

def create_upstream_data_new_schema():
    """
    Upstream system changed schema! Added new columns, changed types
    """
    transactions = [
        Row(transaction_id="TXN001", customer_id="CUST001", transaction_amount=100.50, 
            transaction_date="2024-01-15", transaction_type="purchase", currency="USD"),
        Row(transaction_id="TXN002", customer_id="CUST002", transaction_amount=250.75, 
            transaction_date="2024-01-16", transaction_type="purchase", currency="USD"),
        Row(transaction_id="TXN003", customer_id="CUST003", transaction_amount=75.25, 
            transaction_date="2024-01-17", transaction_type="refund", currency="USD")
    ]
    return spark.createDataFrame(transactions)

def our_transformation_logic_v1(transactions_df):
    """
    Our original transformation logic (expects old schema)
    """
    try:
        # Our logic expects: txn_id, customer, amount, date
        summary = transactions_df.groupBy("customer").agg(
            sum("amount").alias("total_amount"),
            count("txn_id").alias("transaction_count")
        )
        return summary, True
    except Exception as e:
        print(f"❌ Transformation failed: {e}")
        return None, False

def our_transformation_logic_v2(transactions_df):
    """
    Updated transformation logic (handles schema changes gracefully)
    """
    try:
        # Check which schema we're dealing with
        columns = transactions_df.columns
        
        if "txn_id" in columns:
            # Old schema
            print("📋 Detected old schema format")
            customer_col = "customer"
            amount_col = "amount"
            id_col = "txn_id"
        elif "transaction_id" in columns:
            # New schema
            print("📋 Detected new schema format")
            customer_col = "customer_id"
            amount_col = "transaction_amount"
            id_col = "transaction_id"
        else:
            raise Exception("Unknown schema format!")
        
        # Flexible transformation
        summary = transactions_df.groupBy(customer_col).agg(
            sum(amount_col).alias("total_amount"),
            count(id_col).alias("transaction_count")
        )
        
        return summary, True
    except Exception as e:
        print(f"❌ Flexible transformation failed: {e}")
        return None, False

def test_schema_dependency():
    """
    Test how schema changes affect our pipeline
    """
    print("\nTESTING SCHEMA DEPENDENCY")
    print("-" * 40)
    
    # Test with old schema
    print("Testing with Old Schema:")
    old_data = create_upstream_data_old_schema()
    print("Old Schema Columns:", old_data.columns)
    old_data.show(3)
    
    result_old, success_old = our_transformation_logic_v1(old_data)
    if success_old:
        print("Transformation Result:")
        result_old.show()
    
    # Test with new schema using old logic
    print("\nTesting New Schema with Old Logic:")
    new_data = create_upstream_data_new_schema()
    print("New Schema Columns:", new_data.columns)
    new_data.show(3)
    
    result_new, success_new = our_transformation_logic_v1(new_data)
    
    # Test with new schema using flexible logic
    print("\nTesting New Schema with Flexible Logic:")
    result_flexible, success_flexible = our_transformation_logic_v2(new_data)
    if success_flexible:
        print("Flexible Transformation Result:")
        result_flexible.show()

test_schema_dependency()


TESTING SCHEMA DEPENDENCY
----------------------------------------
Testing with Old Schema:
Old Schema Columns: ['txn_id', 'customer', 'amount', 'date']
+------+--------+------+----------+
|txn_id|customer|amount|      date|
+------+--------+------+----------+
|TXN001| CUST001| 100.5|2024-01-15|
|TXN002| CUST002|250.75|2024-01-16|
|TXN003| CUST003| 75.25|2024-01-17|
+------+--------+------+----------+

Transformation Result:
+--------+------------+-----------------+
|customer|total_amount|transaction_count|
+--------+------------+-----------------+
| CUST001|       100.5|                1|
| CUST002|      250.75|                1|
| CUST003|       75.25|                1|
+--------+------------+-----------------+


Testing New Schema with Old Logic:
New Schema Columns: ['transaction_id', 'customer_id', 'transaction_amount', 'transaction_date', 'transaction_type', 'currency']
+--------------+-----------+------------------+----------------+----------------+--------+
|transaction_id|cust


So we do schema testing in boundary testing also, but in that our main aim is to focus on the data accuracy, completeness, data types, and the data other component accepts is properly passed by the previous component. These all testing takes place in Boundary testing. So normally data types checks, schema checks etc. we do in internal boundary testing and external boundary testing both. But the **goal in boundary testing** is: To ensure the **handoff** between components is reliable — meaning the _correct data_ is passed _in the expected structure_, with proper **accuracy**, **completeness**, and **types**.


But schema checking plays a critical role in dependency testing, as the main aim of dependency testing is to test that any change in external source doesn't crash the pipeline. So if there are any unexpected changes like change in structure, change in API urls, network connections, credentials changes, etc. So any changes on which the system depends shouldn't crash the entire pipeline, instead it should handle the cases with proper logs

In **dependency testing**, schema checking is not just about _validation_ — it’s about **guarding against breakage** due to changes in **external systems** your pipeline depends on.

The aim is:

> ⚠️ _"If the schema of an external source/API changes, your pipeline should either adapt or fail gracefully — not crash silently or corrupt data."_

That’s why **dependency testing** also involves:

- **Error handling**
- **Logging**
- **Graceful fallback or fail-fast mechanisms**
- **Alerts**
    
So you nailed it — schema testing becomes **critical** in dependency testing because **external systems are brittle** and **outside your control**.


## Idempotence Testing

WHAT IS END-TO-END TESTING?

SIMPLE ANALOGY: Bank Account Balance
- You deposit $100 → Balance should increase by $100
- You run same deposit again → Balance should NOT change again (idempotent)
- Only new transactions should change balance

IN DATA PIPELINES:
- Same source data → Same target result (idempotent)
- Only approved source changes → Target changes
- Complete pipeline validation (schema + data + business rules)

KEY CONCEPTS:
1. Idempotence (repeatable results)
2. Change approval (controlled updates)
3. Complete data journey validation

In [0]:
def create_source_orders():
    """
    Source order data that should produce consistent results
    """
    orders = [
        Row(customer_id=1, orders=2),
        Row(customer_id=3, orders=4)
    ]
    return spark.createDataFrame(orders)

def create_existing_target():
    """
    Existing aggregated target data
    """
    target = [
        Row(customer_id=1, total_orders=5),
        Row(customer_id=2, total_orders=10),
        Row(customer_id=3, total_orders=15)
    ]
    return spark.createDataFrame(target)

def upsert_logic(source_df, target_df):
    """
    Upsert logic: Update existing customers, insert new ones
    """
    # This logic should be idempotent
    
    # Join source with target to find updates
    joined = source_df.alias("src").join(
        target_df.alias("tgt"), 
        col("src.customer_id") == col("tgt.customer_id"), 
        "full_outer"
    )
    
    # Calculate new totals
    result = joined.select(
        when(col("src.customer_id").isNotNull(), col("src.customer_id"))
        .otherwise(col("tgt.customer_id")).alias("customer_id"),
        
        when(col("src.customer_id").isNotNull(), 
             col("tgt.total_orders") + col("src.orders"))
        .otherwise(col("tgt.total_orders")).alias("total_orders")
    ).filter(col("customer_id").isNotNull())
    
    return result

def test_idempotence():
    """
    Test that running pipeline multiple times gives same result
    """
    print("\n🔄 TESTING IDEMPOTENCE")
    print("-" * 40)
    
    source = create_source_orders()
    initial_target = create_existing_target()
    
    print("📊 Source Data:")
    source.show()
    
    print("📊 Initial Target Data:")
    initial_target.show()
    
    # First run
    print("🔄 First Pipeline Run:")
    result1 = upsert_logic(source, initial_target)
    result1 = result1.orderBy("customer_id")
    result1.show()
    
    # Second run with same source (should be idempotent)
    print("🔄 Second Pipeline Run (Same Source):")
    result2 = upsert_logic(source, result1)
    result2 = result2.orderBy("customer_id")
    result2.show()
    
    # Check if results are identical
    result1_data = [row.asDict() for row in result1.collect()]
    result2_data = [row.asDict() for row in result2.collect()]
    
    is_idempotent = result1_data == result2_data
    print(f"🎯 Idempotence Test: {'✅ PASSED' if is_idempotent else '❌ FAILED'}")
    
    if not is_idempotent:
        print("💥 PROBLEM: Pipeline is not idempotent! Results change on re-run")
    else:
        print("✅ GOOD: Pipeline is idempotent - same input gives same output")

test_idempotence()


🔄 TESTING IDEMPOTENCE
----------------------------------------
📊 Source Data:
+-----------+------+
|customer_id|orders|
+-----------+------+
|          1|     2|
|          3|     4|
+-----------+------+

📊 Initial Target Data:
+-----------+------------+
|customer_id|total_orders|
+-----------+------------+
|          1|           5|
|          2|          10|
|          3|          15|
+-----------+------------+

🔄 First Pipeline Run:
+-----------+------------+
|customer_id|total_orders|
+-----------+------------+
|          1|           7|
|          2|          10|
|          3|          19|
+-----------+------------+

🔄 Second Pipeline Run (Same Source):
+-----------+------------+
|customer_id|total_orders|
+-----------+------------+
|          1|           9|
|          2|          10|
|          3|          23|
+-----------+------------+

🎯 Idempotence Test: ❌ FAILED
💥 PROBLEM: Pipeline is not idempotent! Results change on re-run


In [0]:
class MockDatabase:
    """Simple mock database for testing ELT operations"""
    
    def __init__(self):
        # In-memory storage to simulate database tables
        self.tables = {
            'customers': [
                {'id': 1, 'name': 'John Doe', 'email': 'john@email.com'},
                {'id': 2, 'name': 'Jane Smith', 'email': 'jane@email.com'}
            ],
            'orders': [
                {'order_id': 101, 'customer_id': 1, 'amount': 250.00, 'status': 'completed'},
                {'order_id': 102, 'customer_id': 2, 'amount': 150.50, 'status': 'pending'}
            ]
        }
        self.is_connected = False
    
    def connect(self):
        """Simulate database connection"""
        print("Connecting to database...")
        self.is_connected = True
        return "Connection successful"
    
    def disconnect(self):
        """Simulate database disconnection"""
        print("Disconnecting from database...")
        self.is_connected = False
    
    def execute_query(self, query):
        """Simulate SQL query execution"""
        if not self.is_connected:
            raise Exception("Database not connected")
        
        # Simple query simulation
        if "SELECT * FROM customers" in query:
            return self.tables['customers']
        elif "SELECT * FROM orders" in query:
            return self.tables['orders']
        else:
            return []
    
    def insert_data(self, table_name, data):
        """Simulate data insertion"""
        if not self.is_connected:
            raise Exception("Database not connected")
        
        if table_name in self.tables:
            self.tables[table_name].append(data)
            return f"Data inserted into {table_name}"
        else:
            raise Exception(f"Table {table_name} not found")

# ============================================================================
# 2. Mock REST API Service
# ============================================================================

class MockAPIService:
    """Simple mock REST API for testing external service calls"""
    
    def __init__(self):
        # Mock API data
        self.api_data = {
            'users': [
                {'user_id': 1, 'username': 'alice', 'active': True},
                {'user_id': 2, 'username': 'bob', 'active': False}
            ],
            'products': [
                {'product_id': 'P001', 'name': 'Laptop', 'price': 999.99},
                {'product_id': 'P002', 'name': 'Mouse', 'price': 25.00}
            ]
        }
        self.request_count = 0
    
    def get_data(self, endpoint):
        """Simulate GET request to API endpoint"""
        self.request_count += 1
        print(f"API Request #{self.request_count} to endpoint: {endpoint}")
        
        if endpoint == '/users':
            return {'status': 200, 'data': self.api_data['users']}
        elif endpoint == '/products':
            return {'status': 200, 'data': self.api_data['products']}
        else:
            return {'status': 404, 'error': 'Endpoint not found'}
    
    def post_data(self, endpoint, data):
        """Simulate POST request to API"""
        self.request_count += 1
        print(f"API POST Request #{self.request_count} to {endpoint}")
        
        if endpoint == '/users':
            new_id = len(self.api_data['users']) + 1
            data['user_id'] = new_id
            self.api_data['users'].append(data)
            return {'status': 201, 'message': 'User created', 'data': data}
        else:
            return {'status': 400, 'error': 'Invalid endpoint'}

# ============================================================================
# 3. Mock File System
# ============================================================================

class MockFileSystem:
    """Simple mock file system for testing file operations"""
    
    def __init__(self):
        # In-memory file storage
        self.files = {
            '/data/input/customers.csv': 'id,name,email\n1,John,john@email.com\n2,Jane,jane@email.com',
            '/data/input/sales.json': '{"sales": [{"id": 1, "amount": 100}, {"id": 2, "amount": 200}]}'
        }
    
    def file_exists(self, file_path):
        """Check if file exists"""
        return file_path in self.files
    
    def read_file(self, file_path):
        """Read file content"""
        if file_path in self.files:
            return self.files[file_path]
        else:
            raise FileNotFoundError(f"File not found: {file_path}")
    
    def write_file(self, file_path, content):
        """Write content to file"""
        self.files[file_path] = content
        return f"File written: {file_path}"
    
    def list_files(self, directory):
        """List files in directory"""
        files_in_dir = []
        for file_path in self.files.keys():
            if file_path.startswith(directory):
                files_in_dir.append(file_path)
        return files_in_dir

# ============================================================================
# 4. Integration Test Examples
# ============================================================================

def test_database_integration():
    """Example integration test for database operations"""
    print("=== Testing Database Integration ===")
    
    # Create mock database
    db = MockDatabase()
    
    # Test connection
    db.connect()
    
    # Test data retrieval
    customers = db.execute_query("SELECT * FROM customers")
    print(f"Retrieved {len(customers)} customers")
    
    # Test data insertion
    new_customer = {'id': 3, 'name': 'Bob Johnson', 'email': 'bob@email.com'}
    result = db.insert_data('customers', new_customer)
    print(result)
    
    # Verify insertion
    updated_customers = db.execute_query("SELECT * FROM customers")
    print(f"Now have {len(updated_customers)} customers")
    
    db.disconnect()
    print("Database test completed\n")

def test_api_integration():
    """Example integration test for API service"""
    print("=== Testing API Integration ===")
    
    # Create mock API service
    api = MockAPIService()
    
    # Test GET request
    response = api.get_data('/users')
    print(f"API Response: {response['status']}")
    print(f"Users retrieved: {len(response['data'])}")
    
    # Test POST request
    new_user = {'username': 'charlie', 'active': True}
    post_response = api.post_data('/users', new_user)
    print(f"POST Response: {post_response['status']}")
    
    print(f"Total API requests made: {api.request_count}")
    print("API test completed\n")

def test_file_system_integration():
    """Example integration test for file operations"""
    print("=== Testing File System Integration ===")
    
    # Create mock file system
    fs = MockFileSystem()
    
    # Test file existence
    csv_file = '/data/input/customers.csv'
    if fs.file_exists(csv_file):
        print(f"File exists: {csv_file}")
        
        # Test file reading
        content = fs.read_file(csv_file)
        lines = content.split('\n')
        print(f"File has {len(lines)} lines")
    
    # Test file writing
    output_file = '/data/output/processed_data.txt'
    fs.write_file(output_file, 'Processing completed successfully')
    
    # Test directory listing
    input_files = fs.list_files('/data/input/')
    print(f"Files in input directory: {len(input_files)}")
    
    print("File system test completed\n")

# ============================================================================
# 5. Complete ELT Pipeline Mock Test
# ============================================================================

def test_complete_elt_pipeline():
    """Example of testing complete ELT pipeline with mocks"""
    print("=== Testing Complete ELT Pipeline ===")
    
    # Initialize all mock systems
    db = MockDatabase()
    api = MockAPIService()
    fs = MockFileSystem()
    
    try:
        # Step 1: Extract data from API
        print("Step 1: Extracting data from API...")
        api_data = api.get_data('/users')
        users = api_data['data']
        
        # Step 2: Load data from file system
        print("Step 2: Loading data from file system...")
        csv_content = fs.read_file('/data/input/customers.csv')
        
        # Step 3: Transform data (simple example)
        print("Step 3: Transforming data...")
        active_users = [user for user in users if user['active']]
        print(f"Found {len(active_users)} active users")
        
        # Step 4: Load transformed data to database
        print("Step 4: Loading data to database...")
        db.connect()
        for user in active_users:
            customer_data = {
                'id': user['user_id'] + 100,  # Transform ID
                'name': user['username'],
                'email': f"{user['username']}@company.com"
            }
            db.insert_data('customers', customer_data)
        
        # Step 5: Verify results
        print("Step 5: Verifying results...")
        all_customers = db.execute_query("SELECT * FROM customers")
        print(f"Total customers in database: {len(all_customers)}")
        
        # Step 6: Write summary report
        summary = f"ELT Pipeline completed successfully\nProcessed {len(active_users)} active users"
        fs.write_file('/data/output/pipeline_summary.txt', summary)
        
        print("ELT Pipeline test completed successfully!")
        
    except Exception as e:
        print(f"Pipeline test failed: {e}")
    
    finally:
        db.disconnect()

In [0]:
print("Mock External Systems for Integration Testing")
print("=" * 50)
    
# Run individual tests
test_database_integration()
test_api_integration()
test_file_system_integration()
    
# Run complete pipeline test
test_complete_elt_pipeline()
    
print("\nAll integration tests completed!")
print("\nKey Learning Points:")
print("1. Mock objects simulate real external systems")
print("2. Tests can run without actual external dependencies")
print("3. Easy to control test data and scenarios")
print("4. Fast execution and reliable results")
print("5. Can test error conditions safely")

Mock External Systems for Integration Testing
=== Testing Database Integration ===
Connecting to database...
Retrieved 2 customers
Data inserted into customers
Now have 3 customers
Disconnecting from database...
Database test completed

=== Testing API Integration ===
API Request #1 to endpoint: /users
API Response: 200
Users retrieved: 2
API POST Request #2 to /users
POST Response: 201
Total API requests made: 2
API test completed

=== Testing File System Integration ===
File exists: /data/input/customers.csv
File has 3 lines
Files in input directory: 2
File system test completed

=== Testing Complete ELT Pipeline ===
Step 1: Extracting data from API...
API Request #1 to endpoint: /users
Step 2: Loading data from file system...
Step 3: Transforming data...
Found 1 active users
Step 4: Loading data to database...
Connecting to database...
Step 5: Verifying results...
Total customers in database: 3
ELT Pipeline test completed successfully!
Disconnecting from database...

All integration

In [0]:
%pip install faker

Collecting faker
  Obtaining dependency information for faker from https://files.pythonhosted.org/packages/ce/99/045b2dae19a01b9fbb23b9971bc04f4ef808e7f3a213d08c81067304a210/faker-37.3.0-py3-none-any.whl.metadata
  Downloading faker-37.3.0-py3-none-any.whl.metadata (15 kB)
Downloading faker-37.3.0-py3-none-any.whl (1.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.9 MB[0m [31m1.5 MB/s[0m eta [36m0:00:02[0m
[2K   [91m━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/1.9 MB[0m [31m1.1 MB/s[0m eta [36m0:00:02[0m
[2K   [91m━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.1/1.9 MB[0m [31m1.4 MB/s[0m eta [36m0:00:02[0m
[2K   [91m━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.3/1.9 MB[0m [31m2.3 MB/s[0m eta [36m0:00:01[0m
[2K   [91m━━━━━━━━━━━━[0m[90m╺[0

In [0]:
from pyspark.sql import Row
from pyspark.sql.functions import col, sum as spark_sum, count, when, lit, avg, max as spark_max
from pyspark.sql.types import *

# Import Faker for realistic data generation
from faker import Faker
from faker.providers import internet, person, company, automotive, date_time
import random
from datetime import datetime, timedelta
from unittest.mock import MagicMock
import json

In [0]:
# Initialize Faker
fake = Faker()

In [0]:
def demonstrate_faker_basics():
    """
    Show basic Faker capabilities for generating realistic data
    """
    print("🎲 Basic Faker Demonstration")
    print("-" * 30)
    
    print("🧑 Personal Information:")
    for i in range(3):
        print(f"  Name: {fake.name()}")
        print(f"  Email: {fake.email()}")
        print(f"  Phone: {fake.phone_number()}")
        print(f"  Address: {fake.address()}")
        print(f"  Birthday: {fake.date_of_birth()}")
        print()
    
    print("🏢 Business Information:")
    for i in range(3):
        print(f"  Company: {fake.company()}")
        print(f"  Job: {fake.job()}")
        print(f"  Department: {fake.bs()}")
        print()
    
    print("💰 Financial Information:")
    for i in range(3):
        print(f"  Credit Card: {fake.credit_card_number()}")
        print(f"  Bank Account: {fake.iban()}")
        print(f"  Currency: {fake.currency_code()}")
        print()
    
    print("✅ Faker generates realistic, random data every time!")

demonstrate_faker_basics()

🎲 Basic Faker Demonstration
------------------------------
🧑 Personal Information:
  Name: Brandon Murray
  Email: joannesmith@example.com
  Phone: (972)230-1849x34170
  Address: 7556 Terrell Pines Apt. 607
Nguyenhaven, WI 09297
  Birthday: 2003-11-05

  Name: Andrew Leblanc
  Email: gonzalezanthony@example.net
  Phone: 278-736-4747
  Address: 330 Kristin Union
Patrickport, PA 79662
  Birthday: 2001-08-06

  Name: Chelsea Morton
  Email: lauraking@example.org
  Phone: (347)797-9449x4936
  Address: 4755 Valerie Mission
Davidstad, IL 62529
  Birthday: 1997-10-09

🏢 Business Information:
  Company: Gordon Inc
  Job: Counselling psychologist
  Department: engage world-class users

  Company: Duncan-Edwards
  Job: Engineer, automotive
  Department: whiteboard next-generation action-items

  Company: Meyers, Perez and Jackson
  Job: Drilling engineer
  Department: deploy magnetic experiences

💰 Financial Information:
  Credit Card: 2223701888523248
  Bank Account: GB70SIUA24617499227231
  Cu

In [0]:
def generate_customer_data(num_customers=100):
    """
    Generate realistic customer data using Faker
    """
    print(f"🎲 Generating {num_customers} realistic customers...")
    
    customers = []
    for i in range(num_customers):
        customer = Row(
            customer_id=f"CUST{i+1:05d}",
            first_name=fake.first_name(),
            last_name=fake.last_name(),
            email=fake.email(),
            phone=fake.phone_number(),
            address=fake.address().replace('\n', ', '),
            city=fake.city(),
            country=fake.country(),
            date_of_birth=fake.date_of_birth(minimum_age=18, maximum_age=80),
            registration_date=fake.date_between(start_date='-2y', end_date='today'),
            account_status=fake.random_element(elements=('active', 'inactive', 'suspended')),
            credit_score=fake.random_int(min=300, max=850),
            annual_income=fake.random_int(min=25000, max=200000)
        )
        customers.append(customer)
    
    return spark.createDataFrame(customers)

In [0]:
def generate_transaction_data(num_transactions=500, customer_ids=None):
    """
    Generate realistic transaction data using Faker
    """
    print(f"🎲 Generating {num_transactions} realistic transactions...")
    
    if customer_ids is None:
        customer_ids = [f"CUST{i+1:05d}" for i in range(100)]
    
    transactions = []
    for i in range(num_transactions):
        transaction = Row(
            transaction_id=f"TXN{i+1:08d}",
            customer_id=fake.random_element(elements=customer_ids),
            transaction_date=fake.date_between(start_date='-1y', end_date='today'),
            transaction_time=fake.time(),
            amount=round(fake.random.uniform(5.0, 2000.0), 2),
            transaction_type=fake.random_element(elements=('purchase', 'refund', 'transfer')),
            merchant_name=fake.company(),
            category=fake.random_element(elements=('groceries', 'electronics', 'clothing', 'dining', 'travel', 'entertainment')),
            payment_method=fake.random_element(elements=('credit_card', 'debit_card', 'bank_transfer', 'digital_wallet')),
            status=fake.random_element(elements=('completed', 'pending', 'failed'))
        )
        transactions.append(transaction)
    
    return spark.createDataFrame(transactions)

In [0]:
def generate_product_data(num_products=200):
    """
    Generate realistic product catalog using Faker
    """
    print(f"🎲 Generating {num_products} realistic products...")
    
    categories = ['Electronics', 'Clothing', 'Home & Garden', 'Books', 'Sports', 'Beauty']
    
    products = []
    for i in range(num_products):
        category = fake.random_element(elements=categories)
        
        # Generate category-specific product names
        if category == 'Electronics':
            product_name = fake.random_element(elements=['Smartphone', 'Laptop', 'Tablet', 'Headphones', 'TV', 'Camera'])
        elif category == 'Clothing':
            product_name = fake.random_element(elements=['T-Shirt', 'Jeans', 'Dress', 'Jacket', 'Shoes', 'Hat'])
        else:
            product_name = fake.word().title() + " " + fake.word().title()
        
        product = Row(
            product_id=f"PROD{i+1:05d}",
            product_name=product_name + " " + fake.color_name(),
            category=category,
            brand=fake.company(),
            price=round(fake.random.uniform(9.99, 999.99), 2),
            cost=round(fake.random.uniform(5.0, 500.0), 2),
            stock_quantity=fake.random_int(min=0, max=1000),
            supplier=fake.company(),
            weight=round(fake.random.uniform(0.1, 50.0), 2),
            dimensions=f"{fake.random_int(1,50)}x{fake.random_int(1,50)}x{fake.random_int(1,50)} cm",
            rating=round(fake.random.uniform(1.0, 5.0), 1),
            review_count=fake.random_int(min=0, max=5000)
        )
        products.append(product)
    
    return spark.createDataFrame(products)

In [0]:
# Generate sample datasets
print("\n📊 Generating Sample Datasets:")
sample_customers = generate_customer_data(10)
sample_transactions = generate_transaction_data(20, [f"CUST{i+1:05d}" for i in range(10)])
sample_products = generate_product_data(15)

print("\n📋 Sample Customer Data:")
sample_customers.show(5)

print("\n📋 Sample Transaction Data:")
sample_transactions.show(5)

print("\n📋 Sample Product Data:")
sample_products.show(5)


📊 Generating Sample Datasets:
🎲 Generating 10 realistic customers...
🎲 Generating 20 realistic transactions...
🎲 Generating 15 realistic products...

📋 Sample Customer Data:
+-----------+----------+---------+--------------------+--------------------+--------------------+-----------------+---------+-------------+-----------------+--------------+------------+-------------+
|customer_id|first_name|last_name|               email|               phone|             address|             city|  country|date_of_birth|registration_date|account_status|credit_score|annual_income|
+-----------+----------+---------+--------------------+--------------------+--------------------+-----------------+---------+-------------+-----------------+--------------+------------+-------------+
|  CUST00001|     Bryan|   Carter|danielmendez@exam...|  (680)746-4055x2076|94788 Anderson Tr...|   Sandersborough|Greenland|   1953-06-23|       2024-10-20|      inactive|         413|       181584|
|  CUST00002|      Lisa| 

In [0]:
def extract_customer_data_from_source(num_customers=1000):
    """
    EXTRACTION: Simulate extracting customer data from external system
    """
    print("📥 STEP 1: Extracting Customer Data")
    print("-" * 40)
    
    try:
        # Simulate data extraction from external CRM system
        print("🔄 Connecting to CRM system...")
        print("🔄 Executing customer data query...")
        
        customers_df = generate_customer_data(num_customers)
        
        print(f"✅ Successfully extracted {customers_df.count()} customer records")
        print("📊 Sample extracted data:")
        customers_df.show(3)
        
        return customers_df, True
        
    except Exception as e:
        print(f"❌ Customer extraction failed: {e}")
        return None, False

def extract_transaction_data_from_source(customer_ids, num_transactions=5000):
    """
    EXTRACTION: Simulate extracting transaction data from external system
    """
    print("\n📥 STEP 2: Extracting Transaction Data")
    print("-" * 40)
    
    try:
        # Simulate data extraction from external transaction system
        print("🔄 Connecting to transaction database...")
        print("🔄 Executing transaction data query...")
        
        transactions_df = generate_transaction_data(num_transactions, customer_ids)
        
        print(f"✅ Successfully extracted {transactions_df.count()} transaction records")
        print("📊 Sample extracted data:")
        transactions_df.show(3)
        
        return transactions_df, True
        
    except Exception as e:
        print(f"❌ Transaction extraction failed: {e}")
        return None, False

def extract_product_data_from_source(num_products=1000):
    """
    EXTRACTION: Simulate extracting product data from external system
    """
    print("\n📥 STEP 3: Extracting Product Data")
    print("-" * 40)
    
    try:
        # Simulate data extraction from external product catalog
        print("🔄 Connecting to product catalog system...")
        print("🔄 Executing product data query...")
        
        products_df = generate_product_data(num_products)
        
        print(f"✅ Successfully extracted {products_df.count()} product records")
        print("📊 Sample extracted data:")
        products_df.show(3)
        
        return products_df, True
        
    except Exception as e:
        print(f"❌ Product extraction failed: {e}")
        return None, False

In [0]:
def transform_customer_data(customers_df):
    """
    TRANSFORMATION: Clean and enrich customer data
    """
    print("⚙️ STEP 1: Transforming Customer Data")
    print("-" * 40)
    
    try:
        from pyspark.sql.functions import upper, lower, regexp_replace, concat, lit, current_date, datediff
        
        # Data cleaning and standardization
        cleaned_customers = customers_df.withColumn(
            "full_name", concat(col("first_name"), lit(" "), col("last_name"))
        ).withColumn(
            "email_clean", lower(col("email"))
        ).withColumn(
            "phone_clean", regexp_replace(col("phone"), "[^0-9]", "")
        ).withColumn(
            "age", (datediff(current_date(), col("date_of_birth")) / 365.25).cast("int")
        )
        
        # Customer segmentation based on credit score and income
        segmented_customers = cleaned_customers.withColumn(
            "customer_segment",
            when((col("credit_score") >= 750) & (col("annual_income") >= 75000), "Premium")
            .when((col("credit_score") >= 650) & (col("annual_income") >= 50000), "Standard")
            .when(col("credit_score") >= 550, "Basic")
            .otherwise("Risk")
        ).withColumn(
            "risk_category",
            when(col("credit_score") >= 750, "Low Risk")
            .when(col("credit_score") >= 650, "Medium Risk")
            .otherwise("High Risk")
        )
        
        print(f"✅ Transformed {segmented_customers.count()} customer records")
        print("📊 Customer segmentation summary:")
        segmented_customers.groupBy("customer_segment").count().show()
        
        return segmented_customers, True
        
    except Exception as e:
        print(f"❌ Customer transformation failed: {e}")
        return customers_df, False

def transform_transaction_data(transactions_df):
    """
    TRANSFORMATION: Process and aggregate transaction data
    """
    print("\n⚙️ STEP 2: Transforming Transaction Data")
    print("-" * 40)
    
    try:
        from pyspark.sql.functions import year, month, dayofweek, hour, when
        
        # Add time-based features
        enhanced_transactions = transactions_df.withColumn(
            "transaction_year", year(col("transaction_date"))
        ).withColumn(
            "transaction_month", month(col("transaction_date"))
        ).withColumn(
            "day_of_week", dayofweek(col("transaction_date"))
        ).withColumn(
            "is_weekend", when(col("day_of_week").isin([1, 7]), True).otherwise(False)
        )
        
        # Categorize transaction amounts
        categorized_transactions = enhanced_transactions.withColumn(
            "amount_category",
            when(col("amount") >= 500, "High Value")
            .when(col("amount") >= 100, "Medium Value")
            .when(col("amount") >= 20, "Low Value")
            .otherwise("Micro Transaction")
        )
        
        # Add business logic flags
        final_transactions = categorized_transactions.withColumn(
            "is_large_transaction", col("amount") >= 1000
        ).withColumn(
            "requires_review", 
            (col("amount") >= 1000) | (col("transaction_type") == "refund")
        )
        
        print(f"✅ Transformed {final_transactions.count()} transaction records")
        print("📊 Transaction amount distribution:")
        final_transactions.groupBy("amount_category").count().show()
        
        return final_transactions, True
        
    except Exception as e:
        print(f"❌ Transaction transformation failed: {e}")
        return transactions_df, False

def create_customer_analytics(customers_df, transactions_df):
    """
    TRANSFORMATION: Create customer analytics by joining customer and transaction data
    """
    print("\n⚙️ STEP 3: Creating Customer Analytics")
    print("-" * 40)
    
    try:
        # Calculate customer transaction metrics
        customer_metrics = transactions_df.filter(
            col("status") == "completed"
        ).groupBy("customer_id").agg(
            spark_sum("amount").alias("total_spent"),
            count("transaction_id").alias("transaction_count"),
            avg("amount").alias("avg_transaction_amount"),
            spark_max("amount").alias("max_transaction_amount"),
            count(when(col("transaction_type") == "refund", 1)).alias("refund_count")
        )
        
        # Join customer data with transaction metrics
        customer_analytics = customers_df.join(
            customer_metrics, "customer_id", "left"
        ).fillna({
            "total_spent": 0.0,
            "transaction_count": 0,
            "avg_transaction_amount": 0.0,
            "max_transaction_amount": 0.0,
            "refund_count": 0
        })
        
        # Add customer value scoring
        customer_analytics = customer_analytics.withColumn(
            "customer_value_score",
            (col("total_spent") * 0.4) + 
            (col("transaction_count") * 10) + 
            (col("credit_score") * 0.1) - 
            (col("refund_count") * 50)
        ).withColumn(
            "customer_tier",
            when(col("customer_value_score") >= 1000, "Platinum")
            .when(col("customer_value_score") >= 500, "Gold")
            .when(col("customer_value_score") >= 200, "Silver")
            .otherwise("Bronze")
        )
        
        print(f"✅ Created analytics for {customer_analytics.count()} customers")
        print("📊 Customer tier distribution:")
        customer_analytics.groupBy("customer_tier").count().show()
        
        return customer_analytics, True
        
    except Exception as e:
        print(f"❌ Customer analytics creation failed: {e}")
        return customers_df, False

In [0]:
def load_to_data_warehouse(data_df, table_name, warehouse_connection):
    """
    LOADING: Save processed data to data warehouse (mocked)
    """
    print(f"💾 Loading data to warehouse table: {table_name}")
    print("-" * 40)
    
    try:
        # Convert DataFrame to records for warehouse loading
        record_count = data_df.count()
        
        # Simulate data validation before loading
        print("🔍 Validating data quality...")
        
        # Check for required fields
        null_ids = data_df.filter(col("customer_id").isNull()).count()
        if null_ids > 0:
            raise Exception(f"Found {null_ids} records with null customer_id")
        
        # Check for duplicate records
        total_records = data_df.count()
        unique_records = data_df.distinct().count()
        if total_records != unique_records:
            print(f"⚠️ Warning: Found {total_records - unique_records} duplicate records")
        
        # Mock warehouse operations
        print("🔄 Connecting to data warehouse...")
        warehouse_connection.create_table_if_not_exists(table_name)
        
        print("🔄 Inserting records...")
        warehouse_connection.insert_batch(table_name, record_count)
        
        print("🔄 Creating indexes...")
        warehouse_connection.create_indexes(table_name)
        
        print("🔄 Committing transaction...")
        warehouse_connection.commit()
        
        print(f"✅ Successfully loaded {record_count} records to {table_name}")
        return record_count, True
        
    except Exception as e:
        print(f"❌ Loading to {table_name} failed: {e}")
        warehouse_connection.rollback()
        return 0, False

def load_to_analytics_store(analytics_df, analytics_connection):
    """
    LOADING: Save analytics data to analytics store (mocked)
    """
    print("💾 Loading data to analytics store")
    print("-" * 40)
    
    try:
        record_count = analytics_df.count()
        
        # Mock analytics store operations
        print("🔄 Connecting to analytics store...")
        analytics_connection.prepare_analytics_tables()
        
        print("🔄 Loading customer analytics...")
        analytics_connection.load_customer_metrics(record_count)
        
        print("🔄 Updating dashboards...")
        analytics_connection.refresh_dashboards()
        
        print(f"✅ Successfully loaded {record_count} analytics records")
        return record_count, True
        
    except Exception as e:
        print(f"❌ Analytics loading failed: {e}")
        return 0, False


In [0]:
def run_complete_e2e_pipeline(num_customers=1000, num_transactions=5000):
    """
    Complete end-to-end pipeline: Extract → Transform → Load
    """
    print("🚀 Starting Complete E2E Data Pipeline")
    print("=" * 50)
    
    pipeline_results = {
        "extraction_success": False,
        "transformation_success": False,
        "loading_success": False,
        "total_customers": 0,
        "total_transactions": 0,
        "analytics_records": 0
    }
    
    try:
        # STEP 1: EXTRACTION
        print("\n📥 PHASE 1: DATA EXTRACTION")
        print("=" * 30)
        
        # Extract customers
        customers_df, customer_extract_success = extract_customer_data_from_source(num_customers)
        if not customer_extract_success:
            raise Exception("Customer extraction failed")
        
        # Get customer IDs for transaction extraction
        customer_ids = [row.customer_id for row in customers_df.select("customer_id").collect()]
        
        # Extract transactions
        transactions_df, transaction_extract_success = extract_transaction_data_from_source(
            customer_ids, num_transactions
        )
        if not transaction_extract_success:
            raise Exception("Transaction extraction failed")
        
        pipeline_results["extraction_success"] = True
        pipeline_results["total_customers"] = customers_df.count()
        pipeline_results["total_transactions"] = transactions_df.count()
        
        # STEP 2: TRANSFORMATION
        print("\n⚙️ PHASE 2: DATA TRANSFORMATION")
        print("=" * 30)
        
        # Transform customer data
        transformed_customers, customer_transform_success = transform_customer_data(customers_df)
        if not customer_transform_success:
            raise Exception("Customer transformation failed")
        
        # Transform transaction data
        transformed_transactions, transaction_transform_success = transform_transaction_data(transactions_df)
        if not transaction_transform_success:
            raise Exception("Transaction transformation failed")
        
        # Create customer analytics
        customer_analytics, analytics_success = create_customer_analytics(
            transformed_customers, transformed_transactions
        )
        if not analytics_success:
            raise Exception("Analytics creation failed")
        
        pipeline_results["transformation_success"] = True
        pipeline_results["analytics_records"] = customer_analytics.count()
        
        # STEP 3: LOADING (with mocks)
        print("\n💾 PHASE 3: DATA LOADING")
        print("=" * 30)
        
        # Create mock connections
        mock_warehouse = MagicMock()
        mock_warehouse.create_table_if_not_exists.return_value = None
        mock_warehouse.insert_batch.return_value = None
        mock_warehouse.create_indexes.return_value = None
        mock_warehouse.commit.return_value = None
        mock_warehouse.rollback.return_value = None
        
        mock_analytics = MagicMock()
        mock_analytics.prepare_analytics_tables.return_value = None
        mock_analytics.load_customer_metrics.return_value = None
        mock_analytics.refresh_dashboards.return_value = None
        
        # Load data
        customer_load_count, customer_load_success = load_to_data_warehouse(
            transformed_customers, "customers", mock_warehouse
        )
        
        transaction_load_count, transaction_load_success = load_to_data_warehouse(
            transformed_transactions, "transactions", mock_warehouse
        )
        
        analytics_load_count, analytics_load_success = load_to_analytics_store(
            customer_analytics, mock_analytics
        )
        
        pipeline_results["loading_success"] = (
            customer_load_success and transaction_load_success and analytics_load_success
        )
        
        # PIPELINE SUMMARY
        print("\n📊 PIPELINE EXECUTION SUMMARY")
        print("=" * 40)
        
        all_phases_success = (
            pipeline_results["extraction_success"] and 
            pipeline_results["transformation_success"] and 
            pipeline_results["loading_success"]
        )
        
        print(f"📥 Extraction: {'✅ SUCCESS' if pipeline_results['extraction_success'] else '❌ FAILED'}")
        print(f"⚙️ Transformation: {'✅ SUCCESS' if pipeline_results['transformation_success'] else '❌ FAILED'}")
        print(f"💾 Loading: {'✅ SUCCESS' if pipeline_results['loading_success'] else '❌ FAILED'}")
        print(f"🎯 Overall Pipeline: {'✅ SUCCESS' if all_phases_success else '❌ FAILED'}")
        
        print(f"\n📊 Data Processed:")
        print(f"  👥 Customers: {pipeline_results['total_customers']}")
        print(f"  💳 Transactions: {pipeline_results['total_transactions']}")
        print(f"  📈 Analytics Records: {pipeline_results['analytics_records']}")
        
        return pipeline_results, customer_analytics
        
    except Exception as e:
        print(f"\n❌ PIPELINE FAILED: {e}")
        return pipeline_results, None

In [0]:
def test_e2e_pipeline_small_dataset():
    """
    Integration Test 1: Small dataset (happy path)
    """
    print("🧪 INTEGRATION TEST 1: Small Dataset")
    print("-" * 40)
    
    results, analytics_data = run_complete_e2e_pipeline(num_customers=50, num_transactions=200)
    
    # Validate results
    test_passed = (
        results["extraction_success"] and
        results["transformation_success"] and
        results["loading_success"] and
        results["total_customers"] == 50 and
        results["total_transactions"] == 200
    )
    
    print(f"\n🎯 Small Dataset Test: {'✅ PASSED' if test_passed else '❌ FAILED'}")
    return test_passed

def test_e2e_pipeline_large_dataset():
    """
    Integration Test 2: Large dataset (performance test)
    """
    print("\n🧪 INTEGRATION TEST 2: Large Dataset")
    print("-" * 40)
    
    import time
    start_time = time.time()
    
    results, analytics_data = run_complete_e2e_pipeline(num_customers=2000, num_transactions=10000)
    
    end_time = time.time()
    execution_time = end_time - start_time
    
    # Validate results and performance
    test_passed = (
        results["extraction_success"] and
        results["transformation_success"] and
        results["loading_success"] and
        execution_time < 120  # Should complete within 2 minutes
    )
    
    print(f"\n⏱️ Execution Time: {execution_time:.2f} seconds")
    print(f"🎯 Large Dataset Test: {'✅ PASSED' if test_passed else '❌ FAILED'}")
    return test_passed

def test_e2e_pipeline_data_quality():
    """
    Integration Test 3: Data quality validation
    """
    print("\n🧪 INTEGRATION TEST 3: Data Quality Validation")
    print("-" * 40)
    
    # Run pipeline with moderate dataset
    results, analytics_data = run_complete_e2e_pipeline(num_customers=500, num_transactions=2000)
    
    if analytics_data is None:
        print("❌ No data to validate")
        return False
    
    # Data quality checks
    quality_checks = {}
    
    # Check 1: No null customer IDs
    null_customer_ids = analytics_data.filter(col("customer_id").isNull()).count()
    quality_checks["no_null_ids"] = (null_customer_ids == 0)
    
    # Check 2: All customers have valid email format
    invalid_emails = analytics_data.filter(~col("email_clean").contains("@")).count()
    quality_checks["valid_emails"] = (invalid_emails == 0)
    
    # Check 3: Credit scores in valid range
    invalid_credit_scores = analytics_data.filter(
        (col("credit_score") < 300) | (col("credit_score") > 850)
    ).count()
    quality_checks["valid_credit_scores"] = (invalid_credit_scores == 0)
    
    # Check 4: Customer segments assigned
    unassigned_segments = analytics_data.filter(col("customer_segment").isNull()).count()
    quality_checks["segments_assigned"] = (unassigned_segments == 0)
    
    # Check 5: Age calculations reasonable
    invalid_ages = analytics_data.filter((col("age") < 18) | (col("age") > 100)).count()
    quality_checks["reasonable_ages"] = (invalid_ages == 0)
    
    print("\n🔍 Data Quality Results:")
    for check_name, passed in quality_checks.items():
        status = "✅ PASSED" if passed else "❌ FAILED"
        print(f"  {check_name}: {status}")
    
    all_quality_passed = all(quality_checks.values())
    print(f"\n🎯 Data Quality Test: {'✅ PASSED' if all_quality_passed else '❌ FAILED'}")
    return all_quality_passed

def test_e2e_pipeline_business_logic():
    """
    Integration Test 4: Business logic validation
    """
    print("\n🧪 INTEGRATION TEST 4: Business Logic Validation")
    print("-" * 40)
    
    # Run pipeline
    results, analytics_data = run_complete_e2e_pipeline(num_customers=300, num_transactions=1500)
    
    if analytics_data is None:
        print("❌ No data to validate")
        return False
    
    # Business logic checks
    business_checks = {}
    
    # Check 1: Premium customers have high credit scores
    premium_customers = analytics_data.filter(col("customer_segment") == "Premium")
    low_credit_premium = premium_customers.filter(col("credit_score") < 750).count()
    business_checks["premium_credit_logic"] = (low_credit_premium == 0)
    
    # Check 2: Customer tiers align with value scores
    platinum_customers = analytics_data.filter(col("customer_tier") == "Platinum")
    low_value_platinum = platinum_customers.filter(col("customer_value_score") < 1000).count()
    business_checks["platinum_value_logic"] = (low_value_platinum == 0)
    
    # Check 3: Risk categories align with credit scores
    low_risk_customers = analytics_data.filter(col("risk_category") == "Low Risk")
    low_credit_low_risk = low_risk_customers.filter(col("credit_score") < 750).count()
    business_checks["risk_category_logic"] = (low_credit_low_risk == 0)
    
    # Check 4: Total spent is non-negative
    negative_spending = analytics_data.filter(col("total_spent") < 0).count()
    business_checks["positive_spending"] = (negative_spending == 0)
    
    print("\n🏢 Business Logic Results:")
    for check_name, passed in business_checks.items():
        status = "✅ PASSED" if passed else "❌ FAILED"
        print(f"  {check_name}: {status}")
    
    all_business_passed = all(business_checks.values())
    print(f"\n🎯 Business Logic Test: {'✅ PASSED' if all_business_passed else '❌ FAILED'}")
    return all_business_passed

def test_e2e_pipeline_with_extraction_failure():
    """
    Integration Test 5: Extraction failure scenario
    """
    print("\n🧪 INTEGRATION TEST 5: Extraction Failure")
    print("-" * 40)
    
    # Simulate extraction failure by causing Faker to fail
    try:
        # This will fail because we're passing invalid parameters
        fake.random_int(min=1000, max=10)  # min > max causes failure
        test_passed = False
    except:
        print("✅ Successfully simulated extraction failure")
        test_passed = True
    
    print(f"🎯 Extraction Failure Test: {'✅ PASSED' if test_passed else '❌ FAILED'}")
    return test_passed

# ===============================================
# PART 8: RUN COMPLETE INTEGRATION TEST SUITE
# ===============================================

print("\n\n🏆 PART 8: COMPLETE INTEGRATION TEST SUITE")
print("=" * 60)

def run_complete_integration_test_suite():
    """
    Run all integration tests and provide comprehensive results
    """
    print("🧪 Running Complete Integration Test Suite")
    print("=" * 50)
    
    test_results = {}
    
    # Run all integration tests
    print("\n🚀 Starting Integration Test Execution...")
    
    test_results["small_dataset"] = test_e2e_pipeline_small_dataset()
    test_results["large_dataset"] = test_e2e_pipeline_large_dataset()
    test_results["data_quality"] = test_e2e_pipeline_data_quality()
    test_results["business_logic"] = test_e2e_pipeline_business_logic()
    test_results["extraction_failure"] = test_e2e_pipeline_with_extraction_failure()
    
    # Calculate overall results
    total_tests = len(test_results)
    passed_tests = sum(test_results.values())
    pass_rate = (passed_tests / total_tests) * 100
    
    # Print comprehensive summary
    print("\n" + "=" * 60)
    print("📊 INTEGRATION TEST SUITE SUMMARY")
    print("=" * 60)
    
    for test_name, passed in test_results.items():
        status = "✅ PASSED" if passed else "❌ FAILED"
        print(f"{test_name:20} : {status}")
    
    print(f"\n📈 OVERALL RESULTS:")
    print(f"Total Tests: {total_tests}")
    print(f"Passed: {passed_tests}")
    print(f"Failed: {total_tests - passed_tests}")
    print(f"Pass Rate: {pass_rate:.1f}%")
    
    if passed_tests == total_tests:
        print("\n🎉 ALL INTEGRATION TESTS PASSED!")
        print("✅ Your E2E pipeline is robust and production-ready!")
    else:
        print(f"\n⚠️ {total_tests - passed_tests} test(s) failed")
        print("🔧 Pipeline needs improvements before production deployment")
    
    return test_results

# Run the complete test suite
final_test_results = run_complete_integration_test_suite()



🏆 PART 8: COMPLETE INTEGRATION TEST SUITE
🧪 Running Complete Integration Test Suite

🚀 Starting Integration Test Execution...
🧪 INTEGRATION TEST 1: Small Dataset
----------------------------------------
🚀 Starting Complete E2E Data Pipeline

📥 PHASE 1: DATA EXTRACTION
📥 STEP 1: Extracting Customer Data
----------------------------------------
🔄 Connecting to CRM system...
🔄 Executing customer data query...
🎲 Generating 50 realistic customers...
✅ Successfully extracted 50 customer records
📊 Sample extracted data:
+-----------+----------+---------+--------------------+--------------------+--------------------+------------+----------+-------------+-----------------+--------------+------------+-------------+
|customer_id|first_name|last_name|               email|               phone|             address|        city|   country|date_of_birth|registration_date|account_status|credit_score|annual_income|
+-----------+----------+---------+--------------------+--------------------+-----------