<a href="https://colab.research.google.com/github/JK-the-Ko/AI-and-DL/blob/main/Day1/LSTM/%5B2%5DReal_Time_Vehicle_Trajectory_Prediction_with_YOLOv11.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Ultralytics YOLOv11를 활용한 Vehicle Trajectory Prediction**

## 1. 영상 다운로드

In [None]:
import gdown

file_id = "1nLSUY4Oqb_aI3fkVeTPQ2MLEuooMjT6n"
output_file = "video.zip"  # Replace "data_file.ext" with the desired output filename and extension

gdown.download(f"https://drive.google.com/uc?id={file_id}", output_file)

## 2. 영상 재생

In [None]:
!unzip "/content/video.zip"

In [None]:
!pip3 install ipywidgets

In [None]:
import ipywidgets
from ipywidgets import Image
from IPython.display import display

In [None]:
video_widget = ipywidgets.Video.from_file("/content/train_val.mp4")
display(video_widget)

## 3. 데이터셋 생성

### A. Ultralytics 설치

In [None]:
!pip3 install ultralytics

### B. Hyperparameter 설정

In [None]:
opt = {"video_size": 640, "seed":42, "train_val_ratio":0.9, "input_frame":40, "target_frame":10, "conf":0.05, "iou":0.05,
       "in_channels":2, "hid_channels":256, "out_channels":2, "num_layer":4, "p":0.1,
       "batch_size":256, "total_epoch":10, "lr":1e-4, "decay_rate":1e-2}

### C. Frame 별로 영상을 나누어 데이터셋 생성

#### i. 라이브러리 불러오기

In [None]:
from os import makedirs

import random
from random import shuffle

from collections import defaultdict

import cv2

import numpy as np
import pandas as pd

import torch

from ultralytics import YOLO

#### ii. 필요 함수 설정

In [None]:
def fix_seed(seed) :
  random.seed(seed)
  np.random.seed(seed)
  torch.manual_seed(seed)
  torch.cuda.manual_seed(seed)
  torch.cuda.manual_seed_all(seed)
  torch.backends.cudnn.deterministic = True
  torch.backends.cudnn.benchmark = False

In [None]:
class AverageMeter(object) :
  def __init__(self) :
    self.reset()

  def reset(self) :
    self.val = 0
    self.avg = 0
    self.sum = 0
    self.count = 0

  def update(self, val, n=1) :
    self.val = val
    self.sum += val*n
    self.count += n
    self.avg = self.sum / self.count

#### iii. 영상 재생을 위한 Image 인스턴스 생성

In [None]:
video = Image()

#### iv. YOLOv11을 활용한 Vehicle 좌표 추출

In [None]:
# Load Pretrained YOLOv11 Model Weight
yolo = YOLO("yolo11m.pt")
yolo.info()

# Load Video
video_path = "/content/train_val.mp4"
cap = cv2.VideoCapture(video_path)

# Save Object Tracking Results
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(f"/content/tracking_results.mp4", fourcc, cap.get(cv2.CAP_PROP_FPS), (opt["video_size"], opt["video_size"]))

# Create Dictionary & List Instance
track_history = defaultdict(lambda: [])
data = []

display(video)

with torch.no_grad() :
  while cap.isOpened() :
    # Retrieve Frame
    ret, frame = cap.read()

    if ret :
      # Get Object Tracking Results
      results = yolo.track(frame, persist=True, imgsz=opt["video_size"], conf=opt["conf"], iou=opt["iou"], verbose=False)

      # Get the Boxes and Track IDs
      boxes = results[0].boxes.xywh.cpu()
      track_ids = results[0].boxes.id.int().cpu().tolist()
      cls_ids = results[0].boxes.cls.int().cpu().tolist()

      # Visualize the Results on the Frame
      annotated_frame = results[0].plot()
      out.write(annotated_frame)
      annotated_frame = cv2.imencode(".jpg", annotated_frame)[1].tobytes()
      video.value = annotated_frame

      # Plot the tracks
      for box, track_id, cls_id in zip(boxes, track_ids, cls_ids) :
        if cls_id == 2 or cls_id == 5 or cls_id == 7 : # Car / Truck / Bus
          x, y, w, h = box # Bounding Box Info
          track = track_history[track_id]
          x, y = float(x), float(y)
          track.append((x, y)) # (x, y) Center Point

          if len(track) > (opt["input_frame"] + opt["target_frame"]) : # Input Frames + Target Frames
            data.append(track) # Add Data
            track.pop(0)
    else:
        break

cap.release()

### D. Frame을 기준으로 데이터셋 생성

In [None]:
# Fix Seed
fix_seed(opt["seed"])

# Shuffle Data
shuffle(data)

# Compute Training Dataset Size
train_len = int(len(data)*opt["train_val_ratio"])

# Split Training & Validation Dataset
train_df, val_df = pd.DataFrame(data=data[:train_len]), pd.DataFrame(data=data[train_len:])

# Replace Column Names
col_name = {}
for i in range(opt["input_frame"] + opt["target_frame"]) :
  col_name[i] = f"frame_{i}"

# Create Directory
save_dir = "/content/csv"
makedirs(save_dir, exist_ok=True)

# Save Training & Validation Dataset
train_df, val_df = train_df.rename(columns=col_name), val_df.rename(columns=col_name)
train_df.to_csv(f"{save_dir}/train_dataset.csv", index=False), val_df.to_csv(f"{save_dir}/val_dataset.csv", index=False)

### E. 데이터셋 살펴보기

In [None]:
train_df.head(10)

## 4. Custom Dataloader 생성

In [None]:
from ast import literal_eval

from torch.utils.data import Dataset

In [None]:
class LoadDataset(Dataset) :
  def __init__(self, opt, for_val) :
    # Inheritance
    super(LoadDataset, self).__init__()

    # Initialize Variable
    self.opt = opt

    # Load CSV
    self.data = self.load_csv("/content/csv/val_dataset.csv" if for_val else "/content/csv/train_dataset.csv")

  def load_csv(self, csv_dir) :
    # Convert into Pillow Image
    data = pd.read_csv(csv_dir)

    return data

  def __getitem__(self, index) :
    # Split Sequence
    input, target = self.data.iloc[index][:self.opt["input_frame"]], self.data.iloc[index][self.opt["input_frame"]:]

    # Convert to PyTorch Tensor
    input_x, input_y = [literal_eval(i)[0] for i in input], [literal_eval(i)[1] for i in input]
    input_x, input_y = torch.tensor(input_x).unsqueeze(-1), torch.tensor(input_y).unsqueeze(-1)

    target_x, target_y = [literal_eval(i)[0] for i in target], [literal_eval(i)[1] for i in target]
    target_x, target_y = torch.tensor(target_x).unsqueeze(-1), torch.tensor(target_y).unsqueeze(-1)

    # Apply Min-Max Normalization
    input_x, input_y = self.min_max_norm(input_x, 0, self.opt["video_size"]), self.min_max_norm(input_y, 0, self.opt["video_size"])
    target_x, target_y = self.min_max_norm(target_x, 0, self.opt["video_size"]), self.min_max_norm(target_y, 0, self.opt["video_size"])

    # Concatenate Tensor ([#Seq, 2])
    input = torch.cat([input_x, input_y], dim=-1)
    target = torch.cat([target_x, target_y], dim=-1)

    return input, target

  def __len__(self) :
    # Get Number of Data
    return self.data.shape[0]

  def min_max_norm(self, input, min, max) :
    output = (input-min)/(max-min)

    return output

## 5. LSTM Model

### A. Trajectory 예측을 위한 모델 생성

In [None]:
from torch import nn

In [None]:
class LSTM(nn.Module) :
  def __init__(self, opt) :
    # Inheritance
    super(LSTM, self).__init__()

    # Initialize Variable
    self.opt = opt

    # Create LSTM Layer Instance
    self.lstm = nn.LSTM(opt["hid_channels"], opt["hid_channels"], num_layers=opt["num_layer"], bidirectional=False, batch_first=True, dropout=opt["p"] if opt["num_layer"] != 1 else 0)
    self.bilstm = nn.LSTM(opt["hid_channels"], opt["hid_channels"]//2, num_layers=opt["num_layer"], bidirectional=True, batch_first=True, dropout=opt["p"] if opt["num_layer"] != 1 else 0)

    # Create FC Layer Instance
    self.input2lstm = nn.Linear(opt["in_channels"], opt["hid_channels"])
    self.input2bilstm = nn.Linear(opt["in_channels"], opt["hid_channels"])
    self.input2output = nn.Linear(opt["in_channels"], opt["hid_channels"])
    self.fc0 = nn.Linear(opt["hid_channels"]*2, opt["hid_channels"], bias=False)
    self.fc1 = nn.Linear(opt["hid_channels"], opt["hid_channels"], bias=False)
    self.fc2 = nn.Linear(opt["hid_channels"], opt["out_channels"])

    # Create Layer Normalization Layer Instance
    self.norm0 = nn.LayerNorm(opt["hid_channels"])
    self.norm1 = nn.LayerNorm(opt["hid_channels"])

    # Create Activation Layer Instance
    self.act = nn.ReLU(inplace=True)

  def forward(self, input) :
    lstm_input, bilstm_input = self.input2lstm(input), self.input2bilstm(input)

    lstm_h0 = torch.zeros(self.opt["num_layer"], lstm_input.size(0), self.opt["hid_channels"]).to(input.device)
    lstm_c0 = torch.zeros(self.opt["num_layer"], lstm_input.size(0), self.opt["hid_channels"]).to(input.device)

    bilstm_h0 = torch.zeros(self.opt["num_layer"]*2, bilstm_input.size(0), self.opt["hid_channels"]//2).to(input.device)
    bilstm_c0 = torch.zeros(self.opt["num_layer"]*2, bilstm_input.size(0), self.opt["hid_channels"]//2).to(input.device)

    lstm_output, _ = self.lstm(lstm_input, (lstm_h0, lstm_c0))
    bilstm_output, _ = self.bilstm(bilstm_input, (bilstm_h0, bilstm_c0))

    output = self.norm0(self.act(self.fc0(torch.cat([lstm_output, bilstm_output], dim=-1))))
    output = self.norm1(self.act(self.fc1(output))) + self.input2output(input)
    output = self.fc2(output)

    return output

## 6. Trajectory 예측 모델 훈련 진행

In [None]:
import matplotlib.pyplot as plt

from torch import optim
from torch.utils.data import DataLoader

from tqdm import tqdm

In [None]:
fix_seed(opt["seed"])

# Load Training Dataset
train_dataset = LoadDataset(opt, for_val=False)
train_loader = DataLoader(train_dataset, batch_size=opt["batch_size"], drop_last=True, shuffle=True)

# Load Validation Dataset
val_dataset = LoadDataset(opt, for_val=True)
val_loader = DataLoader(val_dataset, batch_size=opt["batch_size"], drop_last=False, shuffle=False)

# Fix Seed
fix_seed(opt["seed"])

# Create Model Instance
model = LSTM(opt)

# Compute Number of Parameters
num_parameter = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"# Parameters : {num_parameter:,}")

# Create Optimizer Instance
optimizer = optim.Adam(model.parameters(), lr=opt["lr"])

# Create Scheduler Instance
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer,
                                                 T_max=opt["total_epoch"]*len(train_loader),
                                                 eta_min=opt["lr"]*opt["decay_rate"])

# Create Loss Function Instance
criterion = nn.L1Loss()

# Determine Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device Type : {device}")

# Assign Device
model = model.to(device)

# Create Average Meter Instance
train_loss, val_loss = AverageMeter(), AverageMeter()

# Create List Instance
train_loss_list, val_loss_list = [], []

# Create Directory
ckpt_dir, graph_dir = "ckpt/lstm", "result/lstm"
makedirs(ckpt_dir, exist_ok=True), makedirs(graph_dir, exist_ok=True)

# Set Best loss
best_loss = np.inf

# Start Training
for epoch in range(1, opt["total_epoch"]+1) :
  # Create TQDM Dataloader Instance
  train_bar = tqdm(train_loader)

  # Reset Average Meter Instance
  train_loss.reset()

  # Set Training Mode
  model.train()

  # Training Phase
  for data in train_bar :
    # Load Dataset
    input, target = data

    # Assign Device
    input, target = input.to(device), target.to(device)

    # Set Gradient to 0
    optimizer.zero_grad()

    # Get Prediction
    pred = model(input)

    # Compute Loss
    loss = criterion(pred[:,-opt["target_frame"]:,:], target)

    # Back-Propagation
    loss.backward()

    # Update Weight
    optimizer.step()

    # Update Learning Rate Scheduler
    scheduler.step()

    # Compute Averaged Loss
    train_loss.update(loss.detach().cpu().item(), opt["batch_size"])

    # Update Progess Bar Status
    train_bar.set_description(desc=f"[{epoch}/{opt['total_epoch']}] [Train] < Loss:{train_loss.avg:.4f} >")

  # Add Training Loss
  train_loss_list.append(train_loss.avg)

  # Create TQDM Dataloader Instance
  val_bar = tqdm(val_loader)

  # Reset Average Meter Instance
  val_loss.reset()

  # Set Validation Mode
  model.eval()

  # Validation Phase
  for data in val_bar :
    # Load Dataset
    input, target = data

    # Assign Device
    input, target = input.to(device), target.to(device)

    with torch.no_grad() :
      # Get Prediction
      pred = model(input)

    # Compute Loss
    loss = criterion(pred[:,-opt["target_frame"]:,:], target)

    # Compute Averaged Loss
    val_loss.update(loss.detach().cpu().item(), opt["batch_size"])

    # Update Progess Bar Status
    val_bar.set_description(desc=f"[{epoch}/{opt['total_epoch']}] [Val] < Loss:{val_loss.avg:.4f} >")

  # Add Validation Loss
  val_loss_list.append(val_loss.avg)

  # Save Network
  if val_loss.avg < best_loss :
    best_loss = val_loss.avg
    torch.save(model.state_dict(), f"{ckpt_dir}/best.pth")
  torch.save(model.state_dict(), f"{ckpt_dir}/latest.pth")

  # Plot Training vs. Validation Loss Graph
  plt.clf()
  plt.plot(np.arange(epoch), train_loss_list, label="Training Loss")
  plt.plot(np.arange(epoch), val_loss_list, label="Validation Loss")
  plt.title("Loss (Training vs. Validation)")
  plt.xlabel("Epoch"), plt.ylabel("Loss")
  plt.legend(loc="best")
  plt.savefig(f"{graph_dir}/loss.png")

## 7. Valid 데이터셋을 활용하여 Inference 진행

### A. Pretrained Model 다운로드 (Best Model @ Epoch-50)

In [None]:
file_id = "1ij2rk2C4XQvj1Nj2GyoYkD3foDVeqwuE"
output_file = "best_epoch_50.pth"  # Replace "data_file.ext" with the desired output filename and extension

gdown.download(f"https://drive.google.com/uc?id={file_id}", output_file)

In [None]:
opt["batch_size"] = 8

In [None]:
# Fix Seed
fix_seed(opt["seed"])

# Load Validation Dataset
val_dataset = LoadDataset(opt, for_val=True)
val_loader = DataLoader(val_dataset, batch_size=opt["batch_size"], drop_last=False, shuffle=False)

# Fix Seed
fix_seed(opt["seed"])

# Create Model Instance
model = LSTM(opt)

# Load Pretraind Weight
model.load_state_dict(torch.load(f"/content/best_epoch_50.pth"), strict=True)

# Determine Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device Type : {device}")

# Assign Device
model = model.to(device)

# Create Directory
graph_dir = "result/val"
makedirs(graph_dir, exist_ok=True)

# Start Test Phase
val_bar = tqdm(val_loader)

# Start Test Phase
for index, data in enumerate(val_bar) :
  # Load Dataset
  input, target = data

  # Assign Device
  input, target = input.to(device), target.to(device)

  with torch.no_grad() :
    # Get Prediction
    pred = model(input)

    # Affine Transformation
    input = (input.detach().cpu().numpy()*639).astype("int32")

    # Affine Transformation
    pred = (pred[:,-opt["target_frame"]:,:].clamp(0,1).detach().cpu().numpy()*639).astype("int32")

    # Affine Transformation
    target = (target.detach().cpu().numpy()*639).astype("int32")

    # Update Progess Bar Status
    val_bar.set_description(desc=f"[Test] < Updating Results >")

  # Result Visualization
  for i in range(pred.shape[0]) :
    plt.clf()
    plt.figure(figsize=(10, 5))
    plt.plot(list(input[i,:,0])+list(pred[i,:,0]), list(input[i,:,1])+list(pred[i,:,1]), "r", label="Prediction")
    plt.plot(list(input[i,:,0])+list(target[i,:,0]), list(input[i,:,1])+list(target[i,:,1]), "g", label="Ground-Truth")
    plt.xlabel("Local X Coordinate")
    plt.ylabel("Local Y Coordinate")
    plt.title("Trajectory Tracking Prediction")
    plt.legend(loc="best")
    plt.savefig(f"{graph_dir}/trajectory_index_{index}_batch_{i}.png")
    plt.close()

  if index >= 10 :
    break

### B. Trajectory Prediction 결과 시각화 (그래프)

In [None]:
result_graph = cv2.imread("/content/result/val/trajectory_index_0_batch_0.png")
plt.figure(figsize=(10, 5))
plt.box(False)
plt.xticks([])
plt.yticks([])
plt.imshow(result_graph)
plt.show()

## 8. Test 데이터셋을 활용하여 추론 및 결과 시각화

In [None]:
video = Image()

In [None]:
# Load Pretrained YOLOv11 Model Weight
yolo = YOLO("yolo11m.pt")
yolo.info()

# Load Video
video_path = "/content/test.mp4"
cap = cv2.VideoCapture(video_path)

# Create Directory for Saving Results
save_dir = "result/lstm"
makedirs(save_dir, exist_ok=True)
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(f"{save_dir}/tracking_prediction.mp4", fourcc, cap.get(cv2.CAP_PROP_FPS), (opt["video_size"], opt["video_size"]))

# Create LSTM Model Instance
lstm = LSTM(opt).eval()

# Load Pretrained LSTM Model Weight
weights = torch.load("/content/best_epoch_50.pth")
lstm.load_state_dict(weights, strict=True)

# Determine Device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device Type : {device}")

# Assign Device
lstm = lstm.to(device)

# Create Dictionary Instance
track_history = defaultdict(lambda: [])
lstm_track_history = defaultdict(lambda: [])

display(video)

with torch.no_grad() :
  while cap.isOpened() :
    # Retrieve Frame
    ret, frame = cap.read()

    if ret :
      # Get Object Tracking Results
      results = yolo.track(frame, persist=True, imgsz=opt["video_size"], conf=opt["conf"], iou=opt["iou"], verbose=False)

      # Get the Boxes and Track IDs
      boxes = results[0].boxes.xywh.cpu()
      track_ids = results[0].boxes.id.int().cpu().tolist()
      cls_ids = results[0].boxes.cls.int().cpu().tolist()

      # Visualize the Results on the Frame
      annotated_frame = results[0].plot()

      # Plot the tracks
      for box, track_id, cls_id in zip(boxes, track_ids, cls_ids) :
        if cls_id == 2 or cls_id == 5 or cls_id == 7 : # Car / Truck / Bus
          x, y, w, h = box # Bounding Box Info
          track = track_history[track_id]
          lstm_track = lstm_track_history[track_id]
          track.append((float(x), float(y))) # (x, y) Center Point
          if len(track) > opt["input_frame"] :
            track.pop(0)
            input = np.hstack(track).astype(np.int32).reshape((1, -1, 2)) # Get Input Data
            input = torch.tensor(input).to(device)/639 # Min-Max Norm Input Data
            pred = lstm(input)[:,-opt["target_frame"]:,:].clamp(0,1).cpu().detach().numpy().reshape(-1, 2)*639 # Inference & Affine Prediction
            for i in range(pred.shape[0]) :
                lstm_track.append((float(pred[i][0]), float(pred[i][1]))) # Add Predictions

          # Draw the Tracking Lines
          points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
          cv2.polylines(annotated_frame, [points], isClosed=False, color=(0, 0, 255), thickness=2)

          # Draw the Predicted Tracking Lines
          if len(lstm_track) == opt["target_frame"] :
            pred_points = np.hstack(lstm_track).astype(np.int32).reshape((-1, 1, 2))
            cv2.polylines(annotated_frame, [pred_points], isClosed=False, color=(255, 0, 0), thickness=2)
          lstm_track_history[track_id] = []

      # Show Object Tracking Results
      out.write(annotated_frame)
      annotated_frame = cv2.imencode(".jpg", annotated_frame)[1].tobytes()
      video.value = annotated_frame

    else:
        break

cap.release()