Please install the necessary libraries using your terminal/shell in your env before running the code. You can check all the libraries mentioned in the next code block. To install any library you can use either one of the code mentioned below

pip install library_name

pip3 install library_name

In [1]:
import os
import glob
import yaml
from ultralytics import YOLO
import cv2
import matplotlib.pyplot as plt
import json
from matplotlib.backends.backend_pdf import PdfPages
import warnings
import numpy as np
from yolo_cam.eigen_cam import EigenCAM
from yolo_cam.utils.image import show_cam_on_image

warnings.filterwarnings('ignore')

class_name = { 0: 'Mass',
1: 'Spiculation',
2: 'Suspicious Calcification',
3: 'Architectural Distortion',
4: 'Asymmetry',
5: 'Focal Asymmetry',
6: 'Skin Thickening',
7: 'Global Asymmetry',
8: 'Suspicious Lymph Node',
9: 'Skin Retraction',
10: 'Nipple Retraction'
}

color_dict = {
    0: (255, 0, 0),      # Red
    1: (0, 255, 0),      # Green
    2: (0, 0, 255),      # Blue
    3: (255, 255, 0),    # Yellow
    4: (255, 165, 0),    # Orange
    5: (128, 0, 128),    # Purple
    6: (0, 255, 255),    # Cyan
    7: (255, 192, 203),  # Pink
    8: (128, 128, 0),    # Olive
    9: (0, 0, 0),        # Black
    10: (169, 169, 169)  # Dark Grey
}

output_labels_dir = "results/labels"  # Directory to save ground truth images
output_pred_dir = "results/predictions"  # Directory to save predicted images

Mention dataset.yaml file path and run

In [None]:
path = "dataset.yaml"

def remove_cache_files(directory):
    cache_files = glob.glob(os.path.join(directory, "*.cache"))
    for cache_file in cache_files:
        os.remove(cache_file)
        print(f"Removed: {cache_file}")
        
with open(path, 'r') as stream:
    data_loaded = yaml.safe_load(stream)

remove_cache_files(os.path.dirname(data_loaded['train']))
remove_cache_files(os.path.dirname(data_loaded['val']))

Change the model and hyperparameters.

* Epochs = keep minimum 200
* imgsz = change acc to your model assigned
* patience = number of epochs model will check for accuracy improvement and then stop if accuracy is not improving
* set device = [0,1] if you have 2 GPUs requested and device = 0 if only 1 GPU/CPU
* save = True, this will help you save the model checkpoints on its own 
* save_period = 10, this is the number of epochs after which model will save the checkpoints
* resume = True, if kernel crashes, model will continue training from the previous checkpoint
* iou = keep iou between 0.5-0.7 and check for best metrics (hyperparameter to play with)
* optimzer  = AdamW is the best for our model from what I have noticed. you can experiment if you want
* learning rate, momentum, and weight decay are few more hyperparamerts you can play with

In [None]:
model = YOLO('yolov8s.yaml')
results = model.train(data = path, epochs = 200, imgsz = 1024, batch = 10, name = 'checkpoint', device = [0,1], patience = 25, save = True, save_period = 10, exist_ok = True, resume = True, iou = 0.5, optimizer = 'AdamW')

Implemented Tensorboard. Code will redirect you to a link which should be opened in **Google Chrome** if nothing is getting displayed on safari.  You can choose not to run this code block if you want to not see the dashboard and save time.

In [None]:
#!kill 1869322
%load_ext tensorboard
%tensorboard --logdir runs/detect/checkpoint

Mention test_dataset.yaml file path and run

In [None]:
test_path = "test_dataset.yaml"

with open(test_path, 'r') as stream:
    data_loaded = yaml.safe_load(stream)

remove_cache_files(os.path.dirname(data_loaded['train']))
remove_cache_files(os.path.dirname(data_loaded['val']))

Predicting on test data. Please set the prediction_progress.json to 0 by opening everytime when you want to predict on the test data from the start or it will resume from where it stopped last

In [None]:
progress_file = 'prediction_progress.json'

def save_progress(current_index):
    with open(progress_file, 'w') as f:
        json.dump({'last_processed': current_index}, f)

def load_progress():
    if os.path.exists(progress_file):
        with open(progress_file, 'r') as f:
            return json.load(f)['last_processed']
    return 0

# Loading the best model
best_model = YOLO('runs/detect/checkpoint/weights/best.pt')

test_img_path = data_loaded['val']

start_index = load_progress()

chunk_size = 48  # Adjust based on your available memory
for i in range(start_index, len(test_img_path), chunk_size):
    chunk_end = min(i + chunk_size, len(test_img_path))
    current_chunk = test_img_path[i:chunk_end]
    
    try:
        results = best_model.predict(source = current_chunk, save = True, save_txt = True, conf = 0.10, batch = chunk_size//4, stream = True)
        
        # code for post - processing results for later
        for r in results:
            pass  
            
        save_progress(chunk_end)
        
    except Exception as e:
        save_progress(i)
        print(f"Prediction stopped at image {i}. Progress saved.")
        raise e


In [6]:
# best_model = YOLO('runs/detect/checkpoint/weights/best.pt')
# test_img_path = data_loaded['val']
# results = best_model.predict(source = test_img_path, save = True,  save_txt = True,  conf = 0.10, batch = 16)

Run this cell as it is and take screenshot of the output. This code is for calculating the metrics for test data

u can play with conf keep between 0.05-0.2 (for our use case). lower value means it predicts more but with less accuracy. 

In [None]:
metrics = best_model.val(data = test_path, conf = 0.10)

In [None]:
def get_prediction_dirs():
    base_dir = "runs/detect"
    
    predict_dirs = [d for d in os.listdir(base_dir) if d.startswith('predict')]
    predict_dirs.sort(key=lambda x: int(x.replace('predict', '')) if x != 'predict' else 0)
    
    return f"{base_dir}/{predict_dirs[-1]}/labels"

predictions_dir = get_prediction_dirs()
predictions_dir

In [None]:
test_images_dir = data_loaded['val']
test_labels_dir = os.path.join(os.path.dirname(data_loaded['val']), 'labels')

# Create output directory if it doesn't exist
os.makedirs(output_labels_dir, exist_ok=True)
os.makedirs(output_pred_dir, exist_ok=True)

# Function to read YOLO format labels
def read_yolo_labels(label_file):
    labels = []
    with open(label_file, "r") as f:
        for line in f:
            parts = line.strip().split()
            class_id, x_center, y_center, width, height = map(float, parts[:5])
            confidence = float(parts[5]) if len(parts) > 5 else None
            labels.append((int(class_id), x_center, y_center, width, height, confidence))
    return labels

# Function to draw bounding boxes on images
def draw_boxes(image, boxes):
    h, w, _ = image.shape
    for box in boxes:
        class_id, x_center, y_center, width, height, confidence = box
        x_min = int((x_center - width / 2) * w)
        y_min = int((y_center - height / 2) * h)
        x_max = int((x_center + width / 2) * w)
        y_max = int((y_center + height / 2) * h)
        # Draw rectangle
        cv2.rectangle(image, (x_min, y_min), (x_max, y_max), color_dict[class_id], 2)
    return image

# Loop through each image
for image_file in os.listdir(test_images_dir):
    if image_file.endswith((".jpg", ".png", ".jpeg")):
        base_name = os.path.splitext(image_file)[0]
        image_path = os.path.join(test_images_dir, image_file)

        # Load the image
        image = cv2.imread(image_path)
        if image is None:
            print(f"Error loading {image_file}")
            continue

        # Read ground truth labels
        ground_truth_file = os.path.join(test_labels_dir, f"{base_name}.txt")
        ground_truth_boxes = read_yolo_labels(ground_truth_file) if os.path.exists(ground_truth_file) else []

        # Read prediction labels
        prediction_file = os.path.join(predictions_dir, f"{base_name}.txt")
        prediction_boxes = read_yolo_labels(prediction_file) if os.path.exists(prediction_file) else []

        # Draw ground truth (green) and predictions (blue)
        image_with_boxes_gt = draw_boxes(image.copy(), ground_truth_boxes)
        image_with_boxes_pt = draw_boxes(image.copy(), prediction_boxes)

        # Save annotated image
        gt_output_path = os.path.join(output_labels_dir, image_file)
        cv2.imwrite(gt_output_path, image_with_boxes_gt)

        pt_output_path = os.path.join(output_pred_dir, image_file)
        cv2.imwrite(pt_output_path, image_with_boxes_pt)

print("Done saving the predictions")

Code to combine images in a single pdf. This code take upto 20mins to execute

In [None]:
%matplotlib inline

output_labels_file_names = sorted(os.listdir(output_labels_dir))
output_pred_file_names = sorted(os.listdir(output_pred_dir))
c = 0
pdf_file_path = "combined_images.pdf"

# model = YOLO('/home/rshah133/bcd/v8n_1024/runs/detect/checkpoint/weights/best.pt')
model = model.cpu()
target_layers = [model.model.model[-4]]
cam = EigenCAM(model, target_layers, task='od')


with PdfPages(pdf_file_path) as pdf:
    for label_file, pred_file in zip(output_labels_file_names, output_pred_file_names):
        label_img = cv2.imread(os.path.join(output_labels_dir, label_file))
        pred_img = cv2.imread(os.path.join(output_pred_dir, pred_file))
        
        img = cv2.resize(pred_img, (640, 640))
        rgb_img = img.copy()
        img = np.float32(img) / 255
        
        grayscale_cam = cam(rgb_img)[0, :, :]
        
        grayscale_cam_resized = cv2.resize(grayscale_cam, (img.shape[1], img.shape[0]))
        cam_image = show_cam_on_image(img, grayscale_cam_resized, use_rgb=True)
        
        fig, axs = plt.subplots(1, 3, figsize=(16, 6))
        axs[0].imshow(cv2.cvtColor(label_img, cv2.COLOR_BGR2RGB))
        axs[0].set_title('Ground Truth')
        axs[0].axis('off')
        axs[1].imshow(cv2.cvtColor(pred_img, cv2.COLOR_BGR2RGB))
        axs[1].set_title('Prediction')
        axs[1].axis('off')
        axs[2].imshow(cv2.cvtColor(cam_image, cv2.COLOR_BGR2RGB))
        axs[2].set_title('EigenCAM Explainable AI')
        axs[2].axis('off')
        
        c+=1
        pdf.savefig(fig)
        plt.close(fig)
    
print(f"{c} prediction images saved in {pdf_file_path}")

In [None]:
# c = 0

# output_labels_file_names = sorted(os.listdir(output_labels_dir))
# output_pred_file_names = sorted(os.listdir(output_pred_dir))

# pdf_file_path = "combined_images.pdf"
# with PdfPages(pdf_file_path) as pdf:
#     for label_file, pred_file in zip(output_labels_file_names[:5], output_pred_file_names[:5]):
#         label_img = cv2.imread(os.path.join(output_labels_dir, label_file))
#         pred_img = cv2.imread(os.path.join(output_pred_dir, pred_file))

#         fig, axs = plt.subplots(1, 2, figsize=(16, 6))
#         fig.suptitle(f"Image: {label_file}", fontsize=12)
#         axs[0].imshow(cv2.cvtColor(label_img, cv2.COLOR_BGR2RGB))
#         axs[0].set_title('Ground Truth')
#         axs[0].axis('off')
#         axs[1].imshow(cv2.cvtColor(pred_img, cv2.COLOR_BGR2RGB))
#         axs[1].set_title('Prediction')
#         axs[1].axis('off')
#         plt.tight_layout()
#         c += 1
        
#         pdf.savefig(fig)
#         plt.close(fig)

# print(f"{c} prediction images saved in {pdf_file_path}")