In [6]:
import cv2
import math
import numpy as np
from deep_sort_realtime.deepsort_tracker import DeepSort

In [7]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.transforms import functional as F

In [8]:
import torch
print(torch.cuda.is_available())
print(torch.cuda.get_device_name(0))

True
NVIDIA GeForce RTX 2050


In [9]:
model_path = r"C:\Users\Rishabh Surana\Desktop\ATMS project\Custom model for Object Detection\CNN_obj_Traffic_detector_4.pth"
video_path = r"C:\Users\Rishabh Surana\Desktop\ATMS project\test_data\test_video\video 2.avi"

In [10]:
def get_model(num_classes):
    model = fasterrcnn_resnet50_fpn(pretrained=False)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    return model
num_classes = 22
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


model = get_model(num_classes)
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)
model.eval()
    

FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=1e-05)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=1e-05)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=1e-05)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=1e-05)
          (relu

In [11]:
tracker = DeepSort(max_age=30)

  import pkg_resources


In [12]:
cap = cv2.VideoCapture(video_path)

In [13]:
if not cap.isOpened():
    print("Error : Could not open video")
    exit()
else:
    print("Video is opened and working")

Video is opened and working


In [14]:
meters_per_pixel = 5/50
print(meters_per_pixel)

0.1


In [15]:
def obj_Detection(results, detections):
    for box in results.boxes:
        x1, y1, x2, y2 = map(int,box.xyxy[0])
        class_id = int(box.cls[0])
        class_name = results.names[class_id]
        confidence = float(box.conf[0])
        
        detections.append(([x1,y1,x2-x1,y2-y1], confidence, class_name))

In [16]:
stall_cars = []
def stall_movement_fn(car_coordinates, car_id):
    def eucledian(base_coord, pt):
        pt1 = (base_coord[0] - pt[0])**2
        pt2 = (base_coord[1] - pt[1])**2
        dist = math.sqrt(pt1+pt2)
        return dist

    coords = car_coordinates[car_id]
    stalled = True
    if(len(coords) > 10):
        recent_coords = coords[-10:]
        base_coord = recent_coords[0]
        for pt in recent_coords[1:]:
            distance = eucledian(base_coord, pt)
            if distance >= 5: #pixel_threshold
                stalled = False
                break
    else:
        stalled = False
        
    return stalled
    
    

In [17]:
def alert(car_id, reason):
    if(reason=="Opposite_direction"):
        print(f"{car_id} is moving in Opposite Direction.")
    elif(reason=="Stalled"):
        print(f"{car_id} is stalled.")
    elif(reason=="Overspeed"):
        print(f"{car_id} is Overspeeding.")
    else:
        print(f"{car_id} is ok")

In [18]:
opposite_cars = []
def detect_car_movement(car_coordinates, expected = [0,-1], threshold = 0.4):
    
    def normalize_vector(v):
        mag = math.sqrt(v[0]**2 + v[1]**2)
        if(mag!=0):
            return [v[0]/mag, v[1]/mag]
        else:
            return [0,0]
        
    def cosine_similarity(movment_vec, expected):
        dot_num = (movment_vec[0] * expected[0]) + (movment_vec[1] * expected[1])
        mag1 = math.sqrt((movment_vec[0]**2)+(movment_vec[1]**2))
        mag2 = math.sqrt((expected[0]**2)+(expected[1]**2))
        if(mag1 and mag2):
            return dot_num / (mag1 * mag2)
        else:
            return 1
    
    for car_id, coords in car_coordinates.items():
        
        if(len(coords)<2):
            continue
        
        x1, y1 = coords[-2] 
        x2, y2 = coords[-1]
        
        if(x2-x1 == 0 and y2-y1 == 0):
            if(stall_movement_fn(car_coordinates, car_id)):
                if(car_id not in stall_cars):
                    stall_cars.append(car_id)
                    alert(car_id,"Stalled")
        movment = [x2-x1, y2-y1]
        movement_unit = normalize_vector(movment)
        similarity = cosine_similarity(movement_unit, expected)
        
        
        if(similarity < -threshold):
            if(car_id not in opposite_cars):
                alert(car_id,"Opposite_direction")
                opposite_cars.append(car_id)
        

In [19]:
def calculate_speed(car_coordinates, fps, meters_per_pixel):
    speeds = []
    for i in range(1, len(car_coordinates)):
        x1, y1 = car_coordinates[i-1]
        x2, y2 = car_coordinates[i]
        pixel_dist = math.hypot(x2 - x1, y2 - y1)
        distance_m = pixel_dist * meters_per_pixel
        time_s = 1 / fps
        speed_mps = distance_m / time_s
        speed_kmph = speed_mps * 3.6
        speeds.append(speed_kmph)
    if speeds:
        return sum(speeds)/len(speeds)
    else:
        return 0.0

In [20]:
car_coordinates = {}
def car_coords_fn(track_id, x1, y1, x2, y2):
    center_x = int((x1 + x2)/2)    
    center_y = int((y1 + y2)/2)
    if track_id not in car_coordinates:
        car_coordinates[track_id] = []
    if len(car_coordinates[track_id]) >= 20:
        car_coordinates[track_id].pop(0)
    car_coordinates[track_id].append([center_x,center_y])
    detect_car_movement(car_coordinates)

In [21]:
overspeed = {}
def draw_Tracks(tracks,frame):
    for track in tracks:
        if not track.is_confirmed():
            continue
        track_id = track.track_id
        ltrb = track.to_ltrb()
        x1, y1, x2, y2 = map(int, ltrb)
        class_name = track.get_det_class()
        
        car_coords_fn(track_id, x1, y1, x2, y2)
        speed = calculate_speed(car_coordinates[track_id], fps = 30, meters_per_pixel=meters_per_pixel)
        if(speed > 60 and track_id not in overspeed):
            alert(track_id,"Overspeed")
            overspeed[track_id] = speed
            
        label = f"ID {track_id} | {class_name} | {speed:.1f} km/h"
        
        if track_id in stall_cars:
            color = (0,255,255)
        elif track_id in opposite_cars:
            color = (0,0,255)
        elif speed > 60:
            color = (0,165,255)
        else:
            color = (0,255,0)
        
        
        cv2.rectangle(frame, (x1,y1), (x2,y2), color, 1)
        cv2.putText(frame, label, (x1, y1 - 10,), cv2.FONT_HERSHEY_SIMPLEX, 0.4, color, 1)
        

In [22]:
def predict_custom_model(model, frame, threshold = 0.5):
    image_resized = cv2.resize(frame, (512,512))
    image_tensor = F.to_tensor(image_resized).to(device)
    
    with torch.no_grad():
        outputs = model([image_tensor])[0]
        
    keep = outputs['scores'] > threshold
    boxes = outputs['boxes'][keep].cpu()
    labels=outputs['labels'][keep].cpu()
    scores=outputs['scores'][keep].cpu()
    
    detections = []
    scale_x = frame.shape[1]/512
    scale_y = frame.shape[0]/512
    
    for box, score, label in zip(boxes, scores, labels):
        x1, y1, x2, y2 = box
        x1, y1, x2, y2 = int(x1*scale_x), int(y1*scale_y), int(x2*scale_x), int(y2*scale_y)
        w,h = x2 - x1, y2 - y1
        detections.append(([x1,y1,w,h], float(score), label))
    return detections

In [23]:
while True:
   ret, frame = cap.read()
   if not ret:
       break
   detections = predict_custom_model(model, frame, threshold=0.6)
   tracks = tracker.update_tracks(detections, frame = frame)
   
   draw_Tracks(tracks,frame)
   
   cv2.imshow("Output", frame)
   if(cv2.waitKey(1) and 0xFF == ord('q')):
        break

cap.release()
cv2.destroyAllWindows()
   
   

14 is Overspeeding.
17 is Overspeeding.
4 is stalled.
7 is stalled.
25 is moving in Opposite Direction.
29 is moving in Opposite Direction.
22 is stalled.
25 is stalled.
29 is stalled.
41 is Overspeeding.
17 is moving in Opposite Direction.
27 is moving in Opposite Direction.


In [24]:
total_cars = len(car_coordinates)
print(f"Total Cars = {total_cars}")
print("--"*130)

for track_id, coords in car_coordinates.items():
    print(f"\nCar Id {track_id} -> {coords}")
    speed = calculate_speed(coords, fps = 30, meters_per_pixel=meters_per_pixel)
    print(f"Speed of car Id {track_id} : {speed:.2f} km/h.")

Total Cars = 14
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Car Id 1 -> [[172, 110], [171, 109], [171, 107], [171, 106], [170, 104], [169, 103], [169, 101], [168, 99], [167, 98], [155, 95], [153, 93], [152, 91], [151, 90], [149, 88], [148, 86], [147, 85], [146, 83], [144, 81], [143, 80], [141, 78]]
Speed of car Id 1 : 27.48 km/h.

Car Id 4 -> [[253, 121], [253, 121], [253, 121], [252, 120], [252, 120], [252, 119], [252, 119], [251, 119], [251, 118], [251, 118], [250, 118], [250, 118], [250, 118], [250, 118], [250, 117], [249, 117], [249, 117], [248, 117], [249, 117], [248, 116]]
Speed of car Id 4 : 6.16 km/h.

Car Id 7 -> [[224, 116], [223, 116], [223, 115], [223, 114], [222, 113], [222, 112], [221, 112], [220, 111], [220, 110], [219, 109], [221, 109], [220, 109], [220, 1

In [25]:
print("\n---Stalled Vehicle---\n")
for s_car in stall_cars:
    print(f"Car Id {s_car} Stalled.")
print("\n\n---Opposite Vehicle---\n")
for op_car in opposite_cars:
    print(f"Car Id {op_car} is moving in opposite direction.")
print("\n\n---Overspeed Vehicle---\n")
for overspd in overspeed:
    print(f"Car Id {overspd} is overspeeding.")


---Stalled Vehicle---

Car Id 4 Stalled.
Car Id 7 Stalled.
Car Id 22 Stalled.
Car Id 25 Stalled.
Car Id 29 Stalled.


---Opposite Vehicle---

Car Id 25 is moving in opposite direction.
Car Id 29 is moving in opposite direction.
Car Id 17 is moving in opposite direction.
Car Id 27 is moving in opposite direction.


---Overspeed Vehicle---

Car Id 14 is overspeeding.
Car Id 17 is overspeeding.
Car Id 41 is overspeeding.
