In [None]:
# !pip install albumentations

In [None]:

from PIL import Image
import torch
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
import numpy as np
from model.network import Unsupervised_Object_Detection
import matplotlib.pyplot as plt
from model.utils import disp_activation, get_activation
import torchvision.transforms.functional as F


im = Image.open('images/bottle0.jpg')
target_list = {
    'boxes': torch.tensor([[103, 212, 495, 746]], dtype=torch.float32), # xmin, ymin, xmax, ymax
    'labels': torch.tensor([1], dtype=torch.int64),  # Class label should be int64
    # 'image_id': torch.tensor([1], dtype=torch.int64)  # Unique ID should be int64
}

im_test = Image.open('images/bottle2.jpg')
model = Unsupervised_Object_Detection()
# zi, zj, za = model(im)


# Train

In [None]:
import torch
from model.utils import random_rotate_image, random_augment
from model.loss_func import nt_xent_loss

# Pre-training function
def pre_train_one_image(model, image, optimizer, scheduler, loss_fn, epochs=10, patience=3, save_path='best_model.pth'):
    best_loss = float('inf')
    best_epoch = 0
    patience_counter = 0

    model.train()
    
    for epoch in range(epochs):
        new_im = random_augment(image)
        optimizer.zero_grad()

        # Forward pass
        zi, zj, za = model(new_im)
        loss = loss_fn(zi, zj, za)

        # Backward pass
        loss.backward()
        optimizer.step()

        # Step the scheduler with the current loss
        scheduler.step(loss.item())

        print(f'Epoch {epoch + 1}/{epochs}, Loss: {loss.item()}')

        # Check if the current loss is the best we've seen so far
        if loss.item() < best_loss:
            best_loss = loss.item()
            best_epoch = epoch
            patience_counter = 0

            # Save the best model
            torch.save(model.state_dict(), save_path)
            print(f"Best model saved with loss {best_loss} at epoch {best_epoch + 1}")
        else:
            patience_counter += 1

        # Early stopping
        if patience_counter >= patience:
            print(f"Early stopping triggered after {patience} epochs with no improvement.")
            break

    # Optionally load the best model after training
    model.load_state_dict(torch.load(save_path))
    print(f"Training completed. Best model from epoch {best_epoch + 1} loaded.")

# Define the necessary components
loss_fn = nt_xent_loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=2, verbose=True)

# Run the pre-training loop
pre_train_one_image(model, im, optimizer, scheduler, loss_fn, epochs=15, patience=3, save_path='best_model.pth')


Fine Tune

<hr>

# Load

In [20]:
# torch.save(model.state_dict(), 'best_model.pth')

# Load
model = Unsupervised_Object_Detection()
model.load_state_dict(torch.load('best_model.pth'))
model.eval()
print()




Plot

<hr>

In [24]:
from fine_tune_utiles import random_augment_ft

def fine_tune_retinanet(model, images, targetsOG):

    model.train()
    # Freeze the backbone
    for param in model.backbone.parameters():
        param.requires_grad = False
    # Train head
    for param in model.head.parameters():
        param.requires_grad = True

    # optimizer = torch.optim.Adam([param for param in model.parameters() if param.requires_grad], lr=0.0001)
    optimizer = torch.optim.SGD(
        [param for param in model.parameters() if param.requires_grad],
        lr=0.001,
        momentum=0.8,
        weight_decay=0.00001,
    )
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
    
    num_epochs = 20

    
    new_im =  transforms.ToTensor()(images)

    for epoch in range(num_epochs):

        augmented_image, augmented_target = random_augment_ft(images, targetsOG.copy())
        augmented_image = transforms.ToTensor()(augmented_image)

        new_im = [augmented_image]  # List of one image tensor
        new_target = [{k: v for k, v in augmented_target.items()}]

        # # Forward pass
        loss_dict = model(new_im, new_target)
        losses = sum(loss for loss in loss_dict.values())

        # Backward pass
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=2.0)

        # new_im, new_target = random_augment_ft(images, targetsOG.copy())
        # new_im = transforms.ToTensor()(new_im)

        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {losses.item():.4f}")
    scheduler.step()
    

fine_tune_retinanet(model.P2.retinanet,im,target_list)

TypeError: pic should be PIL Image or ndarray. Got <class 'torch.Tensor'>


<hr>

In [None]:
rtina = model.P2.retinanet
rtina.eval()
import torchvision.transforms.functional as F

from PIL import Image, ImageDraw
def load_image(image_path):
    img = Image.open(image_path).convert("RGB")
    return img

def predict_and_plot(image_path, threshold=0.5):
    # Load image and convert to tensor
    img = load_image(image_path)
    img_tensor = F.to_tensor(img).unsqueeze(0)  # Add batch dimension

    # Perform prediction
    with torch.no_grad():
        predictions = rtina(img_tensor)[0]

    # Filter out boxes below the confidence threshold
    boxes = predictions['boxes']
    scores = predictions['scores']
    labels = predictions['labels']
    print( scores)

    selected_boxes = boxes[scores > threshold].numpy()

    # Draw boxes on the image
    draw = ImageDraw.Draw(img)
    for box in selected_boxes:
        draw.rectangle(box.tolist(), outline="red", width=3)

    # Display the image with bounding boxes
    plt.figure(figsize=(8, 8))
    plt.imshow(img)
    plt.axis('off')
    plt.show()

predict_and_plot('images/bottle0.jpg')

In [None]:
from model.utils import disp_activation, get_activation

tensor_im = transforms.ToTensor()(im)
output = model.P2.retinanet.backbone(tensor_im)
N_images = len(output)
fig, ax = plt.subplots(1,N_images, figsize = (12,12))
for i, (name,data) in enumerate(output.items()):
    print(name)
    disp_activation(data.mean(1), im,ax[i], name)



In [None]:
from testing_functions import livecam_test
from capture_box_from_image import draw_rectangle
# draw_rectangle(cv2.cvtColor(np.array(im), cv2.COLOR_RGB2BGR))

In [None]:
# livecam_test(model)

In [None]:
from fine_tune_utiles import flip_boxes_horizontally,flip_boxes_vertically,rotate_image_and_boxes, plot_image_with_boxes
import random
img = im.copy()
target_list = {
    'boxes': torch.tensor([[103, 212, 495, 746]], dtype=torch.float32), # xmin, ymin, xmax, ymax
    'labels': torch.tensor([1], dtype=torch.int64),  # Class label should be int64
    # 'image_id': torch.tensor([1], dtype=torch.int64)  # Unique ID should be int64
}

image_width, image_height = img.size
boxes = target_list['boxes'].clone()

img = F.hflip(img)
boxes = flip_boxes_horizontally(boxes, image_width)

img = F.vflip(img)
boxes = flip_boxes_vertically(boxes, image_height)

angle = random.choice([90, 180, 270])  # Don't rotate by 0
img, boxes = rotate_image_and_boxes(img, boxes, angle)
        
img = transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.2)(img)
img = transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5.0))(img)
        
img = transforms.ToTensor()(img)
img = F.to_pil_image(img)
new_transformed_target = target_list.copy()
new_transformed_target['boxes'] = boxes
plot_image_with_boxes(img, new_transformed_target['boxes'].numpy())

In [None]:
boxes

In [None]:
target_list['boxes']

In [None]:
images, new_target = random_augment_ft(im, target_list)
images = transforms.ToTensor()(images)
plot_image_with_boxes(F.to_pil_image(images), new_target['boxes'].numpy())

In [None]:
img