<a href="https://colab.research.google.com/github/Shubhangimahato/Dairy-DigiD-Artificial-Intelligence-Powered-Facial-Recognition/blob/main/DairyDigiD(with_increase_dataset).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch

# Check if GPU is available
if torch.cuda.is_available():
    print("GPU is available")
    device = torch.device("cuda")
else:
    print("GPU is not available")
    device = torch.device("cpu")

print(f"Using device: {device}")


In [None]:
!pip show pyyaml


In [None]:
import tensorflow as tf
tf.__version__

In [None]:
# !nvidia-smi
!nvcc --version #10.1
!python --version # 3.7.6

In [None]:

from google.colab import drive
drive.mount('/content/drive')


In [None]:
import torch, torchvision
print(torch.__version__, torch.cuda.is_available())

# Loading Detectron 2

In [None]:
!python -m pip install "git+https://github.com/facebookresearch/detectron2.git"

In [None]:
 from detectron2.engine import DefaultTrainer
from detectron2.config import get_cfg
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

import numpy as np
import os, json, cv2, random
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer
from detectron2.data import MetadataCatalog, DatasetCatalog
from detectron2.structures import BoxMode


# Loading Dataset

In [None]:
import os
import json
from detectron2.structures import BoxMode

# Define the root directories for both '28 May' and '24 June'
root_dirs = {
    '28 May': '/content/drive/My Drive/28 May_00101/28 May/',
    '24 June': '/content/drive/My Drive/24 June/'
}

# Define the subdirectories to include for both '28 May' and '24 June'
subdirs_to_include = {
    '28 May': ['28 May_001', '28 May_002'],
    '24 June': ['24_june_001', '24_june_002', '24_june_003', '24_june_004']
}

# Initialize lists to collect all images and annotations
all_annotations = []

# Loop through the root directories and their specified subdirectories
for root_dir_name, subdirs in subdirs_to_include.items():
    root_dir = root_dirs[root_dir_name]
    for subdir_name in subdirs:
        subdir_path = os.path.join(root_dir, subdir_name)
        annotation_folder = os.path.join(subdir_path, 'annotations')
        images_root_folder = os.path.join(subdir_path, 'images')

        # Load all JSON annotation files from the annotation folder
        for annotation_file in os.listdir(annotation_folder):
            if annotation_file.endswith('.json'):
                json_path = os.path.join(annotation_folder, annotation_file)
                with open(json_path, 'r') as f:
                    annotations = json.load(f)
                    for annotation in annotations['annotations']:
                        annotation['image_id'] = f"{subdir_name}_{annotation['image_id']}"
                    for image_info in annotations['images']:
                        image_info['id'] = f"{subdir_name}_{image_info['id']}"

                        # Remove redundant folder name from 'file_name' if present
                        if image_info['file_name'].startswith(f"{root_dir_name}/"):
                            image_info['file_name'] = image_info['file_name'][len(root_dir_name) + 1:]

                        # Now combine with 'images_root_folder'
                        image_info['file_name'] = os.path.join(images_root_folder, image_info['file_name'])

                    all_annotations.append({
                        'images': annotations['images'],
                        'annotations': annotations['annotations']
                    })

# Now create the dataset dictionary
def custom_cattle_dataset():
    dataset_dicts = []
    for annotation_group in all_annotations:
        images_info_map = {img_info['id']: img_info for img_info in annotation_group['images']}
        for annotation in annotation_group['annotations']:
            record = {}
            image_info = images_info_map.get(annotation['image_id'])
            if image_info is None:
                print(f"Image info not found for image ID: {annotation['image_id']}")
                continue
            record['file_name'] = image_info['file_name']
            record['image_id'] = image_info['id']
            record['height'] = image_info['height']
            record['width'] = image_info['width']
            # Determine category based on file path
            if "2 Years old or less" in record["file_name"] or "Heifer" in record["file_name"]:
                category_id = 0
            elif "Dry Cows" in record["file_name"] or "dry cows" in record["file_name"]:
                category_id = 1
            elif "Mature Milking Cow" in record["file_name"]:
                category_id = 2
            elif "Pregnant" in record["file_name"]:
                category_id = 3
            else:
                category_id = 999  # Undefined category (optional)
            obj = {
                'bbox': annotation['bbox'],
                'bbox_mode': BoxMode.XYWH_ABS,
                'category_id': category_id,
                'segmentation': annotation.get('segmentation', []),
                'keypoints': annotation.get('keypoints', []),
                'iscrowd': annotation.get('iscrowd', 0),
            }
            record['annotations'] = [obj]
            dataset_dicts.append(record)
    return dataset_dicts

# Generate the dataset dictionary
dataset_dicts = custom_cattle_dataset()
print("Dataset Dicts Sample:", dataset_dicts[:5])


In [None]:
# Define the keypoint flip map using numerical points
keypoint_flip_map = [
    (1, 5),   # Left Eye extreme right <-> Right Eye extreme left
    (2, 6),   # Left Eye extreme left <-> Right Eye extreme righta
    (3, 7),   # Left Eye extreme top <-> Right Eye extreme top
    (4, 8),   # Left Eye extreme bottom <-> Right Eye extreme bottom
    (9, 14),  # Left Ear extreme top right <-> Right Ear extreme top left
    (10, 15), # Left Ear extreme left <-> Right Ear extreme right
    (11, 16), # Left Ear extreme top mid <-> Right Ear extreme top mid
    (12, 17), # Left Ear extreme bottom mid <-> Right Ear extreme bottom mid
    (13, 18), # Left Ear extreme bottom right <-> Right Ear extreme bottom left
    (19, 21), # Muzzle Top left <-> Muzzle Top right
    (20, 20), # Muzzle Top mid <-> Muzzle Top mid (no change)
    (21, 19), # Muzzle Top right <-> Muzzle Top left
    (22, 24), # Muzzle Bottom right <-> Muzzle Bottom left
    (23, 23), # Muzzle Bottom mid <-> Muzzle Bottom mid (no change)
    (24, 22), # Muzzle Bottom left <-> Muzzle Bottom right
    (28, 30), # Head left side <-> Head right side
    (29, 29), # Head extreme top <-> Head extreme top (no change)
    (30, 28), # Head right side <-> Head left side
]

# Define the keypoint names for 30 points
keypoint_names = [
    'kp1', 'kp2', 'kp3', 'kp4', 'kp5', 'kp6', 'kp7', 'kp8', 'kp9', 'kp10',
    'kp11', 'kp12', 'kp13', 'kp14', 'kp15', 'kp16', 'kp17', 'kp18', 'kp19', 'kp20',
    'kp21', 'kp22', 'kp23', 'kp24', 'kp25', 'kp26', 'kp27', 'kp28', 'kp29', 'kp30'
]

In [None]:

import random
from detectron2.data import DatasetCatalog, MetadataCatalog

# Function to split dataset into train, val, and test sets
def split_dataset(dataset, train_ratio=0.7, val_ratio=0.15, test_ratio=0.15, seed=42):
    """
    Splits the dataset into training, validation, and test sets.
    """
    assert train_ratio + val_ratio + test_ratio == 1, "Ratios must sum up to 1!"

    # Set random seed for reproducibility
    random.seed(seed)

    # Shuffle dataset
    shuffled_dataset = random.sample(dataset, len(dataset))

    # Compute split indices
    total_size = len(dataset)
    train_size = int(total_size * train_ratio)
    val_size = int(total_size * val_ratio)

    # Split dataset
    train_set = shuffled_dataset[:train_size]
    val_set = shuffled_dataset[train_size:train_size + val_size]
    test_set = shuffled_dataset[train_size + val_size:]

    return train_set, val_set, test_set

# Ensure your dataset is already loaded in `dataset_dicts`
train_set, val_set, test_set = split_dataset(dataset_dicts)

# Print dataset sizes
print(f"Train Set Size: {len(train_set)}")
print(f"Validation Set Size: {len(val_set)}")
print(f"Test Set Size: {len(test_set)}")

# Define functions to return datasets
def get_cows_train():
    return train_set

def get_cows_val():
    return val_set

def get_cows_test():
    return test_set

# Register datasets
DatasetCatalog.register("cows_train", get_cows_train)
DatasetCatalog.register("cows_val", get_cows_val)
DatasetCatalog.register("cows_test", get_cows_test)

# Define common metadata
metadata_info = {
    "keypoint_flip_map": keypoint_flip_map,
    "keypoint_names": keypoint_names,
    "thing_classes": ["Young Cows", "Dry Cows", "Mature Milking Cow", "Pregnant"],
    "evaluator_type": "coco"
}

# Apply metadata to all datasets
for dataset_name in ["cows_train", "cows_val", "cows_test"]:
    MetadataCatalog.get(dataset_name).keypoint_flip_map = metadata_info["keypoint_flip_map"]
    MetadataCatalog.get(dataset_name).keypoint_names = metadata_info["keypoint_names"]
    MetadataCatalog.get(dataset_name).thing_classes = metadata_info["thing_classes"]
    MetadataCatalog.get(dataset_name).evaluator_type = metadata_info["evaluator_type"]

print("Datasets registered successfully with metadata!")



Preprocessing

In [None]:
from detectron2.modeling import build_model
from detectron2.modeling.meta_arch import GeneralizedRCNN
from detectron2.modeling.roi_heads import StandardROIHeads
from detectron2.config import get_cfg

cfg = get_cfg()

class FocalLossRCNN(GeneralizedRCNN):
    def forward(self, batched_inputs):
        if self.training:
            losses = super().forward(batched_inputs)
            # Modify losses["loss_cls"] with a custom focal loss
            losses["loss_cls"] = focal_loss_function(self.pred_class_logits, self.gt_classes)
            return losses
        else:
            return super().forward(batched_inputs)

# Integrate into Trainer
cfg.MODEL.ROI_HEADS.LOSS = "FocalLossRCNN"


In [None]:
cfg.MODEL.BOX_LOSS_TYPE = "siou"  # Custom loss integration
cfg.SOLVER.OPTIMIZER = "AdamW"
cfg.SOLVER.BASE_LR = 0.0001  # Adjust learning rate for AdamW


In [None]:
from detectron2.data.samplers import RepeatFactorTrainingSampler

def build_train_loader(cfg):
    dataset_dicts = DatasetCatalog.get(cfg.DATASETS.TRAIN[0])
    repeat_factors = RepeatFactorTrainingSampler.repeat_factors_from_category_frequency(
        dataset_dicts, repeat_thresh=0.1
    )
    return build_detection_train_loader(cfg, sampler=RepeatFactorTrainingSampler(repeat_factors))


In [None]:
from detectron2.config import get_cfg

cfg = get_cfg()
cfg.MODEL.BACKBONE.NAME = "build_resnet_fpn_backbone"
cfg.MODEL.RESNETS.DEPTH = 101
cfg.MODEL.RESNETS.OUT_FEATURES = ["res2", "res3", "res4", "res5"]


In [None]:
cfg.MODEL.FPN.NORM = "GN"  # Group Normalization for FPN
cfg.MODEL.FPN.IN_FEATURES = ["p2", "p3", "p4", "p5"]  # Include lower and higher layers


In [None]:
cfg.MODEL.BACKBONE.ATTENTION_TYPE = "CoordinateAttention"  # Requires custom implementation


In [None]:
cfg.MODEL.ANCHOR_GENERATOR.SIZES = [[32, 64, 128, 256, 512]]  # Adjust based on dataset
cfg.MODEL.ANCHOR_GENERATOR.ASPECT_RATIOS = [[0.5, 1.0, 2.0]]  # Common ratios


Train, val and test

In [None]:
import os
import cv2
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from detectron2.engine import DefaultTrainer
from detectron2.config import get_cfg
from detectron2 import model_zoo
import numpy as np
from detectron2.structures import Boxes, Instances, Keypoints
from detectron2.data import MetadataCatalog, DatasetCatalog
from detectron2.data import build_detection_train_loader

# Enable expandable memory allocation
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

# Configuration setup
cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-Keypoints/keypoint_rcnn_R_50_FPN_3x.yaml"))
cfg.DATASETS.TRAIN = ("cows_train_balanced",)
cfg.DATASETS.TEST = ()

cfg.DATALOADER.NUM_WORKERS = 2
cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url("COCO-InstanceSegmentation/mask_rcnn_R_50_FPN_3x.yaml")

cfg.SOLVER.MAX_ITER = 2000
cfg.SOLVER.IMS_PER_BATCH = 1
cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 16
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 4
cfg.MODEL.ROI_KEYPOINT_HEAD.NUM_KEYPOINTS = 30
cfg.TEST.KEYPOINT_OKS_SIGMAS = np.ones((30, 1), dtype=float).tolist()
cfg.SOLVER.STEPS = [3000, 3800]
cfg.SOLVER.GAMMA = 0.1

# Fix Learning Rate and Gradient Clipping
cfg.SOLVER.BASE_LR = 0.0005  # Lowered to prevent NaN errors
cfg.SOLVER.WARMUP_FACTOR = 0.0001
cfg.SOLVER.WARMUP_ITERS = 500
cfg.SOLVER.CLIP_GRADIENTS.ENABLED = True
cfg.SOLVER.CLIP_GRADIENTS.CLIP_TYPE = "value"
cfg.SOLVER.CLIP_GRADIENTS.CLIP_VALUE = 1.0  # Limits large gradient updates

# Define keypoint flip map and keypoint names
keypoint_flip_map = [(1, 5), (2, 6), (3, 7), (4, 8), (9, 14), (10, 15), (11, 16), (12, 17),
                     (13, 18), (19, 21), (20, 20), (21, 19), (22, 24), (23, 23), (24, 22),
                     (28, 30), (29, 29), (30, 28)]
keypoint_names = [f'kp{i}' for i in range(1, 31)]

MetadataCatalog.get("cows_train_balanced").keypoint_flip_map = keypoint_flip_map
MetadataCatalog.get("cows_train_balanced").keypoint_names = keypoint_names
MetadataCatalog.get("cows_train_balanced").thing_classes = ["Young Cows", "Dry Cows", "Mature Milking Cow", "Pregnant"]
MetadataCatalog.get("cows_train_balanced").evaluator_type = "coco"

# Function to check file existence
def check_alternative_extensions(file_path):
    base, _ = os.path.splitext(file_path)
    extensions = [".jpg", ".jpeg", ".JPG", ".JPEG", ".png"]
    for ext in extensions:
        new_path = base + ext
        if os.path.exists(new_path):
            return new_path
    return None

def filter_missing_files(dataset_dicts):
    """Filters missing files and corrects paths with case mismatches."""
    filtered_dataset = []
    for record in dataset_dicts:
        file_path = record["file_name"]
        if os.path.exists(file_path):
            filtered_dataset.append(record)
        else:
            corrected_path = check_alternative_extensions(file_path)
            if corrected_path:
                record["file_name"] = corrected_path
                filtered_dataset.append(record)
    return filtered_dataset

# Load and filter dataset
dataset_dicts = DatasetCatalog.get("cows_train")
filtered_dataset_dicts = filter_missing_files(dataset_dicts)

# Oversample "Pregnant" class
pregnant_data = [d for d in filtered_dataset_dicts if d["annotations"][0]["category_id"] == 3]
augmented_pregnant_data = []
for d in pregnant_data:
    for _ in range(30):  # Augment each sample 30 times
        aug_data = d.copy()
        aug_data["file_name"] = d["file_name"]
        augmented_pregnant_data.append(aug_data)

balanced_dataset_dicts = filtered_dataset_dicts + augmented_pregnant_data

def get_balanced_cows_train():
    return balanced_dataset_dicts

if "cows_train_balanced" in DatasetCatalog.list():
    DatasetCatalog.remove("cows_train_balanced")
    MetadataCatalog.remove("cows_train_balanced")

DatasetCatalog.register("cows_train_balanced", get_balanced_cows_train)

# Augmentation function for "Pregnant" class
def pregnant_extra_augmentations(image, keypoints):
    transform_list = [
        A.HorizontalFlip(p=0.5),
        A.RandomBrightnessContrast(p=0.5),
        A.MotionBlur(p=0.5),
        A.GaussNoise(var_limit=(10.0, 50.0), p=0.5),
        A.Rotate(limit=10, p=0.5, border_mode=cv2.BORDER_REFLECT),
        ToTensorV2()
    ]
    aug = A.Compose(transform_list, keypoint_params=A.KeypointParams(format="xy", remove_invisible=True))
    augmented = aug(image=image, keypoints=keypoints)
    return augmented["image"].permute(1, 2, 0).numpy(), augmented["keypoints"]

# Custom Mapper
def custom_mapper(dataset_dict):
    dataset_dict = dataset_dict.copy()
    image = cv2.imread(dataset_dict["file_name"])
    if image is None:
        return None
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    category_id = dataset_dict["annotations"][0]["category_id"]
    keypoints = [ann.get("keypoints", []) for ann in dataset_dict["annotations"]]
    if category_id == 3:
        image, keypoints = pregnant_extra_augmentations(image, keypoints)

    dataset_dict["image"] = torch.as_tensor(image.transpose(2, 0, 1))

    height, width = image.shape[:2]
    instances = Instances((height, width))

    boxes = [ann["bbox"] for ann in dataset_dict["annotations"]]
    boxes = np.array(boxes, dtype=np.float32)
    boxes = BoxMode.convert(boxes, BoxMode.XYWH_ABS, BoxMode.XYXY_ABS)

    valid_boxes = []
    valid_classes = []
    valid_keypoints = []
    for i, box in enumerate(boxes):
        x1, y1, x2, y2 = box
        if (x2 - x1) > 0 and (y2 - y1) > 0:
            valid_boxes.append(box)
            valid_classes.append(dataset_dict["annotations"][i]["category_id"])
            if keypoints[i]:
                valid_keypoints.append(keypoints[i])

    if len(valid_boxes) == 0:
        return None

    instances.gt_boxes = Boxes(torch.as_tensor(valid_boxes, dtype=torch.float32))
    instances.gt_classes = torch.as_tensor(valid_classes, dtype=torch.int64)

    if valid_keypoints:
        valid_keypoints = np.array(valid_keypoints, dtype=np.float32).reshape(len(valid_keypoints), -1, 3)
        instances.gt_keypoints = Keypoints(torch.as_tensor(valid_keypoints, dtype=torch.float32))

    dataset_dict["instances"] = instances
    return dataset_dict

class CustomTrainer(DefaultTrainer):
    @classmethod
    def build_train_loader(cls, cfg):
        return build_detection_train_loader(cfg, mapper=custom_mapper)

cfg.OUTPUT_DIR = "./my_custom_output_dir"
os.makedirs(cfg.OUTPUT_DIR, exist_ok=True)

trainer = CustomTrainer(cfg)
trainer.resume_or_load(resume=False)
trainer.train()


In [None]:
import json
import matplotlib.pyplot as plt
import os

# Set the path to the metrics.json file
output_dir = cfg.OUTPUT_DIR
metrics_file = os.path.join(output_dir, "metrics.json")

# Load the metrics from the JSON file
metrics = []
with open(metrics_file, 'r') as f:
    for line in f:
        metrics.append(json.loads(line))

# Extract the iteration values, training loss, and validation loss
iterations = [x['iteration'] for x in metrics if 'total_loss' in x]
train_losses = [x['total_loss'] for x in metrics if 'total_loss' in x]

# Validation loss may have different key, make sure to check the key in your JSON
val_iterations = [x['iteration'] for x in metrics if 'validation_loss' in x]
val_losses = [x['validation_loss'] for x in metrics if 'validation_loss' in x]

# Plot both the training and validation loss curves
plt.figure(figsize=(10, 6))
plt.plot(iterations, train_losses, label='Training Loss', color='blue')
if len(val_iterations) > 0 and len(val_losses) > 0:
    plt.plot(val_iterations, val_losses, label='Validation Loss', color='red')

plt.xlabel('Iterations')
plt.ylabel('Loss')
plt.title('Training Loss over Iterations')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
from google.colab import drive
import shutil

# Mount Google Drive
drive.mount('/content/drive')

# Specify source and new destination paths
source_path = "./my_custom_output_dir/model_final.pth"
destination_path = "/content/drive/My Drive/model2_final.pth"  # Change this to the desired path in your Google Drive

# Copy the model file using shutil
shutil.copy(source_path, destination_path)

print(f"Model copied to {destination_path}")


In [None]:
import os
import cv2
import torch
import numpy as np
from detectron2.data import MetadataCatalog, DatasetCatalog, build_detection_test_loader
from detectron2.structures import Boxes, Instances, Keypoints
from detectron2.evaluation import COCOEvaluator, inference_on_dataset

# Function to check file existence
def check_alternative_extensions(file_path):
    base, _ = os.path.splitext(file_path)
    extensions = [".jpg", ".jpeg", ".JPG", ".JPEG", ".png"]
    for ext in extensions:
        new_path = base + ext
        if os.path.exists(new_path):
            return new_path
    return None

def filter_missing_files(dataset_dicts):
    """Filters missing files and corrects paths with case mismatches."""
    filtered_dataset = []
    for record in dataset_dicts:
        file_path = record["file_name"]
        if os.path.exists(file_path):
            filtered_dataset.append(record)
        else:
            corrected_path = check_alternative_extensions(file_path)
            if corrected_path:
                record["file_name"] = corrected_path
                filtered_dataset.append(record)
    return filtered_dataset

# Load and filter validation dataset
dataset_val = DatasetCatalog.get("cows_val")
filtered_val = filter_missing_files(dataset_val)

# Register validation dataset
def get_balanced_val():
    return filtered_val

if "cows_val_balanced" in DatasetCatalog.list():
    DatasetCatalog.remove("cows_val_balanced")
    MetadataCatalog.remove("cows_val_balanced")

DatasetCatalog.register("cows_val_balanced", get_balanced_val)
MetadataCatalog.get("cows_val_balanced").thing_classes = ["Young Cows", "Dry Cows", "Mature Milking Cow", "Pregnant"]

# Define validation data mapper (NO AUGMENTATION)
def custom_val_mapper(dataset_dict):
    dataset_dict = dataset_dict.copy()
    image = cv2.imread(dataset_dict["file_name"])
    if image is None:
        return None
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    dataset_dict["image"] = torch.as_tensor(image.transpose(2, 0, 1))

    height, width = image.shape[:2]
    instances = Instances((height, width))

    boxes = [ann["bbox"] for ann in dataset_dict["annotations"]]
    boxes = np.array(boxes, dtype=np.float32)
    boxes = Boxes(torch.as_tensor(BoxMode.convert(boxes, BoxMode.XYWH_ABS, BoxMode.XYXY_ABS), dtype=torch.float32))

    instances.gt_boxes = boxes
    instances.gt_classes = torch.as_tensor([ann["category_id"] for ann in dataset_dict["annotations"]], dtype=torch.int64)

    if "keypoints" in dataset_dict["annotations"][0]:
        keypoints = [ann["keypoints"] for ann in dataset_dict["annotations"]]
        keypoints = np.array(keypoints, dtype=np.float32).reshape(len(keypoints), -1, 3)
        instances.gt_keypoints = Keypoints(torch.as_tensor(keypoints, dtype=torch.float32))

    dataset_dict["instances"] = instances
    return dataset_dict

# Run validation evaluation
val_loader = build_detection_test_loader(cfg, "cows_val_balanced", mapper=custom_val_mapper)
evaluator = COCOEvaluator("cows_val_balanced", cfg, False, output_dir="./output/")
print("🔍 Evaluating on Validation Set...")
inference_on_dataset(trainer.model, val_loader, evaluator)


In [None]:
import os
import cv2
import numpy as np
import torch
import matplotlib.pyplot as plt
from collections import Counter
from detectron2.engine import DefaultPredictor
from detectron2.data import MetadataCatalog, DatasetCatalog, build_detection_test_loader
from detectron2.structures import Boxes, Instances, Keypoints
from detectron2.evaluation import COCOEvaluator, inference_on_dataset
from detectron2.utils.visualizer import Visualizer
from detectron2.data.detection_utils import convert_image_to_rgb
from sklearn.metrics import confusion_matrix, classification_report

# Define categories
categories = ["Young Cows", "Dry Cows", "Mature Milking Cow", "Pregnant"]

# Ensure dataset is available and filter missing files
dataset_test = DatasetCatalog.get("cows_test")
filtered_test = [d for d in dataset_test if os.path.exists(d["file_name"])]

# Register filtered testing dataset
def get_test_data():
    return filtered_test

if "cows_test_balanced" in DatasetCatalog.list():
    DatasetCatalog.remove("cows_test_balanced")
    MetadataCatalog.remove("cows_test_balanced")

DatasetCatalog.register("cows_test_balanced", get_test_data)
MetadataCatalog.get("cows_test_balanced").thing_classes = categories

# Load model weights and setup predictor
cfg.MODEL.WEIGHTS = os.path.join(cfg.OUTPUT_DIR, "model_final.pth")
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.2  # Adjust confidence threshold
cfg.DATASETS.TEST = ("cows_test_balanced",)
predictor = DefaultPredictor(cfg)

# Lists to store predictions
predicted_labels = []
actual_labels = []

# Extract ground truth from dataset
def extract_ground_truth(data):
    """Extracts actual labels from dataset."""
    return MetadataCatalog.get("cows_test_balanced").thing_classes[data["annotations"][0]["category_id"]]

# Process images for evaluation
def process_images_for_evaluation(output_folder):
    """Evaluate model predictions against ground truth."""
    os.makedirs(output_folder, exist_ok=True)

    for data in filtered_test:
        file_path = data["file_name"]
        image = cv2.imread(file_path)

        if image is None:
            print(f"Error loading image: {file_path}")
            continue

        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Run inference
        outputs = predictor(image)

        # Get predicted class labels
        predicted_class_indices = outputs["instances"].pred_classes.tolist()
        predicted_classes = [categories[idx] for idx in predicted_class_indices if idx < len(categories)]

        # Get actual class
        actual_class = extract_ground_truth(data)

        # If at least one correct prediction, count as correct
        if any(pred == actual_class for pred in predicted_classes):
            final_pred = actual_class
        else:
            final_pred = predicted_classes[0] if predicted_classes else "Unknown"

        # Store results
        actual_labels.append(actual_class)
        predicted_labels.append(final_pred)

        print(f"Image: {file_path}, Actual: {actual_class}, Predicted: {predicted_classes} -> Final Assigned: {final_pred}")

# Run evaluation
output_folder = '/content/drive/My Drive/test_results'
process_images_for_evaluation(output_folder)

# Analyze results
print("\nUnique Actual Labels:", set(actual_labels))
print("Actual Labels Distribution:", Counter(actual_labels))
print("\nUnique Predicted Labels:", set(predicted_labels))
print("Predicted Labels Distribution:", Counter(predicted_labels))

# Generate confusion matrix and classification report
print("\nConfusion Matrix:")
conf_matrix = confusion_matrix(actual_labels, predicted_labels, labels=categories)
print(conf_matrix)

print("\nClassification Report:")
print(classification_report(actual_labels, predicted_labels, target_names=categories, zero_division=1))

# Plot Confusion Matrix
import seaborn as sns

plt.figure(figsize=(8, 6))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues", xticklabels=categories, yticklabels=categories)
plt.xlabel("Predicted Labels")
plt.ylabel("Actual Labels")
plt.title("Confusion Matrix for Cow Classification")
plt.show()


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc
from sklearn.preprocessing import label_binarize

# Ensure `actual_labels` and `predicted_labels` exist 🚨
if not actual_labels or not predicted_labels:
    print("Error: The actual_labels and predicted_labels lists are empty. Run model evaluation first!")
else:
    # Convert labels to one-hot encoding for AUC-ROC computation
    actual_binarized = label_binarize(actual_labels, classes=categories)
    predicted_binarized = label_binarize(predicted_labels, classes=categories)

    # Compute and Plot AUC-ROC Curve
    plt.figure(figsize=(8, 6))
    for i, category in enumerate(categories):
        fpr, tpr, _ = roc_curve(actual_binarized[:, i], predicted_binarized[:, i])
        roc_auc = auc(fpr, tpr)
        plt.plot(fpr, tpr, label=f"{category} (AUC = {roc_auc:.2f})")

    # Plot Diagonal Reference Line
    plt.plot([0, 1], [0, 1], "k--", lw=2)
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("AUC-ROC Curve for Cow Classification")
    plt.legend(loc="lower right")
    plt.show()


# New Brunswick Data

In [None]:
import os
import cv2
from detectron2.structures import BoxMode

def create_labels_dict(dataset_root):
    """
    Dynamically assign labels based on folder names.
    """
    labels_dict = {}
    label_id = 0

    # Iterate through farms
    for farm in os.listdir(dataset_root):
        farm_path = os.path.join(dataset_root, farm)
        if os.path.isdir(farm_path):  # Ensure it's a folder
            for category in os.listdir(farm_path):  # Iterate over categories
                category_path = os.path.join(farm_path, category)
                if os.path.isdir(category_path) and category not in labels_dict:
                    labels_dict[category] = label_id
                    label_id += 1  # Increment for next category

    return labels_dict


In [None]:
def get_newbrunswick_dataset(image_root):
    """
    Convert New Brunswick dataset into Detectron2 format.
    Uses dynamically created category labels.
    """
    dataset_dicts = []
    labels_dict = create_labels_dict(image_root)  # Get category mappings

    for farm_folder in os.listdir(image_root):
        farm_path = os.path.join(image_root, farm_folder)
        if not os.path.isdir(farm_path):
            continue  # Skip non-folder files

        for class_folder in os.listdir(farm_path):
            class_path = os.path.join(farm_path, class_folder)

            if not os.path.isdir(class_path):
                continue  # Skip non-folder files

            # Get dynamically assigned class ID
            class_id = labels_dict.get(class_folder, None)
            if class_id is None:
                continue  # Ignore undefined categories

            for idx, image_file in enumerate(os.listdir(class_path)):
                image_path = os.path.join(class_path, image_file)
                if not image_path.lower().endswith((".jpg", ".jpeg", ".png")):
                    continue  # Skip non-image files

                # Verify if the file exists
                if not os.path.exists(image_path):
                    print(f"Warning: Image not found - {image_path}")
                    continue

                # Load image to get dimensions
                img = cv2.imread(image_path)
                if img is None:
                    print(f"Warning: Could not read image - {image_path}")
                    continue

                height, width = img.shape[:2]

                # Store in Detectron2 format (full image as bbox)
                record = {
                    "file_name": image_path,  # Ensure "file_name" is included
                    "image_id": len(dataset_dicts),
                    "height": height,
                    "width": width,
                    "annotations": [{
                        "bbox": [0, 0, width, height],  # Whole image as bbox
                        "bbox_mode": BoxMode.XYWH_ABS,
                        "category_id": class_id,
                    }]
                }
                dataset_dicts.append(record)  # ✅ Append properly

    return dataset_dicts, labels_dict


In [None]:
from detectron2.data import DatasetCatalog, MetadataCatalog

# Define dataset path
image_root = "/content/drive/My Drive/NewBrunswick"

# Generate dataset dictionary & labels
dataset_dicts, labels_dict = get_newbrunswick_dataset(image_root)

# Register dataset
DatasetCatalog.register("newbrunswick_dataset", lambda: dataset_dicts)
MetadataCatalog.get("newbrunswick_dataset").set(
    thing_classes=list(labels_dict.keys())  # Use dynamically created class names
)

print("Dataset registered successfully!")
print("Final class mapping:", labels_dict)


In [None]:
from detectron2.config import get_cfg
from detectron2.engine import DefaultTrainer
from detectron2 import model_zoo
import os
import shutil

# Define output directory in Google Drive
output_dir = "/content/drive/My Drive/detectron2_output/"
os.makedirs(output_dir, exist_ok=True)  # Create folder if it doesn't exist

# Detectron2 Training Configuration
cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml"))
cfg.DATASETS.TRAIN = ("newbrunswick_dataset",)
cfg.DATASETS.TEST = ()
cfg.DATALOADER.NUM_WORKERS = 4
cfg.SOLVER.IMS_PER_BATCH = 2
cfg.SOLVER.BASE_LR = 0.001
cfg.SOLVER.MAX_ITER = 2000
cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 128
cfg.MODEL.ROI_HEADS.NUM_CLASSES = len(labels_dict)  # Use dynamically assigned classes

# Set the output directory to Google Drive
cfg.OUTPUT_DIR = output_dir

# Train Model
trainer = DefaultTrainer(cfg)
trainer.resume_or_load(resume=False)
trainer.train()

# Move the trained model to Google Drive
source_path = os.path.join(cfg.OUTPUT_DIR, "model_final.pth")
destination_path = "/content/drive/My Drive/model3_final.pth"  # Change this if needed

if os.path.exists(source_path):
    shutil.copy(source_path, destination_path)
    print(f"Model copied to {destination_path}")
else:
    print("Model training failed, file not found.")


# Pipeline to Use Both Models

In [None]:
import numpy as np
import cv2
import torch
import itertools
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
from detectron2.config import get_cfg
from detectron2.engine import DefaultPredictor
from detectron2.data import MetadataCatalog, DatasetCatalog, build_detection_test_loader
from detectron2.structures import Boxes, Instances
from detectron2.evaluation import COCOEvaluator, inference_on_dataset
from detectron2 import model_zoo  # Corrected Import

# ========================== Fix NameError: Define Configs ========================== #
cfg1 = get_cfg()
cfg1.merge_from_file(model_zoo.get_config_file("COCO-Keypoints/keypoint_rcnn_R_50_FPN_3x.yaml"))
cfg1.MODEL.WEIGHTS = "/content/drive/My Drive/model2_final.pth"
cfg1.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5  # Set confidence threshold
cfg1.MODEL.DEVICE = "cuda"  # Use GPU if available
predictor1 = DefaultPredictor(cfg1)

cfg2 = get_cfg()
cfg2.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml"))
cfg2.MODEL.WEIGHTS = "/content/drive/My Drive/model3_final.pth"
cfg2.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5  # Set confidence threshold
cfg2.MODEL.DEVICE = "cuda"  # Use GPU if available
predictor2 = DefaultPredictor(cfg2)

# ========================== Dataset Registration ========================== #
dataset_test = DatasetCatalog.get("cows_test")

# Function to filter missing files
def filter_missing_files(dataset):
    return [d for d in dataset if cv2.imread(d["file_name"]) is not None]

filtered_test = filter_missing_files(dataset_test)

# Register test dataset (NO AUGMENTATION)
def get_test_data():
    return filtered_test

if "cows_test_balanced" in DatasetCatalog.list():
    DatasetCatalog.remove("cows_test_balanced")
    MetadataCatalog.remove("cows_test_balanced")

DatasetCatalog.register("cows_test_balanced", get_test_data)
MetadataCatalog.get("cows_test_balanced").thing_classes = ["Young Cows", "Dry Cows", "Mature Milking Cow", "Pregnant"]

# ========================== Custom Test Data Mapper ========================== #
def custom_test_mapper(dataset_dict):
    dataset_dict = dataset_dict.copy()
    image = cv2.imread(dataset_dict["file_name"])
    if image is None:
        return None
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    dataset_dict["image"] = torch.as_tensor(image.transpose(2, 0, 1))

    height, width = image.shape[:2]
    instances = Instances((height, width))

    # Convert bounding boxes correctly
    boxes = [ann["bbox"] for ann in dataset_dict["annotations"]]
    boxes = np.array(boxes, dtype=np.float32).reshape(-1, 4)  # Ensure proper shape
    boxes = Boxes(torch.as_tensor(boxes, dtype=torch.float32))

    instances.gt_boxes = boxes
    instances.gt_classes = torch.as_tensor([ann["category_id"] for ann in dataset_dict["annotations"]], dtype=torch.int64)

    dataset_dict["instances"] = instances
    return dataset_dict

# ========================== Run Evaluation & Confusion Matrix ========================== #
test_loader = build_detection_test_loader(cfg1, "cows_test_balanced", mapper=custom_test_mapper)
categories = MetadataCatalog.get("cows_test_balanced").thing_classes  # Get class names

true_labels = []
pred_labels = []

for batch in test_loader:
    file_path = batch[0]["file_name"]  # Get image filename
    images = batch[0]["image"].cuda()  # Move images to GPU
    gt_classes = batch[0]["instances"].gt_classes.cpu().numpy()  # Get ground truth labels
    true_labels.extend(gt_classes)

    # Convert tensor image to NumPy for prediction
    img_numpy = images.cpu().numpy().transpose(1, 2, 0)

    outputs = predictor1(img_numpy)  # Run model prediction
    pred_instances = outputs["instances"]

    # Check if the model made any predictions
    if pred_instances.has("pred_classes") and len(pred_instances.pred_classes) > 0:
        pred_classes = pred_instances.pred_classes.cpu().numpy()
        scores = pred_instances.scores.cpu().numpy()  # Confidence scores

        print(f" Image: {file_path}")
        print(f"   - Ground Truth: {[categories[idx] for idx in gt_classes]}")
        print(f"   - Predicted Classes: {[categories[idx] for idx in pred_classes]}")
        print(f"   - Confidence Scores: {scores}")

        # Select the highest-confidence prediction that is above the threshold
        high_conf_preds = [(pred_classes[i], scores[i]) for i in range(len(scores)) if scores[i] > 0.5]

        if high_conf_preds:
            best_prediction = max(high_conf_preds, key=lambda x: x[1])[0]  # Get highest confidence class
            pred_labels.append(best_prediction)
        else:
            pred_labels.append(-1)  # No high-confidence prediction
    else:
        print(f"   -  No valid predictions for this image!")
        pred_labels.append(-1)  # No prediction case


# Compute confusion matrix
cm = confusion_matrix(true_labels, pred_labels)

# ========================== Plot Confusion Matrix ========================== #
def plot_confusion_matrix(cm, class_names):
    plt.figure(figsize=(8, 6))
    plt.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
    plt.title("Confusion Matrix")
    plt.colorbar()

    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45)
    plt.yticks(tick_marks, class_names)

    # Add text annotations
    fmt = "d"
    thresh = cm.max() / 2.0
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel("True label")
    plt.xlabel("Predicted label")
    plt.tight_layout()
    plt.show()

# Display confusion matrix
class_names = MetadataCatalog.get("cows_test_balanced").thing_classes
plot_confusion_matrix(cm, class_names)


# GRAD CAM

In [None]:
!pip install grad-cam

In [None]:
import os
import cv2
import torch
import random
import numpy as np
import matplotlib.pyplot as plt
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.utils.visualizer import Visualizer, ColorMode
from detectron2 import model_zoo
from detectron2.modeling import build_model
from detectron2.checkpoint import DetectionCheckpointer
from pytorch_grad_cam import GradCAM
from pytorch_grad_cam.utils.image import preprocess_image as gradcam_preprocess
from pytorch_grad_cam.utils.model_targets import FasterRCNNBoxScoreTarget

# Load the trained Detectron2 model
cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml"))
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 4  # Adjust based on your dataset
cfg.MODEL.WEIGHTS = "/content/drive/My Drive/model1_final.pth"  # Change to your trained model path
cfg.MODEL.DEVICE = "cuda" if torch.cuda.is_available() else "cpu"  # Use GPU if available

# Initialize model manually for Grad-CAM
model = build_model(cfg)
DetectionCheckpointer(model).load(cfg.MODEL.WEIGHTS)
model.eval()

# Register Grad-CAM Hook
feature_maps = None
gradients = None

def forward_hook(module, input, output):
    global feature_maps
    feature_maps = output

def backward_hook(module, grad_in, grad_out):
    global gradients
    gradients = grad_out[0]

# Register hooks to a deeper convolutional layer (res4 for better feature visualization)
target_layer = model.backbone.bottom_up.res4
target_layer.register_forward_hook(forward_hook)
target_layer.register_full_backward_hook(backward_hook)

# Initialize predictor
predictor = DefaultPredictor(cfg)

# Function to get 8 random images from test folder
def get_random_images(folder, num_images=8):
    image_paths = []
    for root, _, files in os.walk(folder):
        for file in files:
            if file.lower().endswith(('.jpg', '.jpeg', '.png')):  # Check valid image formats
                image_paths.append(os.path.join(root, file))

    if len(image_paths) == 0:
        raise ValueError("❌ ERROR: No images found in test folder!")

    random.shuffle(image_paths)
    return image_paths[:num_images]

# Function to generate Grad-CAM heatmap
def generate_grad_cam(image_path):
    global feature_maps, gradients

    torch.cuda.empty_cache()  # Free up GPU memory

    # Load and preprocess image
    img = cv2.imread(image_path)
    if img is None:
        print(f"Error: Unable to load image at '{image_path}'. Check the path.")
        return None, None

    img = cv2.resize(img, (512, 512))  # Resize to avoid memory overflow
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    # Convert to tensor and send to GPU
    img_tensor = torch.tensor(img).permute(2, 0, 1).float().unsqueeze(0).cuda()

    # Forward pass
    model.zero_grad()
    inputs = [{"image": img_tensor.squeeze()}]
    outputs = model(inputs)[0]  # Extract first output dictionary

    if "instances" not in outputs or len(outputs["instances"]) == 0:
        print("No objects detected in", image_path)
        return img, None

    instances = outputs["instances"]
    scores = instances.scores
    highest_class_idx = scores.argmax()
    scores[highest_class_idx].backward()

    # Compute Grad-CAM heatmap
    pooled_gradients = torch.mean(gradients, dim=[0, 2, 3])  # Average over height & width
    feature_maps = feature_maps.squeeze(0)  # Remove batch dimension
    pooled_gradients = pooled_gradients.view(feature_maps.shape[0], 1, 1)

    cam = torch.sum(pooled_gradients * feature_maps, dim=0).cpu().detach().numpy()
    cam = np.maximum(cam, 0)  # Apply ReLU
    cam = cv2.resize(cam, (img.shape[1], img.shape[0]))  # Resize to match original image size

    # Overlay the heatmap
    heatmap = cv2.applyColorMap(np.uint8(255 * cam / cam.max()), cv2.COLORMAP_JET)
    overlayed_img = cv2.addWeighted(img, 0.5, heatmap, 0.5, 0)

    return img, overlayed_img  # Return original & heatmap image

# Run Grad-CAM on 8 Random Images & Display in Grid
test_folder = "/content/drive/My Drive/test"  # Change this to your test directory
selected_images = get_random_images(test_folder, num_images=8)

fig, axes = plt.subplots(2, 4, figsize=(20, 10))  # 2x4 grid for displaying images

for i, img_path in enumerate(selected_images):
    row, col = divmod(i, 4)

    original, heatmap = generate_grad_cam(img_path)
    if heatmap is None:
        continue  # Skip images with no detections

    # Display results
    axes[row, col].imshow(heatmap)
    axes[row, col].set_title(f"Image {i+1}")
    axes[row, col].axis("off")

# Show all Grad-CAM results
plt.show()


# PBE

In [None]:
import os
import cv2
import torch
import random
import numpy as np
import matplotlib.pyplot as plt
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2 import model_zoo
from detectron2.modeling import build_model
from detectron2.checkpoint import DetectionCheckpointer

# Load the trained Detectron2 model
cfg = get_cfg()
cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml"))
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 4  # Adjust based on your dataset
cfg.MODEL.WEIGHTS = "/content/drive/My Drive/model1_final.pth"  # Change to your trained model path
cfg.MODEL.DEVICE = "cuda" if torch.cuda.is_available() else "cpu"  # Use GPU if available

# Initialize model
model = build_model(cfg)
DetectionCheckpointer(model).load(cfg.MODEL.WEIGHTS)
model.eval()

# Initialize predictor for normal inference
predictor = DefaultPredictor(cfg)

# Function to get 8 random images from test folder
def get_random_images(folder, num_images=8):
    image_paths = []
    for root, _, files in os.walk(folder):
        for file in files:
            if file.lower().endswith(('.jpg', '.jpeg', '.png')):  # Check valid image formats
                image_paths.append(os.path.join(root, file))

    if len(image_paths) == 0:
        raise ValueError("ERROR: No images found in test folder!")

    random.shuffle(image_paths)
    return image_paths[:num_images]

# Occlusion Sensitivity Function
def perturb_image(image, mask_size=50, stride=25):
    """
    Iteratively occludes different parts of the image and measures impact on predictions.
    Generates an importance heatmap showing regions most sensitive to occlusion.
    """
    h, w, _ = image.shape
    heatmap = np.zeros((h, w))

    # Run original prediction
    original_output = predictor(image)
    original_score = original_output["instances"].scores.max().item() if len(original_output["instances"]) > 0 else 0

    # Slide occlusion window across image
    for y in range(0, h, stride):
        for x in range(0, w, stride):
            # Create occlusion mask
            occluded_image = image.copy()
            occluded_image[y:y+mask_size, x:x+mask_size] = 0  # Black-out occluded area

            # Run prediction on occluded image
            occluded_output = predictor(occluded_image)
            occluded_score = occluded_output["instances"].scores.max().item() if len(occluded_output["instances"]) > 0 else 0

            # Compute score drop (higher = more important)
            heatmap[y:y+mask_size, x:x+mask_size] = original_score - occluded_score

    # Normalize heatmap
    heatmap = (heatmap - heatmap.min()) / (heatmap.max() - heatmap.min())
    return heatmap

# Run Perturbation-Based Explainability on 8 Random Images & Display in Grid
test_folder = "/content/drive/My Drive/test"  # Change this to your test directory
selected_images = get_random_images(test_folder, num_images=8)

fig, axes = plt.subplots(2, 4, figsize=(20, 10))  # 2x4 grid for displaying images

for i, img_path in enumerate(selected_images):
    row, col = divmod(i, 4)

    # Load image
    img = cv2.imread(img_path)
    if img is None:
        print(f"ERROR: Could not load {img_path}")
        continue
    img = cv2.resize(img, (512, 512))  # Resize for consistency
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB

    # Compute occlusion sensitivity heatmap
    heatmap = perturb_image(img)

    # Convert to color heatmap
    heatmap = cv2.applyColorMap(np.uint8(255 * heatmap), cv2.COLORMAP_JET)
    overlayed_img = cv2.addWeighted(img, 0.5, heatmap, 0.5, 0)

    # Display results
    axes[row, col].imshow(overlayed_img)
    axes[row, col].set_title(f"Image {i+1}")
    axes[row, col].axis("off")

# Show all perturbation-based results
plt.show()
