In [None]:
import os
import cv2
import numpy as np
import pandas as pd
from sklearn.metrics import classification_report, confusion_matrix, f1_score, accuracy_score, precision_score, roc_curve, auc
from sklearn.preprocessing import label_binarize
import matplotlib.pyplot as plt
import seaborn as sns
from ultralytics import YOLO
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
from tqdm import tqdm

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Dataset paths
DATASET_PATH = "C:/Users/ASUS/Desktop/fall/Dataset1(Annotated)"  # Your prestructured dataset
TRAIN_PATH = os.path.join(DATASET_PATH, "train")
VAL_PATH = os.path.join(DATASET_PATH, "val") if os.path.exists(os.path.join(DATASET_PATH, "val")) else None

# Constants
IMG_SIZE = 224  # Image size for CNN
BATCH_SIZE = 32
EPOCHS = 20
LEARNING_RATE = 0.001
YOLO_MODEL = "yolov8n.pt"  # Pretrained YOLOv8 nano model
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

def plot_yolo_metrics(results):
    """Plot YOLO training metrics from results object"""
    if not hasattr(results, 'results_dict'):
        print("Warning: No metrics found in YOLO results")
        return
    
    metrics = results.results_dict
    plt.figure(figsize=(15, 10))
    
    # Plot training and validation metrics
    plt.subplot(2, 2, 1)
    plt.plot(metrics['metrics/precision(B)'], label='Train Precision')
    plt.plot(metrics['metrics/recall(B)'], label='Train Recall')
    plt.xlabel('Epoch')
    plt.ylabel('Score')
    plt.title('YOLO Training Precision & Recall')
    plt.legend()
    
    plt.subplot(2, 2, 2)
    plt.plot(metrics['metrics/mAP50(B)'], label='Train mAP@0.5')
    plt.plot(metrics['metrics/mAP50-95(B)'], label='Train mAP@0.5:0.95')
    plt.xlabel('Epoch')
    plt.ylabel('Score')
    plt.title('YOLO Training mAP Metrics')
    plt.legend()
    
    if 'val/metrics/precision(B)' in metrics:
        plt.subplot(2, 2, 3)
        plt.plot(metrics['val/metrics/precision(B)'], label='Val Precision')
        plt.plot(metrics['val/metrics/recall(B)'], label='Val Recall')
        plt.xlabel('Epoch')
        plt.ylabel('Score')
        plt.title('YOLO Validation Precision & Recall')
        plt.legend()
        
        plt.subplot(2, 2, 4)
        plt.plot(metrics['val/metrics/mAP50(B)'], label='Val mAP@0.5')
        plt.plot(metrics['val/metrics/mAP50-95(B)'], label='Val mAP@0.5:0.95')
        plt.xlabel('Epoch')
        plt.ylabel('Score')
        plt.title('YOLO Validation mAP Metrics')
        plt.legend()
    
    plt.tight_layout()
    plt.savefig("yolo_training_metrics.png")
    plt.show()

def train_yolo_model():
    """Train YOLOv8 model for person detection using existing YOLO dataset"""
    # Load pretrained YOLO model
    model = YOLO(YOLO_MODEL)
    
    # Create a temporary dataset.yaml for YOLO training
    yaml_content = f"""
    train: {os.path.join(DATASET_PATH, 'train')}
    val: {os.path.join(DATASET_PATH, 'val') if VAL_PATH else os.path.join(DATASET_PATH, 'train')}
    nc: 3
    names: ['fall', 'not_fall', 'sitting']
    """
    
    yaml_path = "temp_dataset.yaml"
    with open(yaml_path, 'w') as f:
        f.write(yaml_content)
    
    # Train the model
    results = model.train(
        data=yaml_path,
        epochs=50,
        imgsz=640,
        batch=16,
        name="yolov8_person_detection",
        patience=10,
        save=True,
        save_period=5
    )
    
    # Plot YOLO training metrics
    try:
        plot_yolo_metrics(results)
    except Exception as e:
        print(f"Could not plot YOLO metrics: {e}")
    
    # Handle different possible locations for the saved model
    possible_paths = [
        os.path.join("runs", "detect", "yolov8_person_detection", "weights", "best.pt"),
        os.path.join("yolov8_person_detection", "weights", "best.pt"),
        os.path.join("yolov8_person_detection", "best.pt")
    ]
    
    found_model = None
    for path in possible_paths:
        if os.path.exists(path):
            found_model = path
            break
    
    if not found_model:
        raise FileNotFoundError("Could not find the trained YOLO model after training. Checked paths: " + ", ".join(possible_paths))
    
    # Clean up temporary yaml file
    if os.path.exists(yaml_path):
        os.remove(yaml_path)
    
    return found_model  # Return the path to the .pt model directly

def plot_roc_curve(y_true, y_scores, class_names, filename="roc_curve.png"):
    """Plot ROC curve for multi-class classification"""
    # Binarize the output
    y_true_bin = label_binarize(y_true, classes=np.arange(len(class_names)))
    n_classes = y_true_bin.shape[1]
    
    # Compute ROC curve and ROC area for each class
    fpr = dict()
    tpr = dict()
    roc_auc = dict()
    
    for i in range(n_classes):
        fpr[i], tpr[i], _ = roc_curve(y_true_bin[:, i], y_scores[:, i])
        roc_auc[i] = auc(fpr[i], tpr[i])
    
    # Compute micro-average ROC curve and ROC area
    fpr["micro"], tpr["micro"], _ = roc_curve(y_true_bin.ravel(), y_scores.ravel())
    roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])
    
    # Plot all ROC curves
    plt.figure(figsize=(10, 8))
    colors = ['blue', 'red', 'green', 'darkorange']
    
    # First plot the micro-average ROC curve
    plt.plot(fpr["micro"], tpr["micro"],
             label='micro-average ROC curve (AUC = {0:0.2f})'
                   ''.format(roc_auc["micro"]),
             color='deeppink', linestyle=':', linewidth=4)
    
    # Then plot each class's ROC curve
    for i, color in zip(range(n_classes), colors[:n_classes]):
        plt.plot(fpr[i], tpr[i], color=color, lw=2,
                 label='ROC curve of class {0} (AUC = {1:0.2f})'
                 ''.format(class_names[i], roc_auc[i]))
    
    plt.plot([0, 1], [0, 1], 'k--', lw=2)
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Multi-class ROC Curve')
    plt.legend(loc="lower right")
    plt.savefig(filename)
    plt.show()
    
    return roc_auc
class ActivityDataset(Dataset):
    """Custom dataset for activity classification"""
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform
        
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
            
        return image, label

class FallDetectionCNN(nn.Module):
    """CNN model for fall detection classification"""
    def __init__(self, num_classes=3):
        super(FallDetectionCNN, self).__init__()
        self.model = models.resnet18(pretrained=True)
        self.model.fc = nn.Sequential(
            nn.Linear(self.model.fc.in_features, 512),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(512, num_classes)
        )
        
    def forward(self, x):
        return self.model(x)

def prepare_cnn_data():
    """Prepare data for CNN training from the prestructured dataset"""
    image_paths = []
    labels = []
    class_mapping = {"fall": 0, "not_fall": 1, "sitting": 2}
    
    # Check if dataset exists
    if not os.path.exists(DATASET_PATH):
        raise FileNotFoundError(f"Dataset directory not found at {DATASET_PATH}")
    
    # Process training data
    for class_name in ["fall", "not_fall", "sitting"]:
        class_dir = os.path.join(TRAIN_PATH, class_name, "images")
        if not os.path.exists(class_dir):
            print(f"Warning: Class directory not found at {class_dir}")
            continue
            
        for img_file in os.listdir(class_dir):
            if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
                img_path = os.path.join(class_dir, img_file)
                image_paths.append(img_path)
                labels.append(class_mapping[class_name])
    
    if not image_paths:
        raise ValueError("No training images found in the dataset")
    
    # Process validation data if exists
    if VAL_PATH:
        for class_name in ["fall", "not_fall", "sitting"]:
            class_dir = os.path.join(VAL_PATH, class_name, "images")
            if not os.path.exists(class_dir):
                print(f"Warning: Validation class directory not found at {class_dir}")
                continue
                
            for img_file in os.listdir(class_dir):
                if img_file.lower().endswith(('.png', '.jpg', '.jpeg')):
                    img_path = os.path.join(class_dir, img_file)
                    image_paths.append(img_path)
                    labels.append(class_mapping[class_name])
    
    # Data transformations
    train_transform = transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    val_transform = transforms.Compose([
        transforms.Resize((IMG_SIZE, IMG_SIZE)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    
    # Create datasets
    # Use all data for training if no validation set exists
    if not VAL_PATH:
        train_dataset = ActivityDataset(image_paths, labels, train_transform)
        val_dataset = ActivityDataset(image_paths, labels, val_transform)  # Use same data for validation
    else:
        # Split into train and val based on directory structure
        train_paths = [p for p in image_paths if TRAIN_PATH in p]
        train_labels = [l for p, l in zip(image_paths, labels) if TRAIN_PATH in p]
        val_paths = [p for p in image_paths if VAL_PATH in p]
        val_labels = [l for p, l in zip(image_paths, labels) if VAL_PATH in p]
        
        train_dataset = ActivityDataset(train_paths, train_labels, train_transform)
        val_dataset = ActivityDataset(val_paths, val_labels, val_transform)
    
    # Create dataloaders
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
    
    return train_loader, val_loader, len(train_dataset), len(val_dataset)

def train_cnn_model():
    """Train CNN model for activity classification"""
    train_loader, val_loader, train_size, val_size = prepare_cnn_data()
    
    # Initialize model, loss function, and optimizer
    model = FallDetectionCNN(num_classes=3).to(DEVICE)
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3, factor=0.1)
    
    # Training loop
    best_accuracy = 0.0
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []
    
    # For ROC curve
    all_labels = []
    all_probs = []
    
    for epoch in range(EPOCHS):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0
        
        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS}"):
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            
            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        
        train_loss = running_loss / len(train_loader)
        train_accuracy = correct / total
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)
        
        # Validation
        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        all_preds = []
        all_labels = []
        all_probs = []
        
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(DEVICE), labels.to(DEVICE)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
                all_preds.extend(predicted.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
                all_probs.extend(torch.softmax(outputs, dim=1).cpu().numpy())
        
        val_loss = val_loss / len(val_loader)
        val_accuracy = correct / total
        val_losses.append(val_loss)
        val_accuracies.append(val_accuracy)
        
        # Update learning rate
        scheduler.step(val_loss)
        
        print(f"\nEpoch {epoch+1}/{EPOCHS}")
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.4f}")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.4f}")
        
        # Save best model
        if val_accuracy > best_accuracy:
            best_accuracy = val_accuracy
            torch.save(model.state_dict(), "best_cnn_fall_detection.pth")
            print("Saved new best model")
    
    # Plot training curves
    plt.figure(figsize=(15, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Val Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Training and Validation Loss')
    plt.legend()
    
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(val_accuracies, label='Val Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.legend()
    
    plt.tight_layout()
    plt.savefig("cnn_training_curves.png")
    plt.show()
    
    # Evaluate on validation set for ROC curve
    model.load_state_dict(torch.load("best_cnn_fall_detection.pth"))
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = []
    
    with torch.no_grad():
        for images, labels in val_loader:
            images, labels = images.to(DEVICE), labels.to(DEVICE)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(torch.softmax(outputs, dim=1).cpu().numpy())
    
    # Convert to numpy arrays
    all_labels = np.array(all_labels)
    all_probs = np.array(all_probs)
    
    # Calculate metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average='weighted')
    f1 = f1_score(all_labels, all_preds, average='weighted')
    
    print("\nValidation Set Evaluation:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"F1 Score: {f1:.4f}")
    
    # Classification report
    print("\nClassification Report:")
    print(classification_report(all_labels, all_preds, target_names=["fall", "not_fall", "sitting"]))
    
    # Confusion matrix
    cm = confusion_matrix(all_labels, all_preds)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
                xticklabels=["fall", "not_fall", "sitting"], 
                yticklabels=["fall", "not_fall", "sitting"])
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title('Confusion Matrix')
    plt.savefig("cnn_confusion_matrix.png")
    plt.show()
    
    # Plot ROC curve
    plot_roc_curve(all_labels, all_probs, class_names=["fall", "not_fall", "sitting"], 
                  filename="cnn_roc_curve.png")
    
    return model


# Step 3: Test the complete system on sample images
class FallDetectionSystem:
    """Combined system using YOLOv8 for person detection and CNN for fall classification"""
    def __init__(self, yolo_model_path, cnn_model_path):
        # Load YOLO model - handle both .pt and .torchscript formats
        try:
            # First try loading as a regular YOLO model
            self.yolo_model = YOLO(yolo_model_path)
        except Exception as e:
            print(f"Warning: Could not load YOLO model directly, trying alternative loading: {e}")
            # If direct loading fails, try loading the TorchScript model
            try:
                self.yolo_model = torch.jit.load(yolo_model_path)
                # Wrap the TorchScript model in a YOLO-compatible way
                class WrappedModel:
                    def __init__(self, model):
                        self.model = model
                        self.names = ['fall', 'not_fall', 'sitting']  # Update with your class names
                        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
                    
                    def __call__(self, img, **kwargs):
                        # Convert image to tensor and preprocess
                        img_tensor = torch.from_numpy(img).permute(2, 0, 1).float().div(255.0).unsqueeze(0)
                        img_tensor = img_tensor.to(self.device)
                        
                        # Run inference
                        with torch.no_grad():
                            outputs = self.model(img_tensor)
                        
                        # Convert outputs to YOLO-like format
                        # This is a simplified conversion - you'll need to adjust based on your model's output format
                        results = []
                        for i in range(outputs.shape[0]):
                            # Process each detection in the batch
                            # This assumes outputs is in [batch, boxes, 6] format (xyxy, conf, cls)
                            boxes = outputs[i, :, :4]
                            confs = outputs[i, :, 4]
                            cls_ids = outputs[i, :, 5]
                            
                            # Create a Results object-like structure
                            result = {
                                'boxes': boxes,
                                'scores': confs,
                                'labels': cls_ids
                            }
                            results.append(result)
                        
                        return results
                
                self.yolo_model = WrappedModel(self.yolo_model)
            except Exception as e:
                raise ValueError(f"Failed to load YOLO model from {yolo_model_path}: {e}")
        
        # Load CNN model
        self.cnn_model = FallDetectionCNN(num_classes=3).to(DEVICE)
        self.cnn_model.load_state_dict(torch.load(cnn_model_path))
        self.cnn_model.eval()
        
        self.transform = transforms.Compose([
            transforms.Resize((IMG_SIZE, IMG_SIZE)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        self.class_names = ["fall", "not_fall", "sitting"]
        self.class_colors = {
            "fall": (0, 0, 255),      # Red
            "not_fall": (0, 255, 0),   # Green
            "sitting": (255, 0, 0)     # Blue
        }
    
    def detect_and_classify(self, image_path):
        """Detect persons in image and classify their activity"""
        # Read image
        image = cv2.imread(image_path)
        if image is None:
            raise ValueError(f"Could not read image at {image_path}")
        
        # Convert to RGB
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Detect persons with YOLO
        try:
            # Handle both regular YOLO model and wrapped TorchScript model
            if hasattr(self.yolo_model, 'predict'):
                results = self.yolo_model.predict(image_rgb)
                boxes = results[0].boxes
            else:
                results = self.yolo_model(image_rgb)
                boxes = results[0]['boxes']
        except Exception as e:
            raise RuntimeError(f"Failed to perform detection: {e}")
        
        # Process detections
        output_image = image.copy()
        classifications = []
        
        for box in boxes:
            # Get bounding box coordinates
            x1, y1, x2, y2 = map(int, box.xyxy[0] if hasattr(box, 'xyxy') else box[:4])
            
            # Only consider detections with high confidence
            if hasattr(box, 'conf'):
                conf = box.conf.item()
                if conf < 0.5:  # Confidence threshold
                    continue
            
            # Crop person from image
            person_img = image_rgb[y1:y2, x1:x2]
            if person_img.size == 0:
                continue
            
            # Convert to PIL Image and apply transforms
            person_pil = Image.fromarray(person_img)
            person_tensor = self.transform(person_pil).unsqueeze(0).to(DEVICE)
            
            # Classify activity with CNN
            with torch.no_grad():
                outputs = self.cnn_model(person_tensor)
                _, predicted = torch.max(outputs.data, 1)
                class_idx = predicted.item()
                class_name = self.class_names[class_idx]
                classifications.append(class_name)
            
            # Draw bounding box and label
            color = self.class_colors.get(class_name, (255, 255, 255))
            cv2.rectangle(output_image, (x1, y1), (x2, y2), color, 2)
            cv2.putText(output_image, class_name, (x1, y1-10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, color, 2)
        
        return output_image, classifications

# Main execution
if __name__ == "__main__":
    # Step 1: Train YOLOv8 model for person detection
    print("Training YOLOv8 model for person detection...")
    yolo_model_path = train_yolo_model()
    print(f"YOLOv8 model trained and saved at: {yolo_model_path}")
    
    # Step 2: Train CNN model for activity classification
    print("\nTraining CNN model for activity classification...")
    cnn_model = train_cnn_model()
    print("CNN model trained and saved at: best_cnn_fall_detection.pth")
    
    # Step 3: Test the combined system
    print("\nTesting combined system on sample images...")
    fall_detector = FallDetectionSystem(yolo_model_path, "best_cnn_fall_detection.pth")
    
    # Test on sample images from the validation set
    test_images = []
    if VAL_PATH:
        for class_name in ["fall", "not_fall", "sitting"]:
            class_dir = os.path.join(VAL_PATH, class_name, "images")
            if os.path.exists(class_dir):
                test_images.extend([os.path.join(class_dir, f) for f in os.listdir(class_dir) 
                                 if f.lower().endswith(('.png', '.jpg', '.jpeg'))][:1])  # 1 image per class
    else:
        for class_name in ["fall", "not_fall", "sitting"]:
            class_dir = os.path.join(TRAIN_PATH, class_name, "images")
            if os.path.exists(class_dir):
                test_images.extend([os.path.join(class_dir, f) for f in os.listdir(class_dir) 
                                 if f.lower().endswith(('.png', '.jpg', '.jpeg'))][:1])  # 1 image per class
    
    for img_path in test_images[:3]:  # Test on max 3 images
        print(f"\nTesting on image: {img_path}")
        output_image, classifications = fall_detector.detect_and_classify(img_path)
        
        # Save and show result
        output_path = f"result_{os.path.basename(img_path)}"
        cv2.imwrite(output_path, output_image)
        print(f"Classifications: {classifications}")
        print(f"Result saved to {output_path}")
        
        # Display result
        plt.figure(figsize=(10, 10))
        plt.imshow(cv2.cvtColor(output_image, cv2.COLOR_BGR2RGB))
        plt.axis('off')
        plt.title('Fall Detection Result')
        plt.show()