# Train Carriage Crowd Detection using YOLO + ZIP-EBC

## Complete Implementation for Counting People Including Occluded Persons

This notebook implements an intelligent crowd counting system that combines:
- **YOLO**: For detecting visible persons
- **ZIP-EBC**: For estimating occluded persons using density maps
- **ROI Spatial Fusion**: For combining both methods adaptively

---

## 📦 Section 1: Installation and Imports

In [None]:
# Install required packages
!pip install ultralytics opencv-python pillow numpy scipy matplotlib torch torchvision

In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from scipy.ndimage import gaussian_filter
from ultralytics import YOLO
import os
from pathlib import Path
from typing import Tuple, List, Dict
import json
import warnings
warnings.filterwarnings('ignore')

print("✅ All packages imported successfully!")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")

## ⚙️ Section 2: Configuration

In [None]:
class Config:
    """Configuration parameters for the detection system"""
    
    # YOLO Configuration
    YOLO_MODEL = 'yolov8n.pt'  # Options: yolov8n.pt, yolov8s.pt, yolov8m.pt, yolov8l.pt
    YOLO_CONF_THRESHOLD = 0.25  # Confidence threshold for YOLO detection
    YOLO_IOU_THRESHOLD = 0.45   # IoU threshold for NMS
    
    # Density Estimation Configuration
    LOW_DENSITY_THRESHOLD = 5   # If YOLO detects fewer than this, use YOLO only
    HIGH_CONFIDENCE_THRESHOLD = 0.6  # Average confidence threshold
    
    # ZIP-EBC (Density Map) Configuration
    DENSITY_SIGMA = 15  # Gaussian kernel sigma for density map
    DENSITY_THRESHOLD = 0.1  # Threshold for valid density regions
    
    # Spatial Fusion Parameters
    FUSION_WEIGHT_YOLO = 0.6
    FUSION_WEIGHT_DENSITY = 0.4
    
    # Image Configuration
    IMAGE_FILE = 'train.jpeg'  # Input image filename
    
    # Visualization
    SHOW_DETECTIONS = True
    SAVE_RESULTS = True
    OUTPUT_DIR = 'output'

config = Config()

# Create output directory
os.makedirs(config.OUTPUT_DIR, exist_ok=True)

print("✅ Configuration loaded!")
print(f"📸 Input image: {config.IMAGE_FILE}")
print(f"📁 Output directory: {config.OUTPUT_DIR}")

## 🗺️ Section 3: Density Map Generation (ZIP-EBC Implementation)

In [None]:
class DensityMapEstimator:
    """
    ZIP-EBC (Zero-Inflated Poisson - Empirical Bayesian Counting)
    Generates density maps to estimate occluded persons
    """
    
    def __init__(self, sigma=15):
        self.sigma = sigma
    
    def generate_gaussian_kernel(self, height, width, center_x, center_y, sigma):
        """Generate a 2D Gaussian kernel centered at (center_x, center_y)"""
        x = np.arange(0, width, 1, float)
        y = np.arange(0, height, 1, float)
        y = y[:, np.newaxis]
        
        x0 = center_x
        y0 = center_y
        
        return np.exp(-((x - x0)**2 + (y - y0)**2) / (2 * sigma**2))
    
    def create_density_map(self, image_shape, detections):
        """
        Create density map from YOLO detections
        
        Args:
            image_shape: (height, width, channels)
            detections: List of detection boxes [x1, y1, x2, y2, conf, class]
        
        Returns:
            density_map: 2D array representing person density
        """
        height, width = image_shape[:2]
        density_map = np.zeros((height, width), dtype=np.float32)
        
        for det in detections:
            x1, y1, x2, y2 = map(int, det[:4])
            center_x = (x1 + x2) // 2
            center_y = (y1 + y2) // 2
            
            # Generate Gaussian blob at person center
            gaussian = self.generate_gaussian_kernel(height, width, center_x, center_y, self.sigma)
            density_map += gaussian
        
        # Apply Gaussian smoothing for better density estimation
        density_map = gaussian_filter(density_map, sigma=self.sigma/2)
        
        return density_map
    
    def estimate_from_density(self, density_map, roi_mask=None):
        """
        Estimate count from density map using ZIP-EBC principle
        
        Args:
            density_map: 2D density map
            roi_mask: Optional binary mask for region of interest
        
        Returns:
            estimated_count: Estimated number of people
        """
        if roi_mask is not None:
            density_map = density_map * roi_mask
        
        # Integrate density over the region
        total_density = np.sum(density_map)
        
        # Convert density to count estimation
        # Assuming each person contributes approximately 1.0 to the density
        estimated_count = total_density / (2 * np.pi * self.sigma**2)
        
        return int(np.round(estimated_count))

print("✅ DensityMapEstimator class defined!")

## 🎯 Section 4: YOLO Detection Wrapper

In [None]:
class YOLODetector:
    """Wrapper for YOLO person detection"""
    
    def __init__(self, model_path='yolov8n.pt', conf_threshold=0.25, iou_threshold=0.45):
        print(f"Loading YOLO model: {model_path}...")
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        self.iou_threshold = iou_threshold
        self.person_class_id = 0  # COCO dataset person class
        print("✅ YOLO model loaded successfully!")
    
    def detect(self, image):
        """
        Detect persons in image
        
        Args:
            image: numpy array (BGR format)
        
        Returns:
            detections: List of [x1, y1, x2, y2, confidence, class_id]
            results: Raw YOLO results object
        """
        results = self.model(image, conf=self.conf_threshold, iou=self.iou_threshold, verbose=False)
        
        detections = []
        for result in results:
            boxes = result.boxes
            for box in boxes:
                # Filter only person class
                if int(box.cls[0]) == self.person_class_id:
                    x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                    conf = float(box.conf[0])
                    detections.append([x1, y1, x2, y2, conf, self.person_class_id])
        
        return detections, results[0]
    
    def get_average_confidence(self, detections):
        """Calculate average confidence of detections"""
        if len(detections) == 0:
            return 0.0
        return np.mean([det[4] for det in detections])

print("✅ YOLODetector class defined!")

## 🔀 Section 5: ROI Spatial Fusion

In [None]:
class SpatialFusion:
    """
    Implements ROI Spatial Fusion combining YOLO and Density Estimation
    
    Formula: C_total = Σ I(pos(i) ∉ P) + ∫∫_P M(x,y) dx dy
    where:
    - D_YOLO is the set of YOLO detections
    - P is the polygon/region
    - I is an indicator function
    - M(x,y) is the masked density map
    """
    
    def __init__(self, yolo_weight=0.6, density_weight=0.4):
        self.yolo_weight = yolo_weight
        self.density_weight = density_weight
    
    def create_roi_mask(self, image_shape, detections, expansion_factor=1.5):
        """
        Create ROI mask for high-density regions
        
        Args:
            image_shape: (height, width)
            detections: YOLO detections
            expansion_factor: Factor to expand bounding boxes
        
        Returns:
            mask: Binary mask of ROI regions
        """
        height, width = image_shape[:2]
        mask = np.zeros((height, width), dtype=np.uint8)
        
        # Create mask around detected persons (potential occlusion areas)
        for det in detections:
            x1, y1, x2, y2 = map(int, det[:4])
            
            # Expand box to capture nearby occluded persons
            w = x2 - x1
            h = y2 - y1
            expand_w = int(w * (expansion_factor - 1) / 2)
            expand_h = int(h * (expansion_factor - 1) / 2)
            
            x1_exp = max(0, x1 - expand_w)
            y1_exp = max(0, y1 - expand_h)
            x2_exp = min(width, x2 + expand_w)
            y2_exp = min(height, y2 + expand_h)
            
            mask[y1_exp:y2_exp, x1_exp:x2_exp] = 1
        
        return mask
    
    def fuse_counts(self, yolo_count, density_count, yolo_confidence):
        """
        Fuse YOLO and density-based counts
        
        Args:
            yolo_count: Count from YOLO
            density_count: Count from density estimation
            yolo_confidence: Average confidence of YOLO detections
        
        Returns:
            final_count: Fused count
        """
        # Adaptive weighting based on YOLO confidence
        adaptive_yolo_weight = self.yolo_weight * yolo_confidence
        adaptive_density_weight = self.density_weight * (1 + (1 - yolo_confidence))
        
        # Normalize weights
        total_weight = adaptive_yolo_weight + adaptive_density_weight
        adaptive_yolo_weight /= total_weight
        adaptive_density_weight /= total_weight
        
        # Weighted fusion
        fused_count = (adaptive_yolo_weight * yolo_count + 
                      adaptive_density_weight * density_count)
        
        return int(np.round(fused_count))

print("✅ SpatialFusion class defined!")

## 🚀 Section 6: Main Crowd Counting Pipeline

In [None]:
class TrainCrowdCounter:
    """
    Main pipeline for counting people in train carriages
    Combines YOLO and ZIP-EBC for robust counting
    """
    
    def __init__(self, config):
        self.config = config
        self.yolo_detector = YOLODetector(
            model_path=config.YOLO_MODEL,
            conf_threshold=config.YOLO_CONF_THRESHOLD,
            iou_threshold=config.YOLO_IOU_THRESHOLD
        )
        self.density_estimator = DensityMapEstimator(sigma=config.DENSITY_SIGMA)
        self.spatial_fusion = SpatialFusion(
            yolo_weight=config.FUSION_WEIGHT_YOLO,
            density_weight=config.FUSION_WEIGHT_DENSITY
        )
        
        # Create output directory
        if config.SAVE_RESULTS:
            os.makedirs(config.OUTPUT_DIR, exist_ok=True)
    
    def process_image(self, image_path):
        """
        Process a single image to count people
        
        Args:
            image_path: Path to input image
        
        Returns:
            results: Dictionary containing all results
        """
        # Load image
        if isinstance(image_path, str):
            image = cv2.imread(image_path)
        else:
            image = image_path
        
        if image is None:
            raise ValueError(f"Could not load image: {image_path}")
        
        # Step 1: YOLO Detection
        print("\n" + "="*70)
        print("Step 1: Running YOLO detection...")
        print("="*70)
        detections, yolo_results = self.yolo_detector.detect(image)
        yolo_count = len(detections)
        avg_confidence = self.yolo_detector.get_average_confidence(detections)
        
        print(f"  ✓ YOLO detected: {yolo_count} persons")
        print(f"  ✓ Average confidence: {avg_confidence:.2%}")
        
        # Step 2: Decision - Use YOLO only or apply fusion?
        use_fusion = (yolo_count > self.config.LOW_DENSITY_THRESHOLD or 
                     avg_confidence < self.config.HIGH_CONFIDENCE_THRESHOLD)
        
        if not use_fusion:
            print("\n  ℹ️  High confidence & low density -> Using YOLO count only")
            final_count = yolo_count
            density_count = 0
            density_map = None
        else:
            print("\n" + "="*70)
            print("Step 2: Applying ROI Spatial Fusion (YOLO + ZIP-EBC)...")
            print("="*70)
            
            # Generate density map
            density_map = self.density_estimator.create_density_map(
                image.shape, detections
            )
            
            # Create ROI mask for high-density regions
            roi_mask = self.spatial_fusion.create_roi_mask(
                image.shape, detections, expansion_factor=1.5
            )
            
            # Estimate count from density in ROI
            density_count = self.density_estimator.estimate_from_density(
                density_map, roi_mask
            )
            
            print(f"  ✓ Density estimation: {density_count} persons")
            
            # Fuse counts
            final_count = self.spatial_fusion.fuse_counts(
                yolo_count, density_count, avg_confidence
            )
            
            print(f"  ✓ Fused count: {final_count} persons")
        
        # Prepare results
        results = {
            'image': image,
            'detections': detections,
            'yolo_count': yolo_count,
            'density_count': density_count,
            'final_count': final_count,
            'avg_confidence': avg_confidence,
            'density_map': density_map,
            'used_fusion': use_fusion
        }
        
        return results
    
    def visualize_results(self, results, save_path=None):
        """
        Visualize detection results
        
        Args:
            results: Results dictionary from process_image
            save_path: Optional path to save visualization
        """
        image = results['image'].copy()
        detections = results['detections']
        density_map = results['density_map']
        
        # Create figure with subplots
        if density_map is not None:
            fig, axes = plt.subplots(1, 3, figsize=(20, 6))
        else:
            fig, axes = plt.subplots(1, 2, figsize=(14, 6))
            axes = [axes[0], axes[1], None]
        
        # Plot 1: Original image with YOLO detections
        img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        for det in detections:
            x1, y1, x2, y2, conf = det[:5]
            x1, y1, x2, y2 = map(int, [x1, y1, x2, y2])
            cv2.rectangle(img_rgb, (x1, y1), (x2, y2), (0, 255, 0), 3)
            cv2.putText(img_rgb, f'{conf:.2f}', (x1, y1-10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
        
        axes[0].imshow(img_rgb)
        axes[0].set_title(f'YOLO Detections: {results["yolo_count"]} persons\n'
                         f'Avg Confidence: {results["avg_confidence"]:.2%}', 
                         fontsize=14, weight='bold')
        axes[0].axis('off')
        
        # Plot 2: Detection summary
        axes[1].text(0.5, 0.75, f'YOLO Count: {results["yolo_count"]}',
                    ha='center', va='center', fontsize=18, weight='bold')
        
        if results['used_fusion']:
            axes[1].text(0.5, 0.55, f'Density Count: {results["density_count"]}',
                        ha='center', va='center', fontsize=18, weight='bold')
            axes[1].text(0.5, 0.3, f'🎯 FINAL COUNT: {results["final_count"]}',
                        ha='center', va='center', fontsize=24, weight='bold',
                        color='red')
            axes[1].text(0.5, 0.1, '(Using ROI Spatial Fusion)',
                        ha='center', va='center', fontsize=13, style='italic')
        else:
            axes[1].text(0.5, 0.3, f'🎯 FINAL COUNT: {results["final_count"]}',
                        ha='center', va='center', fontsize=24, weight='bold',
                        color='green')
            axes[1].text(0.5, 0.1, '(YOLO Only - High Confidence)',
                        ha='center', va='center', fontsize=13, style='italic')
        
        axes[1].set_xlim(0, 1)
        axes[1].set_ylim(0, 1)
        axes[1].axis('off')
        axes[1].set_facecolor('#f0f0f0')
        
        # Plot 3: Density map (if available)
        if density_map is not None and axes[2] is not None:
            im = axes[2].imshow(density_map, cmap='hot', interpolation='bilinear')
            axes[2].set_title('Density Map (ZIP-EBC)', fontsize=14, weight='bold')
            axes[2].axis('off')
            plt.colorbar(im, ax=axes[2], fraction=0.046, pad=0.04)
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path, dpi=150, bbox_inches='tight')
            print(f"\n📁 Visualization saved to: {save_path}")
        
        plt.show()
        
        return fig

print("✅ TrainCrowdCounter class defined!")

## 🎬 Section 7: Initialize System

In [None]:
print("="*70)
print("🚆 Train Carriage Crowd Detection - YOLO + ZIP-EBC")
print("="*70)

# Initialize counter
counter = TrainCrowdCounter(config)

print("\n✅ System initialized successfully!")
print(f"📊 Ready to process: {config.IMAGE_FILE}")

## 📸 Section 8: Process Train Image

In [None]:
# Check if image exists
if not os.path.exists(config.IMAGE_FILE):
    print(f"⚠️  Warning: {config.IMAGE_FILE} not found!")
    print("Please upload your train carriage image and name it 'train.jpeg'")
    print("Or change the IMAGE_FILE in the Config class to your image filename.")
else:
    print(f"✅ Found image: {config.IMAGE_FILE}")
    
    # Load and display original image
    original_img = cv2.imread(config.IMAGE_FILE)
    img_rgb = cv2.cvtColor(original_img, cv2.COLOR_BGR2RGB)
    
    plt.figure(figsize=(12, 8))
    plt.imshow(img_rgb)
    plt.title('Original Train Carriage Image', fontsize=16, weight='bold')
    plt.axis('off')
    plt.tight_layout()
    plt.show()
    
    print(f"📏 Image size: {original_img.shape[1]}x{original_img.shape[0]}")

## 🔍 Section 9: Run Detection & Count People

In [None]:
# Process the image
if os.path.exists(config.IMAGE_FILE):
    results = counter.process_image(config.IMAGE_FILE)
    
    print("\n" + "="*70)
    print("✅ DETECTION COMPLETE!")
    print("="*70)

## 📊 Section 10: Visualize Results

In [None]:
# Visualize results
if os.path.exists(config.IMAGE_FILE):
    save_path = os.path.join(config.OUTPUT_DIR, 'detection_result.jpg')
    counter.visualize_results(results, save_path=save_path)

## 📈 Section 11: Detailed Results Summary

In [None]:
# Print detailed summary
if os.path.exists(config.IMAGE_FILE):
    print("\n" + "="*70)
    print("📋 DETECTION SUMMARY")
    print("="*70)
    print(f"\n🎯 YOLO Detection Results:")
    print(f"   • Persons detected: {results['yolo_count']}")
    print(f"   • Average confidence: {results['avg_confidence']:.2%}")
    print(f"   • Total detections: {len(results['detections'])}")
    
    if results['used_fusion']:
        print(f"\n🗺️  ZIP-EBC Density Estimation:")
        print(f"   • Estimated count: {results['density_count']} persons")
        print(f"   • Method: ROI Spatial Fusion")
        print(f"   • Fusion weights: YOLO={config.FUSION_WEIGHT_YOLO}, Density={config.FUSION_WEIGHT_DENSITY}")
    else:
        print(f"\n✓ Using YOLO only (High confidence detection)")
    
    print(f"\n{'='*70}")
    print(f"🎯 FINAL COUNT: {results['final_count']} PERSONS")
    print(f"{'='*70}")
    
    # Detection confidence distribution
    if len(results['detections']) > 0:
        confidences = [det[4] for det in results['detections']]
        
        plt.figure(figsize=(10, 4))
        plt.subplot(1, 2, 1)
        plt.hist(confidences, bins=10, color='skyblue', edgecolor='black')
        plt.xlabel('Confidence Score', fontsize=12)
        plt.ylabel('Number of Detections', fontsize=12)
        plt.title('Detection Confidence Distribution', fontsize=14, weight='bold')
        plt.grid(True, alpha=0.3)
        
        plt.subplot(1, 2, 2)
        labels = ['YOLO\nCount', 'Density\nCount', 'Final\nCount']
        counts = [results['yolo_count'], results['density_count'], results['final_count']]
        colors = ['#3498db', '#e74c3c', '#2ecc71']
        bars = plt.bar(labels, counts, color=colors, edgecolor='black', linewidth=2)
        plt.ylabel('Person Count', fontsize=12)
        plt.title('Count Comparison', fontsize=14, weight='bold')
        plt.grid(True, alpha=0.3, axis='y')
        
        # Add value labels on bars
        for bar in bars:
            height = bar.get_height()
            plt.text(bar.get_x() + bar.get_width()/2., height,
                    f'{int(height)}',
                    ha='center', va='bottom', fontsize=14, weight='bold')
        
        plt.tight_layout()
        plt.savefig(os.path.join(config.OUTPUT_DIR, 'statistics.jpg'), dpi=150)
        plt.show()

## 💾 Section 12: Save Results to JSON

In [None]:
# Save results to JSON
if os.path.exists(config.IMAGE_FILE):
    results_json = {
        'image_file': config.IMAGE_FILE,
        'yolo_count': results['yolo_count'],
        'density_count': results['density_count'],
        'final_count': results['final_count'],
        'avg_confidence': float(results['avg_confidence']),
        'used_fusion': results['used_fusion'],
        'detections': [
            {
                'bbox': [float(x) for x in det[:4]],
                'confidence': float(det[4])
            } for det in results['detections']
        ]
    }
    
    json_path = os.path.join(config.OUTPUT_DIR, 'results.json')
    with open(json_path, 'w') as f:
        json.dump(results_json, f, indent=2)
    
    print(f"✅ Results saved to: {json_path}")
    print("\nJSON Content:")
    print(json.dumps(results_json, indent=2))

## 🎥 Section 13: Bonus - Video Processing (Optional)

In [None]:
# Uncomment to process video
"""
def process_video(video_path, output_path=None, sample_rate=30):
    cap = cv2.VideoCapture(video_path)
    
    if not cap.isOpened():
        raise ValueError(f"Could not open video: {video_path}")
    
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"Video: {width}x{height} @ {fps}fps, {total_frames} frames")
    
    if output_path:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_results = []
    frame_idx = 0
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        if frame_idx % sample_rate == 0:
            print(f"Processing frame {frame_idx}/{total_frames}...")
            
            try:
                result = counter.process_image(frame)
                frame_results.append(result)
                
                # Draw count on frame
                cv2.putText(frame, f'Count: {result["final_count"]}',
                          (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.5,
                          (0, 255, 0), 3)
                
                for det in result['detections']:
                    x1, y1, x2, y2 = map(int, det[:4])
                    cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            except Exception as e:
                print(f"Error: {e}")
        
        if output_path:
            out.write(frame)
        
        frame_idx += 1
    
    cap.release()
    if output_path:
        out.release()
        print(f"Output saved: {output_path}")
    
    return frame_results

# Example usage:
# video_results = process_video('train_video.mp4', 'output_video.mp4', sample_rate=30)
"""

print("Video processing function defined (commented out)")
print("Uncomment and run to process videos")

---

## 📚 Documentation & Usage Guide

### How It Works:

1. **YOLO Detection**: Detects visible persons with bounding boxes
2. **Decision Logic**: 
   - If low density + high confidence → Use YOLO only
   - If high density OR low confidence → Apply fusion
3. **ZIP-EBC Density Estimation**: Creates density map for occluded persons
4. **ROI Spatial Fusion**: Combines both methods using weighted fusion

### Formula:
```
C_total = Σ I(pos(i) ∉ P) + ∫∫_P M(x,y) dx dy
```

### Configuration Options:
- `YOLO_MODEL`: Choose model size (n/s/m/l/x)
- `YOLO_CONF_THRESHOLD`: Detection confidence (0-1)
- `DENSITY_SIGMA`: Gaussian kernel size for density map
- `FUSION_WEIGHT_YOLO/DENSITY`: Fusion weights

### Output Files:
- `output/detection_result.jpg`: Visualization with all detections
- `output/statistics.jpg`: Confidence distribution and count comparison
- `output/results.json`: Detailed results in JSON format

### Requirements:
- Place your train image as `train.jpeg` in the same directory
- Or change `config.IMAGE_FILE` to your image filename

---

## ✅ Completed!

Your train carriage crowd detection system is ready! 🚆
