In [10]:
from os import listdir
from scipy.spatial import distance as dist
from collections import OrderedDict
import numpy as np
import cv2
from natsort import natsorted, ns
import os

In [25]:
class CentroidTracker():
    #CREDIT FOR https://www.pyimagesearch.com/2018/07/23/simple-object-tracking-with-opencv/
    def __init__(self, maxDisappeared = 5):
        # initialize the next unique object ID along with two ordered
        # dictionaries used to keep track of mapping a given object
        # ID to its centroid and number of consecutive frames it has
        # been marked as "disappeared", respectively
        self.nextObjectID = 0
        self.objects = OrderedDict()
        self.disappeared = OrderedDict()

        # store the number of maximum consecutive frames a given
        # object is allowed to be marked as "disappeared" until we
        # need to deregister the object from tracking
        self.maxDisappeared = maxDisappeared

    def register(self, centroid):
        # when registering an object we use the next available object
        # ID to store the centroid
        self.objects[self.nextObjectID] = centroid
        self.disappeared[self.nextObjectID] = 0
        self.nextObjectID += 1

    def deregister(self, objectID):
        # to deregister an objectt ID we delete the object ID from
        # both of our respective dictionaries
        del self.objects[objectID]
        del self.disappeared[objectID]


    def update(self, rects):
        # check to see if the list of input bounding box rectangles
        # is empty
        if len(rects) == 0:
            # loop over any existing tracked objects and mark them
            # as disappeared
            for objectID in list(self.disappeared.keys()):
                self.disappeared[objectID] += 1

                # if we have reached a maximum number of consecutive
                # frames where a given object has been marked as
                # missing, deregister it
                if self.disappeared[objectID] > self.maxDisappeared:
                    self.deregister(objectID)

            # return early as there are no centroids or tracking info
            # to update
            return self.objects

        # initialize an array of input centroids for the current frame
        inputCentroids = np.zeros((len(rects), 2), dtype="float")

        # loop over the bounding box rectangles
        for (i, (startX, startY, endX, endY)) in enumerate(rects):
            # use the bounding box coordinates to derive the centroid
            cX = (startX + endX) / 2.0
            cY = (startY + endY) / 2.0
            inputCentroids[i] = (cX, cY)

        # if we are currently not tracking any objects take the input
        # centeroids and register each of them
        if len(self.objects) == 0:
            for i in range(0, len(inputCentroids)):
                self.register(inputCentroids[i])


        # otherwise, are are currently tracking objects so we need to
        # try to match the input centroids to existing object
        # centroids
        else:
            # grab the set of object IDs and corresponding centroids
            objectIDs = list(self.objects.keys())
            objectCentroids = list(self.objects.values())

            # compute the distance between each pair of object
            # centroids and input centroids, respectively -- our
            # goal will be to match an input centroid to an existing
            # object centroid
            D = dist.cdist(np.array(objectCentroids), inputCentroids)

            # in order to perform this matching we must (1) find the
            # smallest value in each row and then (2) sort the row
            # indexes based on their minimum values so that the row
            # with the smallest value is at the *front* of the index
            # list
            rows = D.min(axis=1).argsort()

            # next, we perform a similar process on the columns by
            # finding the smallest value in each column and then
            # sorting using the previously computed row index list
            cols = D.argmin(axis=1)[rows]


            # in order to determine if we need to update, register,
            # or deregister an object we need to keep track of which
            # of the rows and column indexes we have already examined
            usedRows = set()
            usedCols = set()

            # loop over the combination of the (row, column) index
            # tuples
            for (row, col) in zip(rows, cols):
                # if we have already examined either the row or
                # column value before, ignore it
                # val
                if row in usedRows or col in usedCols:
                    continue

                # otherwise, grab the object ID for the current row,
                # set its new centroid, and reset the disappeared
                # counter

                # _______________________________________
                # NEEDS TO BE DEBUGGED: USE THE AUCTION ALGORITHM INSTEAD IN THE SIMILAR WAY TO THE PRIVACY PAPER
                # NEEDS TO BE DEBUGGED: also need to check if the distance is less than an acceptable distance

                objectID = objectIDs[row]
                self.objects[objectID] = inputCentroids[col]
                self.disappeared[objectID] = 0


                # indicate that we have examined each of the row and
                # column indexes, respectively
                usedRows.add(row)
                usedCols.add(col)
                # _______________________________________

            # compute both the row and column index we have NOT yet examined
            unusedRows = set(range(0, D.shape[0])).difference(usedRows)
            unusedCols = set(range(0, D.shape[1])).difference(usedCols)

            # in the event that the number of object centroids is
            # equal or greater than the number of input centroids
            # we need to check and see if some of these objects have
            # potentially disappeared
            if D.shape[0] >= D.shape[1]:
                # loop over the unused row indexes
                for row in unusedRows:
                    # grab the object ID for the corresponding row
                    # index and increment the disappeared counter
                    objectID = objectIDs[row]
                    self.disappeared[objectID] += 1

                    # check to see if the number of consecutive
                    # frames the object has been marked "disappeared"
                    # for warrants deregistering the object
                    if self.disappeared[objectID] > self.maxDisappeared:
                        self.deregister(objectID)

            # otherwise, if the number of input centroids is greater
            # than the number of existing object centroids we need to
            # register each new input centroid as a trackable object
            else:
                for col in unusedCols:
                    self.register(inputCentroids[col])

        # return the set of trackable objects
        return self.objects

def readbboxFile(file, confidenceLevel = 0.1, minDistance = 5):
    txts = open(file).readlines()

    personbbox = []
    carbbox = []
    truckbbox = []
    trafficLightbbox = []
    stopSignbbox = []

    allbbox = []

    for txt in txts:
        bbox = BBox(txt)
#         # first: evaluate the confidence of the detected object       
#         if bbox.center[1] > maxCenterYvalue and bbox.center[0] < maxCenterYvalue_xCheck and  bbox.confidence < 0.8:
#             print(name)
#             print(bbox.rect)
#             continue
            
#         if bbox.rect[3] < minimumYvalue:
# #             print(name)
# #             print(bbox.rect)
#             continue
            
        if bbox.confidence >= confidenceLevel:
            # second: evaluate if the object is not detected twice with two classes
            if len(allbbox) > 0:
                distances = dist.cdist([bbox.center], [b.center for b in allbbox])[0]
                if min(distances) < minDistance:
                    if allbbox[np.argmin(distances)].confidence < bbox.confidence:
                        allbbox[np.argmin(distances)] = bbox
                    else:
                        continue
                else:
                    allbbox.append(bbox)
            else:
                allbbox.append(bbox)

    for b in allbbox:
        if b.type == 'person':
            personbbox.append(b)
        elif b.type == 'car':
            carbbox.append(b)
        elif b.type == 'truck':
            truckbbox.append(b)
        elif b.type == 'trafficLight':
            trafficLightbbox.append(b)

        elif b.type == 'stopSign':
            stopSignbbox.append(b)


    return personbbox, carbbox, truckbbox, trafficLightbbox, stopSignbbox, allbbox


In [26]:
class BBox():
    def __init__(self, txt):
    #     if txt.split(' ')[0] == 'N/A':
    #         self.confidence = 0.01
    #     else:
    #         self.confidence = float(txt.split(' ')[1])
    #         self.type = txt.split(' ')[0]
    # #         self.rect = (int(txt.split(' ')[2]), int(txt.split(' ')[3]), int(txt.split(' ')[4]), int(txt.split(' ')[5]))
    #         self.rect = (float(txt.split(' ')[3]), float(txt.split(' ')[2]), float(txt.split(' ')[5]), float(txt.split(' ')[4]))
    #         self.center = (float((self.rect[0]+self.rect[2])/2), float((self.rect[1]+self.rect[3])/2))

        self.confidence = float(txt.split(' ')[1])
        self.type = txt.split(' ')[0]
#         self.rect = (int(txt.split(' ')[2]), int(txt.split(' ')[3]), int(txt.split(' ')[4]), int(txt.split(' ')[5]))
        self.rect = (float(txt.split(' ')[3]), float(txt.split(' ')[2]), float(txt.split(' ')[5]), float(txt.split(' ')[4]))
        self.center = (float((self.rect[0]+self.rect[2])/2), float((self.rect[1]+self.rect[3])/2))

In [27]:
#_____________________________________
#INPUT
bboxDirectory = 'yolo_frames/'  # Detection Images Directory (Text Files)
# imageDirectory = '/media/smartctlab2/SSDA/05_27_2022_Dataset/Original_Images/H001_C005_Extracted/' # Directory of the original images with high resolution
outputDirectory = 'yolo_tracking/' # Output Directory of Tracking files
# Create the output folder if it doesn't exist
os.makedirs(outputDirectory, exist_ok=True)
#folders=[i+1 for i in range(10)]
#folders = ['Batch2_Output'] # choosing 1 folder, for later, prior line can be used to run all folders together # Predicted folder 'Batch2_Output' for now
#print("folders: ", folders)
minimumYvalue = 0 #if the y value is less than this the bounding box is ignored
maxCenterYvalue = 1000000 # if the center of the box is more than this ignore it #(Just set to a large value to not ignore anything in the image)
maxCenterYvalue_xCheck = 1000000 #(Just set to a large value to not ignore anything in the image) 
#_____________________________________

# Extract the name of the images that their bounding boxes are saved
'''
# Commented to work on Batch2 folder only
images = []
for i in folders:
    directory = bboxDirectory + str(i) + '/'
    files = listdir(directory)
    for file in files:
        if file[-4:] == '.txt':
            images.append(str(i)+'/'+file[:-4])
'''
# Specific for Batch2
images0 = [] # was modified to images0 because I have difference in naming between the original images folder and the predicted folder

directory = bboxDirectory # + str(i) + '/'
files = listdir(directory)
for file in files:
   #print('file = ', file)
   if file[-4:] == '.txt':
       images0.append(file[:-4])

print("total number of files: ", len(images0))

# images.sort(key=lambda r:int(r.split("/")[1][:-5]))
#images.sort(key=lambda r:int(r.split("/")[1][:-5][-6:])) # May need this later when having multiple folders

#images0 = images0.sort(key=alphanum_key)
images0 = natsorted(images0, alg=ns.IGNORECASE)

# for i in range(len(images0)):
#    print(images0[i])

#print(images)

#_______________________________________________
##DEBUG
#previousName = 0
#previousFile = "None"
#for image in images:
#    currentName = int(image.split("/")[1][:-5])
#    difference = currentName - previousName
#    if difference != 3:
#        print("WARNING: difference is ", difference, "(previous: ", previousName, ", current: ", currentName, ")")
#        print("current file: ", image)
#        print("previous file: ", previousFile)
#    previousName = currentName
#    previousFile = image
#_______________________________________________


# Initialize the tracker and frame dimensions:
# DEBUG: we can add multiple trackers for different classes of objects such as small, large and all vehicles
ct = CentroidTracker()
# (H, W) = (None, None)

# images = []

# directory = imageDirectory # + str(i) + '/'
# files = listdir(directory)
# for file in files:
#    #print('file = ', file)
#    if file[-4:] == '.jpg':
#        images.append(file[:-4])


# images = natsorted(images, alg=ns.IGNORECASE)

# for i in range(len(images)):
# for i in range(101):
#    print(f'images{i} = ', images[i])
#    print(f'images0{i} = ', images0[i])


counter = 0
for name in images0:

    # # read the image:
    # image = cv2.imread(imageDirectory + name + '.jpg')
    # print('image = ', imageDirectory + name + '.jpg')
    # #print('name = ', name+'.png')
    # (H, W) = image.shape[:2]
    # # print((H, W))
    

    # read the bbox files
    #smallVehiclesbbox, largeVehiclesbbox, allbbox = readbboxFile(bboxDirectory + name + '.txt')
    personbbox, carbbox, truckbbox, trafficLightbbox, stopSignbbox, allbbox = readbboxFile(bboxDirectory + images0[counter] + '.txt')

    # create bboxes list
    personRect = [b.rect for b in personbbox]
    carRect = [b.rect for b in carbbox]
    truckRect = [b.rect for b in truckbbox]
    trafficLightRect = [b.rect for b in trafficLightbbox]
    stopSignRect = [b.rect for b in stopSignbbox]
    allRect = [b.rect for b in allbbox]

    # create the centers list:
    allCenterList = [b.center for b in allbbox]

    # update the centroid tracker using the computed bounding box rectangles
    objects = ct.update(allRect)

    # create a new annotation file:
    #annotationFile = open(outputDirectory + name + '_t.txt', 'a+')
    annotationFile = open(outputDirectory + images0[counter] + '_t.txt', 'a+')

    # loop over the tracked objects
    for (objectID, centroid) in objects.items():
        # draw both the ID of the object and the centeroid of the
        # object on the output frame
        # text = "Veh. {}".format(objectID)
#         cv2.putText(image, text, (centroid[1] - 10, centroid[0] - 10),
#                     cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
#         cv2.circle(image, (centroid[1], centroid[0]), 4, (0, 255, 0), -1)
        # cv2.putText(image, text, (centroid[0] - 10, centroid[1] - 10),
        #             cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        # cv2.circle(image, (centroid[0], centroid[1]), 4, (0, 255, 0), -1)


        # create a new bbox output for annotation file:
        if tuple(centroid) in allCenterList:
            index = allCenterList.index(tuple(centroid))
            dObject = allbbox[index]
            annotationFile.write(dObject.type + ' ' + str(dObject.confidence) + ' ' + str(dObject.rect[0]) +
                                    ' ' + str(dObject.rect[1]) + ' ' + str(dObject.rect[2]) + ' ' +
                                    str(dObject.rect[3]) + ' ' + str(objectID) + '\n')
        
#     # Resize image to use smaller space
#     image = cv2.resize(image, (3000, 2000))
#     # show the output image and save it
#     #cv2.imshow("image", image)
# #     key = cv2.waitKey(1) & 0xFF
#     #cv2.imwrite(outputDirectory + name + '_t.png', image)
#     cv2.imwrite(outputDirectory + images0[counter] + '_t.png', image)

    # close the annotation file:
    annotationFile.close()
    counter += 1




total number of files:  4983
