# Vehicle Damage Detection - Advanced Usage

This notebook demonstrates advanced features of the Vehicle Damage Detection System including:
- Batch processing
- Async tasks with Celery
- Custom model configuration
- Performance monitoring
- Error handling strategies

In [None]:
import requests
import json
import time
import concurrent.futures
from pathlib import Path
import pandas as pd
from datetime import datetime
import matplotlib.pyplot as plt
import seaborn as sns

API_URL = "http://localhost:8000"

print("Advanced Vehicle Damage Detection Demo")
print("=" * 50)

## 1. Performance Benchmarking

In [None]:
def benchmark_analysis(image_paths, description="Benchmark"):
    """
    Benchmark analysis performance for multiple images.
    
    Args:
        image_paths: List of image file paths
        description: Description for the benchmark
    
    Returns:
        Performance metrics dictionary
    """
    print(f"\nüöÄ Starting {description}")
    print(f"Images to process: {len(image_paths)}")
    
    results = []
    start_time = time.time()
    
    for i, image_path in enumerate(image_paths, 1):
        print(f"Processing {i}/{len(image_paths)}: {Path(image_path).name}")
        
        try:
            img_start = time.time()
            
            with open(image_path, "rb") as f:
                files = {"file": (image_path, f, "image/jpeg")}
                response = requests.post(f"{API_URL}/api/analyze", files=files, timeout=30)
            
            img_time = time.time() - img_start
            
            if response.status_code == 200:
                result = response.json()
                results.append({
                    'image': Path(image_path).name,
                    'status': 'success',
                    'processing_time': img_time,
                    'detections': result.get('detection', {}).get('num_detections', 0),
                    'severity': result.get('classification', {}).get('severity'),
                    'estimated_cost': result.get('cost_estimate', {}).get('estimated_cost', 0)
                })
                print(f"   ‚úÖ Completed in {img_time:.2f}s")
            else:
                results.append({
                    'image': Path(image_path).name,
                    'status': 'failed',
                    'processing_time': img_time,
                    'error': response.status_code
                })
                print(f"   ‚ùå Failed: {response.status_code}")
                
        except Exception as e:
            results.append({
                'image': Path(image_path).name,
                'status': 'error',
                'processing_time': 0,
                'error': str(e)
            })
            print(f"   ‚ùå Error: {str(e)[:50]}...")
    
    total_time = time.time() - start_time
    
    # Calculate metrics
    successful = [r for r in results if r['status'] == 'success']
    
    metrics = {
        'total_images': len(image_paths),
        'successful': len(successful),
        'failed': len(results) - len(successful),
        'success_rate': len(successful) / len(image_paths) * 100,
        'total_time': total_time,
        'avg_time_per_image': total_time / len(image_paths),
        'images_per_second': len(successful) / total_time if total_time > 0 else 0
    }
    
    if successful:
        processing_times = [r['processing_time'] for r in successful]
        metrics.update({
            'min_time': min(processing_times),
            'max_time': max(processing_times),
            'median_time': sorted(processing_times)[len(processing_times)//2]
        })
    
    print(f"\nüìä BENCHMARK RESULTS:")
    print(f"   ‚Ä¢ Total time: {total_time:.2f}s")
    print(f"   ‚Ä¢ Success rate: {metrics['success_rate']:.1f}%")
    print(f"   ‚Ä¢ Avg time per image: {metrics['avg_time_per_image']:.2f}s")
    print(f"   ‚Ä¢ Throughput: {metrics['images_per_second']:.2f} images/sec")
    
    return pd.DataFrame(results), metrics

# Example usage (uncomment and replace with your image paths)
# image_paths = [
#     "path/to/image1.jpg",
#     "path/to/image2.jpg",
#     "path/to/image3.jpg"
# ]
# results_df, metrics = benchmark_analysis(image_paths, "Test Benchmark")

## 2. Async Task Monitoring

In [None]:
def submit_async_task(image_path, task_type="single"):
    """
    Submit an async task for processing.
    
    Args:
        image_path: Path to image or list of paths
        task_type: "single" or "batch"
    
    Returns:
        Task ID or response
    """
    # Note: This assumes you have Celery endpoints implemented
    # For now, we'll simulate with direct API calls
    
    if task_type == "batch":
        # This would be a batch processing endpoint
        print("Submitting batch task...")
        # response = requests.post(f"{API_URL}/api/batch-analyze", json={"image_paths": image_path})
    else:
        # Single async task
        print("Submitting single async task...")
        # response = requests.post(f"{API_URL}/api/analyze-async", json={"image_path": image_path})
    
    # For now, return mock task ID
    return {"task_id": f"mock_task_{int(time.time())}", "status": "submitted"}

def monitor_task(task_id, max_wait=300):
    """
    Monitor task progress.
    
    Args:
        task_id: Task ID to monitor
        max_wait: Maximum wait time in seconds
    
    Returns:
        Final task result
    """
    print(f"Monitoring task: {task_id}")
    start_time = time.time()
    
    while time.time() - start_time < max_wait:
        # This would check task status via Celery
        # response = requests.get(f"{API_URL}/api/task-status/{task_id}")
        
        # For now, simulate task completion
        time.sleep(2)
        elapsed = time.time() - start_time
        print(f"   Task still running... ({elapsed:.0f}s elapsed)")
        
        # Simulate completion after 10 seconds
        if elapsed > 10:
            print("‚úÖ Task completed!")
            return {"task_id": task_id, "status": "completed", "result": "mock_result"}
    
    print("‚è∞ Task timeout")
    return {"task_id": task_id, "status": "timeout"}

# Example async workflow
# task = submit_async_task("path/to/image.jpg", "single")
# result = monitor_task(task["task_id"])

## 3. Error Analysis and Retry Logic

In [None]:
def analyze_with_retry(image_path, max_retries=3, backoff_factor=2):
    """
    Analyze image with retry logic for robust processing.
    
    Args:
        image_path: Path to image
        max_retries: Maximum number of retries
        backoff_factor: Backoff factor for exponential backoff
    
    Returns:
        Analysis result or None if all retries failed
    """
    for attempt in range(max_retries + 1):
        try:
            print(f"Attempt {attempt + 1}/{max_retries + 1}: {Path(image_path).name}")
            
            with open(image_path, "rb") as f:
                files = {"file": (image_path, f, "image/jpeg")}
                response = requests.post(f"{API_URL}/api/analyze", files=files, timeout=60)
            
            if response.status_code == 200:
                print(f"   ‚úÖ Success on attempt {attempt + 1}")
                return response.json()
            elif response.status_code == 503:
                print(f"   ‚ö†Ô∏è  Service unavailable, retrying in {2**attempt}s...")
                time.sleep(2 ** attempt)
            elif response.status_code == 429:
                print(f"   ‚ö†Ô∏è  Rate limited, retrying in {5*backoff_factor**attempt}s...")
                time.sleep(5 * backoff_factor ** attempt)
            else:
                print(f"   ‚ùå HTTP {response.status_code}: {response.text[:100]}")
                if attempt == max_retries:
                    return None
                time.sleep(1)
                
        except requests.exceptions.Timeout:
            print(f"   ‚è∞ Timeout, retrying in {3**attempt}s...")
            time.sleep(3 ** attempt)
        except requests.exceptions.ConnectionError:
            print(f"   üîå Connection error, retrying in {5**attempt}s...")
            time.sleep(5 ** attempt)
        except Exception as e:
            print(f"   üí• Unexpected error: {str(e)[:50]}...")
            if attempt == max_retries:
                return None
            time.sleep(1)
    
    print(f"   ‚ùå All {max_retries + 1} attempts failed")
    return None

def batch_analyze_with_retry(image_paths, max_workers=3):
    """
    Batch analyze with retry logic and concurrency.
    
    Args:
        image_paths: List of image paths
        max_workers: Maximum number of concurrent workers
    
    Returns:
        Results dataframe
    """
    print(f"üöÄ Batch processing {len(image_paths)} images with retry logic")
    
    results = []
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_path = {
            executor.submit(analyze_with_retry, path): path 
            for path in image_paths
        }
        
        # Process results as they complete
        for future in concurrent.futures.as_completed(future_to_path):
            path = future_to_path[future]
            try:
                result = future.result()
                
                if result:
                    # Extract key metrics
                    classification = result.get('classification', {})
                    cost_estimate = result.get('cost_estimate', {})
                    
                    results.append({
                        'image': Path(path).name,
                        'status': 'success',
                        'damage_count': classification.get('damage_count', 0),
                        'severity': classification.get('severity'),
                        'estimated_cost': cost_estimate.get('estimated_cost', 0),
                        'processing_time': result.get('total_processing_time', 0)
                    })
                else:
                    results.append({
                        'image': Path(path).name,
                        'status': 'failed',
                        'damage_count': 0,
                        'severity': None,
                        'estimated_cost': 0,
                        'processing_time': 0
                    })
                    
            except Exception as e:
                results.append({
                    'image': Path(path).name,
                    'status': 'error',
                    'damage_count': 0,
                    'severity': None,
                    'estimated_cost': 0,
                    'processing_time': 0,
                    'error': str(e)
                })
    
    return pd.DataFrame(results)

# Example usage
# results_df = batch_analyze_with_retry(image_paths)

## 4. Performance Visualization

In [None]:
def visualize_results(results_df):
    """
    Create visualizations from analysis results.
    
    Args:
        results_df: DataFrame with analysis results
    """
    if results_df.empty or len(results_df[results_df['status'] == 'success']) == 0:
        print("No successful results to visualize")
        return
    
    # Filter successful results
    successful = results_df[results_df['status'] == 'success'].copy()
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle('Vehicle Damage Analysis Results', fontsize=16)
    
    # 1. Processing time distribution
    axes[0, 0].hist(successful['processing_time'], bins=10, alpha=0.7, color='skyblue')
    axes[0, 0].set_title('Processing Time Distribution')
    axes[0, 0].set_xlabel('Time (seconds)')
    axes[0, 0].set_ylabel('Frequency')
    
    # 2. Damage count distribution
    damage_counts = successful['damage_count'].value_counts().sort_index()
    axes[0, 1].bar(damage_counts.index, damage_counts.values, color='lightcoral')
    axes[0, 1].set_title('Damage Count Distribution')
    axes[0, 1].set_xlabel('Number of Damages')
    axes[0, 1].set_ylabel('Frequency')
    
    # 3. Severity distribution
    severity_counts = successful['severity'].value_counts()
    colors = {'minor': 'green', 'moderate': 'yellow', 'severe': 'orange', 'critical': 'red'}
    severity_colors = [colors.get(sev, 'gray') for sev in severity_counts.index]
    axes[1, 0].pie(severity_counts.values, labels=severity_counts.index, 
                   colors=severity_colors, autopct='%1.1f%%')
    axes[1, 0].set_title('Damage Severity Distribution')
    
    # 4. Cost distribution
    costs = successful[successful['estimated_cost'] > 0]['estimated_cost']
    if len(costs) > 0:
        axes[1, 1].hist(costs, bins=10, alpha=0.7, color='gold')
        axes[1, 1].set_title('Cost Distribution')
        axes[1, 1].set_xlabel('Cost (USD)')
        axes[1, 1].set_ylabel('Frequency')
    else:
        axes[1, 1].text(0.5, 0.5, 'No cost data available', 
                        ha='center', va='center', transform=axes[1, 1].transAxes)
        axes[1, 1].set_title('Cost Distribution')
    
    plt.tight_layout()
    plt.show()
    
    # Summary statistics
    print("\nüìä SUMMARY STATISTICS:")
    print(f"   ‚Ä¢ Total images processed: {len(results_df)}")
    print(f"   ‚Ä¢ Success rate: {(len(successful) / len(results_df) * 100):.1f}%")
    print(f"   ‚Ä¢ Avg processing time: {successful['processing_time'].mean():.2f}s")
    print(f"   ‚Ä¢ Avg damage count: {successful['damage_count'].mean():.1f}")
    
    if len(costs) > 0:
        print(f"   ‚Ä¢ Avg estimated cost: ${costs.mean():.2f}")
        print(f"   ‚Ä¢ Max estimated cost: ${costs.max():.2f}")
    
    # Success rate by status
    status_counts = results_df['status'].value_counts()
    print(f"\nüìã PROCESSING STATUS:")
    for status, count in status_counts.items():
        print(f"   ‚Ä¢ {status}: {count}")

# Example usage
# if 'results_df' in locals():
#     visualize_results(results_df)

## 5. Export Results

In [None]:
def export_results(results_df, filename=None, format='excel'):
    """
    Export analysis results to various formats.
    
    Args:
        results_df: DataFrame with results
        filename: Output filename (optional)
        format: 'excel', 'csv', or 'json'
    """
    if filename is None:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"vehicle_damage_analysis_{timestamp}"
    
    if format.lower() == 'excel':
        output_file = f"{filename}.xlsx"
        with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
            # Main results
            results_df.to_excel(writer, sheet_name='Results', index=False)
            
            # Summary statistics
            summary = pd.DataFrame({
                'Metric': ['Total Images', 'Success Rate (%)', 'Avg Processing Time (s)', 
                          'Avg Damage Count', 'Avg Cost (USD)'],
                'Value': [
                    len(results_df),
                    (results_df['status'] == 'success').mean() * 100,
                    results_df[results_df['status'] == 'success']['processing_time'].mean(),
                    results_df[results_df['status'] == 'success']['damage_count'].mean(),
                    results_df[results_df['status'] == 'success']['estimated_cost'].mean()
                ]
            })
            summary.to_excel(writer, sheet_name='Summary', index=False)
        
    elif format.lower() == 'csv':
        output_file = f"{filename}.csv"
        results_df.to_csv(output_file, index=False)
        
    elif format.lower() == 'json':
        output_file = f"{filename}.json"
        results_df.to_json(output_file, orient='records', indent=2)
    
    print(f"üìÅ Results exported to: {output_file}")
    return output_file

# Example usage
# export_file = export_results(results_df, format='excel')

## 6. Complete Advanced Workflow

Putting it all together - a comprehensive analysis workflow:

In [None]:
def comprehensive_analysis(image_paths, max_workers=3, export=True):
    """
    Complete advanced analysis workflow.
    
    Args:
        image_paths: List of image paths
        max_workers: Number of concurrent workers
        export: Whether to export results
    
    Returns:
        Analysis results DataFrame
    """
    print("üéØ COMPREHENSIVE VEHICLE DAMAGE ANALYSIS")
    print("=" * 60)
    
    # Step 1: Health check
    print("\n1. üîç System Health Check...")
    try:
        response = requests.get(f"{API_URL}/health", timeout=5)
        if response.status_code == 200:
            health = response.json()
            print(f"   ‚úÖ System: {health.get('status')}")
            print(f"   ‚úÖ Model: {'Ready' if health.get('model_loaded') else 'Not loaded'}")
        else:
            print("   ‚ùå System health check failed")
            return None
    except Exception as e:
        print(f"   ‚ùå Cannot connect to API: {e}")
        return None
    
    # Step 2: Performance benchmark
    print(f"\n2. üìä Performance Benchmark...")
    benchmark_results, metrics = benchmark_analysis(image_paths[:5], "Quick Benchmark")
    
    # Step 3: Full analysis with retry logic
    print(f"\n3. üîÑ Full Analysis with Retry Logic...")
    full_results = batch_analyze_with_retry(image_paths, max_workers=max_workers)
    
    # Step 4: Results visualization
    print(f"\n4. üìà Results Visualization...")
    visualize_results(full_results)
    
    # Step 5: Export results
    if export:
        print(f"\n5. üíæ Exporting Results...")
        export_file = export_results(full_results, format='excel')
    
    print(f"\n‚úÖ ANALYSIS COMPLETE!")
    print(f"   ‚Ä¢ Processed: {len(full_results)} images")
    print(f"   ‚Ä¢ Success rate: {(full_results['status'] == 'success').mean()*100:.1f}%")
    print(f"   ‚Ä¢ Average cost: ${full_results[full_results['status'] == 'success']['estimated_cost'].mean():.2f}")
    
    return full_results

# Example usage - replace with your image paths
# image_paths = [
#     "path/to/car1.jpg",
#     "path/to/car2.jpg", 
#     "path/to/car3.jpg"
# ]
# results = comprehensive_analysis(image_paths)

## 7. Best Practices Summary

### Performance Optimization
1. **Concurrent Processing**: Use ThreadPoolExecutor for parallel API calls
2. **Retry Logic**: Implement exponential backoff for transient failures
3. **Timeout Management**: Set appropriate timeouts based on image size
4. **Batch Processing**: Use async tasks for large workloads

### Error Handling
1. **Network Issues**: Implement connection retry logic
2. **Service Unavailability**: Handle 503 responses gracefully
3. **Rate Limiting**: Respect rate limits with backoff
4. **Validation**: Check image format and size before upload

### Monitoring & Observability
1. **Performance Metrics**: Track processing time and success rates
2. **Error Tracking**: Log failures with detailed error information
3. **Resource Usage**: Monitor memory and CPU usage
4. **Health Checks**: Regular system health verification

### Data Management
1. **Result Storage**: Export results to Excel/CSV for analysis
2. **Historical Data**: Track analysis history for trends
3. **Quality Control**: Review failed analyses for patterns
4. **Backup**: Regular backup of analysis results

## Next Steps

1. **Model Fine-tuning**: Train custom models on your specific dataset
2. **API Enhancement**: Add custom endpoints for specific use cases
3. **Real-time Processing**: Implement WebSocket for real-time updates
4. **Integration**: Connect with insurance systems or databases
5. **Mobile App**: Develop mobile application for field inspections