1- loading data set using PyTorch

In [None]:
import os
import json
import matplotlib.pyplot as plt
import cv2
import numpy as np
from torch.utils.data import Dataset, DataLoader

In [None]:
class ImageLabelDataset(Dataset):
    def __init__(self, root_dir, data_types):
        self.data = {}
        for data_type in data_types:
            data_path = os.path.join(root_dir, data_type)
            image_folder_path = os.path.join(data_path, "images")
            label_folder_path = os.path.join(data_path, "labels", "json")
            images = []
            labels = []
            for image_file in os.listdir(image_folder_path):
                image_path = os.path.join(image_folder_path, image_file)
                json_file = image_file.split('.')[0] + '.json'
                label_path = os.path.join(label_folder_path, json_file)
                if os.path.exists(label_path):
                    images.append(image_path)
                    labels.append(label_path)
            self.data[data_type] = {"images": images, "labels": labels}

    def __len__(self):
        total_images = 0
        for data_type in self.data:
            total_images += len(self.data[data_type]["images"])
        return total_images

    def __getitem__(self, idx):
     data_type = None
     image_idx = None
     for dt, data in self.data.items():
        if idx < len(data['images']):
            data_type = dt
            image_idx = idx
            break
        else:
            idx -= len(data['images'])
     if data_type is not None and image_idx is not None:
        image_path = self.data[data_type]["images"][image_idx]
        label_path = self.data[data_type]["labels"][image_idx]
        return image_path, label_path
     else:
        raise IndexError("Index out of range.")


2- visualizing some of the labeled images

In [None]:
def visualize_images(data_loader, num_batches):
    colors = {"bin": (255, 0, 0), "dolly": (0, 255, 0), "jack": (0, 0, 255)}
    
    for batch_idx, batch in enumerate(data_loader):
        if batch_idx >= num_batches:
            break
        
        images, labels = batch
        for image_path, label_path in zip(images, labels):
            image_cv2 = cv2.imread(image_path)
            image_cv2 = cv2.cvtColor(image_cv2, cv2.COLOR_BGR2RGB)
            plt.figure(figsize=(8, 6))
            plt.imshow(image_cv2)

            with open(label_path, 'r') as f:
                labels = json.load(f)

            for bbox in labels:
                left = bbox["Left"]
                top = bbox["Top"]
                right = bbox["Right"]
                bottom = bbox["Bottom"]
                class_name = bbox["ObjectClassName"]
                color = colors.get(class_name, (0, 0, 0))
            

                cv2.rectangle(image_cv2, (left, top), (right, bottom), color, 2)
                cv2.putText(image_cv2, class_name, (left, top - 5), cv2.FONT_HERSHEY_SIMPLEX, 1.2, color, 4)

            plt.imshow(image_cv2)
            plt.title(image_path)
            plt.show()
           


root_dir = "data"
dataset_train = ImageLabelDataset(root_dir, ["Training"])
dataset_test = ImageLabelDataset(root_dir, ["Testing"])
batch_size = 2

train_loader = DataLoader(dataset_train, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(dataset_test, batch_size=batch_size, shuffle=False)

num_batches_to_visualize = 1
print("Visualizing training images:")
visualize_images(train_loader, num_batches_to_visualize)

print("Visualizing testing images:")
visualize_images(test_loader, num_batches_to_visualize)

3- adding agmentations

In [None]:
import albumentations as A

def augment_and_save_images(data_loader, output_dir, num_batches=1):
    for batch_idx, batch in enumerate(data_loader):
        if batch_idx >= num_batches:
            break  
        augmentation = A.Compose([
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            A.RandomRotate90(p=0.5),
            A.RandomBrightnessContrast(p=0.5),
            A.RGBShift(r_shift_limit=25, g_shift_limit=25, b_shift_limit=25, p=0.5),
            A.RandomGamma(gamma_limit=(80, 120), p=0.5),
            A.Blur(blur_limit=(3, 7), p=0.5),
        ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['class_labels', 'class_ids']))
        
        images, labels = batch
    
        for image_path, label_path in zip(images, labels):
            image = cv2.imread(image_path)
            filename = os.path.splitext(os.path.basename(image_path))[0] 
            output_img_path = os.path.join(output_dir, f"{filename}_a.jpg") 
            output_label_path = os.path.join(os.path.dirname(label_path), f"{filename}_a.json") 
            
            if os.path.exists(label_path):
                with open(label_path, 'r') as f:
                    try:
                        original_labels = json.load(f)
                    except json.JSONDecodeError:
                        print(f"Error: Unable to parse JSON file: {label_path}")
                        continue

                bboxes = []
                class_labels = []
                class_ids = []  # Add class IDs list
                for bbox in original_labels:
                    try:
                        class_name = bbox["ObjectClassName"]
                        class_id = bbox["ObjectClassId"]  # Get class ID
                        left = bbox["Left"]
                        top = bbox["Top"]
                        right = bbox["Right"]
                        bottom = bbox["Bottom"]
                    except KeyError:
                        print(f"Error: Malformed label in file: {label_path}")
                        continue
                    bboxes.append([left, top, right, bottom])
                    class_labels.append(class_name)
                    class_ids.append(class_id)  # Append class ID

                augmented = augmentation(image=image, bboxes=bboxes, class_labels=class_labels, class_ids=class_ids)
                augmented_image = augmented['image']
                augmented_bboxes = augmented['bboxes']
                cv2.imwrite(output_img_path, augmented_image)
                with open(output_label_path, 'w') as f_out:
                    augmented_labels = []
                    for bbox, class_id in zip(augmented_bboxes, class_ids):
                        augmented_labels.append({
                            "Left": int(bbox[0]),
                            "Top": int(bbox[1]),
                            "Right": int(bbox[2]),
                            "Bottom": int(bbox[3]),
                            "ObjectClassName": class_labels[augmented_bboxes.index(bbox)],
                            "ObjectClassId": class_id  # Include class ID in augmented JSON
                        })
                    json.dump(augmented_labels, f_out)

root_dir = "data"
output_dir = "data/Training/images"
dataset_train = ImageLabelDataset(root_dir, ["Training"])
batch_size = 3
train_loader = DataLoader(dataset_train, batch_size=batch_size, shuffle=False)
num_batches_to_augment = 1

print("Augmenting and saving training images:")
augment_and_save_images(train_loader, output_dir, num_batches_to_augment)


converting to yolo format

In [None]:
import os
import json

def convert_to_yolo(bbox):
    x_center = (bbox['Left'] + bbox['Right']) / (2)
    y_center = (bbox['Top'] + bbox['Bottom']) / (2)
    width = (bbox['Right'] - bbox['Left']) 
    height = (bbox['Bottom'] - bbox['Top']) 
    return f"{bbox['ObjectClassId']} {x_center:.6f} {y_center:.6f} {width:.6f} {height:.6f}"

input_folder = "data/Training/labels/json"

output_folder = "data/Training/labels/yolo"
os.makedirs(output_folder, exist_ok=True)


for json_file in os.listdir(input_folder):
    if json_file.endswith('.json'):
        with open(os.path.join(input_folder, json_file), 'r') as f:
            data = json.load(f)
        yolo_data = []
        for bbox in data:
            yolo_data.append(convert_to_yolo(bbox))
        output_filename = os.path.splitext(json_file)[0] + '.txt'
        with open(os.path.join(output_folder, output_filename), 'w') as f:
            f.write('\n'.join(yolo_data))

print("Conversion completed.")


Normalizing


In [None]:
import os
import cv2

def normalize_yolo_labels(image_dir, label_dir):
    for filename in os.listdir(image_dir):
        if filename.endswith((".jpg")):
            image_path = os.path.join(image_dir, filename)
            label_path = os.path.join(label_dir, os.path.splitext(filename)[0] + ".txt")

            if os.path.exists(label_path):
                # Read image dimensions
                image = cv2.imread(image_path)
                image_height, image_width, _ = image.shape

                # Read and normalize labels
                with open(label_path, "r") as file:
                    lines = file.readlines()
                    normalized_lines = []
                    for line in lines:
                        class_id, x_center, y_center, box_width, box_height = map(float, line.strip().split())

                        # Normalize bounding box coordinates
                        x_center_norm = max(min(x_center / image_width, 1.0), 0.0)
                        y_center_norm = max(min(y_center / image_height, 1.0), 0.0)
                        box_width_norm = max(min(box_width / image_width, 1.0), 0.0)
                        box_height_norm = max(min(box_height / image_height, 1.0), 0.0)

                        normalized_line = f"{int(class_id)} {x_center_norm:.6f} {y_center_norm:.6f} {box_width_norm:.6f} {box_height_norm:.6f}\n"
                        normalized_lines.append(normalized_line)

                # Write normalized labels back to file
                with open(label_path, "w") as file:
                    file.writelines(normalized_lines)

# Example usage
image_directory = "data/Training/images"
label_directory = "data/Training/labels/yolo"
normalize_yolo_labels(image_directory, label_directory)


4- splitting the dataset

In [None]:
import os
import shutil
import random

def create_directories(base_folder, subfolders):
    for folder in subfolders:
        os.makedirs(os.path.join(base_folder, folder), exist_ok=True)

def copy_files_to_folder(file_list, dest_folder):
    for file_path in file_list:
        shutil.copy(file_path, dest_folder)

def split_dataset(image_folder, label_folder, split_ratio=0.2):
    # Get the list of image files
    image_files = os.listdir(image_folder)
    num_images = len(image_files)

    # Shuffle the image files
    random.shuffle(image_files)

    # Calculate the number of images for validation based on the split ratio
    num_val_images = int(split_ratio * num_images)

    # Split the image files into training and validation sets
    val_image_files = image_files[:num_val_images]
    train_image_files = image_files[num_val_images:]

    # Create training and validation datasets
    train_dataset = {
        "images": [os.path.join(image_folder, img) for img in train_image_files],
        "labels": [os.path.join(label_folder, os.path.splitext(img)[0] + '.txt') for img in train_image_files]
    }
    val_dataset = {
        "images": [os.path.join(image_folder, img) for img in val_image_files],
        "labels": [os.path.join(label_folder, os.path.splitext(img)[0] + '.txt') for img in val_image_files]
    }

    return train_dataset, val_dataset

# Specify the paths to the image and label folders
image_folder = "data/Training/images"
label_folder = "data/Training/labels/yolo"

# Split the dataset with a 80-20 split ratio (80% training, 20% validation)
train_dataset, val_dataset = split_dataset(image_folder, label_folder, split_ratio=0.2)

# Create folders for train and validation datasets
train_folder = "data/Training/train"
val_folder = "data/Training/validation"
create_directories(train_folder, ["images", "labels"])
create_directories(val_folder, ["images", "labels"])

# Copy images and labels to train folder
copy_files_to_folder(train_dataset["images"], os.path.join(train_folder, "images"))
copy_files_to_folder(train_dataset["labels"], os.path.join(train_folder, "labels"))

# Copy images and labels to validation folder
copy_files_to_folder(val_dataset["images"], os.path.join(val_folder, "images"))
copy_files_to_folder(val_dataset["labels"], os.path.join(val_folder, "labels"))

# Print the number of images in the training and validation sets
print("Number of images in training set:", len(train_dataset["images"]))
print("Number of images in validation set:", len(val_dataset["images"]))


updating for yolov7 the class id

In [None]:
def update_class_ids(label_dir):
    for filename in os.listdir(label_dir):
        if filename.endswith(".txt"):
            with open(os.path.join(label_dir, filename), "r") as file:
                lines = file.readlines()
                updated_lines = []
                for line in lines:
                    class_id, *rest = map(float, line.strip().split())
                    # Update class IDs
                    if class_id == 4:
                        updated_class_id = 0
                    elif class_id == 5:
                        updated_class_id = 1
                    elif class_id == 7:
                        updated_class_id = 2
                    else:
                        updated_class_id = class_id
                    updated_line = f"{updated_class_id} {' '.join(map(str, rest))}\n"
                    updated_lines.append(updated_line)
            # Write updated labels back to file
            with open(os.path.join(label_dir, filename), "w") as file:
                file.writelines(updated_lines)

# Example usage
label_directory = "data/yolov7-custom/data/train/labels"
update_class_ids(label_directory)
label_directory = "data/yolov7-custom/data/validation/labels"
update_class_ids(label_directory)
label_directory = "data/Training/train/labels"
update_class_ids(label_directory)
label_directory = "data/Training/validation/labels"
update_class_ids(label_directory)
label_directory = "data/Training/labels/yolo"
update_class_ids(label_directory)

In [None]:
!python train.py --workers 1 --device cpu --batch-size 8 --epochs 20 --img 640 640 --data data/custom_data.yaml --hyp data/hyp.scratch.custom.yaml --cfg cfg/training/yolov7-custom.yaml  --weights yolov7.pt


exporting yolov7 to onnx

In [None]:
!python data/yolov7-custom/export.py --weights data/best.pt --img-size 640 --batch-size 1 --dynamic --include-nms


In [28]:
!netron  data/best.onnx

my own model form scratch 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from PIL import Image, ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True
from albumentations.pytorch import ToTensorV2
import cv2
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from tqdm import tqdm
import torchvision.transforms as transforms

# IoU function
def iou(box1, box2, is_pred=True):
	if is_pred:
		b1_x1 = box1[..., 0:1] - box1[..., 2:3] / 2
		b1_y1 = box1[..., 1:2] - box1[..., 3:4] / 2
		b1_x2 = box1[..., 0:1] + box1[..., 2:3] / 2
		b1_y2 = box1[..., 1:2] + box1[..., 3:4] / 2

		b2_x1 = box2[..., 0:1] - box2[..., 2:3] / 2
		b2_y1 = box2[..., 1:2] - box2[..., 3:4] / 2
		b2_x2 = box2[..., 0:1] + box2[..., 2:3] / 2
		b2_y2 = box2[..., 1:2] + box2[..., 3:4] / 2

		x1 = torch.max(b1_x1, b2_x1)
		y1 = torch.max(b1_y1, b2_y1)
		x2 = torch.min(b1_x2, b2_x2)
		y2 = torch.min(b1_y2, b2_y2)
		intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)

		box1_area = abs((b1_x2 - b1_x1) * (b1_y2 - b1_y1))
		box2_area = abs((b2_x2 - b2_x1) * (b2_y2 - b2_y1))
		union = box1_area + box2_area - intersection
		epsilon = 1e-6
		iou_score = intersection / (union + epsilon)
		return iou_score

	else:
		
		intersection_area = torch.min(box1[..., 0], box2[..., 0]) * \
							torch.min(box1[..., 1], box2[..., 1])
		box1_area = box1[..., 0] * box1[..., 1]
		box2_area = box2[..., 0] * box2[..., 1]
		union_area = box1_area + box2_area - intersection_area
		iou_score = intersection_area / union_area
		return iou_score

# non-maximum suppression function to remove overlapping bounding boxes
def nms(bboxes, iou_threshold, threshold):
	bboxes = [box for box in bboxes if box[1] > threshold]
	bboxes = sorted(bboxes, key=lambda x: x[1], reverse=True)
	bboxes_nms = []

	while bboxes:
		first_box = bboxes.pop(0)
		for box in bboxes:
			if box[0] != first_box[0] or iou(
				torch.tensor(first_box[2:]),
				torch.tensor(box[2:]),
			) < iou_threshold:
				if box not in bboxes_nms:
					bboxes_nms.append(box)
	return bboxes_nms

#to convert cells to bounding boxes
def convert_cells_to_bboxes(predictions, anchors, s, is_predictions=True):
	batch_size = predictions.shape[0]
	num_anchors = len(anchors)
	box_predictions = predictions[..., 1:5]

	if is_predictions:
		anchors = anchors.reshape(1, len(anchors), 1, 1, 2)
		box_predictions[..., 0:2] = torch.sigmoid(box_predictions[..., 0:2])
		box_predictions[..., 2:] = torch.exp(
			box_predictions[..., 2:]) * anchors
		scores = torch.sigmoid(predictions[..., 0:1])
		best_class = torch.argmax(predictions[..., 5:], dim=-1).unsqueeze(-1)
	else:
		scores = predictions[..., 0:1]
		best_class = predictions[..., 5:6]
	cell_indices = (
		torch.arange(s)
		.repeat(predictions.shape[0], 3, s, 1)
		.unsqueeze(-1)
		.to(predictions.device)
	)

	x = 1 / s * (box_predictions[..., 0:1] + cell_indices)
	y = 1 / s * (box_predictions[..., 1:2] +
				cell_indices.permute(0, 1, 3, 2, 4))
	width_height = 1 / s * box_predictions[..., 2:4]
	converted_bboxes = torch.cat(
		(best_class, scores, x, y, width_height), dim=-1
	).reshape(batch_size, num_anchors * s * s, 6)

	return converted_bboxes.tolist()

def plot_image(image, boxes):
	colour_map = plt.get_cmap("tab20b")
	colors = [colour_map(i) for i in np.linspace(0, 1, len(class_labels))]
	img = np.array(image)
	h, w, _ = img.shape

	fig, ax = plt.subplots(1)
	ax.imshow(img)
	for box in boxes:
		class_pred = box[0]
		box = box[2:]
		upper_left_x = box[0] - box[2] / 2
		upper_left_y = box[1] - box[3] / 2
		rect = patches.Rectangle(
			(upper_left_x * w, upper_left_y * h),
			box[2] * w,
			box[3] * h,
			linewidth=2,
			edgecolor=colors[int(class_pred)],
			facecolor="none",
		)
		ax.add_patch(rect)
		plt.text(
			upper_left_x * w,
			upper_left_y * h,
			s=class_labels[int(class_pred)],
			color="white",
			verticalalignment="top",
			bbox={"color": colors[int(class_pred)], "pad": 0},
		)
	plt.show()

def save_checkpoint(model, optimizer, filename="my_checkpoint.pth.tar"):
	print("==> Saving checkpoint")
	checkpoint = {
		"state_dict": model.state_dict(),
		"optimizer": optimizer.state_dict(),
	}
	torch.save(checkpoint, filename)

def load_checkpoint(checkpoint_file, model, optimizer, lr):
	print("==> Loading checkpoint")
	checkpoint = torch.load(checkpoint_file, map_location=device)
	model.load_state_dict(checkpoint["state_dict"])
	optimizer.load_state_dict(checkpoint["optimizer"])

	for param_group in optimizer.param_groups:
		param_group["lr"] = lr

device = "cuda" if torch.cuda.is_available() else "cpu"
load_model = False
save_model = True
checkpoint_file = "checkpoint.pth.tar"
ANCHORS = [
	[(0.28, 0.22), (0.38, 0.48), (0.9, 0.78)],
	[(0.07, 0.15), (0.15, 0.11), (0.14, 0.29)],
	[(0.02, 0.03), (0.04, 0.07), (0.08, 0.06)],
]
batch_size = 32
leanring_rate = 1e-5
epochs = 20
image_size = 416
s = [image_size // 32, image_size // 16, image_size // 8]

class_labels = [
	"bin","dolly","jack"
]

class Dataset(torch.utils.data.Dataset):
    def __init__(
        self, image_dir, label_dir, anchors,
        image_size=416, grid_sizes=[13, 26, 52],
        num_classes=3, transform=None
    ):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.image_size = image_size
        self.transform = transform
        self.grid_sizes = grid_sizes
        self.anchors = torch.tensor(
            anchors[0] + anchors[1] + anchors[2])
        self.num_anchors = self.anchors.shape[0]
        self.num_anchors_per_scale = self.num_anchors // 3
        self.num_classes = num_classes
        self.ignore_iou_thresh = 0.5
        self.data = self.load_data()

    def load_data(self):
        data = {}
        image_files = os.listdir(self.image_dir)
        label_files = [filename.split('.')[0] + '.txt' for filename in image_files]

        label_files = [file for file in label_files if self.is_valid_label_file(file)]

        data['train'] = {'images': [os.path.join(self.image_dir, img) for img in image_files],
                         'labels': [os.path.join(self.label_dir, label) for label in label_files]}
        return data

    def is_valid_label_file(self, file_name):
        if '(' in file_name or ')' in file_name:
            return False
        return True

    def __len__(self):
        total_images = 0
        for data_type in self.data:
            total_images += len(self.data[data_type]["images"])
        return total_images

    def __getitem__(self, idx):
        if idx < 0 or idx >= len(self.data['train']['labels']):
            print(f"Index {idx} is out of range for label list. Returning None.")
            return None

        label_path = self.data['train']['labels'][idx]
        bboxes = np.roll(np.loadtxt(fname=label_path, delimiter=" ", ndmin=2), 4, axis=1).tolist()
        img_path = self.data['train']['images'][idx]
        image = np.array(Image.open(img_path).convert("RGB"))

        if self.transform:
            image, bboxes = self.transform(image, bboxes)

        targets = [torch.zeros((self.num_anchors_per_scale, s, s, 6)) for s in self.grid_sizes]

        for box in bboxes:
            iou_anchors = iou(torch.tensor(box[2:4]), self.anchors, is_pred=False)

            anchor_indices = iou_anchors.argsort(descending=True, dim=0)
            class_label, x, y, width, height = box

            has_anchor = [False] * 3
            for anchor_idx in anchor_indices:
                scale_idx = anchor_idx // self.num_anchors_per_scale
                anchor_on_scale = anchor_idx % self.num_anchors_per_scale

                s = self.grid_sizes[scale_idx]
                i, j = min(int(s * y), s - 1), min(int(s * x), s - 1)
                anchor_taken = targets[scale_idx][anchor_on_scale, i, j, 0]
                if not anchor_taken and not has_anchor[scale_idx]:
                    targets[scale_idx][anchor_on_scale, i, j, 0] = 1
                    x_cell, y_cell = s * x - j, s * y - i
                    width_cell, height_cell = (width * s, height * s)

                    box_coordinates = torch.tensor([x_cell, y_cell, width_cell, height_cell])
                    targets[scale_idx][anchor_on_scale, i, j, 1:5] = box_coordinates
                    targets[scale_idx][anchor_on_scale, i, j, 5] = int(class_label)
                    has_anchor[scale_idx] = True
                elif not anchor_taken and iou_anchors[anchor_idx] > self.ignore_iou_thresh:
                    targets[scale_idx][anchor_on_scale, i, j, 0] = -1
        return image, tuple(targets)

    def collate_fn(self, batch):
        batch = [item for item in batch if item is not None]
        return default_collate(batch)

class CNNBlock(nn.Module):
	def __init__(self, in_channels, out_channels, use_batch_norm=True, **kwargs):
		super().__init__()
		self.conv = nn.Conv2d(in_channels, out_channels, bias=not use_batch_norm, **kwargs)
		self.bn = nn.BatchNorm2d(out_channels)
		self.activation = nn.LeakyReLU(0.1)
		self.use_batch_norm = use_batch_norm

	def forward(self, x):
		x = self.conv(x)
		if self.use_batch_norm:
			x = self.bn(x)
			return self.activation(x)
		else:
			return x

class ResidualBlock(nn.Module):
	def __init__(self, channels, use_residual=True, num_repeats=1):
		super().__init__()
		res_layers = []
		for _ in range(num_repeats):
			res_layers += [
				nn.Sequential(
					nn.Conv2d(channels, channels // 2, kernel_size=1),
					nn.BatchNorm2d(channels // 2),
					nn.LeakyReLU(0.1),
					nn.Conv2d(channels // 2, channels, kernel_size=3, padding=1),
					nn.BatchNorm2d(channels),
					nn.LeakyReLU(0.1)
				)
			]
		self.layers = nn.ModuleList(res_layers)
		self.use_residual = use_residual
		self.num_repeats = num_repeats

	def forward(self, x):
		for layer in self.layers:
			residual = x
			x = layer(x)
			if self.use_residual:
				x = x + residual
		return x

class ScalePrediction(nn.Module):
	def __init__(self, in_channels, num_classes):
		super().__init__()
		self.pred = nn.Sequential(
			nn.Conv2d(in_channels, 2*in_channels, kernel_size=3, padding=1),
			nn.BatchNorm2d(2*in_channels),
			nn.LeakyReLU(0.1),
			nn.Conv2d(2*in_channels, (num_classes + 5) * 3, kernel_size=1),
		)
		self.num_classes = num_classes
	def forward(self, x):
		output = self.pred(x)
		output = output.view(x.size(0), 3, self.num_classes + 5, x.size(2), x.size(3))
		output = output.permute(0, 1, 3, 4, 2)
		return output

class my_model(nn.Module):
	def __init__(self, in_channels=3, num_classes=3):
		super().__init__()
		self.num_classes = num_classes
		self.in_channels = in_channels

		self.layers = nn.ModuleList([
			CNNBlock(in_channels, 32, kernel_size=3, stride=1, padding=1),
			CNNBlock(32, 64, kernel_size=3, stride=2, padding=1),
			ResidualBlock(64, num_repeats=1),
			CNNBlock(64, 128, kernel_size=3, stride=2, padding=1),
			ResidualBlock(128, num_repeats=2),
			CNNBlock(128, 256, kernel_size=3, stride=2, padding=1),
			ResidualBlock(256, num_repeats=8),
			CNNBlock(256, 512, kernel_size=3, stride=2, padding=1),
			ResidualBlock(512, num_repeats=8),
			CNNBlock(512, 1024, kernel_size=3, stride=2, padding=1),
			ResidualBlock(1024, num_repeats=4),
			CNNBlock(1024, 512, kernel_size=1, stride=1, padding=0),
			CNNBlock(512, 1024, kernel_size=3, stride=1, padding=1),
			ResidualBlock(1024, use_residual=False, num_repeats=1),
			CNNBlock(1024, 512, kernel_size=1, stride=1, padding=0),
			ScalePrediction(512, num_classes=num_classes),
			CNNBlock(512, 256, kernel_size=1, stride=1, padding=0),
			nn.Upsample(scale_factor=2),
			CNNBlock(768, 256, kernel_size=1, stride=1, padding=0),
			CNNBlock(256, 512, kernel_size=3, stride=1, padding=1),
			ResidualBlock(512, use_residual=False, num_repeats=1),
			CNNBlock(512, 256, kernel_size=1, stride=1, padding=0),
			ScalePrediction(256, num_classes=num_classes),
			CNNBlock(256, 128, kernel_size=1, stride=1, padding=0),
			nn.Upsample(scale_factor=2),
			CNNBlock(384, 128, kernel_size=1, stride=1, padding=0),
			CNNBlock(128, 256, kernel_size=3, stride=1, padding=1),
			ResidualBlock(256, use_residual=False, num_repeats=1),
			CNNBlock(256, 128, kernel_size=1, stride=1, padding=0),
			ScalePrediction(128, num_classes=num_classes)
		])
	def forward(self, x):
		outputs = []
		route_connections = []

		for layer in self.layers:
			if isinstance(layer, ScalePrediction):
				outputs.append(layer(x))
				continue
			x = layer(x)
			if isinstance(layer, ResidualBlock) and layer.num_repeats == 8:
				route_connections.append(x)
			elif isinstance(layer, nn.Upsample):
				x = torch.cat([x, route_connections[-1]], dim=1)
				route_connections.pop()
		return outputs

class loss_c(nn.Module):
	def __init__(self):
		super().__init__()
		self.mse = nn.MSELoss()
		self.bce = nn.BCEWithLogitsLoss()
		self.cross_entropy = nn.CrossEntropyLoss()
		self.sigmoid = nn.Sigmoid()

	def forward(self, pred, target, anchors):
		obj = target[..., 0] == 1
		no_obj = target[..., 0] == 0
		no_object_loss = self.bce(
			(pred[..., 0:1][no_obj]), (target[..., 0:1][no_obj]),
		)

		anchors = anchors.reshape(1, 3, 1, 1, 2)
		box_preds = torch.cat([self.sigmoid(pred[..., 1:3]),
							torch.exp(pred[..., 3:5]) * anchors
							],dim=-1)
		ious = iou(box_preds[obj], target[..., 1:5][obj]).detach()
		object_loss = self.mse(self.sigmoid(pred[..., 0:1][obj]),
							ious * target[..., 0:1][obj])


		pred[..., 1:3] = self.sigmoid(pred[..., 1:3])
		target[..., 3:5] = torch.log(1e-6 + target[..., 3:5] / anchors)
		box_loss = self.mse(pred[..., 1:5][obj],
							target[..., 1:5][obj])
		class_loss = self.cross_entropy((pred[..., 5:][obj]),
								target[..., 5][obj].long())

		return (
			box_loss
			+ object_loss
			+ no_object_loss
			+ class_loss
		)

def training_loop(loader, model, optimizer, loss_fn, scaler, scaled_anchors):
	progress_bar = tqdm(loader, leave=True)

	losses = []

	for _, (x, y) in enumerate(progress_bar):
		x = x.to(device)
		y0, y1, y2 = (
			y[0].to(device),
			y[1].to(device),
			y[2].to(device),
		)

		with torch.cuda.amp.autocast():
			outputs = model(x)
			loss = (
				loss_fn(outputs[0], y0, scaled_anchors[0])
				+ loss_fn(outputs[1], y1, scaled_anchors[1])
				+ loss_fn(outputs[2], y2, scaled_anchors[2])
			)
		losses.append(loss.item())
		optimizer.zero_grad()
		scaler.scale(loss).backward()
		scaler.step(optimizer)
		scaler.update()
		mean_loss = sum(losses) / len(losses)
		progress_bar.set_postfix(loss=mean_loss)
  
class CustomTransform:
    def __init__(self, transform):
        self.transform = transform
    def __call__(self, image, bboxes):
        transformed_image = self.transform(image)
        return transformed_image, bboxes

transform = transforms.Compose([
    transforms.ToPILImage(), 
    transforms.Resize((416, 416)),  
    transforms.ToTensor(),
])

model = my_model().to(device)
optimizer = optim.Adam(model.parameters(), lr = leanring_rate)
loss_fn = loss_c()
scaler = torch.cuda.amp.GradScaler()
custom_transform = CustomTransform(transform)

train_dataset = Dataset(
    image_dir="/content/drive/MyDrive/data/Training/images",
    label_dir="/content/drive/MyDrive/yolo",
    anchors=ANCHORS,
    transform=custom_transform
)

train_loader = torch.utils.data.DataLoader(
	train_dataset,
	batch_size = batch_size,
	num_workers = 2,
	shuffle = True,
	pin_memory = True,
  collate_fn=train_dataset.collate_fn
)

scaled_anchors = (
	torch.tensor(ANCHORS) *
	torch.tensor(s).unsqueeze(1).unsqueeze(1).repeat(1,3,2)
).to(device)

from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter()
for e in range(1, epochs+1):
    print("Epoch:", e)
    training_loss = training_loop(train_loader, model, optimizer, loss_fn, scaler, scaled_anchors)
    writer.add_scalar('Training Loss', training_loss, e)
    if save_model:
        save_checkpoint(model, optimizer, filename=f"checkpoint.pth.tar")
writer.close()



testing my model

In [None]:
load_model = True

model = my_model().to(device) 
optimizer = optim.Adam(model.parameters(), lr = leanring_rate) 
loss_fn = loss_c() 
scaler = torch.cuda.amp.GradScaler() 

if load_model: 
	load_checkpoint(checkpoint_file, model, optimizer, leanring_rate) 

test_dataset = Dataset(
	image_dir="data/validation/images", 
	label_dir="data/validation/labels", 
	anchors=ANCHORS, 
	transform=custom_transform 
) 
test_loader = torch.utils.data.DataLoader( 
	test_dataset, 
	batch_size = 1, 
	num_workers = 2, 
	shuffle = True, 
) 

x, y = next(iter(test_loader)) 
x = x.to(device) 

model.eval() 
with torch.no_grad(): 
	output = model(x) 
	bboxes = [[] for _ in range(x.shape[0])] 
	anchors = ( 
			torch.tensor(ANCHORS) 
				* torch.tensor(s).unsqueeze(1).unsqueeze(1).repeat(1, 3, 2) 
			).to(device) 
	for i in range(3): 
		batch_size, A, S, _, _ = output[i].shape 
		anchor = anchors[i] 
		boxes_scale_i = convert_cells_to_bboxes( 
							output[i], anchor, s=S, is_predictions=True
						) 
		for idx, (box) in enumerate(boxes_scale_i): 
			bboxes[idx] += box 
model.train() 

for i in range(batch_size): 
	nms_boxes = nms(bboxes[i], iou_threshold=0.5, threshold=0.6)  
	plot_image(x[i].permute(1,2,0).detach().cpu(), nms_boxes)


exporting my model to onnx 

In [None]:
import torch.onnx

# Define a sample input tensor (you may need to adjust its size)
sample_input = torch.randn(1, 3, 416, 416).to(device)

# Export the model to ONNX
onnx_file_path = "my_model.onnx"
torch.onnx.export(model, sample_input, onnx_file_path, verbose=True)

print("Model exported to:", onnx_file_path)


In [None]:
from torch.utils.tensorboard import SummaryWriter

writer = SummaryWriter("torchlogs/")
writer.add_graph(model, X)
writer.close()

API

In [None]:
import requests

url = 'http://localhost:8000/models'
files = {'file': open('data/Testing/images/0.jpg', 'rb')}
response = requests.post(url, files=files)
print("Response content:", response.content)

In [None]:
import requests

url = 'http://localhost:8000/inference'
files = {'file': open('data/Testing/images/0.jpg', 'rb')}
response = requests.post(url, files=files)

print(response.json())

In [None]:
import requests

url = 'http://localhost:8000/inference_with_overlay'
files = {'file': open('data/Testing/images/0.jpg', 'rb')}
response = requests.post(url, files=files)
print("Response content:", response.content)