<a href="https://colab.research.google.com/github/divadde/computer_vision_project/blob/main/project_final.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparazione del dataset


Come primo passo, importo tutte le librerie che sono necessarie per la realizzazione del task di segmentazione e object detection. Dalla preparazione del dataset, all'addestramento del modello fino alla fase di testing.

In [None]:
import torch
import torchvision
import os
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
from google.colab import drive
from collections import defaultdict
import json
import pycocotools
from pycocotools import mask
from torchvision.tv_tensors import BoundingBoxes, BoundingBoxFormat, Mask
from torchvision.transforms.v2 import ConvertBoundingBoxFormat
import torch.nn as nn

Scrivo una funzione ausiliaria *createLabels* che verrà utilizzata all'interno del dataset custom per l'estrazione e la creazione delle labels in un formato adatto al training del modello. Si può osservare dal file json delle etichette, che il formato delle box è del tipo XYWH, per l'addestramento del modello è richiesto che il formato sia del tipo XYXY. Per raggiungere tale scopo verrà usato il convertitore messo a disposizione da torchvision. Inoltre si è notato che all'interno del training set, sono presenti delle box degenerate (ovvero quando ampiezza o altezza è pari a 0), per questa ragione viene effettuato un controllo e se viene riscontrata una box degenerata, viene incrementato di un numero piccolo (=0.1) l'ampiezza o l'altezza degenere.

In [None]:
def createLabels(json_file):
  data = json_file['annotations']
  grouped_data = defaultdict(list)

  [grouped_data[k['image_id']].append(k) for k in data]

  result_dict = dict(grouped_data)

  ret_dict = {}
  image_id = 0
  for image in result_dict.values():
    bbox = []
    seg = []
    cls = []
    for obj in image:
      image_id = obj['image_id']
      seg.append(mask.decode(mask.merge(mask.frPyObjects(obj['segmentation'],700,700))))
      b = obj['bbox']
      #print(b)
      #controllo se ci sono boxes degenerate
      if b[2]==0: b[2]=b[2]+0.1
      if b[3]==0: b[3]=b[3]+0.1
      box = [b[0],b[1],b[2],b[3]] #formato XYWH
      bbox.append(box)
      cls.append(obj['category_id']+1) #la prima classe non deve avere id 0.
    boundy_box = torchvision.tv_tensors.BoundingBoxes(data=np.array(bbox),format = BoundingBoxFormat.XYWH, canvas_size=(700, 700))
    converter = torchvision.transforms.v2.ConvertBoundingBoxFormat("XYXY")
    final_box = converter._transform(boundy_box, {})
    print(image_id)
    ret_dict[image_id] = {
        'boxes': final_box,
        'labels': torch.tensor(cls, dtype = torch.int64),
        'image_id': obj['image_id'],
        'area': torch.tensor(obj['area'], dtype=torch.float64),
        'iscrowd': torch.tensor(obj['iscrowd'], dtype=torch.uint8),
        'masks': Mask(seg)
    }

  return ret_dict

Definisco adesso, come estensione della classe Dataset di torch.utils, la nuova classe che ci permette di leggere il dataset (sia le immagini che le labels associate). Si può notare come nel costruttore della classe venga richiamata la funzione precedentemente definita *createLabels* per creare un grande dizionario che mappa ogni id dell'immagine con le sue label corrispondenti. Nel metodo getitem viene preso il percorso corrispondente all'immagine, viene letta l'immagine e restituita insieme alla label corrispondente.

In [None]:
class TTPLADataset(torch.utils.data.Dataset):
  def __init__(self,img_dir,labels_dir):
    self.img_dir = img_dir
    self.labels_json = json.load(open(labels_dir)) #apertura del file json delle etichette
    self.image_labels = createLabels(self.labels_json)
    self.categories = {category['name']: category['id'] for category in self.labels_json['categories']}

  def __getitem__(self,idx):
    img_path = os.path.join(self.img_dir, self.labels_json['images'][idx]['file_name'])
    image = (torchvision.io.read_image(img_path)/255)
    label = self.image_labels[idx]
    return (image,label)

  def __len__(self):
    return len(os.listdir(self.img_dir))


Carico il dataset di training attraverso il montaggio con Google Drive e sfrutto il dataset custom che ho scritto.

In [None]:
drive.mount('/content/drive',force_remount=True)

train_set = TTPLADataset('/content/drive/MyDrive/trainingset', '/content/drive/MyDrive/train.json')

Mounted at /content/drive
0


  return torch.as_tensor(data, dtype=dtype, device=device).requires_grad_(requires_grad)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277


# Definizione della Mask-RCNN

Il modello che verrà preso come riferimento per il completamento del task di segmentazione e object detection è la Mask-RCNN, modello studiato anche durante le lezioni dedicate alla segmentazione. La ragione della scelta risiede nel fatto che la Mask-RCNN è un modello molto popolare nella risoluzione di tali task oltre a presentare una struttura flessibile composta da backbone, testa di region proposal, testa di classificazione, testa di predizione delle box, e testa di predizione delle maschere per la segmentazione. Inoltre data proprio la sua flessibilità ne esistono numerose varianti che modificano la backbone e le varie teste per ottenere sempre prestazioni migliori. Nel mio caso, ho voluto costruire un modello che sfruttasse come backbone il modello ResNet152, il cui compito è quello di estrarre poi le feature che saranno da input per le teste della Mask-RCNN.

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

Di seguito è mostrata la definizione del modello della MaskRCNN. Si è deciso di trattare separatamente backbone, rpn_head, fastrcc_head e mask_head per una maggiore flessibilità e per poter provare a ogni esperimento delle strutture diverse.

In [None]:
from torchvision.models.detection import mask_rcnn, backbone_utils
from torchvision.models.detection.faster_rcnn import RPNHead
from torchvision.models.detection.anchor_utils import AnchorGenerator
from torchvision.models.resnet import resnet50, resnet101, resnet152
from torchvision.models.detection.backbone_utils import _resnet_fpn_extractor, _validate_trainable_layers
from torchvision.models.detection.faster_rcnn import _default_anchorgen, FasterRCNN, FastRCNNConvFCHead, RPNHead
from torchvision.models.detection.mask_rcnn import MaskRCNNHeads

backbone = resnet152(weights=torchvision.models.ResNet152_Weights.IMAGENET1K_V2)
backbone = _resnet_fpn_extractor(backbone, 5)

rpn_anchor_generator = _default_anchorgen()
rpn_head = RPNHead(backbone.out_channels, rpn_anchor_generator.num_anchors_per_location()[0], conv_depth=2)
box_head = FastRCNNConvFCHead(
        (backbone.out_channels, 7, 7), [256, 256, 256, 256], [1024], norm_layer=nn.BatchNorm2d
    )
mask_head = MaskRCNNHeads(backbone.out_channels, [256, 256, 256, 256], 1, norm_layer=nn.BatchNorm2d)

#print(backbone)

model = mask_rcnn.MaskRCNN(backbone=backbone,
                           num_classes=5,
                           rpn_anchor_generator=rpn_anchor_generator,
                           rpn_head=rpn_head,
                           box_head=box_head,
                           mask_head=mask_head) #rpn_head=rpn_head #si potrebbe implementare come testa di segmentazione deeplab

In [None]:
#caricamento del modello

model = torch.load('/content/drive/MyDrive/mask_rcnn_res50v2_70ep')

Predispongo adesso la funzione *train* per l'addestramento del modello. Imposto learning rate, ottimizzatore, batch size e preparo il dataloader del trainset. L'ottimizzatore scelto è AdamW, variante dell'ottimizzatore Adam, il quale è in grado di regolarizzare i pesi durante l'addestramento

In [None]:
learning_rate = 1e-4
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
batch_size = 4
train_dataloader = torch.utils.data.DataLoader(train_set, batch_size = batch_size, shuffle=True, collate_fn= lambda batch: tuple(zip(*batch)))
total_step = len(train_dataloader)

num_epochs = 100

train_losses = []
train_counter = []

def train(epoch,model,optimizer):
    loss_per_epoch = 0
    counter=0
    for batch_idx, (images, targets) in enumerate(train_dataloader):

        counter = counter + 1
        images = [image.to(device) for image in images]
        targets = [{
                    'boxes': target['boxes'].to(device),
                    'labels': target['labels'].to(device),
                    'image_id': target['image_id'],
                    'area': target['area'].to(device),
                    'iscrowd':target['boxes'].to(device),
                    'masks': target['masks'].to(device)
                    } for target in targets]

        # Forward pass
        optimizer.zero_grad()
        loss_dict = model(images, targets)

        #prova cambio penalità su loss_mask
        #loss_dict["loss_mask"]=loss_dict["loss_mask"]*1.5

        losses = sum(loss for loss in loss_dict.values())
        loss_per_epoch = loss_per_epoch + losses

        # Backward pass
        losses.backward()
        optimizer.step()
        torch.cuda.empty_cache()
        if (batch_idx+1) % 1 == 0:
            print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
                   .format(epoch, num_epochs, batch_idx+1, total_step, losses))

        if counter==total_step:
          med_loss = loss_per_epoch/counter
          print('Loss media: {:.4f}'.format(med_loss))
          counter=0
          loss_per_epoch=0

        train_losses.append(losses)
        train_counter.append(
        (batch_idx*batch_size) + ((epoch-1)*len(train_dataloader)))

In [None]:
model.train()
model.to(device)

for epoch in range(0,num_epochs+1):
  train(epoch = epoch, model = model, optimizer = optimizer)
  if (epoch%2==0):
    torch.save(model, '/content/drive/MyDrive/mask_rcnn_res152v2_{}ep'.format(epoch))

[1;30;43mOutput streaming troncato alle ultime 5000 righe.[0m
Epoch [28/100], Step [68/211], Loss: 0.4268
Epoch [28/100], Step [69/211], Loss: 0.1748
Epoch [28/100], Step [70/211], Loss: 0.2557
Epoch [28/100], Step [71/211], Loss: 0.4764
Epoch [28/100], Step [72/211], Loss: 0.2345
Epoch [28/100], Step [73/211], Loss: 0.5238
Epoch [28/100], Step [74/211], Loss: 0.3331
Epoch [28/100], Step [75/211], Loss: 0.3701
Epoch [28/100], Step [76/211], Loss: 0.5660
Epoch [28/100], Step [77/211], Loss: 0.3653
Epoch [28/100], Step [78/211], Loss: 0.4644
Epoch [28/100], Step [79/211], Loss: 0.3783
Epoch [28/100], Step [80/211], Loss: 0.2560
Epoch [28/100], Step [81/211], Loss: 0.4617
Epoch [28/100], Step [82/211], Loss: 0.2135
Epoch [28/100], Step [83/211], Loss: 0.1032
Epoch [28/100], Step [84/211], Loss: 0.2503
Epoch [28/100], Step [85/211], Loss: 0.2585
Epoch [28/100], Step [86/211], Loss: 0.4568
Epoch [28/100], Step [87/211], Loss: 0.2728
Epoch [28/100], Step [88/211], Loss: 0.3470
Epoch [28/10

# Fase di testing

Proseguiamo con il notebook di testing per il modello addestrato. Il file prende ispirazione dal file di esempio caricato nella cartella del progetto dai docenti con qualche modifica per essere reso effetivamente fruibile per il testing

In [None]:
test_set = TTPLADataset('/content/drive/MyDrive/testset', '/content/drive/MyDrive/test.json')

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
27

In questa fase viene predisposto il dataloader per il test set. Vengono effettuate le predizioni e vengono salvate in un file json sul drive. Questo file json sarà utile più avanti quando dovremo calcolare le metriche di nostro interesse come la mAP(50%) su box e segmentazione.

In [None]:
THRESHOLD_SEGM = 0.5
model.eval()
batch_size = 1
test_dataloader = torch.utils.data.DataLoader(test_set, batch_size = batch_size, shuffle = False, collate_fn =lambda batch: tuple(zip(*batch)) )

results = []
for images, labels in test_dataloader:

  images = [image.to(device) for image in images]
  prediction = model(images)[0]
  classes = prediction['labels'].cpu().numpy()
  scores = prediction['scores'].cpu().detach().numpy()
  boxes = prediction['boxes'].view(-1,4).cpu().detach().numpy()
  masks = torch.where(prediction['masks'].view(-1,700,700).cpu().detach()>THRESHOLD_SEGM,1,0).numpy()
  for i in range(masks.shape[0]): #per ogni oggetto identificato
    image_id = labels[0]['image_id']
    bbox = boxes[i]
    bbox = [bbox[0], bbox[1], bbox[2]-bbox[0], bbox[3]-bbox[1]]
    bbox = [round(float(x)*10)/10 for x in bbox]
    rle = pycocotools.mask.encode(np.asfortranarray(masks[i].astype(np.uint8)))
    rle['counts'] = rle['counts'].decode('ascii')
    results.append({
                'image_id': int(image_id),
                'category_id': int(classes[i])-1,
                'bbox': bbox,
                'segmentation': rle,
                'score': float(scores[i]),
            })

  torch.cuda.empty_cache()
with open('/content/drive/MyDrive/pred.json', 'w') as fp: #salvataggio del json di predizione sul drive
    json.dump(results, fp)

Sfrutto adesso le funzioni messe a disposizione di *pycocotools* per il calcolo dei risultati.

In [None]:
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
test = '/content/drive/MyDrive/test.json'
pred = '/content/drive/MyDrive/pred.json'
gt = COCO(test)
detections = gt.loadRes(pred)

imgIds=sorted(gt.getImgIds())

loading annotations into memory...
Done (t=0.07s)
creating index...
index created!
Loading and preparing results...
DONE (t=0.02s)
creating index...
index created!


In [None]:
annType = 'bbox'
cocoEval = COCOeval(gt, detections, annType)
cocoEval.params.imgIds  = imgIds
cocoEval.evaluate()
cocoEval.accumulate()
cocoEval.summarize()

Running per image evaluation...
Evaluate annotation type *bbox*
DONE (t=1.07s).
Accumulating evaluation results...
DONE (t=0.14s).
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = 0.367
 Average Precision  (AP) @[ IoU=0.50      | area=   all | maxDets=100 ] = 0.563
 Average Precision  (AP) @[ IoU=0.75      | area=   all | maxDets=100 ] = 0.416
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.238
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = 0.336
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = 0.384
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets=  1 ] = 0.280
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets= 10 ] = 0.430
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = 0.443
 Average Recall     (AR) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.255
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=medium | maxDets=100

In [None]:
annType = 'segm'
cocoEval = COCOeval(gt, detections, annType)
cocoEval.params.imgIds  = imgIds
cocoEval.evaluate()
cocoEval.accumulate()
cocoEval.summarize()

Running per image evaluation...
Evaluate annotation type *segm*
DONE (t=1.44s).
Accumulating evaluation results...
DONE (t=0.13s).
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = 0.199
 Average Precision  (AP) @[ IoU=0.50      | area=   all | maxDets=100 ] = 0.354
 Average Precision  (AP) @[ IoU=0.75      | area=   all | maxDets=100 ] = 0.199
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.044
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = 0.155
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = 0.263
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets=  1 ] = 0.207
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets= 10 ] = 0.269
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = 0.270
 Average Recall     (AR) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.058
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=medium | maxDets=100

# Codici di prova

In [None]:
#class logits, labels

class CustomLossFunction(nn.Module):
  def __init__(self, cls_loss, bb_loss, seg_loss, cls_weight, bb_weight, seg_weight):
    super(CustomLossFunction, self).__init__()
    self.cls_loss = cls_loss
    self.bb_loss = bb_loss
    self.seg_loss = seg_loss
    self.cls_weight = cls_weight
    self.bb_weight = bb_weight
    self.seg_weight = seg_weight

  def forward(self,predictions,labels):
    prediction = predictions[0]
    #loss di classificazione
    #scores di classificazione: uno per ogni oggetto predetto
    labels_cls = labels['labels']
    pred_cls = prediction['labels']
    loss_cls = self.cls_loss(pred_cls,labels_cls)
    #loss di bbox
    labels_bbox = labels['boxes']
    pred_bbox = prediction['boxes']
    loss_bbox = self.bb_loss(pred_bbox,labels_bbox)
    #loss per le maschere: sigmoide su tutti i pixel e poi cross entropy
    labels_masks = labels['masks']
    pred_masks = prediction['masks']
    loss_masks = self.seg_loss(pred_masks,labels_masks)

    return self.cls_weight*loss_cls + self.bb_weight*loss_bbox + self.seg_weight*loss_masks

NameError: name 'nn' is not defined