In [1]:
import torch
import torch.nn as nn
import cv2
import csv
import json
import torchvision.models as models
import numpy as np
import pandas as pd
import statistics
import threading
# from object_detection import ObjectDetection
import math
import subprocess
from tqdm import tqdm
from torchvision import transforms
from object_tracking import *
# import maths

In [3]:
model = torch.hub.load('ultralytics/yolov5', 'yolov5m', pretrained=True)
model.eval();

class Image_Classifier(nn.Module):
    def init(self):
        super().init()
        self.model = nn.Sequential(
             Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1)),
             ReLU(),
             Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1)),
             ReLU(),
             MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False),
             Flatten(start_dim=1, end_dim=-1),
             Dropout(p=0.25, inplace=False),
             Linear(in_features=6272, out_features=132, bias=True),
             ReLU(),
             Dropout(p=0.5, inplace=False),
             Linear(in_features=132, out_features=11, bias=True),
        )

    def forward(self, x):
        return self.model(x)

color_classifier = torch.load("2layers_colour_model.pt")
color_classifier = color_classifier.cuda()
color_classifier.eval()

body_classifier = torch.load('model.pt')

Using cache found in C:\Users\bedo-/.cache\torch\hub\ultralytics_yolov5_master
YOLOv5  2023-4-18 Python-3.9.16 torch-2.0.0 CUDA:0 (NVIDIA GeForce GTX 1660, 6144MiB)

Fusing layers... 


[31m[1mrequirements:[0m C:\Users\bedo-\.cache\torch\hub\requirements.txt not found, check failed.


YOLOv5m summary: 290 layers, 21172173 parameters, 0 gradients
Adding AutoShape... 


In [5]:
if __name__ == '__main__':
    video_path = 'light traffic.mp4'
    output_path = 'track_test.mp4'
    cap = cv2.VideoCapture(video_path)

    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    ffmpeg_cmd = f"ffmpeg -y -f rawvideo -pix_fmt bgr24 -s {frame_width}x{frame_height} -r {fps} -i - -c:v libx264 -preset fast -crf 30 -pix_fmt nv12 -an -vcodec libx264 {output_path}"

    output_file = subprocess.Popen(ffmpeg_cmd.split(' '), stdin=subprocess.PIPE)
    
    mot_tracker = Sort(max_age=30, min_hits=60) 

    object_dict = {}

    frame_cut = 0
    frame_count = 0
    clf_state = False
    color_thread = threading.Thread(target=color_classifier)
    color_thread.start()

    body_thread = threading.Thread(target=body_classifier)
    body_thread.start()
    while True:
        ret, frame = cap.read()
        if not ret:
            print('Video processing completed')
            break

        frame_model = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        results = model(frame_model[frame_cut:])

        track_bbs_ids = mot_tracker.update(results.xyxy[0][:,:4].cpu())
        
        
        for x1, y1, x2, y2, obj_id in track_bbs_ids:
            cx1 = int((x1 + x2) / 2)
            cy1 = int((y1 + y2) / 2)
            width = abs(x2 - x1)
            height = abs(y2 - y1)
            diagonal = math.sqrt(width**2 + height**2)
            
            if diagonal > 60 :
                try:
                    car_image = frame_model[int(y1):int(y2), int(x1):int(x2)]
                    transform = transforms.Compose([transforms.ToTensor(),
                                                    transforms.Resize((32,32)),])
                    car_image = transform(car_image)
                    car_image = car_image.cuda()
                except:
                    print('no object detected')
                    continue
                with torch.no_grad():
                    color_output = color_classifier(car_image.unsqueeze(0))
                    color_prediction = torch.argmax(color_output).item()
                    color_name = ['black','blue','brown','green','grey','orange','pink','purple','red','white','yellow']
                    color_class_name = color_name[color_prediction]

                try:
                    body_car_image = frame_model[int(y1):int(y2), int(x1):int(x2)]
                    transform = transforms.Compose([transforms.ToTensor(),
                                                    transforms.Resize((256,256)),])
                    body_car_image = transform(body_car_image)
                    body_car_image = body_car_image.cuda()
                except:
                    print('no object detected')
                    continue

                with torch.no_grad():
                    
                    body_output = body_classifier(body_car_image.unsqueeze(0))
                    body_prediction = torch.argmax(body_output).item()
                    body_name = ['Heavy-Duty', 'Lorry', 'Luxury', 'Pickup', 'SUV', 'Sedan', 'Van']
                    body_class_name = body_name[body_prediction]
                    clf_state = True

            
                if obj_id not in object_dict:
                    object_dict[obj_id] = {
                        'bboxes': [(frame_count, x1, y1, x2, y2)],
                        'last_seen_frame': 0,
                        'color_classifier_preds': [color_prediction],
                        'body_classifier_preds': [body_prediction]
                    }
                else:
                    object_dict[obj_id]['bboxes'].append((frame_count, x1, y1, x2, y2))
                    object_dict[obj_id]['color_classifier_preds'].append(color_prediction)
                    object_dict[obj_id]['body_classifier_preds'].append(body_prediction)

                object_dict[obj_id]['last_seen_frame'] = frame_count

                # Calculate the mode prediction of the classifier for the tracked object
                color_mode_pred = statistics.mode(object_dict[obj_id]['color_classifier_preds'])
                object_dict[obj_id]['color_mode_pred'] = color_mode_pred

                body_mode_pred = statistics.mode(object_dict[obj_id]['body_classifier_preds'])
                object_dict[obj_id]['body_mode_pred'] = body_mode_pred

            cv2.putText(frame, str(obj_id), (cx1, cy1), 0, 0.5, (255, 255, 255), 2)
            if clf_state == True:
                cv2.putText(frame, color_name[color_mode_pred], (int(x1), int(y1)), cv2.FONT_HERSHEY_SIMPLEX, 1, (203, 192, 255), 2)
                cv2.putText(frame,  body_name[body_mode_pred], (int(cx1), int(y2)), cv2.FONT_HERSHEY_SIMPLEX, 1, (203, 192, 255), 2)

        output_file.stdin.write(frame.tobytes())

        for obj_id, obj_data in list(object_dict.items()):
            if frame_count - obj_data['last_seen_frame'] > mot_tracker.max_age:
                object_dict.pop(obj_id)

        frame_count += 1
    color_thread.join()
    body_thread.join()
    cap.release()
    output_file.stdin.close()
    output_file.wait()
    
    with open("object_tracks.json", "w") as f:
        json.dump(object_dict, f, indent=4)
        

Exception in thread Thread-7:
Traceback (most recent call last):
  File "C:\Users\bedo-\anaconda3\envs\pytorch2.0\lib\threading.py", line 980, in _bootstrap_inner
    self.run()
  File "C:\Users\bedo-\anaconda3\envs\pytorch2.0\lib\threading.py", line 917, in run
Exception in thread Thread-8:
Traceback (most recent call last):
  File "C:\Users\bedo-\anaconda3\envs\pytorch2.0\lib\threading.py", line 980, in _bootstrap_inner
    self._target(*self._args, **self._kwargs)
  File "C:\Users\bedo-\anaconda3\envs\pytorch2.0\lib\site-packages\torch\nn\modules\module.py", line 1501, in _call_impl
    self.run()
  File "C:\Users\bedo-\anaconda3\envs\pytorch2.0\lib\threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\bedo-\anaconda3\envs\pytorch2.0\lib\site-packages\torch\nn\modules\module.py", line 1501, in _call_impl
    return forward_call(*args, **kwargs)
TypeError: forward() missing 1 required positional argument: 'x'
    return forward_call(*args, **k

no object detected
no object detected
no object detected
no object detected
no object detected
no object detected
no object detected
no object detected
no object detected
no object detected
no object detected
no object detected
Video processing completed


In [None]:
results.xyxy[0][:,:4]

In [None]:
for x1,y1,x2,y2, obj_id in track_bbs_ids:
    print(x1,y1,x2,y2, obj_id)

In [None]:
import torchvision.transforms as T
from PIL import Image
transform = T.ToPILImage()
img = transform(car_image)
img.show()
output=classifier(car_image.unsqueeze(0))
prediction = torch.argmax(output).item()
print(prediction)