In [1]:
import cv2
import os
import json
import numpy as np
from PIL import Image
from tqdm import tqdm
from collections import Counter


training_samples_path = "training_samples.json"
training_samples_data = json.load(open(training_samples_path))

In [2]:
data_point = training_samples_data[0]
data_point

{'video_name': '20230707_12_SN17_T1_vehicle_view',
 'video_path': '/Users/minhnam/Desktop/playground/aic2024-t2/datasets/videos/train/20230707_12_SN17_T1/vehicle_view/20230707_12_SN17_T1_vehicle_view.mp4',
 'timestamp': [0, 32],
 'speed': 10.0,
 'sentence': "The vehicle is positioned diagonally to the right in front of the pedestrian, at a close distance. The pedestrian is visible within the vehicle's field of view. The vehicle is going straight ahead at a speed of 10 km/h. Meanwhile, in the environment, there is a male pedestrian in his 30s, standing at a height of 170 cm. He is wearing a black T-shirt and black slacks. The weather is clear with bright lighting, and the road surface is dry and level. The road is a residential road with two-way traffic and does not have sidewalks on both sides. There are no roadside strips, but the street lights are on. Overall, the vehicle is in a normal traffic situation, with clear visibility of the pedestrian and suitable road conditions for its sp

In [3]:
video_path = data_point["video_path"]
speed = data_point["speed"]
start, end = data_point["timestamp"]

In [32]:
# open video and read frame by frame
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frames, idx = [], 0
while cap.isOpened():
    ret, frame = cap.read()
    if ret:
        frames.append(frame)
        idx += 1
    else:
        break
frames = frames[start:end]
cap.release()

In [7]:
def extract_optical_flow(video_path: str, start: int, end: int):
    """Source: https://opencv24-python-tutorials.readthedocs.io/en/latest/py_tutorials/py_video/py_lucas_kanade/py_lucas_kanade.html

    """
    cap = cv2.VideoCapture(video_path)
    
    ret, frame1 = cap.read()
    prvs = cv2.cvtColor(frame1,cv2.COLOR_BGR2GRAY)
    hsv = np.zeros_like(frame1)
    hsv[...,1] = 255
    
    frames, idx = [], 1
    
    while(cap.isOpened()):
        ret, frame2 = cap.read()
        if not ret or idx > end:
            break
        
        next = cv2.cvtColor(frame2,cv2.COLOR_BGR2GRAY)
        if idx >= start and idx <= end:
            flow = cv2.calcOpticalFlowFarneback(prvs,next, None, 0.5, 3, 15, 3, 5, 1.2, 0)
            # flow is in shape (h, w, 2)

            # convert flow to rgb
            mag, ang = cv2.cartToPolar(flow[...,0], flow[...,1])
            hsv[...,0] = ang*180/np.pi/2
            hsv[...,2] = cv2.normalize(mag,None,0,255,cv2.NORM_MINMAX)
            bgr = cv2.cvtColor(hsv,cv2.COLOR_HSV2BGR)
            
            # save the frame
            frames.append(bgr)
        
        prvs = next
        idx += 1
    
    cap.release()
    return frames

In [12]:
opt_flow_frames = extract_optical_flow(video_path, start, end)
os.makedirs("opt_flow_frames", exist_ok=True)
for idx, frame in enumerate(opt_flow_frames):
    cv2.imwrite(f"opt_flow_frames/{idx}.jpg", frame)

In [9]:
optical_flow_save_dir = "dataset/images"
os.makedirs(optical_flow_save_dir, exist_ok=True)
optical_flow_labels = {}

os.makedirs(optical_flow_save_dir, exist_ok=True)
for data_point in tqdm(training_samples_data):
    video_name = data_point["video_name"]
    video_path = data_point["video_path"]
    speed = int(data_point["speed"])
    start, end = data_point["timestamp"]
    event_index = data_point["event_index"]
    
    opt_flow_frames = extract_optical_flow(video_path, start, end)
    
    video_optical_flow_save_dir = os.path.join(optical_flow_save_dir, video_name)
    os.makedirs(video_optical_flow_save_dir, exist_ok=True)
    for idx, frame in enumerate(opt_flow_frames):
        filename = f"{video_name}_event{event_index}_{idx}.jpg"
        cv2.imwrite(os.path.join(video_optical_flow_save_dir, filename), frame)
        optical_flow_labels[filename] = {
            "speed": speed,
            "event_index": event_index,
            "video_name": video_name
        }

with open("./dataset/labels.json", "w") as f:
    json.dump(optical_flow_labels, f, indent=4)

100%|██████████| 490/490 [2:20:33<00:00, 17.21s/it]  


In [23]:
cnt_speeds = Counter([v["speed"] for k, v in optical_flow_labels.items()])
cnt_speeds

Counter({0.0: 13391,
         10.0: 7245,
         20.0: 6978,
         5.0: 3732,
         30.0: 1924,
         15.0: 782,
         25.0: 136})

In [25]:
import torch

class SpeedPredictionDataset(torch.utils.data.Dataset):
    def __init__(self, images_dir, labels_path, transform=None):
        self.transform = transform
        
        self.image_paths = []
        self.labels = []
        labels_data = json.load(open(labels_path))
        for image_filename, data in labels_data.items():
            speed = data['speed']
            self.labels.append(speed)

            video_name = data['video_name']
            image_path = os.path.join(images_dir, video_name, image_filename)
            self.image_paths.append(image_path)
        
        label_set = set(self.labels)
        self.label_to_idx = {label: idx for idx, label in enumerate(label_set)}
        self.idx_to_label = {idx: label for label, idx in self.label_to_idx.items()}
        
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path)
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, torch.tensor(self.label_to_idx[label])

In [26]:
dataset = SpeedPredictionDataset(
    images_dir='./dataset/images/',
    labels_path='./dataset/labels.json',
)

In [28]:
dataset[0]

(<PIL.JpegImagePlugin.JpegImageFile image mode=RGB size=1920x1080>, tensor(2))