In [39]:
!pip install ultralytics torch opencv-python scikit-learn pyyaml



In [40]:
import torch
print("GPU Available:", torch.cuda.is_available())
print("GPU Name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU detected")


GPU Available: True
GPU Name: NVIDIA GeForce RTX 4050 Laptop GPU


In [41]:
import os
import yaml
import torch
from ultralytics import YOLO
from sklearn.model_selection import train_test_split
import numpy as np
import cv2
import shutil

In [42]:
def load_data(root_dir):
    """
    Load data from the specified root directory
    
    Args:
        root_dir (str): Root directory containing subject folders
    
    Returns:
        tuple: Lists of image paths and corresponding labels
    """
    data = []
    labels = []
    for subject in os.listdir(root_dir):
        subject_path = os.path.join(root_dir, subject)
        if os.path.isdir(subject_path):
            for activity_type in os.listdir(subject_path):
                label = 1 if activity_type == 'fall' else 0
                activity_path = os.path.join(subject_path, activity_type)
                for activity_class in os.listdir(activity_path):
                    class_path = os.path.join(activity_path, activity_class)
                    for file in os.listdir(class_path):
                        data.append(os.path.join(class_path, file))
                        labels.append(label)
    return data, labels

In [43]:
def prepare_yolo_dataset(data_paths, labels, output_dir):
    """
    Prepare dataset structure for YOLO training
    
    Args:
        data_paths (list): List of image file paths
        labels (list): Corresponding labels
        output_dir (str): Directory to save prepared dataset
    
    Returns:
        dict: Paths to train and validation directories
    """
    # Create absolute paths
    output_dir = os.path.abspath(output_dir)
    
    # Ensure all necessary directories exist
    os.makedirs(os.path.join(output_dir, 'images', 'train'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'images', 'val'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'labels', 'train'), exist_ok=True)
    os.makedirs(os.path.join(output_dir, 'labels', 'val'), exist_ok=True)
    
    # Split data into train and validation sets
    train_paths, val_paths, train_labels, val_labels = train_test_split(
        data_paths, labels, test_size=0.2, stratify=labels, random_state=42
    )
    
    def copy_and_create_labels(image_paths, labels, split):
        """
        Copy images and create YOLO-format label files
        
        Args:
            image_paths (list): Paths to image files
            labels (list): Corresponding labels
            split (str): 'train' or 'val'
        """
        for img_path, label in zip(image_paths, labels):
            # Copy image
            img_filename = os.path.basename(img_path)
            dest_img_path = os.path.join(output_dir, 'images', split, img_filename)
            shutil.copy(img_path, dest_img_path)
            
            # Create label file
            label_filename = os.path.splitext(img_filename)[0] + '.txt'
            label_path = os.path.join(output_dir, 'labels', split, label_filename)
            
            # For fall detection, we'll create a full-image label
            # YOLO format: <class> <x_center> <y_center> <width> <height>
            with open(label_path, 'w') as f:
                # Assuming label 0 is no fall, 1 is fall
                f.write(f"{label} 0.5 0.5 1.0 1.0")
    
    # Prepare train and validation sets
    copy_and_create_labels(train_paths, train_labels, 'train')
    copy_and_create_labels(val_paths, val_labels, 'val')
    
    return {
        'train': os.path.join(output_dir, 'images', 'train'),
        'val': os.path.join(output_dir, 'images', 'val')
    }

In [44]:
def create_yolo_config(dataset_paths, config_path='fall_detection.yaml'):
    """
    Create YOLO configuration YAML file
    
    Args:
        dataset_paths (dict): Paths to train and validation datasets
        config_path (str): Path to save the configuration file
    """
    # Ensure absolute paths
    train_path = os.path.abspath(dataset_paths['train'])
    val_path = os.path.abspath(dataset_paths['val'])
    
    yolo_config = {
        'train': train_path,
        'val': val_path,
        'nc': 2,  # Number of classes (fall and no fall)
        'names': ['no_fall', 'fall']
    }
    
    # Ensure the directory exists
    os.makedirs(os.path.dirname(os.path.abspath(config_path)), exist_ok=True)
    
    with open(config_path, 'w') as f:
        yaml.dump(yolo_config, f)

In [45]:
import os
import shutil
import yaml
import torch
from ultralytics import YOLO
from sklearn.model_selection import train_test_split

def train_yolo_model(root_dir, output_dir='yolov8n/train', epochs=50):
    """
    Comprehensive debugging for YOLO model training
    """
    # Print current working directory and relevant paths
    print("Current Working Directory:", os.getcwd())
    print("Root Directory:", root_dir)
    print("Output Directory:", output_dir)

    # Check input directory contents
    print("\n--- Input Directory Contents ---")
    try:
        print(os.listdir(root_dir))
    except Exception as e:
        print(f"Error listing root directory: {e}")

    # Load data
    data_paths, labels = load_data(root_dir)
    print(f"\nLoaded {len(data_paths)} samples.")
    
    # Prepare dataset for YOLO
    dataset_paths = prepare_yolo_dataset(data_paths, labels, output_dir)
    
    # Create YOLO configuration
    config_path = 'yolov8n/fall_detection.yaml'
    create_yolo_config(dataset_paths, config_path)

    # Initialize model
    model = YOLO('yolov8n.pt')
    
    # Attempt training
    try:
        results = model.train(
            data=config_path,
            epochs=epochs,
            imgsz=640,
            batch=16,
            device=0,  # Use GPU if available
            project='yolov8n',  # Explicitly set project directory
            name='fall_detection',  # Set a specific name for the run
            verbose=True
        )
        
        # Search for model files
        print("\n--- Searching for model files ---")
        for root, dirs, files in os.walk('yolov8n'):
            for file in files:
                if file == 'best.pt':
                    model_path = os.path.join(root, file)
                    print(f"Found model at: {model_path}")
                    
                    # Attempt to load the model
                    try:
                        loaded_model = YOLO(model_path)
                        print("Model successfully loaded!")
                        return model_path
                    except Exception as e:
                        print(f"Error loading model: {e}")
        
        print("No 'best.pt' model found in yolov8n directory")
        return None

    except Exception as e:
        print(f"Training failed with error: {e}")
        return None

In [46]:
import albumentations as A
import cv2  # OpenCV is needed for image reading/writing

def augment_and_save_from_root(root_dir, output_dir, augment_ratio=2, save_original=True):
    """
    Perform data augmentation on a dataset organized by subject/activity type.

    Args:
        root_dir (str): Root directory containing subject/activity folders.
        output_dir (str): Directory to save augmented images.
        augment_ratio (int): Number of augmented images to generate per input image.
        save_original (bool): Whether to save the original image alongside augmented ones.
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    # Define augmentation pipeline
    augmentation = A.Compose([
        # A.RandomRotate90(p=0.5),
        A.HorizontalFlip(p=0.5),
        # A.RandomBrightnessContrast(p=0.3),
        # A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=15, p=0.7),
        # A.OpticalDistortion(distort_limit=0.05, shift_limit=0.05, p=0.3),
        # A.RandomResizedCrop(height=640, width=640, scale=(0.8, 1.0), p=0.5),
        # A.GaussianBlur(blur_limit=3, p=0.2),
        # A.CLAHE(p=0.2),
        # A.ToGray(p=0.1),  # Convert some images to grayscale for variety
    ])

    augmented_data = []
    augmented_labels = []
    errors = []

    for subject in os.listdir(root_dir):
        subject_path = os.path.join(root_dir, subject)
        if os.path.isdir(subject_path):
            for activity_type in os.listdir(subject_path):
                label = 1 if activity_type == 'fall' else 0
                activity_path = os.path.join(subject_path, activity_type)
                for activity_class in os.listdir(activity_path):
                    class_path = os.path.join(activity_path, activity_class)
                    for file in os.listdir(class_path):
                        img_path = os.path.join(class_path, file)
                        image = cv2.imread(img_path)
                        if image is None:
                            errors.append(img_path)
                            continue

                        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

                        # Create output directory structure
                        output_class_dir = os.path.join(output_dir, subject, activity_type, activity_class)
                        os.makedirs(output_class_dir, exist_ok=True)

                        # Save original image if required
                        if save_original:
                            save_path = os.path.join(output_class_dir, f"{file.split('.')[0]}_original.jpg")
                            cv2.imwrite(save_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
                            augmented_data.append(save_path)
                            augmented_labels.append(label)

                        # Generate augmented images
                        for i in range(augment_ratio):
                            augmented = augmentation(image=image)
                            augmented_image = augmented["image"]

                            # Save augmented image
                            aug_save_path = os.path.join(output_class_dir, f"{file.split('.')[0]}_aug{i}.jpg")
                            cv2.imwrite(aug_save_path, cv2.cvtColor(augmented_image, cv2.COLOR_RGB2BGR))
                            augmented_data.append(aug_save_path)
                            augmented_labels.append(label)

    # Log errors if any
    if errors:
        print("\n--- Errors During Augmentation ---")
        for err in errors:
            print(f"Could not process image: {err}")

    print(f"\nSaved {len(augmented_data)} images to {output_dir}")
    return augmented_data, augmented_labels
import albumentations as A
import cv2  # OpenCV is needed for image reading/writing


In [None]:
if __name__ == "__main__":
    root_dir = 'train'  # Replace with the path to your root directory
    # augmented_dir = 'augmentedtrain'  # Replace with the output directory path

    # # Apply data augmentation
    # print("Applying data augmentation...")
    # augmented_data, augmented_labels = augment_and_save_from_root(root_dir, augmented_dir, augment_ratio=1)
    # print(f"Augmented data saved to: {augmented_dir}")

    # # Step 2: Train YOLO model on augmented data
    # print("Training YOLO model on augmented data...")
    # best_model_path = train_yolo_model(augmented_dir)
    # print(f"Best model saved at: {best_model_path}")

    best_model_path = train_yolo_model(root_dir)
    print(f"Best model saved at: {best_model_path}")

Current Working Directory: c:\Users\tohru\data-slayer-2-0-machine-learning-competition\Prediction_program
Root Directory: train
Output Directory: yolov8n/train

--- Input Directory Contents ---
['subject-1', 'subject-2', 'subject-3', 'subject-4']

Loaded 4294 samples.
New https://pypi.org/project/ultralytics/8.3.51 available  Update with 'pip install -U ultralytics'
Ultralytics 8.3.50  Python-3.12.4 torch-2.4.1+cu121 CUDA:0 (NVIDIA GeForce RTX 4050 Laptop GPU, 6140MiB)
[34m[1mengine\trainer: [0mtask=detect, mode=train, model=yolov8n.pt, data=yolov8n/fall_detection.yaml, epochs=50, time=None, patience=100, batch=16, imgsz=640, save=True, save_period=-1, cache=False, device=0, workers=8, project=yolov8n, name=fall_detection3, exist_ok=False, pretrained=True, optimizer=auto, verbose=True, seed=0, deterministic=True, single_cls=False, rect=False, cos_lr=False, close_mosaic=10, resume=False, amp=True, fraction=1.0, profile=False, freeze=None, multi_scale=False, overlap_mask=True, mask_ra

[34m[1mtrain: [0mScanning C:\Users\tohru\data-slayer-2-0-machine-learning-competition\Prediction_program\yolov8n\train\labels\train.cache... 249 images, 0 backgrounds, 0 corrupt: 100%|██████████| 249/249 [00:00<?, ?it/s]

[34m[1malbumentations: [0mBlur(p=0.01, blur_limit=(3, 7)), MedianBlur(p=0.01, blur_limit=(3, 7)), ToGray(p=0.01, num_output_channels=3, method='weighted_average'), CLAHE(p=0.01, clip_limit=(1.0, 4.0), tile_grid_size=(8, 8))





In [48]:
# import os
# import pandas as pd
# import matplotlib.pyplot as plt

# def plot_training_metrics(best_model_path):
#     """
#     Automatically locate the `results.csv` file and plot YOLO training metrics.
    
#     Args:
#         best_model_path (str): Path to the best YOLO model (e.g., best.pt).
    
#     Returns:
#         None
#     """
#     # Derive the path to `results.csv`
#     base_dir = os.path.dirname(best_model_path)  # Get the directory of best.pt
#     results_csv_path = os.path.join(os.path.dirname(base_dir), 'results.csv')
    
#     # Check if results.csv exists
#     if not os.path.exists(results_csv_path):
#         raise FileNotFoundError(f"Results CSV not found at {results_csv_path}")

#     # Load training results
#     try:
#         results = pd.read_csv(results_csv_path)
#     except Exception as e:
#         raise ValueError(f"Failed to read {results_csv_path}: {e}")
    
#     # Validate required columns
#     required_columns = [
#         'epoch', 'train/box_loss', 'train/cls_loss', 'train/dfl_loss',
#         'val/box_loss', 'val/cls_loss', 'val/dfl_loss',
#         'metrics/precision(B)', 'metrics/recall(B)', 'metrics/mAP50(B)', 'metrics/mAP50-95(B)',
#         'lr/pg0', 'lr/pg1', 'lr/pg2'
#     ]
#     for col in required_columns:
#         if col not in results.columns:
#             raise ValueError(f"Missing required column '{col}' in results.csv")
    
#     # Plot training metrics
#     plt.figure(figsize=(18, 10))

#     # Training Losses
#     plt.subplot(2, 3, 1)
#     plt.plot(results['epoch'], results['train/box_loss'], label='Box Loss', color='blue')
#     plt.plot(results['epoch'], results['train/cls_loss'], label='Class Loss', color='orange')
#     plt.plot(results['epoch'], results['train/dfl_loss'], label='DFL Loss', color='green')
#     plt.xlabel('Epochs')
#     plt.ylabel('Loss')
#     plt.title('Training Loss')
#     plt.legend()

#     # Validation Losses
#     plt.subplot(2, 3, 2)
#     plt.plot(results['epoch'], results['val/box_loss'], label='Val Box Loss', color='blue')
#     plt.plot(results['epoch'], results['val/cls_loss'], label='Val Class Loss', color='orange')
#     plt.plot(results['epoch'], results['val/dfl_loss'], label='Val DFL Loss', color='green')
#     plt.xlabel('Epochs')
#     plt.ylabel('Loss')
#     plt.title('Validation Loss')
#     plt.legend()

#     # Metrics: Precision, Recall, mAP50, mAP50-95
#     plt.subplot(2, 3, 3)
#     plt.plot(results['epoch'], results['metrics/precision(B)'], label='Precision', color='blue')
#     plt.plot(results['epoch'], results['metrics/recall(B)'], label='Recall', color='orange')
#     plt.plot(results['epoch'], results['metrics/mAP50(B)'], label='mAP50', color='green')
#     plt.plot(results['epoch'], results['metrics/mAP50-95(B)'], label='mAP50-95', color='red')
#     plt.xlabel('Epochs')
#     plt.ylabel('Metrics')
#     plt.title('Validation Metrics')
#     plt.legend()

#     # Learning Rates
#     plt.subplot(2, 3, 4)
#     plt.plot(results['epoch'], results['lr/pg0'], label='LR pg0', color='purple')
#     plt.plot(results['epoch'], results['lr/pg1'], label='LR pg1', color='blue')
#     plt.plot(results['epoch'], results['lr/pg2'], label='LR pg2', color='green')
#     plt.xlabel('Epochs')
#     plt.ylabel('Learning Rate')
#     plt.title('Learning Rate Schedule')
#     plt.legend()

#     # Epoch vs Time
#     plt.subplot(2, 3, 5)
#     plt.plot(results['epoch'], results['time'], label='Time (s)', color='brown')
#     plt.xlabel('Epochs')
#     plt.ylabel('Time (seconds)')
#     plt.title('Epoch Time')
#     plt.legend()

#     plt.tight_layout()
#     plt.show()
#     print(f"Plots successfully generated from: {results_csv_path}")


# # Example usage
# # Replace `your_model_path` with the actual path to `best.pt`
# plot_training_metrics(best_model_path)


TypeError: expected str, bytes or os.PathLike object, not NoneType

In [49]:
import os
import numpy as np
import pandas as pd
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import load_img, img_to_array
def load_test_images_in_batches(test_dir, target_size=(640, 640), batch_size=32):
    """
    Loads and preprocesses images from the test folder in batches.

    Args:
        test_dir (str): Directory containing test images.
        target_size (tuple): Desired image size (width, height).
        batch_size (int): Number of images to process per batch.

    Yields:
        tuple: Batch of images as a NumPy array and corresponding image IDs.
    """
    image_files = [f for f in os.listdir(test_dir) if f.endswith(('png', 'jpg', 'jpeg'))]

    for i in range(0, len(image_files), batch_size):
        batch_files = image_files[i:i + batch_size]
        batch_images = []
        batch_ids = []

        for file in batch_files:
            img_path = os.path.join(test_dir, file)
            try:
                # Load and preprocess the image
                img = load_img(img_path, target_size=target_size)
                img_array = img_to_array(img) / 255.0  # Novrmalize to [0, 1]
                batch_images.append(img_array)
                batch_ids.append(file)
            except Exception as e:
                print(f"Error processing {file}: {e}")

        yield np.array(batch_images), batch_ids

# Example usage
test_dir = 'test'
for batch_images, batch_ids in load_test_images_in_batches(test_dir):
    print(f"Processed batch of {len(batch_images)} images.")


Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed batch of 32 images.
Processed 

In [50]:
import os
import numpy as np
import pandas as pd
from ultralytics import YOLO
def predict_yolo(model_path, test_dir, threshold=0.25, imgsz=640, device=None):
    """
    Perform predictions using a YOLO model on test images.

    Args:
        model_path (str): Path to the YOLO model file (e.g., 'best.pt').
        test_dir (str): Path to the directory containing test images.
        threshold (float): Confidence threshold for predictions (default 0.25).
        imgsz (int): Image size for YOLO predictions (default 640).
        device (str): Device to use for predictions (e.g., 'cuda', 'cpu', or None for auto-detection).

    Returns:
        pd.DataFrame: DataFrame containing 'image_id' and 'predicted_class'.
    """
    # Load the YOLO model
    try:
        model = YOLO(model_path)
    except Exception as e:
        raise ValueError(f"Error loading model: {e}")

    # List all image files in the test directory
    image_files = [f for f in os.listdir(test_dir) if f.endswith(('png', 'jpg', 'jpeg'))]
    if not image_files:
        raise ValueError("No image files found in the test directory.")

    predictions = []  # List to store results
    errors = []  # List to log errors

    for img_file in image_files:
        img_path = os.path.join(test_dir, img_file)

        try:
            # Run prediction
            results = model.predict(img_path, conf=threshold, imgsz=imgsz, device=device, verbose=False)

            # Handle detections
            if len(results[0].boxes) > 0:
                # Extract confidence scores and classes
                scores = results[0].boxes.conf.cpu().numpy()  # Confidence scores
                classes = results[0].boxes.cls.cpu().numpy().astype(int)  # Predicted classes

                # Debugging confidence scores for verification
                # print(f"{img_file} - Confidences: {scores}")

                # Filter valid detections based on the confidence threshold
                valid_indices = np.where(scores >= threshold)[0]

                if len(valid_indices) > 0:
                    # Pick the class with the highest confidence after filtering
                    best_class = classes[valid_indices[np.argmax(scores[valid_indices])]]
                else:
                    best_class = 0  # Assign 0 if no valid detections (non-detection class)
            else:
                best_class = 0  # No detection → Class = 0

            # Append results
            predictions.append({"image_id": img_file, "predicted_class": best_class})

        except Exception as e:
            # Log error and continue
            errors.append(f"Error processing {img_file}: {e}")
            predictions.append({"image_id": img_file, "predicted_class": 0})

    # Log errors if any
    if errors:
        print("\n--- Errors During Prediction ---")
        for error in errors:
            print(error)

    # Return predictions as a DataFrame
    return pd.DataFrame(predictions)


In [51]:
# Define paths
test_dir = 'test'  # Test images directory
model_path = best_model_path  # YOLO model path
# model_path = '/kaggle/working/fall_detection3/weights/best.pt'  # YOLO model path

threshold = 0.5  # Adjust sensitivity here

results_df = predict_yolo(model_path, test_dir, threshold=threshold)
output_csv_path = 'yolov8n/predictionswithyoelchan_scaler_10epochs_testsize30_.csv'
results_df.to_csv(output_csv_path, index=False)
print(f"Predictions saved to {output_csv_path}")

ValueError: Error loading model: argument should be a str or an os.PathLike object where __fspath__ returns a str, not 'NoneType'

In [16]:
import os


def load_labels_from_directory(base_dir):
    data = []  # To store file paths
    labels = []  # To store labels (1 for fall, 0 for nonfall)

    # Recursively traverse directories
    for root, dirs, files in os.walk(base_dir):  # os.walk handles nested directories
        for category in dirs:  # Look at subdirectories
            category_path = os.path.join(root, category)

            # Assign label based on the folder name
            if category.lower().startswith("fall_"):
                label = 1
            elif category.lower().startswith("nonfall_"):
                label = 0
            else:
                # Skip folders that don't match the expected naming pattern
                print(f"Skipping unknown category: {category}")
                continue

            # Debug: Print category and label
            print(f"Processing folder: {category}, Assigned Label: {label}")

            # Traverse files in this subdirectory
            for file_name in os.listdir(category_path):
                file_path = os.path.join(category_path, file_name)
                
                # Ensure it's a valid file (e.g., JPG)
                if os.path.isfile(file_path) and file_name.lower().endswith(".jpg"):
                    data.append(file_path)
                    labels.append(label)
    
    return data, labels
# Example usage
base_directory = '/kaggle/input/test-labeldiri'  # Replace with your directory path
data, labels = load_labels_from_directory(base_directory)

# Debug: Check if data was loaded
print(f"Total files loaded: {len(data)}")
print(f"Total labels loaded: {len(labels)}")

# Only print a sample if data is available
if data and labels:
    for i in range(min(5, len(data))):  # Show up to 5 items
        print(f"File: {data[i]}, Label: {labels[i]}")
else:
    print("No files or labels were loaded. Please check the directory structure.")


Total files loaded: 0
Total labels loaded: 0
No files or labels were loaded. Please check the directory structure.


In [17]:
import csv

def export_to_csv(file_paths, labels, output_file):
    """Exports file paths and labels to a CSV file."""
    with open(output_file, mode='w', newline='') as csvfile:
        csv_writer = csv.writer(csvfile, delimiter=',')
        
        # Write the header
        csv_writer.writerow(['id', 'label'])
        
        # Write each file path and its corresponding label
        for file_path, label in zip(file_paths, labels):
            csv_writer.writerow([file_path, label])
    
    print(f"Data exported to {output_file}")

# Specify the output CSV file path
output_csv = '/kaggle/working/file_labels.csv'

# Call the function to export data
export_to_csv(data, labels, output_csv)


Data exported to /kaggle/working/file_labels.csv


In [36]:
import csv
from pathlib import Path
import matplotlib.pyplot as plt
from PIL import Image

# Function to load CSV data
def load_csv(file_path):
    """Loads a CSV file and returns a dictionary with photo ID as key and label as value."""
    data_dict = {}
    with open(file_path, mode='r') as csvfile:
        csv_reader = csv.reader(csvfile, delimiter=',')
        next(csv_reader)  # Skip the header
        
        for row in csv_reader:
            # Check if the row contains a valid format
            if len(row) != 2:
                print(f"Skipping invalid row: {row}")
                continue
            
            full_path_or_id, label = row
            # Extract photo ID from the full path if necessary
            photo_id = Path(full_path_or_id).name  # This ensures we only get the filename
            data_dict[photo_id] = int(label)  # Convert label to integer
    
    return data_dict

# Function to calculate accuracy and log incorrect predictions
def calculate_accuracy_and_log_incorrect(predicted_csv, ground_truth_csv):
    """
    Compares the predicted labels to the ground truth labels using the photo IDs as keys.
    Logs incorrect predictions for visualization.
    """
    # Load both CSVs into dictionaries
    predicted = load_csv(predicted_csv)
    ground_truth = load_csv(ground_truth_csv)
    
    # Initialize counters
    correct = 0
    total = 0
    incorrect_predictions = []  # Store tuples of (photo_id, predicted_label, true_label)
    
    for photo_id, true_label in ground_truth.items():
        # Check if the photo ID exists in the predicted data
        if photo_id in predicted:
            total += 1  # Increment total for every matched ID
            if predicted[photo_id] == true_label:
                correct += 1  # Increment correct if labels match
            else:
                incorrect_predictions.append((photo_id, predicted[photo_id], true_label))
        else:
            print(f"Warning: Photo ID {photo_id} not found in predictions.")
    
    # Calculate accuracy
    accuracy = correct / total if total > 0 else 0
    return accuracy, correct, total, incorrect_predictions

import matplotlib.pyplot as plt
from pathlib import Path
from PIL import Image
import math

def visualize_incorrect_predictions_to_file(incorrect_predictions, image_dir, grid_size=5, resize_to=(100, 100), output_path="incorrect_predictions_grid.png"):
    num_images = min(len(incorrect_predictions), grid_size * grid_size*4)
    rows = math.ceil(num_images / grid_size)
    cols = grid_size

    fig, axes = plt.subplots(rows, cols, figsize=(cols * 2, rows * 2))
    axes = axes.flatten()

    for idx, (photo_id, predicted_label, true_label) in enumerate(incorrect_predictions[:num_images]):
        image_path = Path(image_dir) / photo_id
        if not image_path.exists():
            axes[idx].axis("off")
            continue
        try:
            img = Image.open(image_path).resize(resize_to, Image.Resampling.LANCZOS)
            axes[idx].imshow(img)
            axes[idx].axis("off")
            axes[idx].set_title(f"Pred: {predicted_label}\nTrue: {true_label}", fontsize=8)
        except Exception as e:
            axes[idx].axis("off")
    for idx in range(num_images, len(axes)):
        axes[idx].axis("off")
    
    plt.tight_layout(pad=1.0)
    plt.savefig(output_path)
    print(f"Grid saved to {output_path}")



# File paths for comparison
predicted_csv = output_csv_path  # Replace with your predicted CSV path
ground_truth_csv = 'ground_truth.csv'  # Replace with your ground truth CSV path
image_directory = 'test'  # Replace with the directory containing the images




In [None]:
# Calculate accuracy and log incorrect predictions
accuracy, correct, total, incorrect_predictions = calculate_accuracy_and_log_incorrect(predicted_csv, ground_truth_csv)
print(f"Accuracy: {accuracy * 100:.2f}% ({correct}/{total} correct predictions)")
# Save the visualization
visualize_incorrect_predictions_to_file(incorrect_predictions, image_directory)
# Visualize incorrect predictions
# if incorrect_predictions:
#     print(f"Visualizing {len(incorrect_predictions)} incorrect predictions...")
#     visualize_incorrect_predictions(incorrect_predictions, image_directory)
# else:
#     print("No incorrect predictions to visualize.")

Accuracy: 90.06% (1938/2152 correct predictions)
Grid saved to incorrect_predictions_grid.png
Visualizing 214 incorrect predictions...


<Figure size 1000x4000 with 100 Axes>

<Figure size 1000x2000 with 50 Axes>

In [38]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import numpy as np

def plot_confusion_matrix(y_true, y_pred, labels, title="Confusion Matrix"):
    """
    Generate and plot a confusion matrix with a heatmap.
    """
    cm = confusion_matrix(y_true, y_pred, labels=labels)
    
    plt.figure(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=labels, yticklabels=labels, cbar=False)
    plt.title(title)
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.show()

def calculate_accuracy_and_log_incorrect(predicted_csv, ground_truth_csv):
    """
    Compares the predicted labels to the ground truth labels and calculates accuracy.
    Also logs incorrect predictions.
    """
    # Load CSVs into dictionaries
    predicted = load_csv(predicted_csv)
    ground_truth = load_csv(ground_truth_csv)
    
    # Initialize counters and lists for incorrect predictions
    correct = 0
    total = 0
    incorrect_predictions = []
    
    y_true = []  # True labels
    y_pred = []  # Predicted labels
    
    for photo_id, true_label in ground_truth.items():
        # Check if the photo ID exists in the predicted data
        if photo_id in predicted:
            total += 1  # Increment total for every matched ID
            predicted_label = predicted[photo_id]
            y_true.append(true_label)
            y_pred.append(predicted_label)
            
            if predicted_label == true_label:
                correct += 1  # Increment correct if labels match
            else:
                # Log the incorrect prediction
                incorrect_predictions.append((photo_id, predicted_label, true_label))
    
    accuracy = correct / total if total > 0 else 0
    return accuracy, correct, total, incorrect_predictions, y_true, y_pred

# Example usage
predicted_csv = output_csv_path  # Replace with your predicted CSV path
ground_truth_csv = 'ground_truth.csv'  # Replace with your ground truth CSV path

# Get accuracy and incorrect predictions
accuracy, correct, total, incorrect_predictions, y_true, y_pred = calculate_accuracy_and_log_incorrect(predicted_csv, ground_truth_csv)

# Print accuracy
print(f"Accuracy: {accuracy * 100:.2f}% ({correct}/{total} correct predictions)")

# Plot the confusion matrix
labels = [0, 1]  # Change if you have more classes
plot_confusion_matrix(y_true, y_pred, labels)



Accuracy: 90.06% (1938/2152 correct predictions)


<Figure size 600x500 with 1 Axes>