In [None]:
import os
import json
import shutil
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from pycocotools.coco import COCO
from collections import defaultdict
import torch
import torch.utils.data as data
from PIL import Image
import torchvision.transforms as transforms
import random
from sklearn.model_selection import train_test_split

# 1. Filter Classes and Re-label Data

In [None]:
def print_IDs(path):
    with open(path, "r") as f:
        coco_data = json.load(f)

    categories = {c["id"]: c["name"] for c in coco_data["categories"]}
    print(categories)

In [None]:
print_IDs("/mnt/datassd0/coco-2017/annotations/instances_train2017.json")

In [None]:
print_IDs("/mnt/datassd0/coco-2017/annotations/instances_val2017.json")

In [None]:
################# KEEP IMAGES WITH 0 ANNOTATIONS AND ASSIGN "UNLABELED" CATEGORY #################
def filter_and_relabel_coco(input_json, output_json, dropped_classes):
    
    # Load the original COCO dataset
    with open(input_json, "r") as f:
        coco_data = json.load(f)

    # Get a mapping of original category IDs to names
    original_categories = {c["id"]: c["name"] for c in coco_data["categories"]}

    # Select only categories **not in dropped_classes** and relabel them from 1 to N
    selected_categories = [c for c in coco_data["categories"] if c["name"] not in dropped_classes]
    selected_cat_ids = {c["id"]: i + 1 for i, c in enumerate(selected_categories)}  # Start from 1 instead of 0

    # Add an "unlabeled" category at the end
    unlabeled_id = len(selected_categories) + 1
    new_categories = [{"id": new_id, "name": c["name"]} for new_id, c in enumerate(selected_categories, start=1)]
    new_categories.append({"id": unlabeled_id, "name": "unlabeled"})  # Add "unlabeled" category

    new_annotations = []
    image_id_to_annotations = {}

    # Process annotations and keep only non-dropped categories
    for ann in coco_data["annotations"]:
        if ann["category_id"] in selected_cat_ids:
            ann["category_id"] = selected_cat_ids[ann["category_id"]]  # Re-map category ID
            new_annotations.append(ann)
            image_id_to_annotations.setdefault(ann["image_id"], []).append(ann)

    # Get all image IDs that exist in the dataset
    all_image_ids = {img["id"] for img in coco_data["images"]}

    # Identify images that have no remaining annotations
    images_without_annotations = all_image_ids - set(image_id_to_annotations.keys())

    # Assign a dummy "unlabeled" annotation to images that lost all annotations
    for img_id in images_without_annotations:
        new_annotations.append({
            "id": len(new_annotations) + 1,  # Unique annotation ID
            "image_id": img_id,
            "category_id": unlabeled_id,  # Assign to "unlabeled"
            "bbox": [0, 0, 1, 1],  # Placeholder bounding box
            "area": 1,
            "iscrowd": 0
        })

    # Keep all images, even if they had 0 annotations
    new_images = coco_data["images"]

    new_coco_data = {
        "info": coco_data.get("info", {}),
        "licenses": coco_data.get("licenses", []),
        "images": new_images,
        "annotations": new_annotations,
        "categories": new_categories,
    }

    with open(output_json, "w") as f:
        json.dump(new_coco_data, f, indent=None)

    print(f"Filtered COCO dataset saved as {output_json}")

# Define dropped classes
dropped_classes = [
    "person", # Highest class label
    "wine glass", "cup", "bicycle", "potted plant", "bowl", # Repetitive classes
    "snowboard", "surfboard", "baseball glove", "baseball bat", # Sports category (highly associated with person)
    "tennis racket", "kite", "frisbee", "skis", "sports ball", "skateboard", # Sports category (highly associated with person)
    "backpack", "umbrella", "handbag", "tie", "suitcase", # Accessories category (highly associated with person)
    "car", "motorcycle", "airplane", "bus", "train", "truck", "boat" # Vehicles category (highly associated with person)
    "dining table", # Single class highly associated with person
    "traffic light", "fire hydrant", "stop sign", "parking meter", "bench", # Outdoor objects
    "horse", "sheep", "cow", "elephant", "bear", "zebra", "giraffe", # Outdoor wild animals
]


# Apply the filtering function
filter_and_relabel_coco("/mnt/datassd0/coco-2017/annotations/instances_train2017.json", "/mnt/datassd0/coco-2017/annotations/instances_train2017_filtered.json", dropped_classes)
filter_and_relabel_coco("/mnt/datassd0/coco-2017/annotations/instances_val2017.json", "/mnt/datassd0/coco-2017/annotations/instances_val2017_filtered.json", dropped_classes)

In [None]:
print_IDs("/mnt/datassd0/coco-2017/annotations/instances_train2017_filtered.json")

In [None]:
print_IDs("/mnt/datassd0/coco-2017/annotations/instances_val2017_filtered.json")

# 2. Split Data into Train and Test Sets (80-20)

### This Cell Splits annotations 80-20 split

In [None]:
def combine_and_split_coco(train_json, val_json, output_train_json, output_val_json, train_ratio=0.8):
    # Load train and validation COCO datasets
    with open(train_json, "r") as f:
        train_data = json.load(f)
    with open(val_json, "r") as f:
        val_data = json.load(f)

    # Merge images and annotations from both datasets
    all_images = train_data["images"] + val_data["images"]
    all_annotations = train_data["annotations"] + val_data["annotations"]
    categories = train_data["categories"]  # Assuming both datasets have the same category list

    # Create a dictionary to store image annotations
    image_id_to_annotations = {img["id"]: [] for img in all_images}
    for ann in all_annotations:
        image_id_to_annotations[ann["image_id"]].append(ann)

    # List all image IDs
    all_image_ids = list(image_id_to_annotations.keys())

    # Extract labels: Use the first class ID of each image or assign -1 if no annotations exist
    image_labels = []
    for img_id in all_image_ids:
        labels = [ann["category_id"] for ann in image_id_to_annotations[img_id]]
        image_labels.append(labels[0] if labels else -1)  # Assign -1 to unlabeled images

    # Perform an 80-20 stratified split
    train_ids, val_ids = train_test_split(
        all_image_ids, train_size=train_ratio, stratify=image_labels, random_state=42
    )

    # Convert lists to sets for faster membership checking
    train_ids_set = set(train_ids)
    val_ids_set = set(val_ids)

    # Create new train and validation datasets
    new_train_images = [img for img in all_images if img["id"] in train_ids_set]
    new_val_images = [img for img in all_images if img["id"] in val_ids_set]

    new_train_annotations = [ann for ann in all_annotations if ann["image_id"] in train_ids_set]
    new_val_annotations = [ann for ann in all_annotations if ann["image_id"] in val_ids_set]

    # Save new train JSON
    new_train_data = {
        "info": train_data.get("info", {}),
        "licenses": train_data.get("licenses", []),
        "images": new_train_images,
        "annotations": new_train_annotations,
        "categories": categories,
    }
    with open(output_train_json, "w") as f:
        json.dump(new_train_data, f, indent=None)

    # Save new validation JSON
    new_val_data = {
        "info": train_data.get("info", {}),
        "licenses": train_data.get("licenses", []),
        "images": new_val_images,
        "annotations": new_val_annotations,
        "categories": categories,
    }
    with open(output_val_json, "w") as f:
        json.dump(new_val_data, f, indent=None)

    print(f"New train dataset: {len(new_train_images)} images, {len(new_train_annotations)} annotations")
    print(f"New validation dataset: {len(new_val_images)} images, {len(new_val_annotations)} annotations")

# Define file paths
train_json_path = "/mnt/datassd0/coco-2017/annotations/instances_train2017_filtered.json"
val_json_path = "/mnt/datassd0/coco-2017/annotations/instances_val2017_filtered.json"
output_train_json = "/mnt/datassd0/coco-2017/annotations/instances_train2017_balanced.json"
output_val_json = "/mnt/datassd0/coco-2017/annotations/instances_val2017_balanced.json"

# Perform combination and split
combine_and_split_coco(train_json_path, val_json_path, output_train_json, output_val_json, train_ratio=0.8)

### This Cell re-splits the original train/val images files from 95-5 split to 80-20 split to match new annotaitons
#### This takes some time to copy all images into new folders

In [None]:
# # Original image directories
# original_train_dir = "train2017"
# original_val_dir = "val2017"

# # New balanced image directories
# new_train_dir = "balanced_train2017"
# new_val_dir   = "balanced_val2017"

# # Create new directories if they don't already exist
# os.makedirs(new_train_dir, exist_ok=True)
# os.makedirs(new_val_dir, exist_ok=True)

# # Paths to the balanced JSON files
# balanced_train_json = "annotations/instances_train2017_balanced.json"
# balanced_val_json   = "annotations/instances_val2017_balanced.json"

# # Load balanced train JSON
# with open(balanced_train_json, "r") as f:
#     train_data = json.load(f)

# # Load balanced validation JSON
# with open(balanced_val_json, "r") as f:
#     val_data = json.load(f)

# def copy_images(image_list, destination_dir):
#     for img in image_list:
#         file_name = img["file_name"]
#         src_path = None

#         # Check if the image exists in the original training folder
#         train_path = os.path.join(original_train_dir, file_name)
#         if os.path.exists(train_path):
#             src_path = train_path
#         else:
#             # Otherwise, check the original validation folder
#             val_path = os.path.join(original_val_dir, file_name)
#             if os.path.exists(val_path):
#                 src_path = val_path

#         if src_path is None:
#             print(f"Warning: Image {file_name} not found in either directory.")
#             continue

#         dst_path = os.path.join(destination_dir, file_name)
#         shutil.copy2(src_path, dst_path)

# # Copy images for the new training set
# copy_images(train_data["images"], new_train_dir)

# # Copy images for the new validation set
# copy_images(val_data["images"], new_val_dir)

# print("Images have been copied to the new balanced directories.")

### This Cell Splits Checks the distribution (histogram) for train/val annotations

In [None]:
# Load the JSON files
with open("/mnt/datassd0/coco-2017/annotations/instances_train2017_balanced.json", "r") as f:
    train_data = json.load(f)

with open("/mnt/datassd0/coco-2017/annotations/instances_val2017_balanced.json", "r") as f:
    val_data = json.load(f)

# Create a mapping from category ID to category name (assumes both files share same categories)
category_mapping = {cat["id"]: cat["name"] for cat in train_data["categories"]}

# Initialize dictionaries to hold counts per category for train and validation datasets
train_counts = {cat_id: 0 for cat_id in category_mapping}
val_counts   = {cat_id: 0 for cat_id in category_mapping}

# Count annotations per category for the training set
for ann in train_data["annotations"]:
    cid = ann["category_id"]
    train_counts[cid] += 1

# Count annotations per category for the validation set
for ann in val_data["annotations"]:
    cid = ann["category_id"]
    val_counts[cid] += 1

# Get sorted lists of category IDs, names, and counts for consistent plotting
sorted_ids   = sorted(category_mapping.keys())
sorted_names = [category_mapping[cid] for cid in sorted_ids]
train_values = [train_counts.get(cid, 0) for cid in sorted_ids]
val_values   = [val_counts.get(cid, 0) for cid in sorted_ids]

# Set up the bar plot parameters
bar_width = 0.35
indices   = np.arange(len(sorted_ids))

plt.figure(figsize=(12, 6))

# Plot bars for the training set
plt.bar(indices, train_values, bar_width, label="Train", alpha=0.7, color="blue")

# Plot bars for the validation set, shifted by the bar width
plt.bar(indices + bar_width, val_values, bar_width, label="Validation", alpha=0.7, color="orange")

# Add labels and title
plt.xlabel("Category")
plt.ylabel("Number of Annotations")
plt.title("Class Distribution in Balanced Train vs. Validation Datasets")
plt.xticks(indices + bar_width / 2, sorted_names, rotation=90)
plt.legend()
plt.tight_layout()

# Display the plot
plt.show()

# 3. Prepare Dataset

In [None]:
class CustomCocoDataset(data.Dataset):
    def __init__(self, image_dir, anno_path, labels_path=None, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.labels_path = labels_path

        # Load COCO JSON
        with open(anno_path, "r") as f:
            coco_data = json.load(f)

        self.images = {img["id"]: img["file_name"] for img in coco_data["images"]}
        self.annotations = coco_data["annotations"]

        # Store category names along with ID mappings
        self.category_map = {c["id"]: i for i, c in enumerate(coco_data["categories"], start=1)}
        self.category_id_to_name = {c["id"]: c["name"] for c in coco_data["categories"]}  
        self.num_classes = len(self.category_map)

        # Organize annotations by image_id
        self.image_to_annotations = {img_id: [] for img_id in self.images}
        for ann in self.annotations:
            self.image_to_annotations[ann["image_id"]].append(ann)

        # Store image IDs as dataset index
        self.image_ids = list(self.images.keys())

        # Load or generate labels
        if self.labels_path and os.path.exists(self.labels_path):
            print(f"Loading precomputed labels from {self.labels_path}...")
            self.labels = np.load(self.labels_path)
        else:
            print("No precomputed label file found. Generating labels...")
            self.labels = self.generate_labels()
            if self.labels_path:
                os.makedirs(os.path.dirname(self.labels_path), exist_ok=True)
                self.save_labels(self.labels_path)

    def generate_labels(self):
        # Generate one-hot encoded labels for each image and return as NumPy array.
        labels = np.zeros((len(self.image_ids), self.num_classes))
        for i, img_id in enumerate(self.image_ids):
            for ann in self.image_to_annotations[img_id]:
                category_id = ann["category_id"]
                labels[i][self.category_map[category_id] - 1] = 1  # Convert to one-hot
        return labels

    def save_labels(self, labels_path):
        np.save(labels_path, self.labels)
        print(f"Labels saved to {labels_path}")

    def __getitem__(self, index):
        # Loads an image and its one-hot encoded label.
        img_id = self.image_ids[index]
        img_path = os.path.join(self.image_dir, self.images[img_id])

        # Load image
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Load label from precomputed `.npy`
        label = torch.tensor(self.labels[index], dtype=torch.float32)

        return image, label

    def __len__(self):
        return len(self.image_ids)

In [None]:
train_images = "/mnt/datassd0/coco-2017/images/all_images"
train_annotations = "/mnt/datassd0/coco-2017/annotations/instances_train2017_balanced.json"
train_labels_npy = "/mnt/datassd0/coco-2017/annotations/train_labels.npy"  # Path to save labels

val_images = "/mnt/datassd0/coco-2017/images/all_images"
val_annotations = "/mnt/datassd0/coco-2017/annotations/instances_val2017_balanced.json"
val_labels_npy = "/mnt/datassd0/coco-2017/annotations/val_labels.npy"  # Path to save labels

train_dataset = CustomCocoDataset(train_images, train_annotations, labels_path=train_labels_npy)
val_dataset = CustomCocoDataset(val_images, val_annotations, labels_path=val_labels_npy)

In [None]:
# Find the category ID assigned to "unlabeled"
unlabeled_id = None
for cat_id, cat_name in train_dataset.category_id_to_name.items():
    if cat_name == "unlabeled":
        unlabeled_id = cat_id
        break

if unlabeled_id is None:
    raise ValueError("Error: 'unlabeled' category not found in dataset categories.")

# Count total number of images
total_images = len(train_dataset) + len(val_dataset)

# Count the number of images with no annotations or only "unlabeled" category
unlabeled_count = 0

# Count unlabeled images in the training dataset
for img_id in train_dataset.image_ids:
    annotations = train_dataset.image_to_annotations[img_id]
    
    if not annotations or all(ann["category_id"] == unlabeled_id for ann in annotations):
        unlabeled_count += 1

# Count unlabeled images in the validation dataset
for img_id in val_dataset.image_ids:
    annotations = val_dataset.image_to_annotations[img_id]
    
    if not annotations or all(ann["category_id"] == unlabeled_id for ann in annotations):
        unlabeled_count += 1

# Compute and print the percentage of unlabeled images
unlabeled_percentage = (unlabeled_count / total_images) * 100

print(f"Total images: {total_images}")
print(f"Unlabeled images: {unlabeled_count}")
print(f"Unlabeled Percentage: {unlabeled_percentage:.2f}%")

In [None]:
# Get image and label from dataset
image, label = train_dataset[1]  

# Convert label tensor to numpy array
label_array = label.numpy()
print(label_array)

# Get names of active labels (where value is 1)
active_labels = [train_dataset.category_id_to_name[i + 1] for i in range(len(label_array)) if label_array[i] == 1]

# Convert image to PIL if it's a tensor
if isinstance(image, torch.Tensor):
    image = transforms.ToPILImage()(image)

# Display the image with label names
plt.imshow(image)
plt.axis("off")
plt.title(f"Labels: {', '.join(active_labels)}")  # Display category names instead of numbers
plt.show()