In [1]:
import json
import numpy as np
from math import isnan

In [82]:
def detect_turn_left(track_coordinates):
    if len(track_coordinates) < 3:
        return "Not enough data to determine"

    angles = []
    for i in range(1, len(track_coordinates) - 1):
        vector1 = track_coordinates[i] - track_coordinates[i - 1]
        vector2 = track_coordinates[i + 1] - track_coordinates[i]

        # Calculate the angle between the vectors
        angle = np.arccos(np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2)))

        if isnan(angle) == False:
            # Convert angle to degrees
            angle_deg = np.degrees(angle)
            if isnan(angle_deg) == False:
                angles.append(angle_deg)

    # Calculate the average angle
    avg_angle = np.mean(angles)


    # Determine if the car is turning or going straight
    if avg_angle >= 45 and avg_angle <= 135:
        return True
    else:
        return False


In [18]:
def detect_hard_brake(track_coordinates, time = 0.1):
    if len(track_coordinates) < 40:
        return False

    speeds = []
    for i in range(40, len(track_coordinates)):
        zeroIdx = i - 5
        brakeIdx = zeroIdx - 30
        highIdx = brakeIdx - 5
        distance_zero = np.linalg.norm(track_coordinates[i] - track_coordinates[zeroIdx])
        distance_high = np.linalg.norm(track_coordinates[highIdx] - track_coordinates[brakeIdx])

        if distance_zero < 30 and distance_high > 200:
            return True

    return False

In [26]:
def process(data, frame, track_num):
    res = []
    while frame < len(data):
        frame_info = data[frame]
        if frame_info == None:
            frame += 1
            continue
        flag = False
        for track_info in frame_info:
            if track_info['track_id'] == track_num:
                flag = True
                break
        if flag == False:
            break
        centroid = [0,0]
        centroid[0] = (track_info['left'] + track_info['right'])/2
        centroid[1] = (track_info['top'] + track_info['bottom'])/2
        res.append(centroid)
        frame += 1
    return np.array(res)
        

In [5]:
def event_detect(data, start_frame, track_num, event):
    # get the numpy array of all the coordinate sequences
    sequence = process(data, start_frame, track_num)

    # run left detection
    res = False
    if event == "turn-left":
        res = detect_turn_left(sequence)
    elif event == "hard-brake":
        res = detect_hard_brake(sequence)
    return res

In [10]:
from tqdm import tqdm
def run_check(file_path, event = "turn-left"):
    seen_track = []
    event_occurance = 0
    with open(file_path, 'r') as file:
        data = json.load(file)
    for i in tqdm(range(len(data))):
        frame = data[i]
        if frame == None:
            continue
        for track in frame:
            if track['track_id'] not in seen_track:
                if event_detect(data, i, track['track_id'], event):
                    event_occurance += 1
                seen_track.append(track['track_id'])
    print(event_occurance)
    return event_occurance, len(data)
    

In [114]:
event = "turn-left"
prefix = "warsaw/"
for i in range(6):
    file_path = prefix+str(i)+"-baseline.json"
    occurance, frame_num = run_check(file_path, event)
    print("For", event, "event, file ",i, "has average event occurance per frame of ", occurance/frame_num)

  angle = np.arccos(np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2)))
  angle = np.arccos(np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2)))
100%|███████████████████████████████████| 31000/31000 [00:03<00:00, 8788.95it/s]


787
For turn-left event, file  0 has average event occurance per frame of  0.025387096774193547


100%|███████████████████████████████████| 31000/31000 [00:05<00:00, 6134.19it/s]


951
For turn-left event, file  1 has average event occurance per frame of  0.03067741935483871


100%|███████████████████████████████████| 31000/31000 [00:08<00:00, 3718.42it/s]


1307
For turn-left event, file  2 has average event occurance per frame of  0.04216129032258065


100%|███████████████████████████████████| 31000/31000 [00:11<00:00, 2608.70it/s]


1814
For turn-left event, file  3 has average event occurance per frame of  0.058516129032258064


100%|███████████████████████████████████| 31000/31000 [00:14<00:00, 2089.27it/s]


2182
For turn-left event, file  4 has average event occurance per frame of  0.07038709677419355


100%|███████████████████████████████████| 25037/25037 [00:08<00:00, 2958.92it/s]

1619
For turn-left event, file  5 has average event occurance per frame of  0.0646642968406758





**BDD Dataset**

In [49]:
def process_bdd(data, frame, track_num):
    res = []
    while frame < len(data):
        frame_info = data[frame]["labels"]
        if frame_info == None:
            frame += 1
            continue
        flag = False
        for track_info in frame_info:
            if track_info["id"] == track_num:
                flag = True
                break
        if flag == False:
            break
        centroid = [0,0]
        box_info = track_info["box2d"]
        centroid[0] = (box_info['x1'] + box_info['x2'])/2
        centroid[1] = (box_info['y1'] + box_info['y2'])/2
        res.append(centroid)
        frame += 1

    return np.array(res)
    

In [81]:
import glob
from tqdm import tqdm

# return [{"car": [], "pedestrian":[]}, {}, {}]
all_tracks = []
total_frames = 0

for filepath in glob.iglob('bdd/val/*.json'):
    res = {"car": [], "pedestrian":[], "car_start": [], "pedestrian_start": []}
    with open(file_path, 'r') as file:
        frames = json.load(file)
        total_frames += len(frames)
        seen_id = []
        for frame_idx in range(len(frames)):
            frame = frames[frame_idx]
            objects = frame["labels"]
            for object in objects:
                if (object["category"] == "car" or object["category"] == "pedestrian") and (object["id"] not in seen_id):
                    seen_id.append(object["id"])
                    res[object["category"]].append(process_bdd(frames, frame_idx, object["id"]))
                    res[object["category"]+"_start"].append(frame_idx)
    all_tracks.append(res)
                    
            
total_frames

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



40400

In [110]:
cars = []
for video in all_tracks:
    cars.extend(video["car"])

event_occurance = 0
for car_track in cars:
    if detect_turn_left(car_track):
        event_occurance += 1

event_occurance/total_frames

  angle = np.arccos(np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2)))


0.10891089108910891

In [111]:
def detect_hard_brake_bdd(track_coordinates, time = 0.1):
    if len(track_coordinates) < 15:
        return False

    speeds = []
    for i in range(40, len(track_coordinates)):
        zeroIdx = i - 5
        brakeIdx = zeroIdx - 5
        highIdx = brakeIdx - 5
        distance_zero = np.linalg.norm(track_coordinates[i] - track_coordinates[zeroIdx])
        distance_high = np.linalg.norm(track_coordinates[highIdx] - track_coordinates[brakeIdx])

        if distance_zero < 10 and distance_high > 50:
            return True

    return False

In [80]:
event_occurance = 0
for car_track in cars:
    if detect_hard_brake_bdd(car_track):
        event_occurance += 1

event_occurance/total_frames

0.0049504950495049506

In [109]:
from itertools import product

event_occurance = 0

for video in all_tracks:
    start = list(product(video["car_start"], video["pedestrian_start"]))
    for i in range(len(list(product(video["car"], video["pedestrian"])))):
        pair = list(product(video["car"], video["pedestrian"]))[i]
        
        car_list = pair[0]
        pedestrian_list = pair[1]

        car_start = start[i][0]
        car_end = car_start + len(car_list)

        pedestrian_start = start[i][1]
        pedestrian_end = pedestrian_start + len(pedestrian_list)

        if car_start > pedestrian_start:
            if car_start - pedestrian_start > len(pedestrian_list):
                continue
            pedestrian_list = pedestrian_list[car_start - pedestrian_start:]
        else:
            if pedestrian_start - car_start > len(car_list):
                continue
            car_list = car_list[pedestrian_start - car_start:]

        if car_end < pedestrian_end:
            if pedestrian_end - car_end > len(pedestrian_list):
                continue
            pedestrian_list = pedestrian_list[:-(pedestrian_end - car_end)]
        else:
            if car_end - pedestrian_end > len(car_list):
                continue
            car_list = car_list[:-(car_end - pedestrian_end)]
        if len(car_list) == 0 or len(pedestrian_list) == 0:
            continue
        assert len(car_list) == len(pedestrian_list)

        if detect_pedestrian_car(pedestrian_list, car_list):
            event_occurance += 1
event_occurance/total_frames          

  distance = np.average(np.sum(np.linalg.norm(i) for i in difference_coordinates))
  angle = np.arccos(np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2)))


0.25742574257425743

In [108]:
def detect_pedestrian_car(pedestrian, car, distance_threshold = 5000):

    difference_coordinates = car - pedestrian
    distance = np.average(np.sum(np.linalg.norm(i) for i in difference_coordinates))
    if distance >= distance_threshold:
        return False
    angles = []
    for i in range(1, len(difference_coordinates) - 1):
        vector1 = difference_coordinates[i] - difference_coordinates[i - 1]
        vector2 = difference_coordinates[i + 1] - difference_coordinates[i]

        # Calculate the angle between the vectors
        angle = np.arccos(np.dot(vector1, vector2) / (np.linalg.norm(vector1) * np.linalg.norm(vector2)))

        if isnan(angle) == False:
            # Convert angle to degrees
            angle_deg = np.degrees(angle)
            if isnan(angle_deg) == False:
                angles.append(angle_deg)

    # Calculate the average angle
    avg_angle = np.mean(angles)


    # Determine if the car is turning or going straight
    if avg_angle >= 30 and avg_angle <= 150:
        return True
    else:
        return False