# Python Technical Interview - AI Agent Developer Position

## Instructions
This notebook contains 10 questions designed to test your Python skills and ability to work with AI-generated code. Each question has:
- **Problem Description** - What you need to accomplish
- **Code Cell** - Where you write your solution
- **Test Cell** - Automated tests to verify your solution

**Guidelines:**
- Read each question carefully
- You can use whatever libraries or packages
- Some questions provide starter code, others start from scratch
- Focus on writing clean, readable, and robust code
- code should be able to run after clearing all outputs
- All test cells should pass when you're done

## Question 1: Debug AI-Generated Code (Lists & Logic)

**Scenario:** An AI generated this code to filter products by price range, but it has several bugs. Fix the code so it works correctly.

**Requirements:**
- Filter products where price is between min_price and max_price (inclusive)
- Handle edge cases gracefully
- Maintain the original function signature

In [22]:
def filter_products_by_price(products, min_price, max_price):
    """
    Filter products by price range.
    
    Args:
        products: List of dicts with 'name' and 'price' keys
        min_price: Minimum price (inclusive)
        max_price: Maximum price (inclusive)
    
    Returns:
        List of products within price range
    """
    # Fixed: Changed to >= and <= for inclusive bounds
    filtered = []
    for product in products:
        if min_price <= product['price'] <= max_price:
            filtered.append(product)
    return filtered

# Test your solution here
products = [
    {'name': 'Laptop', 'price': 1000},
    {'name': 'Mouse', 'price': 25},
    {'name': 'Keyboard', 'price': 75},
    {'name': 'Monitor', 'price': 300}
]

result = filter_products_by_price(products, 25, 300)
print("Filtered products:", result)

Filtered products: [{'name': 'Mouse', 'price': 25}, {'name': 'Keyboard', 'price': 75}, {'name': 'Monitor', 'price': 300}]


In [23]:
# Test Cell
def test_question_1():
    products = [
        {'name': 'Laptop', 'price': 1000},
        {'name': 'Mouse', 'price': 25},
        {'name': 'Keyboard', 'price': 75},
        {'name': 'Monitor', 'price': 300}
    ]
    
    # Test inclusive bounds
    result = filter_products_by_price(products, 25, 300)
    expected_names = ['Mouse', 'Keyboard', 'Monitor']
    actual_names = [p['name'] for p in result]
    assert set(actual_names) == set(expected_names), f"Expected {expected_names}, got {actual_names}"
    
    # Test edge case - empty list
    assert filter_products_by_price([], 0, 100) == []
    
    # Test no matches
    assert filter_products_by_price(products, 2000, 3000) == []
    
    print("✓ Question 1 tests passed!")

test_question_1()

✓ Question 1 tests passed!


## Question 2: Fix API Integration (Error Handling)

**Scenario:** This AI-generated code fetches user data from an API but lacks proper error handling. Add robust error handling and improve the code.

**Requirements:**
- Handle network timeouts
- Handle HTTP errors (4xx, 5xx)
- Handle JSON parsing errors
- Return None on any error, don't let exceptions bubble up
- Add appropriate logging

In [24]:
import requests
import json
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def get_user_data(user_id):
    """
    Fetch user data from API with proper error handling.
    
    Args:
        user_id: User ID to fetch
        
    Returns:
        dict: User data if successful, None if any error occurs
    """
    url = f"https://jsonplaceholder.typicode.com/users/{user_id}"
    
    try:
        # Set timeout to handle network delays
        response = requests.get(url, timeout=10)
        
        # Handle HTTP errors (4xx, 5xx)
        response.raise_for_status()
        
        # Parse JSON with error handling
        data = response.json()
        
        # Validate response has data
        if not data:
            logger.warning(f"Empty response for user_id: {user_id}")
            return None
            
        logger.info(f"Successfully fetched data for user_id: {user_id}")
        return data
        
    except requests.exceptions.Timeout:
        logger.error(f"Timeout error fetching user {user_id}")
        return None
        
    except requests.exceptions.HTTPError as e:
        logger.error(f"HTTP error for user {user_id}: {e}")
        return None
        
    except requests.exceptions.RequestException as e:
        logger.error(f"Network error fetching user {user_id}: {e}")
        return None
        
    except json.JSONDecodeError as e:
        logger.error(f"JSON parsing error for user {user_id}: {e}")
        return None
        
    except Exception as e:
        logger.error(f"Unexpected error fetching user {user_id}: {e}")
        return None

# Test your solution here
user_data = get_user_data(1)
print("User data:", user_data)

INFO:__main__:Successfully fetched data for user_id: 1


User data: {'id': 1, 'name': 'Leanne Graham', 'username': 'Bret', 'email': 'Sincere@april.biz', 'address': {'street': 'Kulas Light', 'suite': 'Apt. 556', 'city': 'Gwenborough', 'zipcode': '92998-3874', 'geo': {'lat': '-37.3159', 'lng': '81.1496'}}, 'phone': '1-770-736-8031 x56442', 'website': 'hildegard.org', 'company': {'name': 'Romaguera-Crona', 'catchPhrase': 'Multi-layered client-server neural-net', 'bs': 'harness real-time e-markets'}}


In [25]:
# Test Cell
import unittest.mock as mock

def test_question_2():
    # Test successful request
    user_data = get_user_data(1)
    assert user_data is not None
    assert 'name' in user_data
    
    # Test invalid user ID
    user_data = get_user_data(999999)
    assert user_data is None
    
    # Test with mock to simulate network error
    with mock.patch('requests.get') as mock_get:
        mock_get.side_effect = requests.exceptions.RequestException("Network error")
        result = get_user_data(1)
        assert result is None
    
    # Test with mock to simulate timeout
    with mock.patch('requests.get') as mock_get:
        mock_get.side_effect = requests.exceptions.Timeout("Timeout")
        result = get_user_data(1)
        assert result is None
    
    print("✓ Question 2 tests passed!")

test_question_2()

INFO:__main__:Successfully fetched data for user_id: 1
ERROR:__main__:HTTP error for user 999999: 404 Client Error: Not Found for url: https://jsonplaceholder.typicode.com/users/999999
ERROR:__main__:Network error fetching user 1: Network error
ERROR:__main__:Timeout error fetching user 1


✓ Question 2 tests passed!


## Question 3: Code from Scratch (Data Structures)

**Scenario:** Create a `TaskManager` class to manage a simple todo list.

**Requirements:**
- Add tasks with priority (1=high, 2=medium, 3=low)
- Mark tasks as complete
- Get tasks filtered by completion status and/or priority
- Get task count by status

In [26]:
class TaskManager:
    """
    A simple task manager for tracking todo items.
    """
    
    def __init__(self):
        """Initialize empty task manager."""
        self.tasks = []
        self.next_id = 1
    
    def add_task(self, description, priority=2):
        """
        Add a new task.
        
        Args:
            description (str): Task description
            priority (int): Priority level (1=high, 2=medium, 3=low)
        """
        task = {
            'id': self.next_id,
            'description': description,
            'priority': priority,
            'completed': False
        }
        self.tasks.append(task)
        self.next_id += 1
    
    def complete_task(self, task_id):
        """
        Mark a task as complete.
        
        Args:
            task_id: Unique identifier for the task
            
        Returns:
            bool: True if task was found and completed, False otherwise
        """
        for task in self.tasks:
            if task['id'] == task_id:
                task['completed'] = True
                return True
        return False
    
    def get_tasks(self, completed=None, priority=None):
        """
        Get tasks filtered by status and/or priority.
        
        Args:
            completed (bool, optional): Filter by completion status
            priority (int, optional): Filter by priority level
            
        Returns:
            list: List of matching tasks
        """
        filtered_tasks = self.tasks
        
        # Filter by completed status if specified
        if completed is not None:
            filtered_tasks = [t for t in filtered_tasks if t['completed'] == completed]
        
        # Filter by priority if specified
        if priority is not None:
            filtered_tasks = [t for t in filtered_tasks if t['priority'] == priority]
        
        return filtered_tasks
    
    def get_task_count(self, completed=None):
        """
        Get count of tasks by completion status.
        
        Args:
            completed (bool, optional): Count completed (True) or pending (False) tasks
            
        Returns:
            int: Number of matching tasks
        """
        if completed is None:
            return len(self.tasks)
        
        return len([t for t in self.tasks if t['completed'] == completed])

tm = TaskManager()
tm.add_task("Fix bug in login", 1)  # High priority
tm.add_task("Update documentation", 3)  # Low priority
tm.add_task("Code review", 2)  # Medium priority

print("All tasks:", len(tm.get_tasks()))
print("High priority tasks:", len(tm.get_tasks(priority=1)))

All tasks: 3
High priority tasks: 1


In [27]:
# Test Cell
def test_question_3():
    tm = TaskManager()
    
    # Test adding tasks
    tm.add_task("Task 1", 1)
    tm.add_task("Task 2", 2)
    tm.add_task("Task 3", 3)
    
    # Test get all tasks
    all_tasks = tm.get_tasks()
    assert len(all_tasks) == 3
    
    # Test priority filtering
    high_priority = tm.get_tasks(priority=1)
    assert len(high_priority) == 1
    
    # Test task completion
    task_id = all_tasks[0]['id']  # Assuming tasks have 'id' field
    success = tm.complete_task(task_id)
    assert success == True
    
    # Test completion filtering
    completed_tasks = tm.get_tasks(completed=True)
    assert len(completed_tasks) == 1
    
    pending_tasks = tm.get_tasks(completed=False)
    assert len(pending_tasks) == 2
    
    # Test task counts
    assert tm.get_task_count() == 3
    assert tm.get_task_count(completed=True) == 1
    assert tm.get_task_count(completed=False) == 2
    
    print("✓ Question 3 tests passed!")

test_question_3()

✓ Question 3 tests passed!


## Question 4: Optimize AI Code (Performance)

**Scenario:** This AI code finds common elements between multiple lists, but it's very inefficient. Optimize it for better performance.

**Requirements:**
- Same functionality as original
- Significantly better time complexity
- Handle edge cases (empty lists, no common elements)

In [28]:
def find_common_elements_slow(lists):
    """
    Find elements that appear in ALL provided lists.
    AI-generated inefficient version - OPTIMIZE THIS!
    
    Args:
        lists: List of lists to find common elements in
        
    Returns:
        list: Elements that appear in all lists
    """
    if not lists:
        return []
    
    common = []
    for item in lists[0]:
        is_common = True
        for other_list in lists[1:]:
            found = False
            for other_item in other_list:
                if item == other_item:
                    found = True
                    break
            if not found:
                is_common = False
                break
        if is_common and item not in common:
            common.append(item)
    
    return common

# Optimized version - implement this
def find_common_elements_fast(lists):
    """
    Find elements that appear in ALL provided lists.
    Optimized version with better time complexity.
    
    Args:
        lists: List of lists to find common elements in
        
    Returns:
        list: Elements that appear in all lists
    """
    # Handle edge cases
    if not lists:
        return []
    
    if len(lists) == 1:
        return lists[0]
    
    # Convert first list to set for fast lookup
    common_set = set(lists[0])
    
    # Intersect with each subsequent list
    for lst in lists[1:]:
        common_set = common_set.intersection(set(lst))
        
        # Early exit if no common elements remain
        if not common_set:
            return []
    
    # Convert back to list
    return list(common_set)

# Test both versions
test_lists = [
    [1, 2, 3, 4, 5],
    [3, 4, 5, 6, 7],
    [4, 5, 7, 8, 9]
]

print("Slow version:", find_common_elements_slow(test_lists))
print("Fast version:", find_common_elements_fast(test_lists))

Slow version: [4, 5]
Fast version: [4, 5]


In [29]:
# Test Cell

import time

def test_question_4():
    # Basic functionality test
    test_lists = [
        [1, 2, 3, 4, 5],
        [3, 4, 5, 6, 7],
        [4, 5, 7, 8, 9]
    ]
    
    slow_result = find_common_elements_slow(test_lists)
    fast_result = find_common_elements_fast(test_lists)
    
    assert set(slow_result) == set(fast_result), "Results don't match"
    assert set(fast_result) == {4, 5}, f"Expected {{4, 5}}, got {set(fast_result)}"
    
    # Edge cases
    assert find_common_elements_fast([]) == []
    assert find_common_elements_fast([[1, 2], []]) == []
    assert find_common_elements_fast([[1, 2, 3]]) == [1, 2, 3]
    
    # Performance test (rough)
    large_lists = [[i for i in range(1000)] for _ in range(10)]
    
    start_time = time.time()
    find_common_elements_fast(large_lists)
    fast_time = time.time() - start_time
    
    # Fast version should complete in reasonable time
    assert fast_time < 1.0, "Optimized version is still too slow"
    
    print("✓ Question 4 tests passed!")

test_question_4()

✓ Question 4 tests passed!


## Question 5: Fix Function with Edge Cases

**Scenario:** This AI function calculates statistics for a list of numbers, but fails on various edge cases. Make it robust.

**Requirements:**
- Handle empty lists
- Handle non-numeric values gracefully
- Handle division by zero
- Return meaningful error messages or default values

In [30]:
def calculate_stats(numbers):
    """
    Calculate basic statistics for a list of numbers.
    Fixed version that handles edge cases gracefully.
    
    Args:
        numbers: List of numbers
        
    Returns:
        dict: Statistics including mean, median, mode, std_dev
    """
    # Filter out non-numeric values and None
    valid_numbers = []
    for num in numbers:
        if isinstance(num, (int, float)) and num is not None:
            valid_numbers.append(num)
    
    # Handle empty list or no valid numbers
    if not valid_numbers:
        return {
            'error': 'No valid numeric data',
            'mean': None,
            'median': None,
            'mode': None,
            'std_dev': None,
            'count': 0
        }
    
    # Sort for median calculation
    sorted_nums = sorted(valid_numbers)
    
    # Mean
    mean = sum(valid_numbers) / len(valid_numbers)
    
    # Median
    n = len(sorted_nums)
    if n % 2 == 0:
        median = (sorted_nums[n//2 - 1] + sorted_nums[n//2]) / 2
    else:
        median = sorted_nums[n//2]
    
    # Mode (most frequent)
    from collections import Counter
    counts = Counter(valid_numbers)
    mode = counts.most_common(1)[0][0]
    
    # Standard deviation
    variance = sum((x - mean) ** 2 for x in valid_numbers) / len(valid_numbers)
    std_dev = variance ** 0.5
    
    return {
        'mean': mean,
        'median': median,
        'mode': mode,
        'std_dev': std_dev,
        'count': len(valid_numbers)
    }

# Test your solution
test_cases = [
    [1, 2, 3, 4, 5],           # Normal case
    [],                        # Empty list
    [1],                       # Single item
    [1, 1, 1],                # All same
    [1, 'invalid', 3],         # Mixed types
    [1, 2, None, 4]           # None values
]

for i, case in enumerate(test_cases):
    print(f"Test case {i+1}: {case}")
    try:
        result = calculate_stats(case)
        print(f"  Result: {result}")
    except Exception as e:
        print(f"  Error: {e}")
    print()

Test case 1: [1, 2, 3, 4, 5]
  Result: {'mean': 3.0, 'median': 3, 'mode': 1, 'std_dev': 1.4142135623730951, 'count': 5}

Test case 2: []
  Result: {'error': 'No valid numeric data', 'mean': None, 'median': None, 'mode': None, 'std_dev': None, 'count': 0}

Test case 3: [1]
  Result: {'mean': 1.0, 'median': 1, 'mode': 1, 'std_dev': 0.0, 'count': 1}

Test case 4: [1, 1, 1]
  Result: {'mean': 1.0, 'median': 1, 'mode': 1, 'std_dev': 0.0, 'count': 3}

Test case 5: [1, 'invalid', 3]
  Result: {'mean': 2.0, 'median': 2.0, 'mode': 1, 'std_dev': 1.0, 'count': 2}

Test case 6: [1, 2, None, 4]
  Result: {'mean': 2.3333333333333335, 'median': 2, 'mode': 1, 'std_dev': 1.247219128924647, 'count': 3}



In [31]:
# Test Cell
def test_question_5():
    # Normal case
    result = calculate_stats([1, 2, 3, 4, 5])
    assert result['mean'] == 3.0
    assert result['median'] == 3.0
    assert result['count'] == 5
    
    # Single item
    result = calculate_stats([42])
    assert result['mean'] == 42
    assert result['median'] == 42
    assert result['mode'] == 42
    assert result['std_dev'] == 0
    
    # Empty list - should handle gracefully
    result = calculate_stats([])
    assert 'error' in result or all(v is None or v == 0 for v in result.values())
    
    # Mixed types - should handle gracefully
    result = calculate_stats([1, 'invalid', 3])
    assert 'error' in result or result['count'] == 2  # Only valid numbers counted
    
    # All same values
    result = calculate_stats([5, 5, 5, 5])
    assert result['mean'] == 5
    assert result['std_dev'] == 0
    
    print("✓ Question 5 tests passed!")

test_question_5()

✓ Question 5 tests passed!


## Question 6: Complete Partial Implementation (Pandas/Data)

### Goal
Implement `analyze_sales_data(df, group_by_column)`.

### Input
A pandas DataFrame `df` with columns:
- `product`
- `category`
- `sales`
- `profit`

### Output (must match exactly)
- Return a DataFrame **indexed by `group_by_column`** (do not reset the index).
- Include exactly these columns (names must match):
  - `sales_sum` — sum of `sales`
  - `sales_mean` — mean of `sales`
  - `profit_sum` — sum of `profit`
  - `profit_mean` — mean of `profit`
  - `profit_margin` — `profit_sum / sales_sum` (use `NaN` if `sales_sum == 0`)
- Handle missing values: treat missing `sales` or `profit` as `0` before aggregation.
- Sorting is **not required**.

### Edge Behavior
- If `df` is empty or `group_by_column` is missing, return an empty DataFrame with the required column names.

In [32]:
import pandas as pd
import numpy as np

def analyze_sales_data(df, group_by_column):
    """
    Analyze sales data by grouping and calculating statistics.
    
    Args:
        df: DataFrame with columns ['product', 'category', 'sales', 'profit']
        group_by_column: Column name to group by
        
    Returns:
        DataFrame with aggregated statistics indexed by group_by_column
    """
    # Handle edge cases
    if df.empty or group_by_column not in df.columns:
        return pd.DataFrame(columns=['sales_sum', 'sales_mean', 'profit_sum', 'profit_mean', 'profit_margin'])
    
    # Create a copy to avoid modifying original
    df_copy = df.copy()
    
    # Handle missing values: treat as 0
    df_copy['sales'] = df_copy['sales'].fillna(0)
    df_copy['profit'] = df_copy['profit'].fillna(0)
    
    # Group by the specified column
    grouped = df_copy.groupby(group_by_column)
    
    # Calculate aggregations
    result = pd.DataFrame({
        'sales_sum': grouped['sales'].sum(),
        'sales_mean': grouped['sales'].mean(),
        'profit_sum': grouped['profit'].sum(),
        'profit_mean': grouped['profit'].mean()
    })
    
    # Calculate profit margin (profit/sales), handle division by zero
    result['profit_margin'] = result.apply(
        lambda row: row['profit_sum'] / row['sales_sum'] if row['sales_sum'] != 0 else np.nan,
        axis=1
    )
    
    # Index is already set to group_by_column from groupby
    return result

# Create sample data for testing
sample_data = pd.DataFrame({
    'product': ['A', 'B', 'C', 'A', 'B', 'C', 'A'],
    'category': ['Electronics', 'Electronics', 'Clothing', 'Electronics', 'Electronics', 'Clothing', 'Electronics'],
    'sales': [100, 200, 150, 120, np.nan, 180, 110],
    'profit': [20, 50, 30, 25, 40, 35, 22]
})

print("Sample data:")
print(sample_data)
print("\nAnalysis by product:")
result = analyze_sales_data(sample_data, 'product')
print(result)

Sample data:
  product     category  sales  profit
0       A  Electronics  100.0      20
1       B  Electronics  200.0      50
2       C     Clothing  150.0      30
3       A  Electronics  120.0      25
4       B  Electronics    NaN      40
5       C     Clothing  180.0      35
6       A  Electronics  110.0      22

Analysis by product:
         sales_sum  sales_mean  profit_sum  profit_mean  profit_margin
product                                                               
A            330.0       110.0          67    22.333333        0.20303
B            200.0       100.0          90    45.000000        0.45000
C            330.0       165.0          65    32.500000        0.19697


In [33]:
# Test Cell
def test_question_6():
    # Create test data
    test_data = pd.DataFrame({
        'product': ['A', 'B', 'A', 'B', 'A'],
        'category': ['Cat1', 'Cat2', 'Cat1', 'Cat2', 'Cat1'],
        'sales': [100, 200, 150, 300, 50],
        'profit': [20, 40, 30, 60, 10]
    })
    
    # Test grouping by product
    result = analyze_sales_data(test_data, 'product')
    
    # Check structure
    assert isinstance(result, pd.DataFrame), "Should return DataFrame"
    assert len(result) == 2, "Should have 2 groups (A and B)"
    
    # Check required columns exist
    required_cols = ['sales_sum', 'sales_mean', 'profit_sum', 'profit_mean', 'profit_margin']
    for col in required_cols:
        assert col in result.columns, f"Missing column: {col}"
    
    # Check calculations for product A
    product_a = result.loc['A'] if 'A' in result.index else result[result.index == 'A'].iloc[0]
    assert product_a['sales_sum'] == 300, "Product A sales sum should be 300"
    assert product_a['profit_sum'] == 60, "Product A profit sum should be 60"
    
    print("✓ Question 6 tests passed!")

test_question_6()

✓ Question 6 tests passed!


## Question 7: Refactor Messy AI Code (Clean Code)

**Scenario:** This AI code works but is poorly structured and hard to maintain. Refactor it following clean code principles.

**Requirements:**
- Improve readability and maintainability
- Add proper documentation
- Follow naming conventions
- Break down large functions
- Add type hints if possible

In [34]:
def process_data(data):
    """Messy AI-generated code that works but needs refactoring - CLEAN IT UP!"""
    result = {}
    for item in data:
        if 'type' in item and item['type'] == 'user':
            if 'active' in item and item['active']:
                if 'age' in item:
                    if item['age'] >= 18:
                        if 'email' in item and '@' in item['email']:
                            category = 'adult'
                            if item['age'] >= 65:
                                category = 'senior'
                            elif item['age'] >= 25:
                                category = 'adult'
                            else:
                                category = 'young_adult'
                            
                            if category not in result:
                                result[category] = {'count': 0, 'emails': [], 'total_age': 0}
                            
                            result[category]['count'] += 1
                            result[category]['emails'].append(item['email'])
                            result[category]['total_age'] += item['age']
    
    # Calculate averages
    for cat in result:
        result[cat]['avg_age'] = result[cat]['total_age'] / result[cat]['count']
        del result[cat]['total_age']
    
    return result

# Test data
test_data = [
    {'type': 'user', 'active': True, 'age': 25, 'email': 'user1@test.com'},
    {'type': 'user', 'active': True, 'age': 70, 'email': 'user2@test.com'},
    {'type': 'user', 'active': False, 'age': 30, 'email': 'user3@test.com'},
    {'type': 'admin', 'active': True, 'age': 35, 'email': 'admin@test.com'},
    {'type': 'user', 'active': True, 'age': 20, 'email': 'invalid-email'},
    {'type': 'user', 'active': True, 'age': 40, 'email': 'user4@test.com'},
]

# Your refactored version should produce the same results
original_result = process_data(test_data)
print("Original result:", original_result)

# Refactored version with clean code principles
def is_valid_user(user: dict) -> bool:
    """
    Check if user meets basic validation criteria.
    
    Args:
        user: User dictionary to validate
        
    Returns:
        bool: True if user is valid, False otherwise
    """
    return (
        user.get('type') == 'user' and
        user.get('active', False) and
        user.get('age', 0) >= 18 and
        is_valid_email(user.get('email', ''))
    )

def is_valid_email(email: str) -> bool:
    """
    Basic email validation.
    
    Args:
        email: Email string to validate
        
    Returns:
        bool: True if email contains '@', False otherwise
    """
    return isinstance(email, str) and '@' in email

def get_age_category(age: int) -> str:
    """
    Determine age category for a given age.
    
    Args:
        age: User's age
        
    Returns:
        str: Age category ('senior', 'adult', or 'young_adult')
    """
    if age >= 65:
        return 'senior'
    elif age >= 25:
        return 'adult'
    else:
        return 'young_adult'

def initialize_category_stats() -> dict:
    """
    Create empty statistics dictionary for a category.
    
    Returns:
        dict: Empty stats with count, emails, and total_age
    """
    return {
        'count': 0,
        'emails': [],
        'total_age': 0
    }

def add_user_to_category(category_stats: dict, user: dict) -> None:
    """
    Add a user to category statistics.
    
    Args:
        category_stats: Statistics dictionary for the category
        user: User dictionary to add
    """
    category_stats['count'] += 1
    category_stats['emails'].append(user['email'])
    category_stats['total_age'] += user['age']

def calculate_average_ages(results: dict) -> None:
    """
    Calculate average age for each category and remove total_age.
    
    Args:
        results: Results dictionary to update in-place
    """
    for category in results:
        results[category]['avg_age'] = results[category]['total_age'] / results[category]['count']
        del results[category]['total_age']

def process_user_data_clean(data: list) -> dict:
    """
    Process user data and categorize by age groups.
    
    Filters for active adult users with valid emails, then groups them
    into age categories (young_adult: 18-24, adult: 25-64, senior: 65+).
    
    Args:
        data: List of user dictionaries
        
    Returns:
        dict: Statistics for each age category including count, emails, and avg_age
    """
    results = {}
    
    for user in data:
        # Skip invalid users
        if not is_valid_user(user):
            continue
        
        # Determine age category
        category = get_age_category(user['age'])
        
        # Initialize category if needed
        if category not in results:
            results[category] = initialize_category_stats()
        
        # Add user to category
        add_user_to_category(results[category], user)
    
    # Calculate average ages
    calculate_average_ages(results)
    
    return results

# Test both versions
clean_result = process_user_data_clean(test_data)
print("Clean result:", clean_result)

Original result: {'adult': {'count': 2, 'emails': ['user1@test.com', 'user4@test.com'], 'avg_age': 32.5}, 'senior': {'count': 1, 'emails': ['user2@test.com'], 'avg_age': 70.0}}
Clean result: {'adult': {'count': 2, 'emails': ['user1@test.com', 'user4@test.com'], 'avg_age': 32.5}, 'senior': {'count': 1, 'emails': ['user2@test.com'], 'avg_age': 70.0}}


In [35]:
# Test Cell
def test_question_7():
    test_data = [
        {'type': 'user', 'active': True, 'age': 25, 'email': 'user1@test.com'},
        {'type': 'user', 'active': True, 'age': 70, 'email': 'user2@test.com'},
        {'type': 'user', 'active': False, 'age': 30, 'email': 'user3@test.com'},
        {'type': 'user', 'active': True, 'age': 20, 'email': 'user4@test.com'},
    ]
    
    original_result = process_data(test_data)
    clean_result = process_user_data_clean(test_data)
    
    # Results should be functionally equivalent
    assert set(original_result.keys()) == set(clean_result.keys()), "Categories don't match"
    
    for category in original_result:
        assert original_result[category]['count'] == clean_result[category]['count'], f"Count mismatch for {category}"
        assert abs(original_result[category]['avg_age'] - clean_result[category]['avg_age']) < 0.01, f"Average age mismatch for {category}"
    
    print("✓ Question 7 tests passed!")

test_question_7()

✓ Question 7 tests passed!


## Question 8: Debug Complex Logic (Algorithms)

**Scenario:** This AI implementation of binary search has subtle bugs. Find and fix all the issues.

**Requirements:**
- Fix the binary search algorithm
- Handle edge cases properly
- Maintain O(log n) time complexity
- Return correct index or -1 if not found

In [36]:
def binary_search_buggy(arr, target):
    """
    Binary search implementation - FIXED VERSION
    
    Args:
        arr: Sorted list of integers
        target: Value to search for
        
    Returns:
        int: Index of target if found, -1 otherwise
    """
    left = 0
    right = len(arr) - 1  # Fixed: was len(arr), should be len(arr) - 1
    
    while left <= right:  # Fixed: was left < right, should be left <= right
        mid = (left + right) // 2
        
        if arr[mid] == target:
            return mid
        elif arr[mid] < target:
            left = mid + 1  # Fixed: was left = mid, should be left = mid + 1
        else:
            right = mid - 1  # Fixed: was right = mid, should be right = mid - 1
    
    return -1

# Test cases
test_arrays = [
    ([1, 3, 5, 7, 9, 11], 7),    # Should find at index 3
    ([1, 3, 5, 7, 9, 11], 1),    # Should find at index 0
    ([1, 3, 5, 7, 9, 11], 11),   # Should find at index 5
    ([1, 3, 5, 7, 9, 11], 6),    # Should return -1
    ([5], 5),                     # Single element found
    ([5], 3),                     # Single element not found
    ([], 5),                      # Empty array
]

for arr, target in test_arrays:
    result = binary_search_buggy(arr, target)
    print(f"Searching for {target} in {arr}: {result}")

Searching for 7 in [1, 3, 5, 7, 9, 11]: 3
Searching for 1 in [1, 3, 5, 7, 9, 11]: 0
Searching for 11 in [1, 3, 5, 7, 9, 11]: 5
Searching for 6 in [1, 3, 5, 7, 9, 11]: -1
Searching for 5 in [5]: 0
Searching for 3 in [5]: -1
Searching for 5 in []: -1


In [37]:
# Test Cell
def test_question_8():
    # Test cases with expected results
    test_cases = [
        ([1, 3, 5, 7, 9, 11], 7, 3),      # Found at index 3
        ([1, 3, 5, 7, 9, 11], 1, 0),      # Found at index 0
        ([1, 3, 5, 7, 9, 11], 11, 5),     # Found at index 5
        ([1, 3, 5, 7, 9, 11], 6, -1),     # Not found
        ([1, 3, 5, 7, 9, 11], 0, -1),     # Less than min
        ([1, 3, 5, 7, 9, 11], 12, -1),    # Greater than max
        ([5], 5, 0),                       # Single element found
        ([5], 3, -1),                      # Single element not found
        ([], 5, -1),                       # Empty array
    ]
    
    for arr, target, expected in test_cases:
        result = binary_search_buggy(arr, target)
        assert result == expected, f"Failed for {target} in {arr}: expected {expected}, got {result}"
    
    # Test that it actually uses binary search (check performance)
    large_array = list(range(0, 10000, 2))  # [0, 2, 4, 6, ..., 9998]
    result = binary_search_buggy(large_array, 5000)
    assert result == 2500, "Should find 5000 at index 2500"
    
    print("✓ Question 8 tests passed!")

test_question_8()

✓ Question 8 tests passed!


## Question 9: Add Missing Functionality

**Scenario:** This AI code provides a basic cache implementation but is missing several key features. Add the missing functionality to make it production-ready.

**Requirements:**
- Add TTL (time-to-live) support for automatic expiration
- Add size limit with LRU (Least Recently Used) eviction
- Add cache statistics tracking (hits, misses, evictions)
- Add methods for cache management (clear, size, cleanup)
- Handle thread safety considerations

In [38]:
import time
from typing import Any, Optional, Dict
from collections import OrderedDict

class SimpleCache:
    """
    Enhanced cache with TTL, LRU eviction, and statistics tracking.
    """
    
    def __init__(self, max_size: int = 100, default_ttl: Optional[int] = None):
        """
        Initialize cache with size limit and default TTL.
        
        Args:
            max_size: Maximum number of items to store
            default_ttl: Default time-to-live in seconds (None = no expiration)
        """
        self.max_size = max_size
        self.default_ttl = default_ttl
        
        # OrderedDict for LRU tracking (maintains insertion/access order)
        self._data = OrderedDict()
        
        # TTL tracking: key -> expiration timestamp
        self._expiration_times = {}
        
        # Statistics
        self._stats = {
            'hits': 0,
            'misses': 0,
            'evictions': 0
        }
    
    def get(self, key: str) -> Optional[Any]:
        """
        Get value from cache.
        
        Args:
            key: Cache key
            
        Returns:
            Cached value or None if not found/expired
        """
        # Check if key exists
        if key not in self._data:
            self._stats['misses'] += 1
            return None
        
        # Check if expired
        if self._is_expired(key):
            self.delete(key)
            self._stats['misses'] += 1
            return None
        
        # Update LRU order (move to end = most recently used)
        self._data.move_to_end(key)
        
        # Update statistics
        self._stats['hits'] += 1
        
        return self._data[key]
    
    def set(self, key: str, value: Any, ttl: Optional[int] = None) -> None:
        """
        Set value in cache.
        
        Args:
            key: Cache key
            value: Value to cache
            ttl: Time-to-live in seconds (overrides default)
        """
        # If key exists, remove it first (will re-add at end for LRU)
        if key in self._data:
            del self._data[key]
            if key in self._expiration_times:
                del self._expiration_times[key]
        
        # Check if cache is full and evict LRU items
        if len(self._data) >= self.max_size:
            self._evict_lru(1)
        
        # Store value
        self._data[key] = value
        
        # Calculate and store expiration time if TTL provided
        effective_ttl = ttl if ttl is not None else self.default_ttl
        if effective_ttl is not None:
            self._expiration_times[key] = time.time() + effective_ttl
    
    def delete(self, key: str) -> bool:
        """Delete key from cache."""
        if key in self._data:
            del self._data[key]
            if key in self._expiration_times:
                del self._expiration_times[key]
            return True
        return False
    
    def clear(self) -> None:
        """Clear all items from cache."""
        self._data.clear()
        self._expiration_times.clear()
    
    def size(self) -> int:
        """Return current number of items in cache."""
        return len(self._data)
    
    def get_stats(self) -> Dict[str, int]:
        """
        Get cache statistics.
        
        Returns:
            Dict with keys: hits, misses, evictions, current_size
        """
        return {
            'hits': self._stats['hits'],
            'misses': self._stats['misses'],
            'evictions': self._stats['evictions'],
            'current_size': len(self._data)
        }
    
    def cleanup_expired(self) -> int:
        """
        Remove expired items from cache.
        
        Returns:
            Number of items removed
        """
        expired_keys = []
        current_time = time.time()
        
        for key, expiration_time in self._expiration_times.items():
            if current_time >= expiration_time:
                expired_keys.append(key)
        
        for key in expired_keys:
            self.delete(key)
        
        return len(expired_keys)
    
    def _evict_lru(self, count: int = 1) -> int:
        """
        Evict least recently used items.
        
        Args:
            count: Number of items to evict
            
        Returns:
            Number of items actually evicted
        """
        evicted = 0
        for _ in range(count):
            if not self._data:
                break
            
            # Remove first item (least recently used in OrderedDict)
            key, _ = self._data.popitem(last=False)
            
            if key in self._expiration_times:
                del self._expiration_times[key]
            
            evicted += 1
            self._stats['evictions'] += 1
        
        return evicted
    
    def _is_expired(self, key: str) -> bool:
        """Check if a cache entry has expired."""
        if key not in self._expiration_times:
            return False
        
        return time.time() >= self._expiration_times[key]

# Test your enhanced implementation
if __name__ == "__main__":
    # Test TTL functionality
    cache = SimpleCache(max_size=3, default_ttl=1)  # 1 second TTL
    
    print("=== Testing TTL ===")
    cache.set("temp_key", "temp_value")
    print(f"Immediately after set: {cache.get('temp_key')}")
    time.sleep(1.1)
    print(f"After TTL expired: {cache.get('temp_key')}")
    
    print("\n=== Testing Size Limits & LRU ===")
    cache.clear()
    cache.set("a", 1, ttl=None)  # No expiration
    cache.set("b", 2, ttl=None)
    cache.set("c", 3, ttl=None)
    print(f"Cache size after adding 3 items: {cache.size()}")
    
    # Access 'a' to make it recently used
    cache.get("a")
    
    # Add 'd' which should evict 'b' (least recently used)
    cache.set("d", 4, ttl=None)
    print(f"After adding 'd': a={cache.get('a')}, b={cache.get('b')}, c={cache.get('c')}, d={cache.get('d')}")
    
    print("\n=== Testing Statistics ===")
    stats = cache.get_stats()
    print(f"Cache statistics: {stats}")
    
    print("\n=== Testing Cleanup ===")
    cache.set("expire_me", "value", ttl=1)
    time.sleep(1.1)
    removed_count = cache.cleanup_expired()
    print(f"Expired items removed: {removed_count}")

=== Testing TTL ===
Immediately after set: temp_value
After TTL expired: None

=== Testing Size Limits & LRU ===
Cache size after adding 3 items: 3
After adding 'd': a=1, b=None, c=3, d=4

=== Testing Statistics ===
Cache statistics: {'hits': 5, 'misses': 2, 'evictions': 1, 'current_size': 3}

=== Testing Cleanup ===
Expired items removed: 3


In [39]:
# Test Cell 
import time

def test_question_9():
    print("Testing enhanced cache implementation...")
    
    # Test 1: Basic functionality
    cache = SimpleCache(max_size=3, default_ttl=60)
    
    cache.set("key1", "value1")
    cache.set("key2", "value2")
    
    assert cache.get("key1") == "value1", "Basic get/set failed"
    assert cache.get("key2") == "value2", "Basic get/set failed"
    assert cache.size() == 2, f"Expected size 2, got {cache.size()}"
    
    # Test 2: TTL expiration
    cache.clear()
    cache.set("ttl_key", "ttl_value", ttl=1)  # 1 second TTL
    assert cache.get("ttl_key") == "ttl_value", "TTL key should be accessible immediately"
    
    time.sleep(1.1)  # Wait for expiration
    assert cache.get("ttl_key") is None, "TTL key should be expired and return None"
    
    # Test 3: Size limits and LRU eviction
    cache.clear()
    cache.set("a", 1)
    cache.set("b", 2) 
    cache.set("c", 3)  # Cache is now full (max_size=3)
    
    # Access 'a' to make it recently used
    cache.get("a")
    
    # Add 'd' which should evict 'b' (least recently used)
    cache.set("d", 4)
    
    assert cache.get("a") == 1, "Recently used 'a' should not be evicted"
    assert cache.get("b") is None, "Least recently used 'b' should be evicted"
    assert cache.get("c") == 3, "'c' should still be in cache"
    assert cache.get("d") == 4, "Newly added 'd' should be in cache"
    assert cache.size() == 3, "Cache size should remain at max_size"
    
    # Test 4: Statistics tracking
    cache.clear()
    cache.set("stat_key", "stat_value")
    cache.get("stat_key")  # Hit
    cache.get("nonexistent")  # Miss
    
    stats = cache.get_stats()
    required_stats = ["hits", "misses", "evictions", "current_size"]
    for stat in required_stats:
        assert stat in stats, f"Missing statistic: {stat}"
    
    assert stats["hits"] > 0, "Should have recorded hits"
    assert stats["misses"] > 0, "Should have recorded misses"
    assert stats["current_size"] == 1, "Should track current size"
    
    # Test 5: Manual cleanup
    cache.clear()
    cache.set("expire1", "value1", ttl=1)
    cache.set("expire2", "value2", ttl=1)
    cache.set("keep", "value3", ttl=None)  # No expiration
    
    time.sleep(1.1)  # Wait for expiration
    removed_count = cache.cleanup_expired()
    
    assert removed_count == 2, f"Should have removed 2 expired items, removed {removed_count}"
    assert cache.get("keep") == "value3", "Non-expiring item should remain"
    assert cache.size() == 1, "Only one item should remain after cleanup"
    
    # Test 6: Edge cases
    cache.clear()
    assert cache.size() == 0, "Cache should be empty after clear"
    assert cache.get("nonexistent") is None, "Getting non-existent key should return None"
    assert cache.delete("nonexistent") == False, "Deleting non-existent key should return False"
    
    # Test delete functionality
    cache.set("delete_me", "value")
    assert cache.delete("delete_me") == True, "Deleting existing key should return True"
    assert cache.get("delete_me") is None, "Deleted key should not be accessible"
    
    print("✓ All Question 9 tests passed!")

test_question_9()


Testing enhanced cache implementation...
✓ All Question 9 tests passed!


## Question 10: Integration Challenge (Multiple Components)

**Scenario:** You have three separate AI-generated modules that need to work together in a data processing pipeline, but they have interface mismatches and compatibility issues. Your job is to create the integration layer that makes them work together seamlessly.

**Requirements:**
- Create adapter/wrapper functions to handle data format conversions
- Build a unified pipeline that chains all three components
- Add comprehensive error handling for the integration
- Handle edge cases and invalid data gracefully
- Create helper functions for data transformation


In [40]:
import json
from typing import List, Dict, Any, Tuple, Optional, Union

# Component 1: Data Processor (returns dict with specific structure)
class DataProcessor:
    """AI Component 1 - processes raw data and returns structured dict"""
    
    def process_data(self, raw_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Process raw data and return structured dict."""
        if not isinstance(raw_data, list):
            raise ValueError("Expected list input")
        
        result = {
            'total_items': len(raw_data),
            'processed_items': [],
            'metadata': {'processing_time': 0.1, 'timestamp': '2024-01-01T12:00:00Z'}
        }
        
        for item in raw_data:
            if isinstance(item, dict) and 'value' in item:
                result['processed_items'].append({
                    'id': item.get('id', 'unknown'),
                    'processed_value': item['value'] * 2,
                    'original_value': item['value'],
                    'status': 'processed'
                })
            else:
                result['processed_items'].append({
                    'id': 'error',
                    'processed_value': 0,
                    'original_value': None,
                    'status': 'failed'
                })
        
        return result

# Component 2: Analytics Engine (expects JSON string, returns tuple)
class AnalyticsEngine:
    """AI Component 2 - performs analytics on data, expects JSON string input"""
    
    def analyze(self, json_data_string: str) -> Tuple[Optional[str], Union[Dict[str, float], str]]:
        """Analyze data from JSON string, return (summary, metrics) tuple."""
        try:
            data = json.loads(json_data_string)
        except json.JSONDecodeError:
            return None, "Invalid JSON format"
        
        if not isinstance(data, dict) or 'processed_items' not in data:
            return None, "Missing processed_items in data structure"
        
        items = data['processed_items']
        if not isinstance(items, list):
            return None, "processed_items must be a list"
        
        # Extract numeric values for analysis
        values = []
        failed_count = 0
        
        for item in items:
            if isinstance(item, dict) and item.get('status') == 'processed':
                if 'processed_value' in item and isinstance(item['processed_value'], (int, float)):
                    values.append(item['processed_value'])
            else:
                failed_count += 1
        
        if not values:
            return None, "No valid numeric data found for analysis"
        
        summary = f"Analyzed {len(items)} items ({len(values)} successful, {failed_count} failed)"
        metrics = {
            'avg_value': sum(values) / len(values),
            'max_value': max(values),
            'min_value': min(values),
            'total_value': sum(values),
            'success_rate': len(values) / len(items) if items else 0.0
        }
        
        return summary, metrics

# Component 3: Report Generator (expects list of tuples, returns formatted string)
class ReportGenerator:
    """AI Component 3 - generates reports from analytics results"""
    
    def generate_report(self, analytics_results_list: List[Tuple[Optional[str], Union[Dict, str]]]) -> str:
        """Generate report from list of (summary, metrics) tuples."""
        if not isinstance(analytics_results_list, list):
            return "Error: Expected list input for report generation"
        
        if not analytics_results_list:
            return "Error: No data provided for report generation"
        
        report_lines = [
            "=" * 50,
            "           ANALYSIS REPORT",
            "=" * 50
        ]
        
        for i, result in enumerate(analytics_results_list):
            if not isinstance(result, tuple) or len(result) != 2:
                report_lines.append(f"\nSection {i+1}: Invalid data format - expected (summary, metrics) tuple")
                continue
            
            summary, metrics = result
            
            if summary is None:
                report_lines.append(f"\nSection {i+1}: Analysis failed")
                report_lines.append(f"  Error: {metrics}")
                continue
            
            report_lines.append(f"\nSection {i+1}: {summary}")
            
            if isinstance(metrics, dict):
                report_lines.append("  Metrics:")
                for key, value in metrics.items():
                    if isinstance(value, float):
                        report_lines.append(f"    {key}: {value:.2f}")
                    else:
                        report_lines.append(f"    {key}: {value}")
            else:
                report_lines.append(f"  Metrics: {metrics}")
        
        report_lines.append("\n" + "=" * 50)
        return "\n".join(report_lines)

# Integration functions implementation

def dict_to_json_adapter(data_dict: Dict[str, Any]) -> str:
    """
    Convert dictionary to JSON string for AnalyticsEngine.
    
    Args:
        data_dict: Dictionary from DataProcessor
        
    Returns:
        JSON string suitable for AnalyticsEngine
    """
    try:
        return json.dumps(data_dict)
    except (TypeError, ValueError) as e:
        # Return empty valid JSON structure if conversion fails
        return json.dumps({'processed_items': []})

def validate_and_clean_raw_data(raw_data: Any) -> List[Dict[str, Any]]:
    """
    Validate and clean raw input data.
    
    Args:
        raw_data: Input data of any type
        
    Returns:
        Cleaned list of dictionaries
    """
    # Handle non-list inputs
    if not isinstance(raw_data, list):
        return []
    
    cleaned = []
    for item in raw_data:
        # Only keep dictionary items
        if isinstance(item, dict):
            cleaned.append(item)
    
    return cleaned

def integrated_pipeline(raw_data_list: List[Any]) -> str:
    """
    Integrate all three components to process data end-to-end.
    
    This function:
    1. Validates and cleans each raw dataset
    2. Processes each dataset through DataProcessor
    3. Converts results to format expected by AnalyticsEngine
    4. Runs analytics on each processed dataset
    5. Collects all analytics results
    6. Generates final report using ReportGenerator
    7. Handles all errors gracefully
    
    Args:
        raw_data_list: List of raw data sets to process
        
    Returns:
        str: Final report combining all analyses
    """
    # Initialize components
    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()
    
    # Handle empty input
    if not raw_data_list:
        return reporter.generate_report([])
    
    # Process each dataset
    analytics_results = []
    
    for raw_data in raw_data_list:
        try:
            # Step 1: Validate and clean the data
            cleaned_data = validate_and_clean_raw_data(raw_data)
            
            # Step 2: Process through DataProcessor
            try:
                processed_data = processor.process_data(cleaned_data)
            except Exception as e:
                # If processing fails, add error result
                analytics_results.append((None, f"Data processing error: {str(e)}"))
                continue
            
            # Step 3: Convert to JSON for AnalyticsEngine
            json_data = dict_to_json_adapter(processed_data)
            
            # Step 4: Run analytics
            try:
                analysis_result = analytics.analyze(json_data)
                analytics_results.append(analysis_result)
            except Exception as e:
                # If analysis fails, add error result
                analytics_results.append((None, f"Analysis error: {str(e)}"))
                
        except Exception as e:
            # Catch any unexpected errors
            analytics_results.append((None, f"Unexpected error: {str(e)}"))
    
    # Step 5: Generate final report
    try:
        return reporter.generate_report(analytics_results)
    except Exception as e:
        return f"Error generating report: {str(e)}"

def create_sample_data() -> List[List[Dict[str, Any]]]:
    """Create sample test data for the pipeline."""
    return [
        # Dataset 1: Normal data
        [
            {'id': 'A1', 'value': 10},
            {'id': 'A2', 'value': 20},
            {'id': 'A3', 'value': 15}
        ],
        # Dataset 2: Smaller dataset
        [
            {'id': 'B1', 'value': 5},
            {'id': 'B2', 'value': 25}
        ],
        # Dataset 3: Mixed data with issues
        [
            {'id': 'C1', 'value': 30},
            {'id': 'C2'},  # Missing value
            {'value': 40},  # Missing id
            {'id': 'C4', 'value': 'invalid'},  # Invalid value type
        ]
    ]

# Test the integration
if __name__ == "__main__":
    print("Testing component integration...")
    
    # Test individual components first
    print("\n=== Testing Individual Components ===")
    
    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()
    
    # Test DataProcessor
    test_data = [{'id': 'test', 'value': 10}]
    processed = processor.process_data(test_data)
    print(f"DataProcessor output: {processed}")
    
    # Test AnalyticsEngine
    json_data = json.dumps(processed)
    analysis_result = analytics.analyze(json_data)
    print(f"AnalyticsEngine output: {analysis_result}")
    
    # Test ReportGenerator
    report = reporter.generate_report([analysis_result])
    print(f"ReportGenerator output:\n{report}")
    
    print("\n=== Testing Integrated Pipeline ===")
    
    # Test full pipeline
    sample_datasets = create_sample_data()
    
    try:
        final_report = integrated_pipeline(sample_datasets)
        print("Integration successful!")
        print(final_report)
    except Exception as e:
        print(f"Integration failed: {e}")
        import traceback
        traceback.print_exc()

Testing component integration...

=== Testing Individual Components ===
DataProcessor output: {'total_items': 1, 'processed_items': [{'id': 'test', 'processed_value': 20, 'original_value': 10, 'status': 'processed'}], 'metadata': {'processing_time': 0.1, 'timestamp': '2024-01-01T12:00:00Z'}}
AnalyticsEngine output: ('Analyzed 1 items (1 successful, 0 failed)', {'avg_value': 20.0, 'max_value': 20, 'min_value': 20, 'total_value': 20, 'success_rate': 1.0})
ReportGenerator output:
           ANALYSIS REPORT

Section 1: Analyzed 1 items (1 successful, 0 failed)
  Metrics:
    avg_value: 20.00
    max_value: 20
    min_value: 20
    total_value: 20
    success_rate: 1.00


=== Testing Integrated Pipeline ===
Integration successful!
           ANALYSIS REPORT

Section 1: Analyzed 3 items (3 successful, 0 failed)
  Metrics:
    avg_value: 30.00
    max_value: 40
    min_value: 20
    total_value: 90
    success_rate: 1.00

Section 2: Analyzed 2 items (2 successful, 0 failed)
  Metrics:
    avg

In [41]:
# Test Cell
def test_question_10():
    print("Testing integrated pipeline...")
    
    # Test 1: Individual component functionality
    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()
    
    # Test DataProcessor
    test_data = [{'id': 'test1', 'value': 10}, {'id': 'test2', 'value': 20}]
    processed = processor.process_data(test_data)
    
    assert isinstance(processed, dict), "DataProcessor should return dict"
    assert 'total_items' in processed, "Missing total_items in processed data"
    assert 'processed_items' in processed, "Missing processed_items in processed data"
    assert processed['total_items'] == 2, "Should count items correctly"
    
    # Test AnalyticsEngine
    json_data = json.dumps(processed)
    summary, metrics = analytics.analyze(json_data)
    
    assert summary is not None, "Analytics should return valid summary"
    assert isinstance(metrics, dict), "Analytics should return metrics dict"
    assert 'avg_value' in metrics, "Missing avg_value in metrics"
    
    # Test ReportGenerator
    report = reporter.generate_report([(summary, metrics)])
    
    assert isinstance(report, str), "Report should be string"
    assert "ANALYSIS REPORT" in report, "Report should contain header"
    assert "Section 1" in report, "Report should contain section"
    
    # Test 2: Data validation and cleaning
    cleaned_data = validate_and_clean_raw_data([
        {'id': 'valid', 'value': 10},
        {'value': 20},  # Missing id
        {'id': 'invalid'},  # Missing value
        'invalid_format'  # Wrong format
    ])
    
    assert isinstance(cleaned_data, list), "Should return list"
    # Should handle invalid data gracefully
    
    # Test 3: Integration adapters
    test_dict = {'processed_items': [{'processed_value': 10}]}
    json_str = dict_to_json_adapter(test_dict)
    
    assert isinstance(json_str, str), "Should return JSON string"
    # Should be valid JSON
    parsed = json.loads(json_str)
    assert parsed == test_dict, "Should preserve data structure"
    
    # Test 4: Full pipeline integration
    sample_datasets = [
        [{'id': 'A1', 'value': 10}, {'id': 'A2', 'value': 20}],
        [{'id': 'B1', 'value': 5}],
        []  # Empty dataset
    ]
    
    final_report = integrated_pipeline(sample_datasets)
    
    assert isinstance(final_report, str), "Pipeline should return string report"
    assert "ANALYSIS REPORT" in final_report, "Should contain report header"
    
    # Should handle multiple sections
    assert "Section 1" in final_report, "Should have first section"
    assert "Section 2" in final_report, "Should have second section"
    
    # Test 5: Error handling
    # Test with invalid input
    error_report = integrated_pipeline([])
    assert isinstance(error_report, str), "Should handle empty input gracefully"
    
    # Test with malformed data
    malformed_report = integrated_pipeline([["not", "a", "dict", "list"]])
    assert isinstance(malformed_report, str), "Should handle malformed data"
    
    # Test 6: Edge cases
    edge_cases = [
        [{'id': 'only_id'}],  # Missing value
        [{'value': 42}],      # Missing id
        [{}],                 # Empty dict
    ]
    
    edge_report = integrated_pipeline(edge_cases)
    assert isinstance(edge_report, str), "Should handle edge cases"
    assert "ANALYSIS REPORT" in edge_report, "Should still generate report structure"
    
    print("✓ All Question 10 tests passed!")

# Run the test
test_question_10()

Testing integrated pipeline...
✓ All Question 10 tests passed!


## Final Submission Instructions

### Before You Submit:

**Code Quality Checklist:**
- All test cells pass without errors
- Code follows Python best practices and conventions  
- Functions include appropriate documentation
- Error handling is implemented where required
- Edge cases are handled appropriately
- Code is clean, readable, and maintainable

**Save Your Work:**
- **Save all code outputs** - Run all cells and keep the output visible
- Save the notebook file (Ctrl+S / Cmd+S)
- Verify all your implementations are in the correct code cells
- Double-check that test cells show "tests passed!" messages

### Submission Format:
Submit your completed `firstname_lastname.ipynb` file with **all outputs preserved**. We want to see:
- Your code implementations
- Test results (passed/failed)
- Any debugging output or print statements
- Cell execution numbers


