**Import Libraries**

In [1]:
import os
os.environ['KMP_DUPLICATE_LIB_OK'] = 'True'


In [2]:
import sys
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.datasets as datasets
from torch.utils.data import Dataset, DataLoader
from torchvision import models
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import matplotlib as mpl
from glob import glob
import cv2
import re
import time
from datetime import datetime
from sort import Sort
from torch.amp import autocast
from torch.amp import GradScaler


**Set the chunksize to avoid OverflowError in matplotlib**

In [3]:
mpl.rcParams['agg.path.chunksize'] = 10000


In [4]:
torch.backends.cudnn.benchmark = True


**Load Detections**

In [5]:
def load_detections(images_dir, labels_dir):
    image_files = sorted(os.listdir(images_dir))
    detections = []
    img_paths = []

    for idx, img_file in enumerate(image_files):
        img_path = os.path.join(images_dir, img_file)
        img_name, _ = os.path.splitext(img_file)

        # Corresponding label file
        lbl_path = os.path.join(labels_dir, img_name + '.txt')

        img = Image.open(img_path)
        width, height = img.size

        # Read label file
        frame_detections = []
        if os.path.exists(lbl_path):
            with open(lbl_path, 'r') as f:
                lines = f.readlines()
            for line in lines:
                tokens = line.strip().split()
                if len(tokens) != 5:
                    continue
                class_id = int(tokens[0])  # Class ID
                x_center = float(tokens[1]) * width
                y_center = float(tokens[2]) * height
                bbox_width = float(tokens[3]) * width
                bbox_height = float(tokens[4]) * height

                x1 = x_center - bbox_width / 2
                y1 = y_center - bbox_height / 2
                x2 = x_center + bbox_width / 2
                y2 = y_center + bbox_height / 2

                # The detection format for SORT is [x1, y1, x2, y2, score]
                frame_detections.append([x1, y1, x2, y2, 1.0])  # Assume a confidence score of 1.0
        else:
            frame_detections = np.empty((0, 5))

        detections.append(np.array(frame_detections))
        img_paths.append(img_path)
    return detections, img_paths


**Apply SORT to Assign IDs**

In [6]:
def track_objects(detections):
    mot_tracker = Sort()  # Create instance of the SORT tracker
    trajectories = {}
    frame_indices = []

    for frame_idx, dets in enumerate(detections):
        tracks = mot_tracker.update(dets)
        for track in tracks:
            x1, y1, x2, y2, track_id = track
            center_x = (x1 + x2) / 2
            center_y = (y1 + y2) / 2
            if track_id not in trajectories:
                trajectories[track_id] = []
            trajectories[track_id].append({
                'frame_idx': frame_idx,
                'bbox': [x1, y1, x2, y2],
                'center': [center_x, center_y]
            })
    return trajectories


**Collect Trajectories**

In [7]:
def get_trajectories(images_dir, labels_dir):
    detections, img_paths = load_detections(images_dir, labels_dir)
    trajectories = track_objects(detections)
    return trajectories, img_paths


**Paths to images and labels**

In [8]:
train_images_dir = './DETRAC_Upload/images/train'
train_labels_dir = './DETRAC_Upload/labels/train'
val_images_dir = './DETRAC_Upload/images/val'
val_labels_dir = './DETRAC_Upload/labels/val'


**Get trajectories for training and validation data**

In [9]:
train_trajectories, train_image_files = get_trajectories(train_images_dir, train_labels_dir)
val_trajectories, val_image_files = get_trajectories(val_images_dir, val_labels_dir)


**Visualize Simple Trajectory**

In [10]:
if len(train_trajectories) > 0:
    sample_track_id = list(train_trajectories.keys())[0]
    sample_trajectory = train_trajectories[sample_track_id]

    positions = np.array([frame_info['center'] for frame_info in sample_trajectory])

    # Print the number of points
    print(f"Number of points in trajectory: {len(positions)}")

    # Limit the number of points to plot if necessary
    max_points = 1000  # Adjust this value as needed
    if len(positions) > max_points:
        positions = positions[:max_points]
        print(f"Trajectory is too long; plotting first {max_points} points.")

    plt.figure(figsize=(6, 6))
    plt.plot(positions[:, 0], positions[:, 1], marker='o')
    plt.title('Sample Vehicle Trajectory')
    plt.xlabel('X Position')
    plt.ylabel('Y Position')
    plt.gca().invert_yaxis()  # Invert y-axis to match image coordinates
    plt.grid()
    plt.show()
else:
    print("No trajectories found in the training data.")


Number of points in trajectory: 157


**Custom Dataset Class**

In [11]:
class UADETRACDataset(Dataset):
    def __init__(self, images_dir, trajectories, image_paths, input_length, pred_length, transform=None):
        self.images_dir = images_dir
        self.image_paths = image_paths
        self.transform = transform
        self.input_length = input_length
        self.pred_length = pred_length
        self.sequences = []
        self.targets = []
        self.create_sequences(trajectories)

    def create_sequences(self, trajectories):
        for track_id, detections in trajectories.items():
            # Sort detections by frame_idx
            detections = sorted(detections, key=lambda x: x['frame_idx'])
            num_frames = len(detections)
            if num_frames < self.input_length + self.pred_length:
                continue
            for i in range(num_frames - self.input_length - self.pred_length + 1):
                input_frames = detections[i:i+self.input_length]
                target_frames = detections[i+self.input_length:i+self.input_length+self.pred_length]
                self.sequences.append((input_frames, target_frames))

    def __len__(self):
        return len(self.sequences)

    def __getitem__(self, idx):
        input_frames, target_frames = self.sequences[idx]

        img_seq = []
        pos_seq = []
        for frame_info in input_frames:
            frame_idx = frame_info['frame_idx']
            img_path = self.image_paths[frame_idx]
            img = Image.open(img_path).convert('RGB')
            if self.transform:
                img = self.transform(img)
            else:
                img = transforms.ToTensor()(img)
            img_seq.append(img)
            pos_seq.append(frame_info['center'])

        target_pos_seq = [frame_info['center'] for frame_info in target_frames]

        img_seq = torch.stack(img_seq)  # Shape: (input_length, C, H, W)
        pos_seq = torch.tensor(pos_seq).float()  # Shape: (input_length, 2)
        target_seq = torch.tensor(target_pos_seq).float()  # Shape: (pred_length, 2)

        # Normalize positions by image dimensions if desired
        # Assuming all images have the same dimensions
        # img_width, img_height = img.size
        # pos_seq /= torch.tensor([img_width, img_height])
        # target_seq /= torch.tensor([img_width, img_height])

        return img_seq, pos_seq, target_seq


**Data Transformation**

In [12]:
transform = transforms.Compose([
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet means
                         std=[0.229, 0.224, 0.225])    # ImageNet stds
])


**Create Datasets and DataLoaders**

In [13]:
input_length = 10
pred_length = 5
batch_size = 64

train_dataset = UADETRACDataset(train_images_dir, train_trajectories, train_image_files, input_length, pred_length, transform=transform)
val_dataset = UADETRACDataset(val_images_dir, val_trajectories, val_image_files, input_length, pred_length, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)


# Define Model Architecture

**Define Feature Extractor**

We'll use a pre-trained ResNet18 model for feature extraction.

In [14]:
class FeatureExtractor(nn.Module):
    def __init__(self):
        super(FeatureExtractor, self).__init__()
        # Use a pre-trained ResNet50 model
        resnet = models.resnet50(pretrained=True)
        # Remove the last fully connected layer
        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)
        self.fc = nn.Linear(resnet.fc.in_features, 128)
        
    def forward(self, x):
        # x shape: (batch_size, C, H, W)
        x = self.resnet(x)  # Output shape: (batch_size, 512, 1, 1)
        x = x.view(x.size(0), -1)  # Shape: (batch_size, 512)
        x = self.fc(x)  # Shape: (batch_size, 128)
        return x


**Define Trajectory Predictor**

We'll use the LSTM Model for trajectory prediction

In [15]:
class TrajectoryPredictor(nn.Module):
    def __init__(self, feature_dim=128, hidden_size=256, num_layers=1, pred_length=pred_length):
        super(TrajectoryPredictor, self).__init__()
        self.lstm = nn.LSTM(input_size=feature_dim + 2, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, pred_length * 2)
        
    def forward(self, features, positions):
        # features: (batch_size, seq_length, feature_dim)
        # positions: (batch_size, seq_length, 2)
        x = torch.cat([features, positions], dim=2)  # Shape: (batch_size, seq_length, feature_dim + 2)
        out, _ = self.lstm(x)
        out = out[:, -1, :]  # Get the last output
        out = self.fc(out)   # Shape: (batch_size, pred_length * 2)
        out = out.view(-1, pred_length, 2)
        return out


**Combine Models**

In [16]:
class VehicleTrajectoryModel(nn.Module):
    def __init__(self):
        super(VehicleTrajectoryModel, self).__init__()
        self.feature_extractor = FeatureExtractor()
        self.predictor = TrajectoryPredictor()
        
    def forward(self, img_seq, pos_seq):
        batch_size, seq_length, C, H, W = img_seq.size()
        img_seq = img_seq.view(batch_size * seq_length, C, H, W)
        features = self.feature_extractor(img_seq)
        features = features.view(batch_size, seq_length, -1)
        out = self.predictor(features, pos_seq)
        return out


**Load Model into GPU**

In [17]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = VehicleTrajectoryModel().to(device)




In [18]:
print(device)


cuda


In [19]:
print(model)


VehicleTrajectoryModel(
  (feature_extractor): FeatureExtractor(
    (resnet): Sequential(
      (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (2): ReLU(inplace=True)
      (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (4): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (rel

# Train Model

**Loss Function and Optimizer**

In [20]:
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
scaler = GradScaler('cuda')
num_epochs = 10


**Training Loop**

In [None]:
# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for img_seq, pos_seq, target_seq in train_loader:
        img_seq = img_seq.to(device, non_blocking=True)
        pos_seq = pos_seq.to(device, non_blocking=True)
        target_seq = target_seq.to(device, non_blocking=True)

        optimizer.zero_grad()

        with autocast('cuda'):
            outputs = model(img_seq, pos_seq)
            loss = criterion(outputs, target_seq)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()

    epoch_loss = running_loss / len(train_loader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')


Epoch [1/10], Loss: 47715.6865


**Evaluate Model**

In [None]:
model.eval()
test_loss = 0.0
with torch.no_grad():
    for img_seq, pos_seq, target_seq in val_loader:
        img_seq = img_seq.to(device)
        pos_seq = pos_seq.to(device)
        target_seq = target_seq.to(device)
        
        outputs = model(img_seq, pos_seq)
        loss = criterion(outputs, target_seq)
        test_loss += loss.item()
        
test_loss /= len(val_loader)
print(f'Test Loss: {test_loss:.4f}')


**Visualize Model Results**

In [None]:
# Get a batch of test data
img_seq, pos_seq, target_seq = next(iter(val_loader))
img_seq = img_seq.to(device)
pos_seq = pos_seq.to(device)
outputs = model(img_seq, pos_seq)

img_seq = img_seq.cpu()
pos_seq = pos_seq.cpu()
outputs = outputs.cpu()
target_seq = target_seq.cpu()

# Plotting
num_samples = 5
for i in range(num_samples):
    input_positions = pos_seq[i].numpy()
    true_positions = np.concatenate((input_positions, target_seq[i].numpy()))
    pred_positions = np.concatenate((input_positions, outputs[i].detach().numpy()))
    
    plt.figure(figsize=(6, 6))
    plt.plot(true_positions[:, 0], true_positions[:, 1], 'bo-', label='True Trajectory')
    plt.plot(pred_positions[:, 0], pred_positions[:, 1], 'ro--', label='Predicted Trajectory')
    plt.legend()
    plt.xlabel('X Position')
    plt.ylabel('Y Position')
    plt.title(f'Sample {i+1}')
    plt.gca().invert_yaxis()
    plt.grid()
    plt.show()
