In [12]:
import json
from typing import List, Dict, Any, Tuple, Optional, Union
import pandas as pd

# Component 1: Data Processor (returns dict with specific structure)
class DataProcessor:
    """AI Component 1 - processes raw data and returns structured dict"""

    def process_data(self, raw_data: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Process raw data and return structured dict."""
        if not isinstance(raw_data, list):
            raise ValueError("Expected list input")

        result = {
            'total_items': len(raw_data),
            'processed_items': [],
            'metadata': {'processing_time': 0.1, 'timestamp': '2024-01-01T12:00:00Z'}
        }

        for item in raw_data:
            if isinstance(item, dict) and 'value' in item:
                result['processed_items'].append({
                    'id': item.get('id', 'unknown'),
                    'processed_value': item['value'] * 2,
                    'original_value': item['value'],
                    'status': 'processed'
                })
            else:
                result['processed_items'].append({
                    'id': 'error',
                    'processed_value': 0,
                    'original_value': None,
                    'status': 'failed'
                })

        return result

# Component 2: Analytics Engine (expects JSON string, returns tuple)
class AnalyticsEngine:
    """AI Component 2 - performs analytics on data, expects JSON string input"""

    def analyze(self, json_data_string: str) -> Tuple[Optional[str], Union[Dict[str, float], str]]:
        """Analyze data from JSON string, return (summary, metrics) tuple."""
        try:
            data = json.loads(json_data_string)
        except json.JSONDecodeError:
            return None, "Invalid JSON format"

        if not isinstance(data, dict) or 'processed_items' not in data:
            return None, "Missing processed_items in data structure"

        items = data['processed_items']
        if not isinstance(items, list):
            return None, "processed_items must be a list"

        # Extract numeric values for analysis
        values = []
        failed_count = 0

        for item in items:
            if isinstance(item, dict) and item.get('status') == 'processed':
                if 'processed_value' in item and isinstance(item['processed_value'], (int, float)):
                    values.append(item['processed_value'])
            else:
                failed_count += 1

        if not values:
            return None, "No valid numeric data found for analysis"

        summary = f"Analyzed {len(items)} items ({len(values)} successful, {failed_count} failed)"
        metrics = {
            'avg_value': sum(values) / len(values),
            'max_value': max(values),
            'min_value': min(values),
            'total_value': sum(values),
            'success_rate': len(values) / len(items) if items else 0.0
        }

        return summary, metrics

# Component 3: Report Generator (expects list of tuples, returns formatted string)
class ReportGenerator:
    """AI Component 3 - generates reports from analytics results"""

    def generate_report(self, analytics_results_list: List[Tuple[Optional[str], Union[Dict, str]]]) -> str:
        """Generate report from list of (summary, metrics) tuples."""
        if not isinstance(analytics_results_list, list):
            return "Error: Expected list input for report generation"

        if not analytics_results_list:
            return "Error: No data provided for report generation"

        report_lines = [
            "=" * 50,
            "           ANALYSIS REPORT",
            "=" * 50
        ]

        for i, result in enumerate(analytics_results_list):
            if not isinstance(result, tuple) or len(result) != 2:
                report_lines.append(f"\nSection {i+1}: Invalid data format - expected (summary, metrics) tuple")
                continue

            summary, metrics = result

            if summary is None:
                report_lines.append(f"\nSection {i+1}: Analysis failed")
                report_lines.append(f"  Error: {metrics}")
                continue

            report_lines.append(f"\nSection {i+1}: {summary}")

            if isinstance(metrics, dict):
                report_lines.append("  Metrics:")
                for key, value in metrics.items():
                    if isinstance(value, float):
                        report_lines.append(f"    {key}: {value:.2f}")
                    else:
                        report_lines.append(f"    {key}: {value}")
            else:
                report_lines.append(f"  Metrics: {metrics}")

        report_lines.append("\n" + "=" * 50)
        return "\n".join(report_lines)

# Integration Functions

def dict_to_json_adapter(data_dict: Dict[str, Any]) -> str:
    """
    Convert dictionary to JSON string for AnalyticsEngine.

    Args:
        data_dict: Dictionary from DataProcessor

    Returns:
        JSON string suitable for AnalyticsEngine
    """
    try:
        # ✅ Validate input type
        if not isinstance(data_dict, dict):
            raise ValueError("Input must be a dictionary")

        # ✅ Convert to JSON with proper error handling
        json_string = json.dumps(data_dict, indent=2, default=str)
        return json_string

    except (TypeError, ValueError) as e:
        print(f"JSON conversion error: {e}")
        return json.dumps({"error": f"JSON conversion failed: {str(e)}"})

def validate_and_clean_raw_data(raw_data: Any) -> List[Dict[str, Any]]:
    """
    Validate and clean raw input data.

    Args:
        raw_data: Input data of any type

    Returns:
        Cleaned list of dictionaries
    """
    # ✅ Handle None or empty input
    if raw_data is None:
        return []

    # ✅ Convert to list if not already
    if not isinstance(raw_data, list):
        return []

    cleaned_data = []

    for item in raw_data:
        # ✅ Handle different data types
        if isinstance(item, dict):
            # Ensure required structure
            cleaned_item = {
                'id': item.get('id', 'unknown'),
                'value': item.get('value', 0)
            }
            cleaned_data.append(cleaned_item)
        elif isinstance(item, (int, float)):
            # Convert simple numbers to dict format
            cleaned_data.append({'id': 'auto_generated', 'value': item})
        # Skip other invalid types

    return cleaned_data

def integrated_pipeline(raw_data_list: List[Any]) -> str:
    """
    Integrate all three components to process data end-to-end.

    This function should:
    1. Validate and clean each raw dataset
    2. Process each dataset through DataProcessor
    3. Convert results to format expected by AnalyticsEngine
    4. Run analytics on each processed dataset
    5. Collect all analytics results
    6. Generate final report using ReportGenerator
    7. Handle all errors gracefully

    Args:
        raw_data_list: List of raw data sets to process

    Returns:
        str: Final report combining all analyses
    """
    # ✅ Initialize components
    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()

    analytics_results = []

    # ✅ Process each dataset
    for i, raw_data in enumerate(raw_data_list):
        print(f"Processing dataset {i+1}...")

        try:
            # Step 1: Validate and clean data
            cleaned_data = validate_and_clean_raw_data(raw_data)

            if not cleaned_data:
                print(f"  Warning: Dataset {i+1} has no valid data after cleaning")
                analytics_results.append((None, "No valid data to process"))
                continue

            # Step 2: Process through DataProcessor
            processed_data = processor.process_data(cleaned_data)

            # Step 3: Convert to JSON for AnalyticsEngine
            json_data = dict_to_json_adapter(processed_data)

            # Step 4: Run analytics
            analysis_result = analytics.analyze(json_data)

            # Step 5: Collect results
            analytics_results.append(analysis_result)

            print(f"  Successfully processed dataset {i+1}")

        except Exception as e:
            # ✅ Handle any errors gracefully
            error_msg = f"Error processing dataset {i+1}: {str(e)}"
            print(f"  {error_msg}")
            analytics_results.append((None, error_msg))

    # Step 6: Generate final report
    final_report = reporter.generate_report(analytics_results)

    return final_report

def create_sample_data() -> List[List[Dict[str, Any]]]:
    """Create sample test data for the pipeline."""
    return [
        # Dataset 1: Normal data
        [
            {'id': 'A1', 'value': 10},
            {'id': 'A2', 'value': 20},
            {'id': 'A3', 'value': 15}
        ],
        # Dataset 2: Smaller dataset
        [
            {'id': 'B1', 'value': 5},
            {'id': 'B2', 'value': 25}
        ],
        # Dataset 3: Mixed data with issues
        [
            {'id': 'C1', 'value': 30},
            {'id': 'C2'},  # Missing value
            {'value': 40},  # Missing id
            {'id': 'C4', 'value': 'invalid'},  # Invalid value type
        ],
        # Dataset 4: Empty dataset
        []
    ]

# Enhanced test with error cases
def create_comprehensive_test_data():
    """Create comprehensive test data including edge cases."""
    return [
        # Normal data
        [{'id': 'X1', 'value': 100}, {'id': 'X2', 'value': 200}],
        # Mixed valid/invalid
        [{'id': 'Y1', 'value': 50}, {'invalid': 'data'}, 123],
        # All invalid
        [None, 'string', [], {}],
        # Empty
        []
    ]

# Test the integration
if __name__ == "__main__":
    print("Testing component integration...")

    # Test individual components first
    print("\n" + "="*60)
    print("TESTING INDIVIDUAL COMPONENTS")
    print("="*60)

    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()

    # Test DataProcessor
    test_data = [{'id': 'test', 'value': 10}]
    processed = processor.process_data(test_data)
    print(f"✓ DataProcessor output keys: {list(processed.keys())}")

    # Test AnalyticsEngine
    json_data = dict_to_json_adapter(processed)
    analysis_result = analytics.analyze(json_data)
    print(f"✓ AnalyticsEngine output: {analysis_result[0]}")

    # Test ReportGenerator
    report = reporter.generate_report([analysis_result])
    print(f"✓ ReportGenerator output length: {len(report)} characters")

    print("\n" + "="*60)
    print("TESTING INTEGRATED PIPELINE")
    print("="*60)

    # Test full pipeline
    sample_datasets = create_sample_data()

    try:
        final_report = integrated_pipeline(sample_datasets)
        print("🎉 Integration successful!")
        print("\nFINAL REPORT:")
        print(final_report)
    except Exception as e:
        print(f"❌ Integration failed: {e}")
        import traceback
        traceback.print_exc()

    print("\n" + "="*60)
    print("TESTING EDGE CASES")
    print("="*60)

    # Test edge cases
    edge_cases = create_comprehensive_test_data()
    edge_report = integrated_pipeline(edge_cases)
    print("Edge cases report generated successfully!")
    print(f"Report preview: {edge_report[:200]}...")

# Additional utility functions
def pipeline_with_metrics(raw_data_list: List[Any]) -> Dict[str, Any]:
    """
    Enhanced pipeline that returns metrics along with report.

    Returns:
        Dict containing report, success_rate, and processing_stats
    """
    processor = DataProcessor()
    analytics = AnalyticsEngine()
    reporter = ReportGenerator()

    analytics_results = []
    processing_stats = {
        'total_datasets': len(raw_data_list),
        'successful_processing': 0,
        'failed_processing': 0,
        'total_items_processed': 0
    }

    for i, raw_data in enumerate(raw_data_list):
        try:
            cleaned_data = validate_and_clean_raw_data(raw_data)
            processing_stats['total_items_processed'] += len(cleaned_data)

            if cleaned_data:
                processed_data = processor.process_data(cleaned_data)
                json_data = dict_to_json_adapter(processed_data)
                analysis_result = analytics.analyze(json_data)

                if analysis_result[0] is not None:  # Success case
                    processing_stats['successful_processing'] += 1
                else:
                    processing_stats['failed_processing'] += 1

                analytics_results.append(analysis_result)
            else:
                processing_stats['failed_processing'] += 1
                analytics_results.append((None, "No valid data"))
        except Exception as e:
            processing_stats['failed_processing'] += 1
            analytics_results.append((None, f"Processing error: {str(e)}"))

    final_report = reporter.generate_report(analytics_results)

    return {
        'report': final_report,
        'processing_stats': processing_stats,
        'success_rate': processing_stats['successful_processing'] / processing_stats['total_datasets'] if processing_stats['total_datasets'] > 0 else 0
    }
# Test Output:
# Quick test
sample_data = create_sample_data()
result = integrated_pipeline(sample_data)
print(result)

# Test with metrics
metrics_result = pipeline_with_metrics(sample_data)
print(f"\nSuccess Rate: {metrics_result['success_rate']:.2%}")
print(f"Processing Stats: {metrics_result['processing_stats']}")


Testing component integration...

TESTING INDIVIDUAL COMPONENTS
✓ DataProcessor output keys: ['total_items', 'processed_items', 'metadata']
✓ AnalyticsEngine output: Analyzed 1 items (1 successful, 0 failed)
✓ ReportGenerator output length: 345 characters

TESTING INTEGRATED PIPELINE
Processing dataset 1...
  Successfully processed dataset 1
Processing dataset 2...
  Successfully processed dataset 2
Processing dataset 3...
  Successfully processed dataset 3
Processing dataset 4...
🎉 Integration successful!

FINAL REPORT:
           ANALYSIS REPORT

Section 1: Analyzed 3 items (3 successful, 0 failed)
  Metrics:
    avg_value: 30.00
    max_value: 40
    min_value: 20
    total_value: 90
    success_rate: 1.00

Section 2: Analyzed 2 items (2 successful, 0 failed)
  Metrics:
    avg_value: 30.00
    max_value: 50
    min_value: 10
    total_value: 60
    success_rate: 1.00

Section 3: Analyzed 4 items (3 successful, 0 failed)
  Metrics:
    avg_value: 46.67
    max_value: 80
    min_valu