In [None]:
import torch
from PIL import Image
import torchvision.transforms as T
import matplotlib.pyplot as plt
import cv2
import numpy as np
import os
from scipy.optimize import linear_sum_assignment

In [None]:
model = torch.hub.load('facebookresearch/detr:main', 'detr_resnet50', pretrained=True)

Downloading: "https://github.com/facebookresearch/detr/zipball/main" to /root/.cache/torch/hub/main.zip
Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 106MB/s]
Downloading: "https://dl.fbaipublicfiles.com/detr/detr-r50-e632da11.pth" to /root/.cache/torch/hub/checkpoints/detr-r50-e632da11.pth
100%|██████████| 159M/159M [00:01<00:00, 105MB/s]


In [None]:
import numpy as np
from tensorflow.keras.applications import VGG16
from tensorflow.keras.preprocessing import image
from tensorflow.keras.applications.vgg16 import preprocess_input
from tensorflow.keras.models import Model
from scipy.spatial.distance import cosine

In [None]:
class sim_VGG16_net:
    def __init__(self):
        # Carica il modello VGG16 pre-addestrato
        self.base_model = VGG16(weights='imagenet', include_top=True)
        # Estrai l'output dello strato prima dell'ultimo strato completamente connesso
        self.model = Model(inputs=self.base_model.input, outputs=self.base_model.get_layer('fc2').output)

    # Funzione per caricare e pre-processare un'immagine
    def load_and_preprocess_image(self, frame):
        # img = image.load_img(image_path, target_size=(224, 224))
        # VGG accetta in input immagini 224x224
        frame = frame.convert('RGB')
        img_resized = frame.resize((224, 224))
        img_array = image.img_to_array(img_resized)
        img_array = np.expand_dims(img_array, axis=0)
        img_array = preprocess_input(img_array)
        return img_array

    # Funzione per estrarre le features da un'immagine
    def extract_features_nb(self, frame):
        img = self.load_and_preprocess_image(frame)
        features = self.model.predict(img)
        return features.flatten()

    # Funzione per estrarre le features da un'immagine
    def extract_features(self, frame, bbox):
        # Definisci il bounding box per il crop
        # (left, upper): The coordinates of the top-left corner of the bounding box.
        # (right, lower): The coordinates of the bottom-right corner of the bounding box.
        x1,y1,x2,y2 = bbox  # (left, upper, right, lower)
        x1 = int(x1)
        y1 = int(y1)
        x2 = int(x2)
        y2 = int(y2)
        sub_box = (x1,y1,x2,y2)
        subbox = frame.crop(sub_box)
        img = self.load_and_preprocess_image(subbox)
        # per togliere il verbose model.predict(x,verbose=0)
        # features = self.model.predict(img)
        features = self.model.predict(img,verbose=0)
        return features.flatten()

    # Funzione per calcolare la similarità tra due immagini basata sulla distanza coseno delle features
    def calculate_similarity(self, frame1, frame2):
        features1 = self.extract_features(frame1)
        features2 = self.extract_features(frame2)
        # Calcola la distanza coseno tra le features
        similarity = 1 - cosine(features1, features2)
        return similarity

    def calculate_similarity_reid(self, frame1, features2):
        features1 = self.extract_features(frame1)
        similarity = 1 - cosine(features1, features2)
        return similarity

    def calulate_similarity_features(self,features1, features2):
        similarity = 1 - cosine(features1, features2)
        return similarity


In [None]:
vgg16 = sim_VGG16_net()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5


In [None]:
# image_test1 = Image.open('/content/image_test_1.jpg')
# image_test2 = Image.open('/content/image_test_2.jpg')

In [None]:
# image_test1

In [None]:
# image_test2

In [None]:
# features1 = vgg16.extract_features_nb(image_test1) #nb stands for no bounding boxes
# features1.shape

In [None]:
# features2 = vgg16.extract_features_nb(image_test2)
# features2.shape

In [None]:
# vgg16.calulate_similarity_features(features1, features2)

In [None]:
# vgg16.calulate_similarity_features(features1, features1)

In [None]:
# vgg16.calulate_similarity_features(features2, features1)

In [None]:
# vgg16.calulate_similarity_features(features2, features2)

In [None]:
COLORS = [[0.000, 0.447, 0.741], [0.850, 0.325, 0.098], [0.929, 0.694, 0.125],
          [0.494, 0.184, 0.556], [0.466, 0.674, 0.188], [0.301, 0.745, 0.933]]

In [None]:
CLASSES = [
    'N/A', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A',
    'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse',
    'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack',
    'umbrella', 'N/A', 'N/A', 'handbag', 'tie', 'suitcase', 'frisbee', 'skis',
    'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove',
    'skateboard', 'surfboard', 'tennis racket', 'bottle', 'N/A', 'wine glass',
    'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich',
    'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake',
    'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table', 'N/A',
    'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard',
    'cell phone', 'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A',
    'book', 'clock', 'vase', 'scissors', 'teddy bear', 'hair drier',
    'toothbrush'
]

In [None]:
def box_cxcywh_to_xyxy(x):
    x_c, y_c, w, h = x.unbind(1)
    b = [(x_c - 0.5 * w), (y_c - 0.5 * h),
         (x_c + 0.5 * w), (y_c + 0.5 * h)]

    return torch.stack(b, dim=1)

'''def rescale_bboxes(out_bbox, size):
    img_w, img_h, _ = size
    b = box_cxcywh_to_xyxy(out_bbox)
    b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32)
    return b'''

def plot_results(pil_img, prob, boxes):
    plt.figure(figsize=(16,10))
    plt.imshow(pil_img)
    ax = plt.gca()
    for p, (xmin, ymin, xmax, ymax), c in zip(prob, boxes.tolist(), COLORS * 100):
        ax.add_patch(plt.Rectangle((xmin, ymin), xmax - xmin, ymax - ymin,
                                   fill=False, color=c, linewidth=3))
        cl = p.argmax()
        text = f'{CLASSES[cl]}: {p[cl]:0.2f}'
        ax.text(xmin, ymin, text, fontsize=15,
                bbox=dict(facecolor='yellow', alpha=0.5))
    plt.axis('off')
    plt.show()


'''def detect(model, im, transform = None, threshold_confidence = 0.7):
    if transform is None:
        # standard PyTorch mean-std input image normalization
        transform = T.Compose([
        T.Resize(800),
        T.ToTensor(),
        T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

    img = transform(im).unsqueeze(0)

    # demo model only support by default images with aspect ratio between 0.5 and 2
    # if you want to use images with an aspect ratio outside this range
    # rescale your image so that the maximum size is at most 1333 for best results
    assert img.shape[-2] <= 1600 and img.shape[-1] <= 1600, 'demo model only supports images up to 1600 pixels on each side'

    # propagate through the model
    outputs = model(img)

    # keep only predictions with a confidence > threshold_confidence
    probas = outputs['pred_logits'].softmax(-1)[0, :, :-1]
    keep = probas.max(-1).values > threshold_confidence

    # convert boxes from [0; 1] to image scales
    bboxes_scaled = rescale_bboxes(outputs['pred_boxes'][0, keep], im.size)
    return probas[keep], bboxes_scaled'''

"def detect(model, im, transform = None, threshold_confidence = 0.7):\n    if transform is None:\n        # standard PyTorch mean-std input image normalization\n        transform = T.Compose([\n        T.Resize(800),\n        T.ToTensor(),\n        T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])\n        ])\n\n    img = transform(im).unsqueeze(0)\n\n    # demo model only support by default images with aspect ratio between 0.5 and 2\n    # if you want to use images with an aspect ratio outside this range\n    # rescale your image so that the maximum size is at most 1333 for best results\n    assert img.shape[-2] <= 1600 and img.shape[-1] <= 1600, 'demo model only supports images up to 1600 pixels on each side'\n\n    # propagate through the model\n    outputs = model(img)\n\n    # keep only predictions with a confidence > threshold_confidence\n    probas = outputs['pred_logits'].softmax(-1)[0, :, :-1]\n    keep = probas.max(-1).values > threshold_confidence\n\n    # convert bo

In [None]:
def rescale_bboxes(boxes, size):

    img_w, img_h = size

    b = box_cxcywh_to_xyxy(boxes)

    b = b * torch.tensor([img_w, img_h, img_w, img_h], dtype=torch.float32)

    return b

In [None]:
# Funzione per rilevare i pedoni (modifica di quella della prof)
def detect_pedestrians(threshold_confidence, model, im, transform = None):
    if transform is None:

        # standard PyTorch mean-std input image normalization
        transform = T.Compose([
        T.Resize(800),
        T.ToTensor(),
        T.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

    img = transform(im).unsqueeze(0)

    # demo model only support by default images with aspect ratio between 0.5 and 2
    # if you want to use images with an aspect ratio outside this range
    # rescale your image so that the maximum size is at most 1333 for best results
    assert img.shape[-2] <= 1600 and img.shape[-1] <= 1600, 'demo model only supports images up to 1600 pixels on each side'

    outputs = model(img)

    # keep only predictions with a confidence > threshold_confidence
    probas = outputs['pred_logits'].softmax(-1)[0, :, :-1]
    max_probas = probas.max(-1).values
    keep = probas.max(-1).values > threshold_confidence
    labels = probas.argmax(-1)

    # Filter by pedestrian
    keep = keep & (labels == 1)

    # Extract the confidences for the kept boxes
    confidences = max_probas[keep].detach().numpy()

    # convert boxes from [0; 1] to image scales
    bboxes_scaled = rescale_bboxes(outputs['pred_boxes'][0, keep], im.size)
    return confidences, bboxes_scaled.detach().numpy()

In [None]:
# conf, bboxes_scaled = detect_pedestrians(model, Image.open('/content/im.jpg'))
# print(conf)
# print(bboxes_scaled)

In [None]:
# frame_test = Image.open('/content/im.jpg')
# features_test = []
# for bbox in bboxes_scaled:
#     feature = vgg16.extract_features(frame_test, bbox)
#     features_test.append(feature)
#     print(feature.shape)

In [None]:
# feature_frame0 = features_test[2]
# feature_frame1 = features_test[4]
# vgg16.calulate_similarity_features(feature_frame0, feature_frame1)

In [None]:
class Tracker:

    def __init__(self):

        # Lista di tracker
        self.trackers = []

        # Contatore per assegnare ID univoci ai pedoni
        self.track_counter = 0

        # Quanti frame devo aspettare prima che il tracker venga rimosso dall'immagine
        self.max_lost_frames = 10

        # definisco un vettore di vanishing tracks, che mi servirà poi per determinare
        # quelli che sono i track che avevo perso. Rispecchia la struttura di un trackers
        # ma è formato da <id> <feature_desc> <lost>
        self.vanishing_tracks = []

        self.dead_tracks = []

    def update_tracker(self, confidences, detections, frame, vgg16, theshold_det_track=1.0, theshold_reid=1.0):

        # Se non ci sono tracker, significa che è il primo insieme di rilevamenti, quindi bisogna aggiungere ogni nuovo oggetto tracciato
        if not self.trackers:
            # print("Inizializzazione")
            for (detection,conf) in zip(detections,confidences):
                track = {'bbox': detection, 'id': self.track_counter, 'conf':conf, 'lost':0}
                self.trackers.append(track)
                self.track_counter += 1

        # In caso contrario, bisogna gestire i rilevamenti esistenti
        else:

            # Memorizza i rilevamenti esistenti, ovvero le identità presenti al momento
            current_bboxes = [tracker['bbox'] for tracker in self.trackers if tracker['lost'] == 0]
            current_frames = [tracker for tracker in self.trackers if tracker['lost'] == 0]

            # Calcola la matrice di costo tra le identità giù segnate al frame t-1
            # e le nuove detection
            cost_matrix = np.zeros((len(current_bboxes), len(detections)))

            for tracker_index, tracker in enumerate(current_bboxes):
                for detection_index, detection in enumerate(detections):
                    cost_matrix[tracker_index, detection_index] = self.compute_cost(tracker, detection)

            # max = cost_matrix.max()
            max = 1
            norm_cost_matrix = cost_matrix / max
            row_indices, col_indices = linear_sum_assignment(norm_cost_matrix)

            # Crea una lista di coppie con le corrispondenze ottimali
            matched_indices = list(zip(row_indices, col_indices))

            # Crea dei set di detection e tracker non matchati
            unmatched_detections = set(range(len(detections))) - set(col_indices)
            unmatched_trackers = set(range(len(current_bboxes))) - set(row_indices)

            # Itera su tutte le coppie di indici corrispondenti ottenute dall'algoritmo ungherese.
            # Per ogni coppia aggiorna il bounding box e rimposta il contatore di fotogrammi persi a zero perché il rilevamento dell'oggetto continua.
            for t_idx, d_idx in matched_indices:
                # qui bisogna mettere una soglia sulle assegnazioni corrispondenti,
                # se il valore della matrice di costo C[t_idx, d_idx] è maggiore di un certo valore allora assegna
                # altrimenti è un lost!
                # usare theshold_det_track
                # print(norm_cost_matrix[t_idx, d_idx])
                # se stanno sotto la soglia allora vanno bene, altrimenti devo scartarli
                # sotto perché è un problema di minimo
                id_track = current_frames[t_idx]['id']
                # print(id_track)
                if self.trackers[id_track]['lost'] == 0:
                    # questo vale solo per le non lost detections
                    if norm_cost_matrix[t_idx, d_idx] <= theshold_det_track:
                        # print(f"matched {id_track} between frames")
                        self.trackers[id_track]['bbox'] = detections[d_idx] # aggiorna la bounding box
                        self.trackers[id_track]['conf'] = confidences[d_idx] # aggiorna la confidence
                        self.trackers[id_track]['lost'] = 0 # aggiorna il numero di frame persi
                        # else:
                        #     print("Per i track con lost != 0 devo vedere con i vanishing!")
                    else:
                        # il valore è inferiore alla soglia richiesta
                        # print("value above theshold_det_track: discard for matching")
                        # print(norm_cost_matrix[t_idx, d_idx])
                        self.trackers[id_track]['lost'] += 1
                        bbx = self.trackers[id_track]['bbox']
                        feature_lost = vgg16.extract_features(frame,bbx)
                        lost_track = {'id':id_track, 'bbox': self.trackers[id_track]['bbox'], 'conf': self.trackers[id_track]['conf'], 'feature': feature_lost}
                        self.vanishing_tracks.append(lost_track)

            # Aggiungi nuovi rilevamenti che non hanno corrispondenze precedenti alla lista dei tracker
            # for d_idx in unmatched_detections:
            #     new_track = {'bbox': detections[d_idx], 'id': self.track_counter, 'conf':confidences[d_idx], 'lost':0}
            #     self.trackers.append(new_track)
            #     self.track_counter += 1
            # prima di aggiungere i nuovi rilevamenti devo andare a scandire gli unmatched_detections.
            # Cosa devo fare qua:
            # 1- prendo i unmatched_detections e i vanishing_tracks
            # 2- determino una matrice di costo proprio come fatto prima
            # 3- per righe metto i vanishing_tracks, per colonne le unmatched_detections
            # 4- estraggo le colonne e le righe dal mio algoritmo
            remaining_detection = []
            remaining_confidences = []
            for d_idx in unmatched_detections:
                remaining_detection.append(detections[d_idx])
                remaining_confidences.append(confidences[d_idx])
            # print(len(remaining_detection))
            # print(remaining_detection)
            # print(len(unmatched_detections))
            # print(unmatched_detections)
            if len(self.vanishing_tracks) != 0 and len(unmatched_detections) != 0:
                # print("Re-identfication pass")
                cost_matrix_reid = np.zeros((len(self.vanishing_tracks), len(unmatched_detections)))
                # vedere cosa stampa qua, l'errore si trova qua circa
                for v_idx, detection_lost in enumerate(self.vanishing_tracks):
                    for d_idx, detection_remain in enumerate(remaining_detection):
                        # print("breakpoint1")
                        # feature1 corrispondono alle feature della detection non matchata
                        feature_1 = vgg16.extract_features(frame,detection_remain)
                        # feature2 corrisponde invece alle feature che ho traccate
                        # print("breakpoint5")
                        feature_2 = detection_lost['feature']
                        sim_features = vgg16.calulate_similarity_features(feature_2, feature_1)
                        # print("breakpoint6")
                        d_features = 1 - sim_features # varia tra 0 e 1
                        IoU = self.iou(detection_lost['bbox'], detection_remain)
                        d_IoU = 1 - IoU
                        cost_matrix_reid[v_idx, d_idx] = 0.6 * d_features + 0.4 * d_IoU
                        # print("breakpoint2")

                max_reid = cost_matrix_reid.max()
                # normalizzo la matrice
                # print(f'Max reid: {max_reid}')
                # norm_cost_matrix_reid = cost_matrix_reid / max_reid
                norm_cost_matrix_reid = cost_matrix_reid

                row_indices, col_indices = linear_sum_assignment(norm_cost_matrix_reid)

                # Crea una lista di coppie con le corrispondenze ottimali
                matched_indices_reid = list(zip(row_indices, col_indices)) # ho matchato le vanishing

                vanishing_list_enumerate = []
                # vanishing_list_enumerate = enumerate(self.vanishing_tracks)
                for t_idx, det in enumerate(self.vanishing_tracks):
                    vanishing_list_enumerate.append(det)

                # riassegno le matched solo se hanno un valore di soglia opportuno
                for t_idx, d_idx in matched_indices_reid:
                    feature_1_test = vgg16.extract_features(frame,remaining_detection[d_idx])
                    feature_2_test = vanishing_list_enumerate[t_idx]['feature']
                    # print(f'Similarità: {vgg16.calulate_similarity_features(feature_1_test,feature_2_test)}')
                    bbox = vanishing_list_enumerate[t_idx]['bbox']
                    # print(f'IoU sim: {self.iou(bbox,remaining_detection[d_idx])}')
                    if norm_cost_matrix_reid[t_idx, d_idx] <= theshold_reid:
                        # aggiorno utilizzando l'ID delle vanished che cammina di pari passo con trackers
                        # con le remaining_detections
                        # print(f"Effettuata reidentificazione: {vanishing_list_enumerate[t_idx]['id']}")
                        self.trackers[vanishing_list_enumerate[t_idx]['id']]['bbox'] = remaining_detection[d_idx] # aggiorna la bounding box
                        self.trackers[vanishing_list_enumerate[t_idx]['id']]['conf'] = remaining_confidences[d_idx] # aggiorna la confidence
                        self.trackers[vanishing_list_enumerate[t_idx]['id']]['lost'] = 0 # aggiorna il numero di frame persi
                        # tolgo da vanishing
                        self.vanishing_tracks = [t for t in self.vanishing_tracks if t['id'] != vanishing_list_enumerate[t_idx]['id']]
                    else:
                        # print(f"Valore sopra la soglia per la re-id: {norm_cost_matrix_reid[t_idx, d_idx]}")
                        # se non reidentifico allora devo creare una nuova detection
                        # print("New detection!")
                        new_track = {'bbox': remaining_detection[d_idx], 'id': self.track_counter, 'conf': remaining_confidences[d_idx], 'lost':0}
                        self.trackers.append(new_track)
                        self.track_counter += 1


                # Crea dei set di detections non matchate con le vanishing, quindi nuove detections
                unmatched_unmatched_detections = set(range(len(remaining_detection))) - set(col_indices)

                # adesso provo a matchare e unmatchare con i vanishing
                # se matchano allora provvedo a reinserire nel tracker l'id a lost=0,
                # aggiorno la confidence e la boundary box sulla base della matchata
                # altrimenti la devo assegnae nuova
                for d_idx in unmatched_unmatched_detections:
                    # print("New detection after re-id not found!")
                    new_track = {'bbox': remaining_detection[d_idx], 'id': self.track_counter, 'conf': remaining_confidences[d_idx], 'lost':0}
                    self.trackers.append(new_track)
                    self.track_counter += 1
            else:
                # ancora non ci sono track scomparse
                for d_idx in unmatched_detections:
                    # print("New detection!")
                    new_track = {'bbox': detections[d_idx], 'id': self.track_counter, 'conf':confidences[d_idx], 'lost':0}
                    self.trackers.append(new_track)
                    self.track_counter += 1

            vanished_keys = [vanished['id'] for vanished in self.vanishing_tracks]
            # Aumenta il contatore per i tracker persi
            for t_idx in unmatched_trackers:
                # print("breakpoint3")
                # print(bbx)
                id_track = current_frames[t_idx]['id']
                # print(f"Lost track id: {t_idx}")
                self.trackers[id_track]['lost'] += 1
                bbx = self.trackers[id_track]['bbox']
                if t_idx not in vanished_keys:
                    feature_lost = vgg16.extract_features(frame,bbx)
                    lost_track = {'id':id_track, 'bbox': self.trackers[id_track]['bbox'], 'conf': self.trackers[id_track]['conf'], 'feature': feature_lost}
                    self.vanishing_tracks.append(lost_track)

            # devo aggiornare i track persi di quelli già persi
            # for idx in range(len(self.trackers)):
            #     if self.trackers[idx]['lost'] > 0:
            #         self.trackers[idx]['lost'] += 1

            # rivedere un attimo perché se tolgo un track allora la lista è più piccola
            # Rimuovi gli oggetti non tracciati per troppo tempo
            # self.trackers = [t for t in self.trackers if t['lost'] <= self.max_lost_frames]
            keep_tracks = []
            for t in self.trackers:
                if t['lost'] > self.max_lost_frames:
                    self.dead_tracks.append(t['id'])
                    id_dead = t['id']
                    # print(f'Morta la track {id_dead}')
                if t['lost'] > 0 and t['lost'] <= self.max_lost_frames:
                    keep_tracks.append(t['id'])
            # per test, le dead track impostano a [] l'id corrispondente:
            for id_dead in self.dead_tracks:
                self.trackers[id_dead] = []
            # tolgo da vanishing le track perse ma non ancora morte
            self.vanishing_tracks = [t for t in self.vanishing_tracks if t['id'] in keep_tracks]
            # print(f'Track perse conservate: {self.vanishing_tracks}')


    def compute_cost(self, tracker, detection):
        t_x1, t_y1, t_x2, t_y2 = tracker
        d_x1, d_y1, d_x2, d_y2 = detection
        iou = self.iou(tracker, detection)
        # dist = np.linalg.norm(np.array([(t_x1+t_x2)/2, (t_y1+t_y2)/2]) - np.array([(d_x1+d_x2)/2, (d_y1+d_y2)/2]))
        return (1-iou) # la distanza varia tra 0 e 1

    def iou(self, box1, box2):
        x1 = max(box1[0], box2[0])
        y1 = max(box1[1], box2[1])
        x2 = min(box1[2], box2[2])
        y2 = min(box1[3], box2[3])
        inter_area = max(0, x2 - x1) * max(0, y2 - y1)
        box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
        box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
        union_area = box1_area + box2_area - inter_area
        return inter_area / union_area if union_area != 0 else 0

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import matplotlib.font_manager as fm
import numpy as np

def plot_image_w_detections(image, detections):
    fig, ax = plt.subplots()
    ax.imshow(image)
    for detection in detections:
        frame,id,x,y,w,h,conf,_,_,_ = detection
        rectangle = patches.Rectangle((x, y), w, h, linewidth=1, edgecolor='green', facecolor='none')
        ax.add_patch(rectangle)

        # Step 4: Add text
        # Define the text and its position
        text = f"id: {id}, conf:{conf:.2f}"
        text_position = (x, y-10)  # Position the text at the top-left corner with some padding
        # Add the text to the plot with alignment properties
        ax.text(*text_position, text, fontsize=5, color='green',
        verticalalignment='top', horizontalalignment='left', bbox=dict(facecolor='white', alpha=0.5))


    # Step 5: Display the image
    plt.axis('off')  # Turn off the axis
    plt.show()

In [None]:
# def process_image_folder(folder_path, frame_size=(640, 360), detection_interval=2, frame_limit_flag = False, limit=5):
#     tracker = Tracker()
#     frame_files = sorted([os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(('.png', '.jpg', '.jpeg'))])
#     frame_count = 0

#     detections_frame = []

#     detections_preload = []

#     idx = 0
#     # preload frames
#     for frame_file in frame_files:
#         frame = Image.open(frame_file)
#         print(idx)
#         confidences, detections = detect_pedestrians(im=frame, model=model,threshold_confidence=0.6)
#         detections_preload.append([confidences, detections])
#         idx += 1

#     print('Fine preload')
#     for frame_file in frame_files:
#         if frame_limit_flag and frame_count > limit:
#             break
#         frame = Image.open(frame_file)

#         if frame_count % detection_interval == 0:
#             # confidences, detections = detect_pedestrians(im=frame, model=model)
#             confidences, detections = detections_preload[frame_count]
#         tracker.update_tracker(confidences, detections, frame, vgg16, theshold_det_track = 0.4, theshold_reid=0.4)

#         actual_detections = [] # solo per print

#         for track in tracker.trackers:
#             if track['lost'] == 0:
#                 x1, y1, x2, y2 = map(int, track['bbox'])
#                 x = x1
#                 y = y1
#                 w = x2-x1
#                 h = y2-y1
#                 conf = track['conf']
#                 # poi format_detection deve essere stampato in un file
#                 format_detectetion = [frame_count, track['id'], x,y,w,h, track['conf'],-1,-1,-1]
#                 print(format_detectetion)
#                 actual_detections.append(format_detectetion) # solo per printing
#                 detections_frame.append(format_detectetion)
#         print(f'Frame: {frame_count}')
#         # plot_image_w_detections(frame, actual_detections)
#         frame_count += 1
#     return detections_frame

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# path = '/content/images'
# detected_to_file = process_image_folder(path, frame_size=(1920, 1080), detection_interval=1, frame_limit_flag=True, limit=40)

In [None]:
def extract_detections(folder_path, t):
    frame_files = sorted([os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(('.png', '.jpg', '.jpeg'))])
    frame_count = 0

    detections_preload = []

    idx = 0
    # preload frames
    for frame_file in frame_files:
        frame = Image.open(frame_file)
        print(f'Frame: {idx}')
        confidences, detections = detect_pedestrians(t, im=frame, model=model)
        detection_per_frame = []
        for i in range(len(detections)):
            detection_per_frame.append([detections[i], confidences[i]])
        detections_preload.append(detection_per_frame)
        idx += 1
    return detections_preload

In [None]:
path = '/content/MOT17-13-DPM'
detections = extract_detections(path, t=0.6)

Frame: 0
Frame: 1
Frame: 2
Frame: 3
Frame: 4
Frame: 5
Frame: 6
Frame: 7
Frame: 8
Frame: 9
Frame: 10
Frame: 11
Frame: 12
Frame: 13
Frame: 14
Frame: 15
Frame: 16
Frame: 17
Frame: 18
Frame: 19
Frame: 20
Frame: 21
Frame: 22
Frame: 23
Frame: 24
Frame: 25
Frame: 26
Frame: 27
Frame: 28
Frame: 29
Frame: 30
Frame: 31
Frame: 32
Frame: 33
Frame: 34
Frame: 35
Frame: 36
Frame: 37
Frame: 38
Frame: 39
Frame: 40
Frame: 41
Frame: 42
Frame: 43
Frame: 44
Frame: 45
Frame: 46
Frame: 47
Frame: 48
Frame: 49
Frame: 50
Frame: 51
Frame: 52
Frame: 53
Frame: 54
Frame: 55
Frame: 56
Frame: 57
Frame: 58
Frame: 59
Frame: 60
Frame: 61
Frame: 62
Frame: 63
Frame: 64
Frame: 65
Frame: 66
Frame: 67
Frame: 68
Frame: 69
Frame: 70
Frame: 71
Frame: 72
Frame: 73
Frame: 74
Frame: 75
Frame: 76
Frame: 77
Frame: 78
Frame: 79
Frame: 80
Frame: 81
Frame: 82
Frame: 83
Frame: 84
Frame: 85
Frame: 86
Frame: 87
Frame: 88
Frame: 89
Frame: 90
Frame: 91
Frame: 92
Frame: 93
Frame: 94
Frame: 95
Frame: 96
Frame: 97
Frame: 98
Frame: 99
Frame: 100

In [None]:
len(detections[0])

23

In [None]:
detections[0]

[[array([620.2601 , 512.24786, 652.8814 , 591.8727 ], dtype=float32),
  0.7455426],
 [array([114.685074, 706.71155 , 166.86876 , 811.7171  ], dtype=float32),
  0.95309377],
 [array([1478.754 ,  531.1471, 1505.6257,  592.4311], dtype=float32),
  0.673282],
 [array([1520.455  ,  561.87054, 1573.9979 ,  676.2484 ], dtype=float32),
  0.9929877],
 [array([1188.8218 ,  480.20657, 1216.8882 ,  524.50116], dtype=float32),
  0.6201036],
 [array([1606.1079 ,  548.58936, 1661.0089 ,  672.6865 ], dtype=float32),
  0.96573234],
 [array([807.858  , 510.21274, 828.41016, 554.18964], dtype=float32),
  0.6713118],
 [array([1482.962 ,  530.7892, 1512.114 ,  593.698 ], dtype=float32),
  0.747867],
 [array([529.63086, 540.6116 , 570.0233 , 638.152  ], dtype=float32),
  0.91629726],
 [array([191.47488, 582.7254 , 235.80142, 723.93726], dtype=float32),
  0.91684836],
 [array([1324.0713 ,  498.18338, 1345.5214 ,  552.0971 ], dtype=float32),
  0.67825186],
 [array([1.3235271e-01, 6.6901166e+02, 5.1526787e+01,

In [None]:
with open('/content/drive/MyDrive/master_unipa/MOT17-13-DPM/MOT17-13-DPM-DETR06.txt', 'w') as f:
    frame_count = 0
    for detections_frame in detections:
        # indice del frame, detections e confidences
        for i in range(len(detections_frame)):
            print(f'{frame_count}, {detections_frame[i][0]}, {detections_frame[i][1]}', file=f)
        frame_count += 1
        # for conf, detect in zip(confidences, detections):
        #     print(f'{detect},{conf}', file=f)

In [None]:
path = '/content/MOT17-09-DPM'

In [None]:
detetions = extract_detections(path, t=0.7)

Frame: 0
Frame: 1
Frame: 2
Frame: 3
Frame: 4
Frame: 5
Frame: 6
Frame: 7
Frame: 8
Frame: 9
Frame: 10
Frame: 11
Frame: 12
Frame: 13
Frame: 14
Frame: 15
Frame: 16
Frame: 17
Frame: 18
Frame: 19
Frame: 20
Frame: 21
Frame: 22
Frame: 23
Frame: 24
Frame: 25
Frame: 26
Frame: 27
Frame: 28
Frame: 29
Frame: 30
Frame: 31
Frame: 32
Frame: 33
Frame: 34
Frame: 35
Frame: 36
Frame: 37
Frame: 38
Frame: 39
Frame: 40
Frame: 41
Frame: 42
Frame: 43
Frame: 44
Frame: 45
Frame: 46
Frame: 47
Frame: 48
Frame: 49
Frame: 50
Frame: 51
Frame: 52
Frame: 53
Frame: 54
Frame: 55
Frame: 56
Frame: 57
Frame: 58
Frame: 59
Frame: 60
Frame: 61
Frame: 62
Frame: 63
Frame: 64
Frame: 65
Frame: 66
Frame: 67
Frame: 68
Frame: 69
Frame: 70
Frame: 71
Frame: 72
Frame: 73
Frame: 74
Frame: 75
Frame: 76
Frame: 77
Frame: 78
Frame: 79
Frame: 80
Frame: 81
Frame: 82
Frame: 83
Frame: 84
Frame: 85
Frame: 86
Frame: 87
Frame: 88
Frame: 89
Frame: 90
Frame: 91
Frame: 92
Frame: 93
Frame: 94
Frame: 95
Frame: 96
Frame: 97
Frame: 98
Frame: 99
Frame: 100

In [None]:
with open('/content/drive/MyDrive/master_unipa/MOT17-09-DPM/MOT17-09-DPM-DETR07.txt', 'w') as f:
    frame_count = 0
    for detections_frame in detetions:
        # indice del frame, detections e confidences
        for i in range(len(detections_frame)):
            print(f'{frame_count}, {detections_frame[i][0]}, {detections_frame[i][1]}', file=f)
        frame_count += 1
        # for conf, detect in zip(confidences, detections):
        #     print(f'{detect},{conf}', file=f)

In [None]:
len(detetions)

525

In [None]:
# def process_image_folder_only_detection(detections_preloaded, folder_path, frame_size=(640, 360), detection_interval=2, frame_limit_flag = False, limit=5):
#     tracker = Tracker()
#     frame_files = sorted([os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith(('.png', '.jpg', '.jpeg'))])
#     frame_count = 0

#     detections_frame = []

#     for frame_file in frame_files:
#         if frame_limit_flag and frame_count > limit:
#             break
#         frame = Image.open(frame_file)

#         if frame_count % detection_interval == 0:
#             # confidences, detections = detect_pedestrians(im=frame, model=model)
#             confidences, detections = detections_preloaded[frame_count]
#         tracker.update_tracker(confidences, detections, frame, vgg16, theshold_det_track = 0.4, theshold_reid=0.4)

#         actual_detections = [] # solo per print

#         for track in tracker.trackers:
#             if track['lost'] == 0:
#                 x1, y1, x2, y2 = map(int, track['bbox'])
#                 x = x1
#                 y = y1
#                 w = x2-x1
#                 h = y2-y1
#                 conf = track['conf']
#                 # poi format_detection deve essere stampato in un file
#                 format_detectetion = [frame_count, track['id'], x,y,w,h, track['conf'],-1,-1,-1]
#                 print(format_detectetion)
#                 actual_detections.append(format_detectetion) # solo per printing
#                 detections_frame.append(format_detectetion)
#         print(f'Frame: {frame_count}')
#         # plot_image_w_detections(frame, actual_detections)
#         frame_count += 1
#     return detections_frame

In [None]:
# detected_to_file_2 = process_image_folder_only_detection()

In [None]:
# with open('/content/drive/MyDrive/master_unipa/risultati.txt', 'w') as f:
#     for d in detected_to_file:
#         frame,id,x,y,w,h,conf,_,_,_ = d
#         print(f'{frame+1},{id},{x},{y},{w},{h},{conf},-1,-1,-1', file=f)

In [None]:
# upload
path_data = '/content/drive/MyDrive/master_unipa/MOT17-13-DPM/MOT17-13-DPM-DETR06.txt'
data_preloaded = []
with open(path_data, 'r') as f:
    for row in f:
        frame, bboxes, confidences = row.split(',')
        frame = int(frame)
        x1, x2, y1, y2 = bboxes.strip('[] ').split()
        x1 = float(x1)
        x2 = float(x2)
        y1 = float(y1)
        y2 = float(y2)
        conf = float(confidences.strip())
        detection_frame = [frame,[x1,x2,y1,y2], conf]
        data_preloaded.append(detection_frame)

In [None]:
# data_preloaded

In [None]:
final_detection = {}
for data in data_preloaded:
    if data[0] not in final_detection.keys():
        final_detection[data[0]] = []
    final_detection[data[0]].append([data[1], data[2]])

In [None]:
len(final_detection)

In [None]:
detections_frame_1 = final_detection[1]
detections_frame_1