In [None]:
pwd

In [None]:
cd Rar

In [None]:
cd object-detection-nn

In [None]:
# cd ../../..

In [None]:
remote_mode = True

import os
import numpy as np
from nn.YOLO_VGG16_OBB.utils.constants import ANCHORS
from nn.YOLO_VGG16_OBB.prepare_data.dota_dataset_memory import DotaDataset
from nn.YOLO_VGG16_OBB.prepare_data.transforms import train_transform, test_transform
from nn.YOLO_VGG16_OBB.utils.helpers import convert_cells_to_bboxes, load_checkpoint, nms, plot_image, save_checkpoint
from nn.YOLO_VGG16_OBB.utils.constants import device, s, leanring_rate, save_model, checkpoint_file
from nn.YOLO_VGG16_OBB.model.YOLO_VGG16_OBB import YOLO_VGG16_OBB
import torch
import torch.optim as optim
from nn.YOLO_VGG16_OBB.model.loss import YOLOLoss
from tqdm import tqdm
from torch.utils.tensorboard import SummaryWriter
import torchvision.transforms as T

if remote_mode:
    model_path_base = f"/home/dcor/niskhizov/Rar/object-detection-nn/nn/YOLO_VGG16_OBB/notebooks/vgg_f_obb_v2_model"
else:
    model_path_base = f"nn/YOLO_VGG16_OBB/notebooks/"


In [None]:
categories = ['plane','ship', 'storage-tank', 'baseball-diamond', 'tennis-court', 'basketball-court', 'ground-track-field', 'harbor', 'bridge', 'large-vehicle', 'small-vehicle', 'helicopter', 'roundabout', 'soccer-ball-field', 'swimming-pool']


In [None]:
# Creating the model from YOLOv3 class 
load_model = True
save_model = True
model = YOLO_VGG16_OBB(num_classes=len(categories)).to(device) 

# Defining the optimizer 
optimizer = optim.Adam(model.parameters(), lr = leanring_rate) 

# Defining the loss function 
loss_fn = YOLOLoss() 

# Defining the scaler for mixed precision training 
scaler = torch.amp.GradScaler(device=device) 
# Loading the checkpoint 
if load_model: 
    load_checkpoint(model_path_base + f"e276_vgg16_{checkpoint_file}", model, optimizer, leanring_rate, device) 

# Initialize TensorBoard writer
writer = SummaryWriter(log_dir='runs/YOLO_VGG16_OBB_complete_v2')



In [None]:
dataset = DotaDataset( 
	categories=categories,
	grid_sizes=[13, 26, 52], 
	anchors=ANCHORS, 
	transform=train_transform 
) 

# Defining the train data loader 
train_loader = torch.utils.data.DataLoader( 
	dataset=dataset, 
	batch_size=8, 
	shuffle=True, 
	num_workers=4,
 	prefetch_factor=10,
) 


In [None]:
val_dataset = DotaDataset(
    categories=categories,
    grid_sizes=[13, 26, 52],
    anchors=ANCHORS,
    transform=test_transform,  # Use the same transform for validation
    data_base_path = f"nn/dotadataset/val"
)

# Create the validation data loader
val_loader = torch.utils.data.DataLoader(
    dataset=val_dataset,
    batch_size=8,
    shuffle=True,
    num_workers=4,
)

val_loader_iter = iter(val_loader)

In [None]:
# Scaling the anchors 
scaled_anchors = ( 
	torch.tensor(ANCHORS) *
	torch.tensor(s).unsqueeze(1).unsqueeze(1).repeat(1,3,2) 
).to(device) 

In [None]:
loss_fn = YOLOLoss()

In [None]:
e

In [None]:
save_checkpoint(model, optimizer, filename=model_path_base +f"e{e}_vgg16_checkpoint.pth.tar")

In [None]:
epochs = 100000000
error_counter = 0
save_model = True
# Training the model 
for e in range(297, epochs+1): 
	print("Epoch:", e) 
    ################# dos
    # Creating a progress bar 
	progress_bar = tqdm(train_loader, leave=True) 

	# Initializing a list to store the losses 
	losses = [] 
	# try:
		# Iterating over the training data 
	for batch_idx, (x, y) in enumerate(progress_bar):
			# try:
				print("batch_idx:", batch_idx)
				x = x.to(device) 
				y0, y1, y2 = ( 
					y[0].to(device), 
					y[1].to(device), 
					y[2].to(device), 
				) 

				with torch.amp.autocast(device_type=device): 
					# Getting the model predictions 
					outputs = model(x) 
					# Calculating the loss at each scale 
					loss = ( 
						loss_fn(outputs[0], y0, scaled_anchors[0]) 
						+ loss_fn(outputs[1], y1, scaled_anchors[1]) 
						+ loss_fn(outputs[2], y2, scaled_anchors[2]) 
					) 

				# Add the loss to the list 
				losses.append(loss.item()) 

				# Reset gradients 
				optimizer.zero_grad() 

				# Backpropagate the loss 
				scaler.scale(loss).backward() 

				# Optimization step 
				scaler.step(optimizer) 

				# Update the scaler for next iteration 
				scaler.update() 

				# update progress bar with loss 
				mean_loss = sum(losses) / len(losses) 
				progress_bar.set_postfix(loss=mean_loss)
		
				# Log the loss to TensorBoard
				writer.add_scalar('Loss/train', mean_loss, e * len(train_loader) + batch_idx)

				# # Log images to TensorBoard every 100 batches
				# if batch_idx % 200 == 0:
				# 	# Saving the model 
				# 	if save_model: 
				# 		save_checkpoint(model, optimizer, filename=model_path_base +f"b{batch_idx}_vgg16_checkpoint.pth.tar")
		
				# 	for i in range(0, batch_idx-3, 200):
				# 			if os.path.exists(model_path_base + f"b{i}_vgg16_checkpoint.pth.tar"):
				# 				os.remove(model_path_base + f"b{i}_vgg16_checkpoint.pth.tar")

	if e % 2 == 0:
		# Saving the model 
		if save_model: 
			save_checkpoint(model, optimizer, filename=model_path_base +f"e{e}_vgg16_checkpoint.pth.tar")
			# # delete batch checkpoints
			# for i in range(0, e-2):
			# 	if os.path.exists(model_path_base + f"e{e}_vgg16_checkpoint.pth.tar"):
			# 		os.remove(model_path_base + f"e{e}_vgg16_checkpoint.pth.tar")
			# for i in range(0,e-3):
			# 	if os.path.exists(model_path_base + f"e{i}_vgg16_checkpoint.pth.tar"):
			# 		os.remove(model_path_base + f"e{i}_vgg16_checkpoint.pth.tar")
	
		# model.eval()
	
		# # Getting a sample image from the test data loader 
		# try:
		# 	x, y = next(val_loader_iter)
		# except StopIteration:
		# 	val_loader_iter = iter(val_loader)
		# 	x, y = next(val_loader_iter)
		# x = x.to(device) 
		
		# print("###################################### display and report image ######################################")
		# with torch.no_grad():
		# 	scaled_anchors = ( 
		# 	torch.tensor(ANCHORS) *
		# 	torch.tensor(s).unsqueeze(1).unsqueeze(1).repeat(1,3,2) 
		# 	).to(device) 
		# 	output = model(x)
		# 	y0, y1, y2 = ( 
		# 		y[0].to(device), 
		# 		y[1].to(device), 
		# 		y[2].to(device), 
		# 	) 

		# 	with torch.amp.autocast(device_type=device): 
		# 		# Getting the model predictions 
		# 		outputs = model(x) 
		# 		# Calculating the loss at each scale 
		# 		loss = ( 
		# 			loss_fn(outputs[0], y0, scaled_anchors[0]) 
		# 			+ loss_fn(outputs[1], y1, scaled_anchors[1]) 
		# 			+ loss_fn(outputs[2], y2, scaled_anchors[2]) 
		# 		) 
	
		# 	# Print predictions
		# 	writer.add_scalar('Loss/val', loss.item(), e * len(train_loader) + batch_idx)

			# bboxes = [[] for _ in range(x.shape[0])]
			# for i in range(3):
			# 	batch_size, A, S, _, _ = output[i].shape
			# 	anchor = scaled_anchors[i]
			# 	boxes_scale_i = convert_cells_to_bboxes(output[i], anchor, s=S, is_predictions=True)
			# 	for idx, box in enumerate(boxes_scale_i):
			# 		bboxes[idx] += box
	
			# i = 0
			# print('bboxes[i] shape:', np.array(bboxes[i]).shape)
			# nms_boxes = nms(bboxes[i], iou_threshold=0.5, threshold=0.6)
			# img_with_boxes = plot_image(x[i].permute(1, 2, 0).detach().cpu(), nms_boxes, categories)
			# img_with_boxes = T.ToTensor()(img_with_boxes)
			# writer.add_image(f'Val/Image_{e}_{i}_{batch_idx}_before', img_with_boxes, e * len(train_loader) + batch_idx)

		model.train()
	# except Exception as error:
	# 	print(error)
	# 	error_counter += 1
	# 	if error_counter > 10:
	# 		raise error


    #################
	# training_loop(train_loader, model, optimizer, loss_fn, scaler, scaled_anchors) 


In [None]:
e

In [None]:
torch.cat([(pred[..., 1:3]),
                        torch.exp(pred[..., 3:5]) *
                        anchors, pred_angle.unsqueeze(-1)
                        ], dim=-1)

In [None]:
pred[..., 1:3].shape, pred[..., 3:5].shape, pred_angle.unsqueeze(-1).shape, anchors.shape

In [None]:
pred = outputs[1]
target = y1 
anchors = scaled_anchors[1]
pred_angle = torch.tanh(pred[..., 5])
box_preds = torch.cat([(pred[..., 1:3]),
                        torch.exp(pred[..., 3:5]) *
                        anchors, pred_angle.unsqueeze(-1)
                        ], dim=-1)
# Calculating intersection over union for prediction and target
ious = skew_iou(box_preds[obj], target[..., 1:6][obj]).detach()


In [None]:
import torch
from shapely.geometry import Polygon

def get_rotated_box_corners(boxes):
    """
    Convert rotated bounding boxes [x, y, w, h, theta] into their four corner points.
    
    Args:
    - boxes: (N, 5) Tensor containing (x, y, w, h, theta) in radians.
    
    Returns:
    - Tensor of shape (N, 4, 2) representing the four corner points of each box.
    """
    x, y, w, h, theta = boxes[:, 0], boxes[:, 1], boxes[:, 2], boxes[:, 3], boxes[:, 4]
    
    # Compute half dimensions
    w_half, h_half = w / 2, h / 2

    # Define corner points relative to the center
    corners = torch.tensor([
        [-1, -1],
        [ 1, -1],
        [ 1,  1],
        [-1,  1]
    ], dtype=boxes.dtype, device=boxes.device)  # Shape (4, 2)

    # Scale corners by (w/2, h/2)
    scaled_corners = corners * torch.stack([w_half, h_half], dim=-1).unsqueeze(1)  # (N, 4, 2)

    # Compute rotation matrix
    cos_t, sin_t = torch.cos(theta), torch.sin(theta)
    rotation_matrix = torch.stack([
        torch.stack([cos_t, -sin_t], dim=-1),
        torch.stack([sin_t, cos_t], dim=-1)
    ], dim=-2)  # Shape (N, 2, 2)

    # Rotate corners and translate to (x, y)
    rotated_corners = torch.bmm(scaled_corners, rotation_matrix) + torch.stack([x, y], dim=-1).unsqueeze(1)
    
    return rotated_corners  # Shape (N, 4, 2)


def skew_iou(boxes1, boxes2):
    """
    Compute IoU between batches of rotated bounding boxes.
    
    Args:
    - boxes1: (N, 5) Tensor (x, y, w, h, theta) in radians.
    - boxes2: (N, 5) Tensor (x, y, w, h, theta) in radians.
    
    Returns:
    - Tensor (N,) containing IoU values for each pair.
    """
    N = boxes1.shape[0]
    assert boxes2.shape[0] == N, "Batch sizes must match"

    # Get corners for each box
    corners1 = get_rotated_box_corners(boxes1)
    corners2 = get_rotated_box_corners(boxes2)
    corners1 = corners1.detach().cpu().numpy()
    corners2 = corners2.detach().cpu().numpy()
    # Compute IoU using Shapely
    ious = []
    for i in range(N):
        poly1 = Polygon(corners1[i])
        poly2 = Polygon(corners2[i])
        
        if not poly1.is_valid or not poly2.is_valid:
            ious.append(0.0)
            continue
        
        inter_area = poly1.intersection(poly2).area
        union_area = poly1.area + poly2.area - inter_area

        ious.append(inter_area / union_area if union_area > 0 else 0.0)

    return torch.tensor(ious, dtype=boxes1.dtype, device=boxes1.device)


In [None]:
import torch
import torch.nn as nn

from nn.YOLO_VGG16_OBB.utils.helpers import iou

# Defining YOLO loss class

class YOLOLoss(nn.Module):
    def __init__(self):
        super().__init__()
        self.mse = nn.MSELoss()
        self.bce = nn.BCEWithLogitsLoss()
        self.cross_entropy = nn.CrossEntropyLoss()
        self.sigmoid = nn.Sigmoid()
        self.than = nn.Tanh()

    def forward(self, pred, target, anchors):
        # Identifying which cells in target have objects
        # and which have no objects
        obj = target[..., 0] == 1
        no_obj = target[..., 0] == 0

        # Calculating No object loss
        no_object_loss = self.bce(
            self.sigmoid(pred[..., 0:1][no_obj]), (target[..., 0:1][no_obj]),
        )
        
        # reg_object_loss = 10 * self.bce(
        #     self.sigmoid(pred[..., 0:1][obj]), (target[..., 0:1][obj]),
        # )

        # Reshaping anchors to match predictions
        anchors = anchors.reshape(1, 3, 1, 1, 2)
        pred_angle = torch.tanh(pred[..., 5])
        # Box prediction confidence
        box_preds = torch.cat([self.sigmoid(pred[..., 1:3]),
                               torch.exp(pred[..., 3:5]) *
                               anchors, pred_angle.unsqueeze(-1)
                               ], dim=-1)
        # Calculating intersection over union for prediction and target
        ious = iou(box_preds[obj], target[..., 1:6][obj]).detach()
        # Calculating Object loss
        object_loss = self.bce(self.sigmoid(pred[..., 0:1][obj]),
                               ious * target[..., 0:1][obj])

        # Predicted box coordinates
        pred[..., 1:3] = self.sigmoid(pred[..., 1:3])
        # Target box coordinates
        target[..., 3:5] = torch.log(1e-6 + target[..., 3:5] / anchors)
        # Calculating box coordinate loss

        box_loss = self.mse(pred[..., 1:5][obj],
                            target[..., 1:5][obj])
        
        box_loss_center = self.mse(pred[..., 1:3][obj],
                            target[..., 1:3][obj])

        # Calculate angle loss
        angle_loss = self.mse(torch.sin(pred_angle[obj] - target[..., 5][obj]), torch.zeros_like(pred_angle[obj])) # fine with rad?
        # Claculating class loss
        class_loss = self.cross_entropy((pred[..., 6:][obj]),
                                        target[..., 6][obj].long())

        # print('~~~~~~~~~~~~~ FIND UNCAPPED ~~~~~~~~~~~~~')
        # print('box_loss:', box_loss)
        # print('angle_loss:', angle_loss)
        # print('object_loss:', object_loss)
        # print('no_object_loss:', no_object_loss)
        # print('class_loss:', class_loss)
        # print('reg_object_loss', reg_object_loss)

        # Total loss
        return (
            box_loss
            + box_loss_center
            + angle_loss
            + object_loss
            + no_object_loss
            + class_loss
            # + reg_object_loss
        )


In [None]:
loss_fn = YOLOLoss()

In [None]:
loss_fn(outputs[0], y0, scaled_anchors[0])