# Estrazione dei dati

## Descrizione del dataset

Abbiamo a disposizione due dataset:
- campioni positivi (dove c'è sicuramente un pedone)
- campioni negativi (dove invece il pedone non c'è)

Questi campioni sono organizzati in 3 insiemi disgiunti:
- Train Set
- Test Set
- Validation Set

Il nostro compito è quello di estrarre le immaggini di train, test e set per ogni campione positivo e negativo e creare per ogni immagine un "mapping" tale per cui assegniamo classe 1 o 0 se il pedone c'è o no.

In [1]:
# import delle librerie
import cv2
import numpy as np

## Campioni Positivi

Per quanto riguarda i campioni positivi, ogni immagine è "descritta" da delle boundary boxes che ci dicono dove si trovano i pedoni e a quale classe di pedone appartiene quella boundary box. A noi tuttavia interesserà sapere quelle appartenenti alla classe 1.

Per leggere le informazioni su queste boundary boxes prima devo leggere quali sono le "immagini" da tenere conto per ogni insieme (test, train e validation), ovvero devo prendere SOLO le boundary boxes delle annotations dei seguenti file txt:
- test_assignment.txt
- train_assignment.txt
- val_assignment.txt

Tuttavia, dal file di readMe leggiamo come ogni file di annotazione per ogni immagine è del tipo:

![image.png](attachment:image.png)

I tre file di test_assignment, train e val servono per capire quali 'immagini' considerare, quindi per quali immagini estrarre le boundary boxes dal corrispondente file di annotazione

In [2]:
def assign_sets():
    '''
    Questa funzione legge dalla cartella i seguenti file:
    - test_assignment.txt
    - train_assignment.txt
    - val_assignment.txt
    Poi ritorna tre array di valori per ogni categoria di set contente le annotazioni da vedere
    '''
    test_set = []
    train_set = []
    val_set = []
    # leggo prima Train
    with open('train_assignment.txt', 'r') as f:
        for row in f:
            row = row.strip()
            train_set.append(row)
    # poi test
    with open('test_assignment.txt', 'r') as f:
        for row in f:
            row = row.strip()
            test_set.append(row)
    # infine validation
    with open('val_assignment.txt', 'r') as f:
        for row in f:
            row = row.strip()
            val_set.append(row)
    
    print(f'Train set: {len(train_set)}')
    print(f'Test set: {len(test_set)}')
    print(f'Val set: {len(val_set)}')
    return train_set, test_set, val_set

In [3]:
def extract_boundary_box(valore, annotations):
    '''
    Questa funzione prende un valore in input, ovvero l'immagine dove cercare
    ed estrae le boundary box della classe 1 relative a quel valore.
    '''
    path_file = 'Annotations/' + valore + '.jpg.txt'
    # print(path_file)
    rows = []
    if path_file[12:] in annotations:
        # il file è presente in Annotations
        with open(path_file, 'r') as f:
            for row in f:
                row = row.strip()
                rows.append(row)
        rows = rows[1:] # faccio slicings
        bboxes = []
        for row in rows:
            row = row.split(' ')
            target,x,y,w,h = row
            if int(target) == 1:
                box = [x,y,w,h]
                bboxes.append(box)
        # print(f'Ho trovato {len(bboxes)} boundary boxes')
        return bboxes
    return None

Tuttavia dobbiamo prima di tutto 'esportare' i file da cui dobbiamo leggere

In [4]:
import os
directory = 'Annotations'
annotations = []
for root,dirs,files in os.walk(directory):
    for file in files:
        annotations.append(str(file))

In [5]:
annotations[0]

'012105.jpg.txt'

In [6]:
# creo i set
train, test, val = assign_sets()

positive_bboxes_train = []
for i in range(len(train)):
    bboxes = extract_boundary_box(train[i], annotations)
    if bboxes:
        # aggiungo alle Boundary Box anche il nome dell'immagine dove andare a cercare
        positive_bboxes_train.append((train[i], bboxes))

positive_bboxes_test = []
for i in range(len(test)):
    bboxes = extract_boundary_box(test[i], annotations)
    if bboxes:
        # aggiungo alle Boundary Box anche il nome dell'immagine dove andare a cercare
        positive_bboxes_test.append([test[i], bboxes])

positive_bboxes_val = []
for i in range(len(val)):
    bboxes = extract_boundary_box(val[i], annotations)
    if bboxes:
        # aggiungo alle Boundary Box anche il nome dell'immagine dove andare a cercare
        positive_bboxes_val.append([val[i], bboxes])


print(f'Ci sono in totale {len(positive_bboxes_train)} file di annotazioni per il train')
print(f'Ci sono in totale {len(positive_bboxes_test)} file di annotazioni per il test')
print(f'Ci sono in totale {len(positive_bboxes_val)} file di annotazioni per il validation')

Train set: 7000
Test set: 1000
Val set: 1000
Ci sono in totale 6919 file di annotazioni per il train
Ci sono in totale 983 file di annotazioni per il test
Ci sono in totale 991 file di annotazioni per il validation


In questo modo metto in evidenza l'immagine da cercare e le boundary box associate

In [7]:
positive_bboxes_train[0]

('000040',
 [['45', '235', '79', '318'],
  ['60', '209', '120', '356'],
  ['119', '214', '168', '336'],
  ['94', '220', '136', '326'],
  ['213', '201', '287', '381'],
  ['268', '201', '339', '378'],
  ['312', '192', '389', '380'],
  ['374', '226', '391', '267'],
  ['417', '225', '438', '279'],
  ['428', '222', '446', '266'],
  ['443', '224', '456', '258'],
  ['449', '227', '467', '271'],
  ['396', '217', '406', '241'],
  ['402', '210', '413', '238'],
  ['409', '217', '419', '241'],
  ['531', '222', '550', '287'],
  ['527', '217', '538', '245'],
  ['511', '213', '522', '240'],
  ['515', '211', '525', '237'],
  ['491', '216', '502', '243'],
  ['484', '209', '494', '235'],
  ['473', '211', '483', '235'],
  ['477', '211', '487', '236'],
  ['468', '212', '476', '235'],
  ['461', '210', '471', '236'],
  ['456', '212', '466', '240'],
  ['448', '209', '459', '237'],
  ['439', '210', '450', '237'],
  ['431', '211', '442', '241'],
  ['420', '213', '432', '242']])

In [11]:
directory_p = 'Images_Positive'

images_positive = []
for root,dirs,files in os.walk(directory_p):
    for file in files:
        images_positive.append(str(file))

print(f'Ci sono {len(annotations)} annotazioni per le boundary box')
print(f'Ci sono {len(images_positive)} immagini di campioni positivi')

Ci sono 8918 annotazioni per le boundary box
Ci sono 13259 immagini di campioni positivi


Ogni boundary box va letta come:[x,y,w,h], dove:
- x,y: coordinate del rettangolo in alto a sx dove inizia la boundary box
- w: larghezza (quindi x 'finale')
- h: altezza (quindi y 'finale')

L'idea è quella di usare il primo valore per 'aprire' l'immagine, poi usare le boundary box per fare slicing delle foto, quindi applicare il descrittore HOG ad ognuno di loro. 

Alla fine, dopo aver aperto l'immagine e aver calcolato il descrittore su ogni campione positivo, otterrò un vettore di label positive del tipo (descrittore, 1)

Definisco innanzitutto il descrittore HOG, utilizzando i parametri di Default e la dimensione della finestra fissata a h=128, w=64 pixel. Dalla documentazione vediamo che questi sono i valori di default.
Inoltre, per il momento, addestriamo il nostro descrittore su un modello di Default di OpenCV

### Descrittore HOG

#### Descrittore HOG

cv2.HOGDescriptor(
  
                win_size=(64, 128),
                block_size=(16, 16),
                block_stride=(8, 8),
                cell_size=(8, 8),
                nbins=9,
                win_sigma=DEFAULT_WIN_SIGMA,
                threshold_L2hys=0.2,
                gamma_correction=true,
                nlevels=DEFAULT_NLEVELS
                
              )

- win_size: dimensione della finestra di rilevamento in pixel (larghezza, altezza). Definisce la regione di interesse. Deve essere un multiplo intero della dimensione della cella.

- block_size: dimensione del blocco in pixel (larghezza, altezza). Definisce quante celle sono presenti in ciascun blocco. Deve essere un multiplo intero della dimensione della cella e deve essere inferiore alla finestra di rilevamento. Più piccolo è il blocco, maggiori saranno i dettagli che otterrai.

- block_stride: blocca il passo in pixel (orizzontale, verticale). Deve essere un multiplo intero della dimensione della cella. Il block_stride definisce la distanza tra i blocchi adiacenti, ad esempio, 8 pixel in orizzontale e 8 pixel in verticale. Block_strides più lunghi rendono l'algoritmo più veloce (perché vengono valutati meno blocchi) ma l'algoritmo potrebbe non funzionare altrettanto bene.

- cell_size: dimensione della cella in pixel (larghezza, altezza). Determina la dimensione della tua cella. Più piccola è la cella, più dettagli otterrai.

- nbins: numero di bin per gli istogrammi. Determina il numero di contenitori angolari utilizzati per creare gli istogrammi. Con più contenitori catturi più direzioni del gradiente. HOG utilizza gradienti senza segno, quindi i contenitori angolari avranno valori compresi tra 0 e 180 gradi.

- win_sigma: parametro della finestra di livellamento gaussiano. Le prestazioni dell'algoritmo HOG possono essere migliorate smussando i pixel vicino ai bordi dei blocchi applicando una finestra spaziale gaussiana a ciascun pixel prima di calcolare gli istogrammi.

- soglia_L2hys: ritiro del metodo di normalizzazione L2-Hys (norma L2 ritagliata in stile Lowe). Il metodo L2-Hys viene utilizzato per normalizzare i blocchi e consiste in una norma L2 seguita da ritaglio e rinormalizzazione. Il ritaglio limita il valore massimo del vettore descrittore per ciascun blocco in modo che abbia il valore della soglia specificata (0,2 per impostazione predefinita).

- gamma_correction: flag per specificare se è richiesta o meno la preelaborazione della correzione gamma. L'esecuzione della correzione gamma aumenta leggermente le prestazioni dell'algoritmo HOG.

- nlevels: aumenta il numero massimo di finestre di rilevamento.

![image.png](attachment:image.png)

In [12]:
# Specify the parameters for our HOG descriptor
win_size = (64, 128)
cell_size = (8, 8)
block_size = (16, 16)
block_stride = (8, 8)
num_bins = 9

# Set the parameters of the HOG descriptor using the variables defined above
hog = cv2.HOGDescriptor(win_size, block_size, block_stride, cell_size, num_bins)

### Estrazione dei campioni positivi

Definisco una funzione che, presa un elemento da un set (train, test o val):
- estrare le boundary boxes
- per ognuna di essere ritorna un descrittore

La funzione dovrà tornare quindi una lista di descrittori che useremo per costruire l'esempio di campioni positivi

In [10]:
images_positive[0]

'005566.jpg'

In [13]:
def extract_subset_bbox(valore):
    '''
    Presa in input l'immagine da 'analizzare', estraggo le boundary boxes e faccio slicing
    '''
    image_path, bboxes = valore
    if (f'{image_path}.jpg' in images_positive):
        image = cv2.imread(f'Images_Positive/{image_path}.jpg')
        sub_images = []
        for bbox in bboxes:
            # faccio il casting degli interi
            for i in range(len(bbox)):
                bbox[i] = int(bbox[i])
            x,y,w,h = bbox
            if w != x and h != y:
                # evito boundary box false
                sub_image = image[y:h, x:w].copy()
                # cv2.imshow(f'Boundary Box: {bbox}', sub_image)
                # cv2.waitKey(0)
                sub_images.append(sub_image)
        # cv2.destroyAllWindows()
        return sub_images
    return []

In [14]:
bboxes_0_example = extract_subset_bbox(positive_bboxes_train[0])
len(extract_subset_bbox(positive_bboxes_train[0])) == len(positive_bboxes_train[0][1])
# la dimensione corrisponde

True

#### Funzione di image processing

Realizzo anche una funzione di image proccessing per fare in modo che ogni immagine croppata dalla funzione precedente venga riscalata a 64x128 pixel e:
- prima viene convertita in scala di grigi
- eventualmente faccio equilizzazione dei colori per portarli tutti a livelli di contrasto e luminanza identici
- poi applico un filtro gaussiano
- infine faccio resize dell'immagine

L'idea è quella di addestrare il nostro modello con immagini che siano uguali in condizioni di valori di contrasto e luminanza. Qui l'euristica considerata è: *se un pedone è presente, deve esserlo in qualsiasi condizione di luce*

In [15]:
def subimage_processing(img):
    # faccio conversione in scala di grigi
    # cv2.imshow('Immagine originale', img)
    # cv2.waitKey(0)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    # poi applico una equilizzazione
    img = cv2.equalizeHist(img)
    # cv2.imshow('Immagine equilizzata', img)
    # cv2.waitKey(0)
    # quindi applico un filtro gaussiano er ridurre eventuale rumore
    img = cv2.GaussianBlur(img,(3,3),0) # kernel size = 3x3, sigma_x = 0
    # cv2.imshow('Immagine con filtro', img)
    # cv2.waitKey(0)
    # infine faccio reshape
    img = cv2.resize(img, (64, 128))  # Larghezza x Altezza
    # cv2.imshow('Immagine con reshape', img)
    # cv2.waitKey(0)
    # cv2.destroyAllWindows()
    return img

Come si applicano i descrittori?

In [17]:
subimages = extract_subset_bbox(positive_bboxes_train[0])
descriptors = [] # definisco i descrittori per ogni immagine
for subimage in subimages:
    subimage = subimage_processing(subimage)
    descriptor = hog.compute(subimage)
    descriptors.append(descriptor)

In [18]:
descriptors[0].shape

(3780,)

In [19]:
descriptors[1].shape

(3780,)

Notiamo come tutti i descrittori hanno lo stesso numero di feature a parità di dimensione!

adesso che sappiamo che funziona per un campione, partiamo con la costruzione dei campioni positivi

### Estrazione campioni positivi dalle bboxes

Vista l'alta dimensionalità del dataset decidiamo di prendere un sottoesempio del campione. Per esempio decidiamo di prendere in totale qualcosa come 2000 campioni dal dataset.
Rispetto le proporzioni del 78% train, 11% test e 11% validazione. Su 2000 campioni avremo:
- 1560 campioni di train
- 220 campioni di test
- 220 campioni di validazione

In [20]:
# randomizziamo
import random

random.shuffle(positive_bboxes_train)
random.shuffle(positive_bboxes_test)
random.shuffle(positive_bboxes_val)

reduced_train = positive_bboxes_train[:2560]
reduced_test = positive_bboxes_test[:1200]
reduced_val = positive_bboxes_val[:2000]

In [21]:
positive_samples = []

### Train positivi

In [22]:
sub_sets = []
for i in range(len(reduced_train)):
    if (i%100 == 0):
        print(f'Index{i}')
    subimages = extract_subset_bbox(reduced_train[i])
    sub_sets.append(subimages)

Index0
Index100
Index200
Index300
Index400
Index500
Index600
Index700
Index800
Index900
Index1000
Index1100
Index1200
Index1300
Index1400
Index1500
Index1600
Index1700
Index1800
Index1900
Index2000
Index2100
Index2200
Index2300
Index2400
Index2500


In [23]:
pos_train_samples = []

In [24]:
train_descriptors = []

Salvo i descrittori di addestramento perché poi alla fine dovrei normalizzare i dati, considerando i valori di media e varianza dei descrittori di train!

Questo processo è la *standardizzazione*

In [25]:
for i in range(len(sub_sets)):
    if (i%100 == 0):
        print(f'Index {i}')
    if sub_sets[i]:
        subimages = sub_sets[i].copy()
        for subimage in subimages:
            if subimage is not None:
                subimage = subimage_processing(subimage)
                descriptor = hog.compute(subimage)
                train_descriptors.append(descriptor)
                new_sample = (descriptor,1)
                positive_samples.append(new_sample)
                pos_train_samples.append(new_sample)

Index 0
Index 100
Index 200
Index 300
Index 400
Index 500
Index 600
Index 700
Index 800
Index 900
Index 1000
Index 1100
Index 1200
Index 1300
Index 1400
Index 1500
Index 1600
Index 1700
Index 1800
Index 1900
Index 2000
Index 2100
Index 2200
Index 2300
Index 2400
Index 2500


In [26]:
print(f'Ci sono {len(pos_train_samples)} campioni per il train')
print(f'Ci sono {len(pos_train_samples[0][0])} descrittori per ogni immagine')

Ci sono 50265 campioni per il train
Ci sono 3780 descrittori per ogni immagine


### Test positivi

In [27]:
sub_sets = []
for i in range(len(reduced_test)):
    if (i%100 == 0):
        print(f'Index{i}')
    subimages = extract_subset_bbox(reduced_test[i])
    sub_sets.append(subimages)

Index0
Index100
Index200
Index300
Index400
Index500
Index600
Index700
Index800
Index900


In [28]:
pos_test_samples = []

In [29]:
for i in range(len(sub_sets)):
    if (i%100 == 0):
        print(f'Index {i}')
    if sub_sets[i]:
        subimages = sub_sets[i].copy()
        for subimage in subimages:
            if subimage is not None:
                subimage = subimage_processing(subimage)
                descriptor = hog.compute(subimage)
                new_sample = (descriptor,1)
                positive_samples.append(new_sample)
                pos_test_samples.append(new_sample)

Index 0
Index 100
Index 200
Index 300
Index 400
Index 500
Index 600
Index 700
Index 800
Index 900


In [30]:
print(f'Ci sono {len(pos_test_samples)} campioni per il test')
print(f'Ci sono {len(pos_test_samples[0][0])} descrittori per ogni immagine')

Ci sono 17508 campioni per il test
Ci sono 3780 descrittori per ogni immagine


### Val positivi

In [31]:
sub_sets = []
for i in range(len(reduced_val)):
    if (i%100 == 0):
        print(f'Index{i}')
    subimages = extract_subset_bbox(reduced_val[i])
    sub_sets.append(subimages)

Index0
Index100
Index200
Index300
Index400
Index500
Index600
Index700
Index800
Index900


In [32]:
pos_val_samples = []

In [33]:
for i in range(len(sub_sets)):
    if (i%100 == 0):
        print(f'Index {i}')
    if sub_sets[i]:
        subimages = sub_sets[i].copy()
        for subimage in subimages:
            if subimage is not None:
                subimage = subimage_processing(subimage)
                descriptor = hog.compute(subimage)
                new_sample = (descriptor,1)
                positive_samples.append(new_sample)
                pos_val_samples.append(new_sample)

Index 0


Index 100
Index 200
Index 300
Index 400
Index 500
Index 600
Index 700
Index 800
Index 900


In [34]:
print(f'Ci sono {len(pos_val_samples)} campioni per la validazione')
print(f'Ci sono {len(pos_val_samples[0][0])} descrittori per ogni immagine')

Ci sono 19668 campioni per la validazione
Ci sono 3780 descrittori per ogni immagine


## Campioni Negativi

I campioni negativi (ovvero quelli con classe 0) li trovo nella cartella Images_Negative

In [35]:
directory_n = 'Images_negative'

images_negative = []

In [36]:
train_n = directory_n + '/train_neg'
test_n = directory_n + '/test_neg'
val_n = directory_n + '/val_neg'

In [37]:
neg_train_img = []
neg_test_img = []
neg_val_img = []

In [38]:
for root,dirs,files in os.walk(train_n):
    for file in files:
        images_negative.append(str(file))
        neg_train_img.append(str(file))

for root,dirs,files in os.walk(test_n):
    for file in files:
        images_negative.append(str(file))
        neg_test_img.append(str(file))

for root,dirs,files in os.walk(val_n):
    for file in files:
        images_negative.append(str(file))
        neg_val_img.append(str(file))

In [39]:
print(f'Ci sono {len(images_negative)} immagini di campioni negativi')
print(f'Ci sono {len(neg_train_img)} immagini di train campioni negativi')
print(f'Ci sono {len(neg_test_img)} immagini di test campioni negativi')
print(f'Ci sono {len(neg_val_img)} immagini di validazione campioni negativi')

Ci sono 1671 immagini di campioni negativi
Ci sono 1015 immagini di train campioni negativi
Ci sono 453 immagini di test campioni negativi
Ci sono 203 immagini di validazione campioni negativi


Queste immagini sono particolari, difatti se applicassi un descrittore HOG e un multiscale non troverei nessun pedestrian nell'immagine

In [40]:
path = 'Images_negative/train_neg/'
neg_train_img[0]

'no_person__no_bike_184.png'

In [41]:
hog_n = cv2.HOGDescriptor()
hog_n.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())

In [42]:
# neg_example_img = cv2.imread(path + neg_train_img[0])
# cv2.imshow('Immagine esempio', neg_example_img)
# cv2.waitKey(0)
# cv2.destroyAllWindows()

qt.qpa.plugin: Could not find the Qt platform plugin "wayland" in "/home/gabriele/.local/lib/python3.10/site-packages/cv2/qt/plugins"


In [43]:
# locations, confidence = hog_n.detectMultiScale(neg_example_img, padding=(4,4), scale=1.2)

In [41]:
# len(locations)

0

Come previsto, il detector HOG non funziona!
Quindi, adesso, la cosa migliore è quella di selezionare dei campioni randomici dalle immagini per costruire dei campioni negativi ed estrarre comunque i descrittori

In [42]:
# import random

# h,w,_ = neg_example_img.shape
# portion_size = (200, 200) # da intedersi come (x,y)

# tries = 10

# sub_images = []
# for i in range(tries):
#     x = random.randint(0, w-portion_size[1]-1)
#     y = random.randint(0, h-portion_size[0]-1)
#     sub_neg_img = neg_example_img[y:y+portion_size[1], x:x+portion_size[0]].copy()
#     sub_images.append(sub_neg_img)

In [43]:
# for sub_image in sub_images:
#     cv2.imshow('Immagine esempio', sub_image)
#     cv2.waitKey(0)
# cv2.destroyAllWindows()

Ora che sappiamo che comunque riesco a prendere delle finestre casuali, posso procedere a randomizzare il tutto per campioni negativi di train, test e validation. 

In [44]:
negative_samples = []

### Train Negativi

Qui ci dobbiamo ricordare che per calcolare media e varianza del set di train dobbiamo incrementare la variabile dei descrittori!

In [45]:
len(train_descriptors)

50265

In [46]:
neg_train_samples = []

In [47]:
path_train = train_n + '/'
path_train

'Images_negative/train_neg/'

In [48]:
path_train + neg_train_img[0] # prende una immagine

'Images_negative/train_neg/no_person__no_bike_184.png'

In [49]:
import random
portion_size = (200, 200) # da intedersi come (y,x) sempre

In [50]:
for i in range(len(neg_train_img)):
    if (i%100 == 0):
        print(f'Index {i}')
    image_path = path_train + neg_train_img[i]
    image = cv2.imread(image_path) 
    h,w,_ = image.shape
    tries = random.randint(40, 50) # sceglie di prende un numero casuale di immagini        
    sub_images = []
    for j in range(tries):
        w_rand = random.randint(64,portion_size[1])
        h_rand = random.randint(128, portion_size[0])
        x = random.randint(0, w-w_rand-1)
        y = random.randint(0, h-h_rand-1)
        sub_neg_img = image[y:y+h_rand, x:x+w_rand].copy()
        sub_images.append(sub_neg_img)
    for subimage in sub_images:
        subimage = subimage_processing(subimage)
        descriptor = hog.compute(subimage)
        train_descriptors.append(descriptor)
        new_sample = (descriptor,0)
        negative_samples.append(new_sample)
        neg_train_samples.append(new_sample)

Index 0
Index 100




Index 200




Index 300




Index 400




Index 500




Index 600




Index 700




Index 800




Index 900




Index 1000


In [51]:
print(f'Ci sono {len(neg_train_samples)} esempi di addestramento negativi')

Ci sono 45675 esempi di addestramento negativi


Unisco i campioni negativi e i campioni positivi, creando i campioni per train

In [52]:
train_samples = pos_train_samples + neg_train_samples

In [53]:
print(f'In totale ci sono {len(train_samples)} campioni di addestramento')

In totale ci sono 95940 campioni di addestramento


### Test Negativi

In [54]:
neg_test_samples = []

In [55]:
path_test = test_n + '/'
path_test

'Images_negative/test_neg/'

In [56]:
for i in range(len(neg_test_img)):
    if (i%100 == 0):
        print(f'Index {i}')
    image_path = path_test + neg_test_img[i]
    image = cv2.imread(image_path) 
    h,w,_ = image.shape
    tries = random.randint(40, 50) # sceglie di prende un numero casuale di immagini        
    sub_images = []
    for j in range(tries):
        w_rand = random.randint(64,portion_size[1])
        h_rand = random.randint(128, portion_size[0])
        x = random.randint(0, w-w_rand-1)
        y = random.randint(0, h-h_rand-1)
        sub_neg_img = image[y:y+h_rand, x:x+w_rand].copy()
        sub_images.append(sub_neg_img)
    for subimage in sub_images:
        subimage = subimage_processing(subimage)
        descriptor = hog.compute(subimage)
        new_sample = (descriptor,0)
        negative_samples.append(new_sample)
        neg_test_samples.append(new_sample)

Index 0




Index 100




Index 200




Index 300




Index 400


In [57]:
print(f'Ci sono {len(neg_test_samples)} esempi di test negativi')

Ci sono 20459 esempi di test negativi


In [58]:
test_samples = pos_test_samples + neg_test_samples

In [59]:
print(f'Ci sono {len(test_samples)} esempi di test in totale')

Ci sono 37967 esempi di test in totale


### Validazione negativi

In [60]:
neg_val_samples = []

In [61]:
path_val = val_n + '/'
path_val

'Images_negative/val_neg/'

In [62]:
for i in range(len(neg_val_img)):
    if (i%100 == 0):
        print(f'Index {i}')
    image_path = path_val + neg_val_img[i]
    image = cv2.imread(image_path) 
    h,w,_ = image.shape
    tries = random.randint(40, 50) # sceglie di prende un numero casuale di immagini        
    sub_images = []
    for j in range(tries):
        w_rand = random.randint(64,portion_size[1])
        h_rand = random.randint(128, portion_size[0])
        x = random.randint(0, w-w_rand-1)
        y = random.randint(0, h-h_rand-1)
        sub_neg_img = image[y:y+h_rand, x:x+w_rand].copy()
        sub_images.append(sub_neg_img)
    for subimage in sub_images:
        subimage = subimage_processing(subimage)
        descriptor = hog.compute(subimage)
        new_sample = (descriptor,0)
        negative_samples.append(new_sample)
        neg_val_samples.append(new_sample)

Index 0




Index 100




Index 200


In [63]:
print(f'Ci sono {len(neg_val_samples)} campioni di validazione negativi')

Ci sono 9139 campioni di validazione negativi


In [64]:
val_samples = pos_val_samples + neg_val_samples

In [65]:
print(f'CI sono {len(val_samples)} campioni di validazione in totale')

CI sono 28807 campioni di validazione in totale


## Risultati finali della preparazione dei dati

Adesso, se volessimo utilizzare i campioni per addestrare un modello dobbiamo vedere gli array:
- train_samples
- test_samples
- val_samples

Salvo in un file denominato 'summary_parameters' i parametri finora trovati, come numero di campioni utilizzati in fase di train, quindi campioni etc

In [66]:
len(train_samples)

95940

In [67]:
len(test_samples)

37967

In [68]:
len(val_samples)

28807

In [69]:
len(positive_samples)

87441

In [70]:
len(negative_samples)

75273

In [71]:
def count_class_set(set_):
    class_0 = 0
    class_1 = 0
    for item in set_:
        if item[1] == 0:
            class_0 += 1
        else:
            class_1 += 1
    return (class_0, class_1)

In [72]:
suddivisione_train = count_class_set(train_samples)

In [73]:
print(f'Suddivisione: {suddivisione_train[0]} elementi di classe 0, {suddivisione_train[1]} elementi di classe 1')

Suddivisione: 45675 elementi di classe 0, 50265 elementi di classe 1


In [74]:
suddivisione_test = count_class_set(test_samples)
print(f'Suddivisione: {suddivisione_test[0]} elementi di classe 0, {suddivisione_test[1]} elementi di classe 1')

Suddivisione: 20459 elementi di classe 0, 17508 elementi di classe 1


In [75]:
suddivisione_val = count_class_set(val_samples)
print(f'Suddivisione: {suddivisione_val[0]} elementi di classe 0, {suddivisione_val[1]} elementi di classe 1')

Suddivisione: 9139 elementi di classe 0, 19668 elementi di classe 1


# Standardizzazione dati

Prima di procedere con l'implementazione del classificatore, procediamo con la standardizzazione dei dati. Per farlo usiamo i descrittori di train per determinare media e varianza con cui "normalizare" i dati da mandare in pasto al nostro classiicatore

In [76]:
len(train_descriptors)

95940

In [77]:
np_descriptors = np.array(train_descriptors)
np_descriptors.shape

(95940, 3780)

In [78]:
media = np_descriptors.mean()
media

0.13056211

In [79]:
std = np_descriptors.std()
std

0.10273082

In [80]:
type(train_samples[0][0])

numpy.ndarray

In [81]:
def standardizza(h):
    return (h-media)/(std)

In [82]:
# train_samples_np = np.array(train_samples)
# train_samples_np = (train_samples_np - media) / std
std_train_samples = []
for i in range(len(train_samples)):
    h, t = train_samples[i]
    h = standardizza(h)
    std_sample = (h,t)
    std_train_samples.append(std_sample)

In [83]:
len(std_train_samples)

96312

In [84]:
# test_samples_np = np.array(test_samples)
# test_samples_np = (test_samples_np - media) / std
std_test_samples = []
for i in range(len(test_samples)):
    h, t = test_samples[i]
    h = standardizza(h)
    std_sample = (h,t)
    std_test_samples.append(std_sample)

In [85]:
len(std_test_samples)

37909

In [86]:
# val_samples_np = np.array(val_samples)
# val_samples_np = (val_samples_np - media) / std
std_val_samples = []
for i in range(len(val_samples)):
    h, t = val_samples[i]
    h = standardizza(h)
    std_sample = (h,t)
    std_val_samples.append(std_sample)

In [87]:
len(std_val_samples)

28811

# Salvataggio dei valori finora trovati

Il file summary_data conterrà praticamente il numero di valori che stiamo usando per addestrare il nostro classificatore

In [None]:
with open('summary_data.txt', 'w') as f:
    print(f'Campioni per il train: {train_samples}', file=f)
    print(f'Campioni di test: {test_samples}', file=f)
    print(f'Campioni di validazione: {val_samples}', file=f)
    print(f'Campioni positivi: {positive_samples}', file=f)
    print(f'Campioni negativi: {negative_samples}', file=f)
    print(f'Suddivisione train: {suddivisione_train[0]} elementi di classe 0, {suddivisione_train[1]} elementi di classe 1', file=f)
    print(f'Suddivisione test: {suddivisione_test[0]} elementi di classe 0, {suddivisione_test[1]} elementi di classe 1', file=f)
    print(f'Suddivisione: {suddivisione_val[0]} elementi di classe 0, {suddivisione_val[1]} elementi di classe 1')
    

In un file chiamato invece 'parameters.txt' salvo i valori di media e deviazione standard che servono per standardizzare i dati. Difatti, per la natura 'variabile' dei dati, la standardizzazione valutata sull'insieme di train ci permette di avere valori centrati nello spazio delle features di training.

In [None]:
with open('parameters.txt', 'w') as f:
    print(media, file=f)
    print(std, file=f)

# Classificazione dati: costruzione del classificatore

In [88]:
# Importazione delle librerie necessarie
from sklearn.svm import LinearSVC
from sklearn.metrics import accuracy_score
import numpy as np
import pickle

In [89]:
# Preparazione dei dati di train
X_train = np.array([item[0] for item in std_train_samples])
y_train = np.array([item[1] for item in std_train_samples])

# Preparazione dei dati di validazione
X_val = np.array([item[0] for item in std_val_samples])
y_val = np.array([item[1] for item in std_val_samples])

# Preparazione dei dati di test
X_test = np.array([item[0] for item in std_test_samples])
y_test = np.array([item[1] for item in std_test_samples])

In [90]:
def salva_modello(model_svm):
    filename = "finalized_model.sav"
    pickle.dump(model_svm, open(filename, 'wb'))

def carica_modello(filename):
    model = pickle.load(open(filename, 'rb'))
    return model

Definisco una funzione peril calcolo delle metriche di TP, FP, FN

In [1]:
def stima_parametri_modello(y_pred, y_truth):
    TP = 0
    FP = 0
    FN = 0
    for i in range(len(y_pred)):
        if y_pred[i] == y_truth[i] and y_pred[i] == 1:
            TP += 1
        elif y_pred[i] != y_truth[i] and y_pred[i] == 1:
            FP += 1
        elif y_pred[i] != y_truth[i] and y_pred[i] == 0:
            FN += 1
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    f1_score = TP / (TP + (FN + FP)/2 )
    return TP, FP, FN, precision, recall, f1_score

In [2]:
# test
test_pred = [0,0,1,1,0,0]
test_truth = [0,0,1,0,0,0]
stima_parametri_modello(test_pred, test_truth)

(1, 1, 0, 0.5, 1.0, 0.6666666666666666)

Creo un dizionario per contenere gli score per ogni valore di C

In [3]:
best_c_scores = {}

In [91]:
# Iperparametri e score
classifier_scores = {}

c_values = [0.01, 0.1, 1, 10, 100]

for value in c_values:

    # Istanziazione del classificatore
    svm_classifier = LinearSVC(C=value)
    svm_classifier.fit(X_train, y_train)

    val_prediction = svm_classifier.predict(X_val)
    val_score = accuracy_score(y_val, val_prediction)

    # Qui bisogna calcolare TP, FP, TN e FN
    TP, FP, FN, precision, recall, f1_score = stima_parametri_modello(val_prediction, y_val)
    best_c_scores[f'{c_values}'] = [TP, FP, FN, precision, recall, f1_score]

    # Salvataggio del punteggio
    classifier_scores[value] = val_score

print(classifier_scores)

'# Iperparametri e score\nclassifier_scores = {}\n\nc_values = [0.01, 0.1, 1, 10, 100]\n\nfor value in c_values:\n\n    # Istanziazione del classificatore\n    svm_classifier = LinearSVC(C=value)\n    svm_classifier.fit(X_train, y_train)\n\n    val_prediction = svm_classifier.predict(X_val)\n    val_score = accuracy_score(y_val, val_prediction)\n\n    # Salvataggio del punteggio\n    classifier_scores[value] = val_score\n\nprint(classifier_scores)'

Salvo i parametri trovati in fase di addestramento e ricerca del miglior iperparametro

In [None]:
best_c_scores

In [None]:
with open('parametri_best_c.txt', 'w') as f:
    for key, values in best_c_scores.items():
        c_value = float(key)
        TP, FP, FN, precision, recall, f1_score = values
        print(f'{c_value}, {TP}, {FP}, {FN}, {precision}, {recall}, {f1_score}', file=f)

In [92]:
# Ricerca del miglior iperparametro
best_c = 0
best_score = 0

# for item in classifier_scores.items():
#     if item[1] > best_score:
#         best_c = item[0]
#         best_score = item[1]


for key, values in best_c_scores.items():
    c_value = float(key)
    TP, FP, FN, precision, recall, f1_score = values
    if f1_score >= best_score:
        best_score = f1_score
        best_c = c_value

# best_c = 1

# Istanziazione del classificatore
svm_classifier = LinearSVC(C=best_c)
svm_classifier.fit(X_train, y_train)

# Test sui dati di test
test_prediction = svm_classifier.predict(X_test)
test_score = accuracy_score(y_test, test_prediction)
print(f"Accuratezza del classificatore sui dati di test: {(test_score * 100):.2f}%")

'# Ricerca del miglior iperparametro\nbest_c = 0\nbest_score = 0\n\nfor item in classifier_scores.items():\n    if item[1] > best_score:\n        best_c = item[0]\n        best_score = item[1]\n\nbest_c = 1\n\n# Istanziazione del classificatore\nsvm_classifier = LinearSVC(C=best_c)\nsvm_classifier.fit(X_train, y_train)\n\n# Test sui dati di test\ntest_prediction = svm_classifier.predict(X_test)\ntest_score = accuracy_score(y_test, test_prediction)\nprint(f"Accuratezza del classificatore sui dati di test: {(test_score * 100):.2f}%")'

Salviamo il modello in maniera preventiva

In [93]:
salva_modello(svm_classifier)

Testiamo anche il fatto che dobbiamo caricare il modello

In [94]:
svm_classifier = carica_modello("finalized_model.sav")

# Test sui dati di test
test_prediction = svm_classifier.predict(X_test)
test_score = accuracy_score(y_test, test_prediction)
print(f"Accuratezza del classificatore sui dati di test: {(test_score * 100):.2f}%")

Accuratezza del classificatore sui dati di test: 90.89%


# Determinare le scale migliori

Per determinare le scale migliori utilizzo una euristica basata su:
- dimensione media delle finestre più grandi
- dimensione media delle finestre più piccole

Infine faccio una media del rapporto di scala della mia finestra win=(64,128) e la dimensione della media delle finestre più grandi e più piccole. La terza scala sarà semplicemente la stessa cosa, ma valutata come media tra i rapporti di scala che ho trovato

Per fare tutto ciò tuttavia necessito delle boundary boxes di train!

In [95]:
len(positive_bboxes_train)

6919

In [96]:
def find_boxes(bboxes):
    bboxes_area = []
    for bbox in bboxes:
        # faccio il casting degli interi
        for i in range(len(bbox)):
            bbox[i] = int(bbox[i])
        x1,y1,x2,y2 = bbox
        area = abs((x2-x1)*(y2-y1))
        bbox_area = (bbox,area)
        bboxes_area.append(bbox_area)
    # print(bboxes_area)
    bboxes_area.sort(key=lambda tup: tup[1])
    # print(bboxes_area)
    min_boundary = bboxes_area[0][0]
    max_bounday = bboxes_area[-1][0]
    medium = len(bboxes_area) // 2
    med_boundary = bboxes_area[medium][0]
    # print(min_boundary)
    # print(med_boundary)
    # print(max_bounday)
    return min_boundary, med_boundary, max_bounday


In [97]:
def calculate_long(bbox):
    x1, y1, x2, y2 = bbox
    delta_x = abs((x2-x1))
    delta_y = abs((y2-y1))
    return delta_x, delta_y

In [98]:
# min, med, max = find_boxes(positive_bboxes_train[0][1])

In [99]:
d_max = []
d_min = []
d_med = []

In [100]:
for i in range(len(positive_bboxes_train)):
    bboxes = positive_bboxes_train[i][1]
    min_v, med_v, max_v = find_boxes(bboxes)
    d_max.append(calculate_long(max_v))
    d_med.append(calculate_long(med_v))
    d_min.append(calculate_long(min_v))

In [101]:
d_max[:5]

[(166, 405), (77, 189), (99, 245), (105, 258), (19, 49)]

In [102]:
def calcola_media(d_bbooxes):
    x_boxes = []
    y_boxes = []
    for i in range(len(d_bbooxes)):
        x_boxes.append(d_bbooxes[i][0])
        y_boxes.append(d_bbooxes[i][1])
    media_x = sum(x_boxes) / len(x_boxes)
    media_y = sum(y_boxes) / len(y_boxes)
    return (media_x, media_y)

In [103]:
media_max = calcola_media(d_max)
media_med = calcola_media(d_med)
media_min = calcola_media(d_min)

In [104]:
media_max

(86.08194825841885, 213.02673796791444)

In [105]:
media_med

(48.39962422315364, 120.97528544587368)

In [106]:
media_min

(27.834369128486774, 72.40453822806764)

In [107]:
winSize = (64, 128)

In [108]:
def find_scala_ottima(media_value, winSize = (64, 128)):
    x_win = winSize[1]
    y_win = winSize[0]
    f_x = media_value[0] / x_win
    f_y = media_value[1] / y_win
    return (1/f_x), (1/f_y)
    # return (1/f_x)

In [109]:
f_max = find_scala_ottima(media_max)
f_max

(1.4869551931578355, 0.300431770258058)

In [110]:
f_med = find_scala_ottima(media_med)
f_med

(2.6446486321843543, 0.5290336762927883)

In [111]:
f_min = find_scala_ottima(media_min)
f_min

(4.598631260839314, 0.8839224938968037)

In [112]:
scale_x = [1/f_min[0], 1/f_med[0], 1/f_max[0]] # in questo modo prima prende i dettagli piccoli, verso quelli più grandi
scale_x

[0.21745600881630292, 0.3781220642433878, 0.6725152207688972]

In [113]:
best_scales = scale_x.copy()
best_scales.reverse()
print(best_scales)

[0.6725152207688972, 0.3781220642433878, 0.21745600881630292]


In [114]:
scale_y = [1/f_min[1], 1/f_med[1], 1/f_max[1]]
scale_y

[1.1313209098135568, 1.8902388350917763, 3.3285427807486636]

# MultiScale Sliding Window

Ricordate che per fare sliding window ogni volta che prendete una sottofinestra dovete fare:

In [417]:
def multiscale(image, stride, window_size, scales):

    # Estrazione della tupla delle dimensioni della finestra
    window_size_w, window_size_h = window_size

    detections = {}

    for scale in scales:

        image = cv2.GaussianBlur(image,(3,3),0)

        resized_image = cv2.resize(image, None, fx=scale, fy=scale)

        detections[scale] = []

        for y in range(0, resized_image.shape[0] - window_size_h, stride):
            for x in range(0, resized_image.shape[1] - window_size_w, stride):
        
                # Estrai la finestra
                window = resized_image[y:y+window_size_h,x:x+window_size_w]

                # Preprocessing
                window_subprocessed = subimage_processing(window)

                # Estrai i descrittori dalla finestra
                descriptor = np.array(hog.compute(window_subprocessed)).reshape(1,-1)

                # Normalizza i descrittori
                descriptor_normalized = (descriptor - media) / std

                if not np.isnan(descriptor_normalized).any():
                    pred = svm_classifier.predict(descriptor_normalized)
                    score = svm_classifier.decision_function(descriptor_normalized)            

                    if pred == 1 and score >= 0.7:
                        detection = (x, y, score)
                        detections[scale].append(detection)
    
    return detections

In [423]:
# image_path = "Images_Positive/000040.jpg"
# test_image = cv2.imread(image_path)

# dets = multiscale(test_image, 15, (64,128), best_scales)
# dets_scaled = function_scale(dets, best_scales, test_image)
# dets_nms = calcola_nms(dets_scaled)

# new_image = test_image.copy()

# for bbox in dets_nms:

#     x, y, w, h, c = bbox

#     cv2.rectangle(new_image, (x, y), (x + w, y + h), (0, 255, 0), 2)

#     print(bbox)

# cv2.imshow("Titolo", new_image)
# cv2.waitKey(0)
# cv2.destroyAllWindows()

h:413, w:550
Scale: 0.6725152207688972
h_r: 278, w_r: 370
Scale: 0.3781220642433878
h_r: 156, w_r: 208
Scale: 0.21745600881630292
h_r: 90, w_r: 120
14.973451327433628
[200, 200, 95, 190, 2.6454828706630544]
2.4845559845559846
[245, 200, 95, 190, 2.412860887748873]
1.672886124685325
[245, 178, 95, 190, 2.1870073655734497]
14.607436230004323
[89, 133, 95, 190, 1.6733551499155457]
0.7645908690976635
[289, 178, 95, 190, 1.5346560608089161]
1.520949720670391
[267, 200, 95, 190, 1.4183213151638707]
4.644152595372107
[200, 178, 95, 190, 1.3042771035423952]
11.363013698630137
[178, 178, 95, 190, 1.2637907474923231]
2.61
[312, 200, 95, 190, 0.7667793004488658]
1.1494492408454897
[245, 155, 95, 190, 0.7413954171717639]
4.803858520900322
[222, 200, 95, 190, 0.7005765199474241]
0.8646694214876033
[22, 155, 95, 190, 1.3497802749397567]
1.5530410183875532
[178, 200, 95, 190, 0.7323063083190027]
[289, 200, 95, 190, 2.8876528896785256]
[44, 200, 95, 190, 1.4552382401369557]
[89, 178, 95, 190, 1.000098

In [355]:
image_path = "Images_Positive/000040.jpg"
test_image = cv2.imread(image_path)

stride = 15

window_size = (64, 128)

window_size_w, window_size_h = window_size

#scales = [1, 0.7, 0.5]

scales = best_scales

detections = {}

for scale in scales:

    test_image = cv2.GaussianBlur(test_image,(3,3),0)

    resized_image = cv2.resize(test_image, None, fx=scale, fy=scale)

    detections[scale] = []

    for y in range(0, resized_image.shape[0] - window_size_h, stride):
        for x in range(0, resized_image.shape[1] - window_size_w, stride):
        
            # Estrai la finestra
            window = resized_image[y:y+window_size_h,x:x+window_size_w]

            # Preprocessing
            window_subprocessed = subimage_processing(window)

            # Estrai i descrittori dalla finestra
            descriptor = np.array(hog.compute(window_subprocessed)).reshape(1,-1)

            # Normalizza i descrittori
            descriptor_normalized = (descriptor - media) / std

            if not np.isnan(descriptor_normalized).any():
                pred = svm_classifier.predict(descriptor_normalized)
                score = svm_classifier.decision_function(descriptor_normalized)            

                if pred == 1 and score >= 0.7:
                    print(score)
                    detection = (x, y, score)
                    cv2.rectangle(resized_image, (x, y), (x + window_size_w, y + window_size_h), (0, 255, 0), 2)
                    detections[scale].append(detection)
        
            # Mostra l'immagine con la finestra scorrevole
            cv2.imshow("Sliding Window", resized_image)

            # WaitKey
            key = cv2.waitKey(25)
            if key == ord('q'):
                cv2.destroyAllWindows()
                break
    
    # Applica la non maxima suppression per la     area_inter = w_inter * h_interscala attuale
    #filtered_detections = non_maxima_suppression(detections[scale])
    
print("End of sliding window")
cv2.waitKey(0)
cv2.destroyAllWindows()

[1.67335515]
[1.31534581]
[0.74139542]
[1.14751504]
[0.95342819]
End of sliding window


# Funzione di scala

In [296]:
def function_scale(detected_bboxes_scale, scales, test_image):
    ''' 
    Questa funzione prende in input tutte le bboxes detected,
    le riscala per ogni scala all'immagine originale
    Restituisce in output una lista contenente le bbxes nel formato: x,y,w,h,confidence
    '''
    h,w,_ = test_image.shape
    print(f'h:{h}, w:{w}')
    finestra = (64,128)
    new_bboxes = []
    for scala in scales:
        print(f'Scale: {scala}')
        resized_test_image = cv2.resize(test_image, None, fx=scala, fy=scala)
        h_r, w_r, _ = resized_test_image.shape
        print(f'h_r: {h_r}, w_r: {w_r}')
        w_f = int(finestra[0] / scala)
        h_f = int(finestra[1] / scala) 
        detected_windows_x_scale = detected_bboxes_scale[scala]
        for bbox in detected_windows_x_scale:
            x1, y1, confidence = bbox
            x_new = int((x1 / w_r) * w)
            y_new = int((y1 / h_r) * h)
            new_box = [x_new, y_new, w_f, h_f, confidence[0]]
            new_bboxes.append(new_box)
    return new_bboxes

In [356]:
new_detections = function_scale(detections, scales, test_image)
new_detections

h:413, w:550
Scale: 0.6725152207688972
h_r: 278, w_r: 370
Scale: 0.3781220642433878
h_r: 156, w_r: 208
Scale: 0.21745600881630292
h_r: 90, w_r: 120


[[89, 133, 95, 190, 1.6733551499155457],
 [22, 155, 95, 190, 1.3153458094013326],
 [245, 155, 95, 190, 0.7413954171717639],
 [267, 155, 95, 190, 1.1475150361685276],
 [200, 178, 95, 190, 0.953428188235357]]

In [401]:
new_test_image = test_image.copy()

for bbox in new_detections:

    x, y, w, h, c = bbox

    cv2.rectangle(new_test_image, (x, y), (x + w, y + h), (0, 255, 0), 2)

    print(bbox)

cv2.imshow("Titolo", new_test_image)
cv2.waitKey(0)
cv2.destroyAllWindows()

[89, 133, 95, 190, 1.6733551499155457]
[22, 155, 95, 190, 1.3153458094013326]
[245, 155, 95, 190, 0.7413954171717639]
[267, 155, 95, 190, 1.1475150361685276]
[200, 178, 95, 190, 0.953428188235357]


# Maxima Suppression Windows

In [389]:
def calcola_iou(box1, box2):

    # Estraggo i punti e lo score
    x1, y1, w1, h1, _ = box1
    x2, y2, w2, h2, _ = box2

    # Calcolo il secondo punto del primo box
    a1 = w1 + x1
    b1 = h1 + y1

    # Calcolo il secondo punto del secondo box
    a2 = w2 + x2
    b2 = h2 + y2
    
    # Calcolo i valori massimi e minimi dell'intersezione
    x_inter = max(x1, x2)
    y_inter = max(y1, y2)
    w_inter = min(a1 + w1, a2 + w2) - x_inter
    h_inter = min(b1 + h1, b2 + h2) - y_inter

    # Calcolo aree dell'intersezione
    base_1 = abs(a1 - x1) 
    altezza_1 = abs(b1 - y1)
    area1 = base_1 * altezza_1
    base_2 = abs(a2 - x2) 
    altezza_2 = abs(b2 - y2)
    area2 = base_2 * altezza_2

    base = abs(w_inter - x_inter) 
    altezza = abs(h_inter -  y_inter) 

    area_inter = base * altezza
    
    area_union = area1 + area2 - area_inter
    
    iou = area_inter / area_union

    # factor_scale = area1 / area2 if area1 < area2 else area2 / area1
    return iou

def calcola_nms(detections, score_threshold=0.5):

    final_detections = []
    
    ordered_detections = sorted(detections, key=lambda x: x[-1], reverse=True)

    while ordered_detections:

        #ordered_detections = sorted(ordered_detections, key=lambda x: x[-1], reverse=True)

        head_detection = ordered_detections.pop(0)

        final_detections.append(head_detection)

        canceled_detections = []

        for detection in ordered_detections:

            iou = calcola_iou(head_detection, detection)

            if iou >= score_threshold:
                print(iou)
                print(detection)
                #final_detections.append(detection)
                canceled_detections.append(detection)
        
        new_ordered_detections = [item for item in ordered_detections if item not in canceled_detections]

        ordered_detections = new_ordered_detections.copy()
        
    return final_detections

In [391]:
filtered_detections = calcola_nms(new_detections)
# filtered_detections = calcola_nms(filtered_detections)
filtered_detections

1.1107408057066013
[200, 178, 95, 190, 0.953428188235357]
1.6112115732368897
[245, 155, 95, 190, 0.7413954171717639]


[[89, 133, 95, 190, 1.6733551499155457],
 [22, 155, 95, 190, 1.3153458094013326],
 [267, 155, 95, 190, 1.1475150361685276]]

In [392]:
new_test_image = test_image.copy()

for bbox in filtered_detections:

    x, y, w, h, c = bbox

    cv2.rectangle(new_test_image, (x, y), (x + w, y + h), (0, 255, 0), 2)

cv2.imshow("Titolo", new_test_image)
cv2.waitKey(0)
cv2.destroyAllWindows()


In [393]:
filtered_detections

[[89, 133, 95, 190, 1.6733551499155457],
 [22, 155, 95, 190, 1.3153458094013326],
 [267, 155, 95, 190, 1.1475150361685276]]

# Trovare il miglior parametro di Thresholding

La NMS utilizza un parametro, ovvero un parametro T che serve per 'filtrare le detection' sulla base del valore di IoU

Per determinare il miglior valore devo definire una funzione che trova l'F1-score del modello sul set di validazione utilizzando il nostro classificatore e valori diversi di IoU

## Funzione per lo score

In [6]:
def calcola_scores(detections, ground_truths, IoU_threshold):

    def estrai_box(box):

        box = list(box)

        if len(box) == 4:
            box = tuple(box + [1])
        else:
            box = tuple(box)
        
        return box

    # Inizializzazione delle variabili
    TP = 0 # True positives
    FP = 0 # False positives
    FN = 0 # False negatives, si assume inizialmente che siano tutti tali in quanto i ground truths non sono rilevati dall'algoritmo

    detection_to_remove = [] #deection matchate

    # Iterazione di tutte le detection
    for detection in detections:
        matched = False
        ground_to_remove = []
        # Si iterano tutti i ground truth
        for ground_truth in ground_truths:
            # Conversione dei box a delle tuple e aggiunta di uno score fittizio (serve semplicemente per non avere un errore nella funzione di calcolo IoU e non snaturare la medesima)
            box1 = estrai_box(detection)
            box2 = estrai_box(ground_truth)

            #box1 = tuple(list(detection) + [1])
            #box2 = tuple(list(ground_truth) + [1])
            
            # Calcolo della IOU tra i due box
            iou = calcola_iou(box1, box2)

            # Se l'IOU è maggiore della soglia, aumenta il numero di veri positivi
            if iou >= IoU_threshold:
                # TP += 1
                matched = True # c'è sovrapposizione con una vera annotazione
                ground_to_remove.apped(ground_truth)
                detection_to_remove.append(detection)
                break
        if matched:
            TP += 1
            ground_truths = [item for item in ground_truths if item not in ground_to_remove]
            # aggiorno le annotazioni restanti
    # adesso devo eliminare le detection che sono true positive
    detection = [item for item in detection if item not in detection_to_remove]
    FP = len(detection) # quelle che restano sono falsi positivi
    FN = len(ground_truths) # quelle che restano qua sono falsi negativi, perchè non li ho rilevati ma ci sono
    
    # Calcolo di precision e recall
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)

    # Calcolo dello score F1
    f1_score = (2 * precision * recall) / (precision + recall)
    
    # Ritorna tutto per ogni immagine, quindi devo incrementare
    return TP, FP, FN, precision, recall, f1_score

In [None]:
t_values = [0.3, 0.35, 0.4, 0.45, 0.5]
best_t = {}

In [None]:
subset_validation = positive_bboxes_val[:100]

In [None]:
subset_validation_dict = dict(subset_validation)

In [None]:
# # pipeline di esempio
# # path, annotations = subset_validation_dict
# # image_path = f"Images_Positive/{image[0]}.jpg"
# test_image = cv2.imread(image_path)
# dets = multiscale(test_image, 15, (64,128), best_scales)
# dets_scaled = function_scale(dets, best_scales, test_image)
# detected_boxes = calcola_nms(dets_scaled, t)

In [None]:
t_statitics = {}

In [None]:
for t in t_values:
    TP = 0
    FP = 0
    FN = 0
    for path, annotations in subset_validation_dict.items():
        image_path = f'Images_Positive/{path}.jpg'
        image = cv2.imread(image_path)
        dets = multiscale(image, 15, (64,128), best_scales)
        dets_scaled = function_scale(dets, best_scales, image)
        detected_boxes = calcola_nms(dets_scaled, t)
        # ora confronto per aggionrare TP, FP, FN
        TP_i, FP_i, FN_i, _, _, _ = calcola_scores(detected_boxes, annotations)
        TP += TP_i
        FP += FP_i
        FN += FN_i

    # Calcolo di precision e recall
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    # Calcolo dello score F1
    f1_score = (2 * precision * recall) / (precision + recall)
    t_statitics[f't'] = [TP, FP, FN, precision, recall, f1_score]


In [None]:
t_statitics

In [None]:
with open('best_iou_threshold.txt', 'w') as f:
    for key, value in t_statitics.items():
        t_value = float(key)
        TP, FP, FN, precision, recall, f1_score = values
        print(f'{t_value}, {TP}, {FP}, {FN}, {precision}, {recall}, {f1_score}', file=f)

Dopodiché qua devo prendere un insieme di boxes di validazione (o un sottoinsieme), di cui conosco le annotations, per fare in modo di utilizzare queste come valori veri per calcolare l'F1-Score

In [None]:
# Ricerca del miglior iperparametro
best_t = 0
best_score = 0

# for item in classifier_scores.items():
#     if item[1] > best_score:
#         best_c = item[0]
#         best_score = item[1]


for key, values in t_statitics.items():
    t_value = float(key)
    TP, FP, FN, precision, recall, f1_score = values
    if f1_score >= best_score:
        best_score = f1_score
        best_t = t_value

In [None]:
best_t

# Testing del classificatore

creo la tabella dove salvare i valori

In [394]:

#matrice = np.zeros((2,5))

#header = ['', 'TP','FP','FN','Prec', 'Recall']
#matrice[0,:] = ['MioClassificatore', 'TP','FP','FN','Prec', 'Recall']
#matrice[1,:] = ['Classificatore cv2', 'TP','FP','FN','Prec', 'Recall']


# Confronto col classificatore di OpenCV

In [395]:
# Istanziamento del classificatore di OpenCV
win_size = (64, 128)
cell_size = (8, 8)
block_size = (16, 16)
block_stride = (8, 8)
num_bins = 9

# Set the parameters of the HOG descriptor using the variables defined above
hog_cv = cv2.HOGDescriptor(win_size, block_size, block_stride, cell_size, num_bins)
hog_cv.setSVMDetector(cv2.HOGDescriptor_getDefaultPeopleDetector())
image_path = "Images_Positive/009278.jpg"
test2_image = cv2.imread(image_path)
(rects, weights) = hog_cv.detectMultiScale(test2_image, winStride=(4, 4), padding=(8, 8), scale=1.5)
for rect in rects:
    x, y, w, h = rect
    cv2.rectangle(test2_image, (x, y), (x + w, y + h), (0, 255, 0), 2)
cv2.imshow("Titolo", test2_image)
cv2.waitKey(0)
cv2.destroyAllWindows()

# Calcolo degli score

Conversione dei ground truths in un dizionario:

In [396]:
positive_bboxes_test_dict = dict(positive_bboxes_test)

Si considerano i ground truths dell'immagine di test:

In [397]:
positive_bboxes_test_dict['009278']

[[2, 97, 50, 215],
 [37, 92, 86, 215],
 [72, 91, 122, 214],
 [112, 91, 162, 215],
 [154, 95, 203, 217],
 [195, 93, 246, 217],
 [234, 99, 281, 213],
 [275, 98, 322, 213],
 [317, 94, 365, 214],
 [357, 95, 406, 217],
 [398, 94, 445, 210],
 [441, 101, 487, 215]]

Di seguito l'algoritmo:

In [426]:
# def calcola_scores(detections, ground_truths, iou_score):

#     def estrai_box(box):

#         box = list(box)

#         if len(box) == 4:
#             box = tuple(box + [1])
#         else:
#             box = tuple(box)
        
#         return box

#     # Inizializzazione delle variabili
#     TP = 0 # True positives
#     FP = 0 # False positives
#     FN = 0 # False negatives, si assume inizialmente che siano tutti tali in quanto i ground truths non sono rilevati dall'algoritmo

#     # Iterazione di tutte le detection
#     for detection in detections:

#         # Si iterano tutti i ground truth
#         for ground_truth in ground_truths:

#             # Conversione dei box a delle tuple e aggiunta di uno score fittizio (serve semplicemente per non avere un errore nella funzione di calcolo IoU e non snaturare la medesima)
#             box1 = estrai_box(detection)
#             box2 = estrai_box(ground_truth)

#             #box1 = tuple(list(detection) + [1])
#             #box2 = tuple(list(ground_truth) + [1])
            
#             # Calcolo della IOU tra i due box
#             iou = calcola_iou(box1, box2)

#             # Se l'IOU è maggiore della soglia, aumenta il numero di veri positivi
#             if iou >= iou_score:
#                 TP += 1
#             elif iou > 0 and iou < iou_score:
#                 FP += 1
#             else:
#                 FN += 1

#     # Calcolo di precision e recall
#     precision = TP / (TP + FP)
#     recall = TP / (TP + FN)

#     # Calcolo dello score F1
#     f1_score = (2 * precision * recall) / (precision + recall)
    
#     # Ritorna tutto
#     return TP, FP, FN, precision, recall, f1_score

## Score del classificatore di OpenCV

In [427]:
TP, FP, FN, precision, recall, f1_score = calcola_scores(rects, positive_bboxes_test_dict['009278'], 0.5)

In [428]:
print(f"True positives: {TP}. False positives: {FP}. False negatives: {FN}. Precision: {(precision * 100):.2f}%. Recall: {(recall * 100):.2f}%.")

True positives: 19. False positives: 8. False negatives: 9. Precision: 70.37%. Recall: 67.86%.


## Score del classificatore CCS

In [429]:
image_path = "Images_Positive/009278.jpg"
test_image = cv2.imread(image_path)

dets = multiscale(test_image, 15, (64,128), best_scales)
dets_scaled = function_scale(dets, best_scales, test_image)
dets_nms = calcola_nms(dets_scaled)

new_image = test_image.copy()

for bbox in dets_nms:

    x, y, w, h, c = bbox

    cv2.rectangle(new_image, (x, y), (x + w, y + h), (0, 255, 0), 2)

    print(bbox)

cv2.imshow("Titolo", new_image)
cv2.waitKey(0)
cv2.destroyAllWindows()

h:249, w:500
Scale: 0.6725152207688972
h_r: 167, w_r: 336
Scale: 0.3781220642433878
h_r: 94, w_r: 189
Scale: 0.21745600881630292
h_r: 54, w_r: 109
[312, 44, 95, 190, 1.8739419183909423]
[290, 0, 95, 190, 0.9510060377457481]


In [431]:
TP, FP, FN, precision, recall, f1_score = calcola_scores(dets_nms, positive_bboxes_test_dict['009278'], 0.5)

print(f"True positives: {TP}. False positives: {FP}. False negatives: {FN}. Precision: {(precision * 100):.2f}%. Recall: {(recall * 100):.2f}%.")

True positives: 11. False positives: 7. False negatives: 6. Precision: 61.11%. Recall: 64.71%.


# Previsioni del classificatore sull'insieme di test

Per paragonare il nostro classificatore con quello nativo di openCV devo:
- per ogni immagine dell'insieme di test
- determinare le bboxes del mio modello per quella immagine
- determinare le bboxes del classificatore opencv per quella immagine
- aggiornare i valori di TP, FP, FN
- alla fine di tutte le immagini, quando ho calcolato i valori di TP, FP, FN calcolo i valori di F1 score, accuracy, precision e e recall per entrambi i modelli

**Modello ccs**: Costa Castelli Sollazzo

**Modello cv**: modello nativo per i pedestrian

In [2]:
TP_ccs = 0
FP_ccs = 0
FN_ccs = 0

TP_cv = 0
FP_cv = 0
FN_cv = 0

Come idea possiamo pensare di scorrere prima le immagini di test positivi, poi quelle di test negativi

In [None]:
neg_test_img # immagini negative

In [None]:
image_path = path_test + neg_test_img[0]
image_path

In [None]:
positive_bboxes_test #annotazioni per le immagini positive

In [None]:
# positive_bboxes_test_dict = dict(positive_bboxes_test)

## Immagini positive di test

In [None]:
for path, annotations in positive_bboxes_test_dict.items():
    image_path = f'Images_Positive/{path}.jpg'
    image = cv2.imread(image_path)
    # applichiamo il nostro classificatore
    dets = multiscale(image, 15, (64,128), best_scales)
    dets_scaled = function_scale(dets, best_scales, image)
    detected_boxes_ccs = calcola_nms(dets_scaled, t)
    # applico il classificatore di opencv
    (detected_cv, weights) = hog_cv.detectMultiScale(image, winStride=(4, 4), padding=(8, 8), scale=1.5)
    # ora confronto per aggionrare TP, FP, FN
    TP_ccs_i, FP_ccs_i, FN_ccs_i, _, _, _ = calcola_scores(detected_boxes_ccs, annotations)
    TP_cv_i, FP_cv_i, FN_cv_i, _, _, _  = calcola_scores(detected_cv, annotations)
    TP_ccs += TP_ccs_i
    FP_ccs += FP_ccs_i
    FN_ccs += FN_ccs_i
    TP_cv += TP_cv_i
    FP_cv += FP_cv_i
    FN_cv += FN_cv_i

## Immagini negative di test

In [None]:
for i in range(len(neg_test_img)):
    annotations = [] # non ci sono pedoni
    image_path = path_test + neg_test_img[i]
    image = cv2.imread(image_path)
    # applichiamo il nostro classificatore
    dets = multiscale(image, 15, (64,128), best_scales)
    dets_scaled = function_scale(dets, best_scales, image)
    detected_boxes_ccs = calcola_nms(dets_scaled, t)
    # applico il classificatore di opencv
    (detected_cv, weights) = hog_cv.detectMultiScale(image, winStride=(4, 4), padding=(8, 8), scale=1.5)
    # ora confronto per aggionrare TP, FP, FN
    TP_ccs_i, FP_ccs_i, FN_ccs_i, _, _, _ = calcola_scores(detected_boxes_ccs, annotations)
    TP_cv_i, FP_cv_i, FN_cv_i, _, _, _  = calcola_scores(detected_cv, annotations)
    TP_ccs += TP_ccs_i
    FP_ccs += FP_ccs_i
    FN_ccs += FN_ccs_i
    TP_cv += TP_cv_i
    FP_cv += FP_cv_i
    FN_cv += FN_cv_i

Adesso che ho i valori di TP, FP, FN devo calcolare le metriche di f1-score, accuracy, precision, recall sia per il nostro modello che per quello di openCV

In [None]:
# Calcolo di precision e recall
# modello ccs
precision_ccs = TP_ccs / (TP_ccs + FP_ccs)
recall_ccs = TP_ccs / (TP_ccs + FN_ccs)
# Calcolo dello score F1
f1_score_ccs = (2 * precision_ccs * recall_ccs) / (precision_ccs + recall_ccs)

# modello cv
precision_cv = TP_cv / (TP_cv + FP_cv)
recall_cv = TP_cv / (TP_cv + FN_cv)
# Calcolo dello score F1
f1_score_cv = (2 * precision_cv * recall_cv) / (precision_cv + recall_cv)

In [None]:
with open('test_statitics.txt', 'w') as f:
    print(f'{precision_ccs}, {recall_ccs}, {f1_score_ccs}', file=f)
    print(f'{precision_cv}, {recall_cv}, {f1_score_cv}', file=f)