In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
from torchvision.ops import nms
import scipy.io as sio
import numpy as np
import cv2
import os
import time
import json
from datetime import datetime
import argparse
import logging

import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.cluster import DBSCAN

In [None]:
# Setup logging for emergency response
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('emergency_response.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class EmergencyResponseConfig:
    """Configuration class for disaster response crowd counting"""

    def __init__(self):
        # Device configuration
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Model parameters (matching your trained model)
        self.IMG_HEIGHT = 256
        self.IMG_WIDTH = 256
        self.STRIDE = 8

        # Detection thresholds optimized for emergency scenarios
        self.CONFIDENCE_THRESHOLD = 0.2
        self.DBSCAN_EPS = 15
        self.DBSCAN_MIN_SAMPLES = 1

        # Emergency response parameters
        self.HIGH_DENSITY_THRESHOLD = 50  # People per area unit
        self.CRITICAL_DENSITY_THRESHOLD = 100
        self.EVACUATION_ZONE_SIZE = 100  # pixels

        # File paths
        self.MODEL_PATH = '/content/drive/MyDrive/attention_denisity_based.pth'
        self.OUTPUT_DIR = '/content/drive/MyDrive/crowd_test_data'

        # Create output directory
        os.makedirs(self.OUTPUT_DIR, exist_ok=True)

        # Alert colors for different crowd density levels
        self.COLORS = {
            'safe': (0, 255, 0),      # Green
            'moderate': (0, 255, 255), # Yellow
            'high': (0, 165, 255),     # Orange
            'critical': (0, 0, 255),   # Red
            'evacuation': (255, 0, 255) # Magenta
        }


# **Model Used**

In [None]:
class MultiScaleBlock(nn.Module):
    """Multi-scale feature extraction block"""
    def __init__(self, in_channels):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, in_channels, 3, padding=1)
        self.bn1 = nn.BatchNorm2d(in_channels)
        self.conv2 = nn.Conv2d(in_channels, in_channels, 5, padding=2)
        self.bn2 = nn.BatchNorm2d(in_channels)
        self.conv3 = nn.Conv2d(in_channels, in_channels, 7, padding=3)
        self.bn3 = nn.BatchNorm2d(in_channels)
        self.fuse = nn.Conv2d(in_channels * 3, in_channels, 1)
        self.bn_fuse = nn.BatchNorm2d(in_channels)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x1 = self.relu(self.bn1(self.conv1(x)))
        x2 = self.relu(self.bn2(self.conv2(x)))
        x3 = self.relu(self.bn3(self.conv3(x)))
        return self.relu(self.bn_fuse(self.fuse(torch.cat([x1, x2, x3], dim=1))))

class AttentionBlock(nn.Module):
    """Attention mechanism for better feature focus"""
    def __init__(self, in_channels):
        super().__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Conv2d(in_channels, in_channels // 8, 1, bias=False),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels // 8, in_channels, 1, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        w = self.avg_pool(x)
        w = self.fc(w)
        return x * w

class APGCCPointNet(nn.Module):
    """APGCC Point-based crowd counting network"""
    def __init__(self):
        super().__init__()
        vgg16_features = models.vgg16(pretrained=True).features
        self.encoder = nn.Sequential(*list(vgg16_features)[:17])
        ENCODER_OUT_CHANNELS = 256

        self.multiscale_attn_block = nn.Sequential(
            MultiScaleBlock(ENCODER_OUT_CHANNELS),
            AttentionBlock(ENCODER_OUT_CHANNELS)
        )

        self.confidence_head = nn.Conv2d(ENCODER_OUT_CHANNELS, 1, 1, bias=True)
        self.offset_head = nn.Conv2d(ENCODER_OUT_CHANNELS, 2, 1, bias=True)

    def forward(self, x):
        feature_map = self.encoder(x)
        processed_feature_map = self.multiscale_attn_block(feature_map)
        confidence_output = self.confidence_head(processed_feature_map)
        offset_output = self.offset_head(processed_feature_map)
        return confidence_output, offset_output

In [None]:
class EmergencyResponseAnalyzer:
    def __init__(self, config):
        self.config = config
        self.model = None
        self.emergency_log = []

        # Initialize preprocessing pipeline
        self.inference_transforms = A.Compose([
            A.Resize(config.IMG_HEIGHT, config.IMG_WIDTH, interpolation=cv2.INTER_AREA),
            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
            ToTensorV2(),
        ])

        logger.info(f"Emergency Response Analyzer initialized on device: {config.device}")

# **Loading Model**

In [None]:
    def load_model(self):
        """Load the trained crowd counting model"""
        try:
            self.model = APGCCPointNet().to(self.config.device)
            if os.path.exists(self.config.MODEL_PATH):
                self.model.load_state_dict(torch.load(self.config.MODEL_PATH, map_location=self.config.device))
                self.model.eval()
                logger.info(f"Model loaded successfully from {self.config.MODEL_PATH}")
                return True
            else:
                logger.error(f"Model file not found: {self.config.MODEL_PATH}")
                return False
        except Exception as e:
            logger.error(f"Error loading model: {e}")
            return False

    def analyze_crowd_density(self, points, frame_shape):
        """Analyze crowd density and identify emergency zones"""
        if len(points) == 0:
            return [], "safe", 0

        h, w = frame_shape[:2]

        # Create density map
        density_zones = []
        grid_size = self.config.EVACUATION_ZONE_SIZE

        for y in range(0, h, grid_size):
            for x in range(0, w, grid_size):
                zone_points = []
                for point in points:
                    px, py = point
                    if x <= px < x + grid_size and y <= py < y + grid_size:
                        zone_points.append(point)

                if len(zone_points) > 0:
                    density = len(zone_points)
                    zone_info = {
                        'bbox': (x, y, min(x + grid_size, w), min(y + grid_size, h)),
                        'count': density,
                        'points': zone_points,
                        'center': (x + grid_size//2, y + grid_size//2)
                    }

                    # Classify density level
                    if density >= self.config.CRITICAL_DENSITY_THRESHOLD:
                        zone_info['level'] = 'critical'
                    elif density >= self.config.HIGH_DENSITY_THRESHOLD:
                        zone_info['level'] = 'high'
                    else:
                        zone_info['level'] = 'moderate'

                    density_zones.append(zone_info)



# **Determining Emergency Level and Visualization**

In [None]:
        # Determine overall emergency level
        total_count = len(points)
        max_zone_density = max([zone['count'] for zone in density_zones]) if density_zones else 0

        if max_zone_density >= self.config.CRITICAL_DENSITY_THRESHOLD:
            emergency_level = 'critical'
        elif max_zone_density >= self.config.HIGH_DENSITY_THRESHOLD:
            emergency_level = 'high'
        elif total_count > 20:
            emergency_level = 'moderate'
        else:
            emergency_level = 'safe'

        return density_zones, emergency_level, total_count

    def log_emergency_event(self, frame_idx, total_count, emergency_level, density_zones):
        """Log emergency events for response coordination"""
        event = {
            'timestamp': datetime.now().isoformat(),
            'frame': frame_idx,
            'total_count': total_count,
            'emergency_level': emergency_level,
            'high_density_zones': len([z for z in density_zones if z['level'] in ['high', 'critical']]),
            'critical_zones': len([z for z in density_zones if z['level'] == 'critical'])
        }

        self.emergency_log.append(event)

        if emergency_level in ['high', 'critical']:
            logger.warning(f"EMERGENCY ALERT - Frame {frame_idx}: {emergency_level.upper()} density detected - {total_count} people")

    def draw_emergency_visualization(self, frame, points, density_zones, emergency_level, total_count, frame_idx):
        """Draw comprehensive emergency response visualization"""
        overlay = frame.copy()

        # Draw density zones
        for zone in density_zones:
            x1, y1, x2, y2 = zone['bbox']
            color = self.config.COLORS[zone['level']]

            # Draw zone rectangle
            cv2.rectangle(overlay, (x1, y1), (x2, y2), color, 2)

            # Draw zone info
            zone_text = f"{zone['count']} people"
            cv2.putText(overlay, zone_text, (x1, y1-10),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)

        # Draw individual detected people
        for point in points:
            x, y = int(point[0]), int(point[1])
            cv2.circle(overlay, (x, y), 3, (255, 255, 255), -1)
            cv2.circle(overlay, (x, y), 5, (0, 0, 0), 1)

        # Emergency status panel
        panel_height = 120
        panel_color = self.config.COLORS[emergency_level]
        cv2.rectangle(overlay, (10, 10), (400, panel_height), (0, 0, 0), -1)
        cv2.rectangle(overlay, (10, 10), (400, panel_height), panel_color, 3)

        # Status text
        status_texts = [
            f"EMERGENCY STATUS: {emergency_level.upper()}",
            f"Total People Detected: {total_count}",
            f"High-Density Zones: {len([z for z in density_zones if z['level'] in ['high', 'critical']])}",
            f"Frame: {frame_idx} | Time: {datetime.now().strftime('%H:%M:%S')}"
        ]

        for i, text in enumerate(status_texts):
            y_pos = 30 + i * 20
            cv2.putText(overlay, text, (20, y_pos),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 1)

        # Emergency instructions
        if emergency_level in ['high', 'critical']:
            instruction_y = frame.shape[0] - 60
            instructions = [
                "EMERGENCY RESPONSE REQUIRED",
                "Deploy rescue teams to high-density zones",
                "Coordinate evacuation routes"
            ]

            for i, instruction in enumerate(instructions):
                cv2.putText(overlay, instruction, (20, instruction_y + i * 20),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2)

        return overlay

    def process_video(self, video_path = "/content/drive/MyDrive/crowd_test_data/1.mp4", output_path= "/content/drive/MyDrive/crowd_test_data"):
        """Process video for emergency response crowd analysis"""
        if not self.model:
            logger.error("Model not loaded. Call load_model() first.")
            return False

        # Setup video capture
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            logger.error(f"Could not open video: {video_path}")
            return False

        # Get video properties
        fps = cap.get(cv2.CAP_PROP_FPS)
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

        logger.info(f"Processing video: {video_path}")
        logger.info(f"Video properties: {frame_width}x{frame_height}, {fps} FPS, {total_frames} frames")

        # Setup output video writer
        if output_path is None:
            output_path = os.path.join(self.config.OUTPUT_DIR, f'emergency_analysis_{int(time.time())}.mp4')

        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

        if not out.isOpened():
            logger.error("Could not create output video writer")
            return False

        frame_idx = 0
        emergency_alerts = 0

        logger.info("Starting emergency response analysis...")

        with torch.no_grad():
            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                # Preprocess frame
                original_frame = frame.copy()
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                transformed = self.inference_transforms(image=frame_rgb)
                image_tensor = transformed['image'].unsqueeze(0).to(self.config.device)

                # Model inference
                y_pred_conf_logits, y_pred_offset = self.model(image_tensor)
                pred_conf_map = torch.sigmoid(y_pred_conf_logits).squeeze(0).squeeze(0).cpu().numpy()
                pred_offset_map = y_pred_offset.squeeze(0).permute(1, 2, 0).cpu().numpy()

                # Extract predicted points
                predicted_points_raw = []
                feature_map_height, feature_map_width = pred_conf_map.shape

                for fy in range(feature_map_height):
                    for fx in range(feature_map_width):
                        conf = pred_conf_map[fy, fx]
                        if conf >= self.config.CONFIDENCE_THRESHOLD:
                            pred_x = (fx * self.config.STRIDE) + (self.config.STRIDE / 2) + (pred_offset_map[fy, fx, 0] * self.config.STRIDE)
                            pred_y = (fy * self.config.STRIDE) + (self.config.STRIDE / 2) + (pred_offset_map[fy, fx, 1] * self.config.STRIDE)
                            predicted_points_raw.append([pred_x, pred_y])

                # Apply DBSCAN clustering
                clustered_points = []
                if predicted_points_raw:
                    predicted_points_np = np.array(predicted_points_raw, dtype=np.float32)
                    clustering = DBSCAN(eps=self.config.DBSCAN_EPS, min_samples=self.config.DBSCAN_MIN_SAMPLES).fit(predicted_points_np)
                    labels = clustering.labels_

                    for k in set(labels):
                        if k == -1:
                            continue
                        class_members = predicted_points_np[labels == k]
                        cluster_center = np.mean(class_members, axis=0)
                        clustered_points.append(cluster_center)

                # Scale points to original frame dimensions
                scale_x = frame_width / self.config.IMG_WIDTH
                scale_y = frame_height / self.config.IMG_HEIGHT
                scaled_points = [[p[0] * scale_x, p[1] * scale_y] for p in clustered_points]

                # Analyze crowd density for emergency response
                density_zones, emergency_level, total_count = self.analyze_crowd_density(scaled_points, frame.shape)

                # Log emergency events
                self.log_emergency_event(frame_idx, total_count, emergency_level, density_zones)

                if emergency_level in ['high', 'critical']:
                    emergency_alerts += 1

                # Create visualization
                visualized_frame = self.draw_emergency_visualization(
                    original_frame, scaled_points, density_zones, emergency_level, total_count, frame_idx
                )

                # Write frame to output video
                out.write(visualized_frame)

                frame_idx += 1

                # Progress logging
                if frame_idx % 30 == 0:  # Log every 30 frames
                    progress = (frame_idx / total_frames) * 100
                    logger.info(f"Progress: {progress:.1f}% - Frame {frame_idx}/{total_frames} - Emergency alerts: {emergency_alerts}")

        # Cleanup
        cap.release()
        out.release()

        # Save emergency log
        log_path = os.path.join(self.config.OUTPUT_DIR, f'emergency_log_{int(time.time())}.json')
        with open(log_path, 'w') as f:
            json.dump(self.emergency_log, f, indent=2)

        logger.info(f"Analysis complete!")
        logger.info(f"Output video: {output_path}")
        logger.info(f"Emergency log: {log_path}")
        logger.info(f"Total emergency alerts: {emergency_alerts}")
        logger.info(f"Processed {frame_idx} frames")

        return True

    def process_live_feed(self, source=0):
        """Process live camera feed for real-time emergency monitoring"""
        if not self.model:
            logger.error("Model not loaded. Call load_model() first.")
            return False

        cap = cv2.VideoCapture(source)
        if not cap.isOpened():
            logger.error(f"Could not open camera source: {source}")
            return False

        logger.info("Starting live emergency monitoring. Press 'q' to quit.")

        frame_idx = 0

        with torch.no_grad():
            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                # Process frame (similar to video processing)
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                transformed = self.inference_transforms(image=frame_rgb)
                image_tensor = transformed['image'].unsqueeze(0).to(self.config.device)

                y_pred_conf_logits, y_pred_offset = self.model(image_tensor)
                pred_conf_map = torch.sigmoid(y_pred_conf_logits).squeeze(0).squeeze(0).cpu().numpy()
                pred_offset_map = y_pred_offset.squeeze(0).permute(1, 2, 0).cpu().numpy()

                # Extract and cluster points
                predicted_points_raw = []
                feature_map_height, feature_map_width = pred_conf_map.shape

                for fy in range(feature_map_height):
                    for fx in range(feature_map_width):
                        conf = pred_conf_map[fy, fx]
                        if conf >= self.config.CONFIDENCE_THRESHOLD:
                            pred_x = (fx * self.config.STRIDE) + (self.config.STRIDE / 2) + (pred_offset_map[fy, fx, 0] * self.config.STRIDE)
                            pred_y = (fy * self.config.STRIDE) + (self.config.STRIDE / 2) + (pred_offset_map[fy, fx, 1] * self.config.STRIDE)
                            predicted_points_raw.append([pred_x, pred_y])

                clustered_points = []
                if predicted_points_raw:
                    predicted_points_np = np.array(predicted_points_raw, dtype=np.float32)
                    clustering = DBSCAN(eps=self.config.DBSCAN_EPS, min_samples=self.config.DBSCAN_MIN_SAMPLES).fit(predicted_points_np)
                    labels = clustering.labels_

                    for k in set(labels):
                        if k == -1:
                            continue
                        class_members = predicted_points_np[labels == k]
                        cluster_center = np.mean(class_members, axis=0)
                        clustered_points.append(cluster_center)

                # Scale points
                scale_x = frame.shape[1] / self.config.IMG_WIDTH
                scale_y = frame.shape[0] / self.config.IMG_HEIGHT
                scaled_points = [[p[0] * scale_x, p[1] * scale_y] for p in clustered_points]

                # Analyze density
                density_zones, emergency_level, total_count = self.analyze_crowd_density(scaled_points, frame.shape)

                # Create visualization
                visualized_frame = self.draw_emergency_visualization(
                    frame, scaled_points, density_zones, emergency_level, total_count, frame_idx
                )

                # Display frame
                cv2.imshow('Emergency Response - Live Monitoring', visualized_frame)

                frame_idx += 1

                # Check for quit
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

        cap.release()
        cv2.destroyAllWindows()
        logger.info("Live monitoring stopped.")
        return True

def main():
    """Main function for emergency response crowd analysis"""
    parser = argparse.ArgumentParser(description='Emergency Response Crowd Density Analyzer')
    parser.add_argument('--video', type=str, help='Path to input video file')
    parser.add_argument('--live', action='store_true', help='Use live camera feed')
    parser.add_argument('--output', type=str, help='Output video path')
    parser.add_argument('--model', type=str, default='best_apgcc_model.pth', help='Path to model weights')

    args = parser.parse_args()

    # Initialize configuration
    config = EmergencyResponseConfig()
    if args.model:
        config.MODEL_PATH = args.model

    # Initialize analyzer
    analyzer = EmergencyResponseAnalyzer(config)

    # Load model
    if not analyzer.load_model():
        logger.error("Failed to load model. Exiting.")
        return

    logger.info("=" * 60)
    logger.info("EMERGENCY RESPONSE CROWD DENSITY ANALYZER")
    logger.info("Optimized for disaster and evacuation scenarios")
    logger.info("=" * 60)

    try:
        if args.live:
            # Live camera monitoring
            analyzer.process_live_feed()
        elif args.video:
            # Video file processing
            analyzer.process_video(args.video, args.output)
        else:
            logger.error("Please specify either --video or --live option")
            parser.print_help()

    except KeyboardInterrupt:
        logger.info("Process interrupted by user")
    except Exception as e:
        logger.error(f"An error occurred: {e}")

    logger.info("Emergency response analysis completed.")

if __name__ == "__main__":
    main()