#Import

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
import torch
import matplotlib.pyplot as plt
import cv2
import xml.etree.ElementTree as ET
import numpy as np
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torch.optim as optim
from tqdm.notebook import tqdm
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Utils

In [None]:
class FeatureExtractor(nn.Module):
    def __init__(self):
        super().__init__()
        model = torchvision.models.resnet50(pretrained=True)
        req_layers = list(model.children())[:9]
        self.backbone = nn.Sequential(*req_layers)
        for param in self.backbone.named_parameters():
            param[1].requires_grad = True

    def forward(self, img_data):
        return self.backbone(img_data)



class RPN(nn.Module):
    def __init__(self, in_channels, num_anchors):
        super(RPN, self).__init__()

        self.conv_obj = nn.Conv2d(in_channels, num_anchors, kernel_size=3, padding=1)
        self.conv_bbox = nn.Conv2d(in_channels, 4 * num_anchors, kernel_size=3, padding=1)


    def forward(self, x):
        B=x.shape[0]
        obj_scores = self.conv_obj(x)
        obj_scores=obj_scores.reshape(B,-1)

        bbox_preds = self.conv_bbox(x)
        bbox_preds=bbox_preds.reshape(B,-1,4)

        return obj_scores,bbox_preds



class FastRCNN1(nn.Module):
    def __init__(self):
        super(FastRCNN1, self).__init__()
        self.conv2 = nn.Conv2d(6, 6, 5)
        self.fcc1 = nn.Linear(22326, 2048)
        self.fcc2 = nn.Linear(2048, 1024)
        self.fcc3 = nn.Linear(1024, 256)
        self.fcc4 = nn.Linear(256, 64)
        self.fcc5 = nn.Linear(64, 4*9)


        self.fc1 = nn.Linear(64, 512)
        self.fc2 = nn.Linear(512, 512)
        self.cls_score = nn.Linear(512, 9*4)
        self.bbox_pred = nn.Linear(512, 9 * 4)

    def forward(self, x):
        x = F.adaptive_max_pool2d(x, (8,8))
        x = x.reshape(24,-1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        class_scores = self.cls_score(x)

        class_scores = class_scores.reshape(-1,4)
        bbox_predictions = self.bbox_pred(x)
        bbox_predictions=bbox_predictions.reshape(-1,9,4)


        return class_scores, bbox_predictions



class CustomDataset(Dataset):
    def __init__(self, data, box,labels,transform=None):
        self.data = data
        self.labels = labels
        self.box=box
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image = self.data[idx]
        label = self.labels[idx]
        box=self.box[idx]

        if self.transform:
            image = self.transform(image)

        return image, box,label



def prepare_dataset(path,img_size):
  p=sorted(os.listdir(path))
  labels_map = {'apple':0,'banana':1,'orange':2,'back':3}
  images=torch.tensor([])
  boxes=torch.tensor([])
  classes=torch.tensor([])
  for i in range(0,len(p),2):
    image=cv2.imread(path+p[i])
    scale_x=img_size/image.shape[1]
    scale_y=img_size/image.shape[0]
    image = cv2.resize(image, (img_size,img_size))
    image=cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
    img=torch.from_numpy(image.transpose(2,0,1)).float().unsqueeze(0)/255
    images=torch.cat([images,img],dim=0)

    tree = ET.parse(path+p[i+1])
    root = tree.getroot()

    xmin = root.findall("object/bndbox/xmin")
    xmin = [round(int(x.text)*scale_x) for x in xmin]
    ymin = root.findall("object/bndbox/ymin")
    ymin = [round(int(y.text)*scale_y) for y in ymin]
    xmax = root.findall("object/bndbox/xmax")
    xmax = [round(int(x.text)*scale_x) for x in xmax]
    ymax = root.findall("object/bndbox/ymax")
    ymax = [round(int(y.text)*scale_y) for y in ymax]
    box=[[item1,item2,item3,item4] for item1, item2, item3, item4 in zip(xmin,ymin,xmax,ymax)]
    while(len(box)<9):
      box.append([0,0,0,0])
    box=torch.tensor(box).unsqueeze(0)
    boxes=torch.cat([boxes,box],dim=0)


    names=root.findall('object/name')
    cls=torch.tensor([3,3,3,3,3,3,3,3,3])
    i=0
    for name in names:
      cls[i]=labels_map[name.text]
      i=i+1
    cls=cls.unsqueeze(0)
    classes=torch.cat([classes,cls],dim=0)
  return images, boxes, classes


def anchors():
  anchor_boxes=[]
  ratios=torch.tensor([0.33,1,3])
  xc=[32,64,96,128,160,190]
  yc=xc
  for x in xc:
    for y in yc:
      for width in [45,72,100,140,200]:
        for ratio in ratios:
          width=width/torch.sqrt(ratio)
          heigth=ratio*width
          anchor_boxes.append([max(x - width / 2,0), max(y - heigth / 2,0), min(x + width / 2,image_size), min(y + heigth / 2,image_size)])
  return torch.tensor(anchor_boxes).int()


def build_target(t,tresh):
  max_values, _ = torch.max(t, dim=2)
  v = (max_values > tresh).float()
  return v



def calculate_iou(box1, box2):
    if box1.numel() == 0 or box2.numel() == 0:
        return 0.0
    if box1.dim() == 0 or box2.dim() == 0:
        return 0.0

    x1 = max(box1[0].item(), box2[0].item())
    y1 = max(box1[1].item(), box2[1].item())
    x2 = min(box1[2].item(), box2[2].item())
    y2 = min(box1[3].item(), box2[3].item())

    if x1 >= x2 or y1 >= y2:
        return 0.0

    intersection_area = max(0, x2 - x1) * max(0, y2 - y1)

    box1_area = (box1[2].item() - box1[0].item()) * (box1[3].item() - box1[1].item())
    box2_area = (box2[2].item() - box2[0].item()) * (box2[3].item() - box2[1].item())

    iou = intersection_area / (box1_area + box2_area - intersection_area)

    return iou



def get_iou_mat(images, anchors, boxes):
    B = images.shape[0]
    N = boxes.shape[1]
    nanc = anchors.shape[0]
    ious_mat = torch.zeros((B, nanc, N))

    for i in range(B):
        for j in range(nanc):
            for k in range(N):
                ious_mat[i, j, k] = calculate_iou(anchors[j], boxes[i][k])

    return ious_mat



def compute_targets(images, anchors, boxes):
    B=images.shape[0]
    mat=get_iou_mat(images, anchors, boxes)
    num_anchors = len(anchors)

    cls_targets = build_target(mat,0.6)
    reg_targets = torch.zeros((B,num_anchors, 4), dtype=torch.float32)
    indexes=torch.argmax(mat, dim=2)

    for b in range(B):
      for i, anchor in enumerate(anchors):
        if cls_targets[b][i] == 1:
             index=indexes[b][i]
             dx = (boxes[b][index][0] - anchor[0]).item()
             dy = (boxes[b][index][1] - anchor[1]).item()
             dX = (boxes[b][index][2] - anchor[2]).item()
             dY = (boxes[b][index][3] - anchor[3]).item()
             reg_targets[b][i]= torch.tensor([dx, dy, dX, dY], dtype=torch.float32)


    return cls_targets, reg_targets



def get_proposals(out1,out2):
  B=out2.shape[0]
  N=out2.shape[1]
  indices=torch.where(torch.sigmoid(out1)>0.72)
  box_cor=torch.zeros(B,N,4)
  for j in range(B):
    for i in range(N):

      box_cor[j][i][0]=anchors[i][0]+out2[j][i][0]
      box_cor[j][i][1]=anchors[i][1]+out2[j][i][1]
      box_cor[j][i][2]=anchors[i][2]+out2[j][i][2]
      box_cor[j][i][3]=anchors[i][3]+out2[j][i][3]

  proposals=torch.zeros(B,1,4)
  flags=[0]*24
  for i in range(len(indices[0])):
    x,y=indices[0][i], indices[1][i]
    new_box = box_cor[x][y]
    iou = torch.tensor([calculate_iou(new_box, proposals[x][j]) for j in range(flags[x])])
    if (iou < 0.7).all():
      if flags[x] >= proposals.shape[1]:
              new_proposals = torch.zeros(proposals.shape[0], proposals.shape[1] + 1, 4)
              new_proposals[:, :proposals.shape[1], :] = proposals
              proposals = new_proposals

      proposals[x][flags[x]]=box_cor[x][y]
      flags[x]=flags[x]+1

  return proposals

#FasterRCNN

In [None]:
path='drive/MyDrive/train_zip/train/'
image_size=256
images,boxes,classes=prepare_dataset(path,image_size)
dataset=CustomDataset(images,boxes,classes)

In [None]:
batch_size = 24
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
images_b,boxes_b,classes_b=next(iter(dataloader))
anchors=anchors()

In [None]:
file_path = 'trained1.pth'
file_path1 = 'trained2.pth'
modello= torch.load(file_path)
rp=RPN(2048,540)
rp.load_state_dict(modello)
FE=FeatureExtractor()
modello=torch.load(file_path1)
fast=FASTRCNN1()
fast.load_state_dict(modello)



# Training

In [None]:

FE=FeatureExtractor()
rp=RPN(2048,540)

obj_criterion = nn.BCEWithLogitsLoss()
bbox_criterion = nn.SmoothL1Loss()

optimizer = optim.Adam(rp.parameters(), lr=0.01)
num_epochs=30
losses=[]
losses1=[]

for epoch in tqdm(range(num_epochs), desc='Epochs'):
    lp=0
    for i, data in enumerate(dataloader):
        images, boxes, classes = data
        optimizer.zero_grad()

        # Calcola cls_targets e reg_targets per ogni batch di dati
        cls_targets, reg_targets = compute_targets(images, anchors, boxes)
        if reg_targets is None:
            continue

        # Esegui il modello RPN sulle immagini
        features = FE(images)
        obj_scores, reg_scores = rp(features)
        # Converti obj_scores in long
        cls_targets=cls_targets.float()

        # Calcola le losses

        cls_loss = obj_criterion(obj_scores, cls_targets)
        reg_loss = bbox_criterion(reg_scores, reg_targets)
        total_loss = cls_loss + reg_loss
        losses.append(total_loss.detach().cpu().item())

        # Esegui la retropropagazione
        total_loss.backward()

        optimizer.step()

        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {total_loss.item()}')

        lp=lp+total_loss
    lp=lp/10
    losses1.append(lp.detach().cpu().item())
    print('lista2: ',losses1)

In [None]:
fast=FastRCNN1()

obj_criterion = nn.CrossEntropyLoss()
bbox_criterion = nn.MSELoss()

optimizer = optim.Adam(fast.parameters(), lr=0.001)
num_epochs=10

losses=[]
losses1=[]
for epoch in tqdm(range(num_epochs), desc='Epochs'):
    pl=0
    for i, data in enumerate(dataloader):
        images, boxes, classes = data
        classes=classes.reshape(-1)
        classes = classes.long()
        optimizer.zero_grad()

        features = FE(images)
        obj_scores, reg_scores = rp(features)
        proposals=get_proposals(obj_scores,reg_scores)

        cl,bb=fast(proposals)

        cls_loss = obj_criterion(cl, classes)
        reg_loss = bbox_criterion(bb, boxes)
        total_loss = cls_loss/4 + reg_loss/9
        losses.append((total_loss).detach().cpu().item())

        pl=pl+total_loss
        total_loss.backward()

        optimizer.step()

    pl=pl/10
    losses1.append((pl).detach().cpu().item())
    print('mean: ',pl)