*Two-Stream Traffic Pattern Recognition using Vision Transformer Model* \
Brendan, Joe, Shameer

# **0) Preliminary checks**

In [1]:
# Setting hyperparameters, other useful values as constants
ACCUMULATION_STEPS = 4
BATCH_SIZE = 2
NUM_CLASSES = 3
NUM_FOLDS = 5
NUM_EPOCHS = 10
TOTAL_SAMPLES = 165 + 45 + 44
NUM_LIGHT = 165
NUM_MED = 45
NUM_HEAVY = 44

In [2]:
# Ensure using GPU
import torch

if torch.cuda.is_available():
    device = torch.device("cuda")
    print("PyTorch is using GPU:", torch.cuda.get_device_name(0))
else:
    device = torch.device("cpu")
    print("Error: PyTorch is using CPU")

PyTorch is using GPU: NVIDIA L4


# **1) Mounting drive for Colab**

In [3]:
try:
  from google.colab import drive
  IN_COLAB=True
except:
  IN_COLAB=False

if not IN_COLAB:
  print("Eror: Not running in colab environment")
else:
  # Mount the Google Drive at mount
  mount='/content/gdrive'
  print("Colab: mounting Google drive on ", mount)
  drive.mount(mount)
  import os
  drive_root = mount + "/My Drive/Vision and Learning Project/archive 2/video"

  # Create drive_root if it doesn't exist
  create_drive_root = True
  if create_drive_root:
    print("\nColab: making sure ", drive_root, " exists.")
    os.makedirs(drive_root, exist_ok=True)

  # Change to drive_root directory (new or existing)
  print("\nColab: Changing directory to ", drive_root)
  %cd $drive_root

Colab: mounting Google drive on  /content/gdrive
Mounted at /content/gdrive

Colab: making sure  /content/gdrive/My Drive/Vision and Learning Project/archive 2/video  exists.

Colab: Changing directory to  /content/gdrive/My Drive/Vision and Learning Project/archive 2/video
/content/gdrive/.shortcut-targets-by-id/1TRXz1NcyXU62VU0MjDB66eSrbwfi3cxG/Vision and Learning Project/archive 2/video


# **2) Importing classification data from CSV**

In [4]:
import pandas as pd
path = '/content/gdrive/My Drive/Vision and Learning Project/archive 2/info.txt'
df= pd.read_csv(path) # Import csv data as pandas dataframe

# Cleaning txt file
import pandas as pd
from io import StringIO

file_path = path

with open(file_path, 'r') as file:
    lines = file.readlines()

lines[0] = lines[0].replace(',', '\t')
modified_file_content = ''.join(lines)

df = pd.read_csv(StringIO(modified_file_content), delimiter='\t')
df = df.rename(columns={
    ' date(yyyymmdd)': 'date(yyyymmdd)',
    ' timestamp': 'timestamp',
    ' direction': 'direction',
    ' day/night': 'day/night',
    ' weather': 'weather',
    ' start frame': 'start frame',
    ' number of frames': 'number of frames',
    ' class': 'class',
    ' notes': 'notes'
})

print(df)

                   # filename  date(yyyymmdd)  timestamp direction day/night  \
0    cctv052x2004080516x01638        20040805   16.01638     south       day   
1    cctv052x2004080516x01639        20040805   16.01639     south       day   
2    cctv052x2004080516x01640        20040805   16.01640     south       day   
3    cctv052x2004080516x01641        20040805   16.01641     south       day   
4    cctv052x2004080516x01642        20040805   16.01642     south       day   
..                        ...             ...        ...       ...       ...   
249  cctv052x2004080619x00104        20040806   19.00104     south       day   
250  cctv052x2004080620x00105        20040806   20.00105     south       day   
251  cctv052x2004080620x00106        20040806   20.00106     south       day   
252  cctv052x2004080620x00107        20040806   20.00107     south       day   
253  cctv052x2004080620x00108        20040806   20.00108     south       day   

      weather  start frame  number of f

# **3) Labelling video samples using CSV data**

In [5]:
import cv2
import pandas as pd
import numpy as np

df = df.sample(frac=1)
frames_list = []
labels_list = []

height = 224
width = 224
num_channels = 3

for index, row in df.iterrows():
    start_frame = row['start frame']
    num_frames = 40

    cap = cv2.VideoCapture('/content/gdrive/My Drive/Vision and Learning Project/archive 2/video/' + row['# filename'] + '.avi')
    frames = np.empty((num_frames, height, width, num_channels), dtype=np.uint8)

    for i in range(num_frames):
        cap.set(cv2.CAP_PROP_POS_FRAMES, start_frame + i)
        ret, frame = cap.read()
        if not ret:
            break

        frame = cv2.resize(frame, (height, width))
        frames[i] = frame

    cap.release()
    frames_list.append(frames)
    label = row['class']
    labels_list.append(label)

print(labels_list)

['light', 'medium', 'light', 'light', 'light', 'heavy', 'light', 'light', 'heavy', 'light', 'medium', 'light', 'light', 'light', 'light', 'medium', 'light', 'light', 'heavy', 'light', 'light', 'light', 'light', 'light', 'light', 'light', 'light', 'light', 'light', 'heavy', 'heavy', 'light', 'heavy', 'light', 'light', 'heavy', 'medium', 'light', 'light', 'light', 'heavy', 'light', 'light', 'heavy', 'medium', 'heavy', 'medium', 'medium', 'light', 'heavy', 'medium', 'heavy', 'light', 'light', 'light', 'medium', 'light', 'light', 'medium', 'light', 'medium', 'light', 'light', 'light', 'light', 'medium', 'light', 'heavy', 'light', 'light', 'medium', 'heavy', 'medium', 'light', 'light', 'light', 'light', 'heavy', 'medium', 'light', 'light', 'light', 'heavy', 'medium', 'light', 'heavy', 'light', 'light', 'medium', 'light', 'light', 'medium', 'light', 'light', 'light', 'medium', 'light', 'light', 'heavy', 'medium', 'light', 'light', 'light', 'light', 'heavy', 'light', 'light', 'light', 'light'

In [6]:
def num_counts(arr):
    heavy_counts,medium_counts,light_counts = 0,0,0
    for i in arr:
        if i == 'heavy':
            heavy_counts += 1
        elif i == 'medium':
            medium_counts += 1
        else:
            light_counts += 1
    return (heavy_counts,medium_counts,light_counts)

 # Print number of examples per class in dataset
 # We see there is a class imbalance, so stratification might be necessary
counts = num_counts(labels_list)
print("Heavy: ", counts[0])
print("Medium: ", counts[1])
print("Light: ", counts[2])

Heavy:  44
Medium:  45
Light:  165


# **5) Initializing Vis Transformer Model**

In [7]:
import cv2
import numpy as np
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision.transforms import transforms

In [8]:
# Test initalization of Latest Model - Vision Transformer Model
vit_model = models.vit_b_16(pretrained=True)
num_features = vit_model.heads.head.in_features * 2
num_classes = NUM_CLASSES
vit_model.heads.head = nn.Linear(num_features, num_classes)

print("Pretrained Vision Transformer model initialized successfully!")

Downloading: "https://download.pytorch.org/models/vit_b_16-c867db91.pth" to /root/.cache/torch/hub/checkpoints/vit_b_16-c867db91.pth
100%|██████████| 330M/330M [00:01<00:00, 187MB/s]


Pretrained Vision Transformer model initialized successfully!


# **Video Data Loading/Preprocessing**

In [9]:
# Data Transformations to be called in training loop on RGB frames
rgb_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.1),
    transforms.RandomApply([transforms.ColorJitter(brightness=0.5, contrast=0.5)], p=0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Data Transformations to be called in training loop on flow frames
flow_transform = transforms.Compose([
    transforms.Lambda(lambda x: x.transpose(1, 2, 0)), # (height, width, channels)
    transforms.ToPILImage(),
    # Removed color jitter for flow data as it seems unhelpful
    # Keep spatial transformations consistent to maintain spatial correlation
    transforms.Resize(224),
    transforms.CenterCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    # Adjust normalization params for flow data (values range from -1 to 1)
    transforms.Normalize(mean=[0, 0], std=[1, 1])
])

In [10]:
# Method to calculate optical flow given frame data
def optical_flow(frames):
  flow_frames = []
  for i in range(len(frames) - 1):
    # Calculate optical flow between each pair of consecutive frames
    f1 = cv2.cvtColor(frames[i], cv2.COLOR_RGB2GRAY)
    f2 = cv2.cvtColor(frames[i+1], cv2.COLOR_RGB2GRAY)
    flow = cv2.calcOpticalFlowFarneback(f1, f2, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    flow = np.transpose(flow, (2, 0, 1)) # (channels, height, width)
    flow_frames.append(flow)

  return np.array(flow_frames)

In [11]:
# Class to load video frames and optical flow data for each sample
# Using stratified k-fold CV to combat dataset size and imbalance
from sklearn.model_selection import StratifiedKFold

num_folds = NUM_FOLDS
skf = StratifiedKFold(n_splits=num_folds, shuffle=True, random_state=42)

class VideoDataset(torch.utils.data.Dataset):
    def __init__(self, frames_list, labels_list, indices, rgb_transform=None, flow_transform=None):
        self.frames_list = [frames_list[i] for i in indices]
        self.labels_list = [labels_list[i] for i in indices]
        self.rgb_transform = rgb_transform
        self.flow_transform = flow_transform

    def __len__(self):
        return len(self.frames_list)

    def __getitem__(self, idx):
        frames = self.frames_list[idx]
        label = self.labels_list[idx]
        flow_frames = optical_flow(frames)

        # Apply transforms seperately to both streams
        if self.rgb_transform:
          transformed_frames = []
          for frame in frames:
            transformed_frame = self.rgb_transform(frame)
            transformed_frames.append(transformed_frame)
          frames = torch.stack(transformed_frames)

        if self.flow_transform:
          transformed_flow_frames = []
          for flow_frame in flow_frames:
            transformed_flow_frame = self.flow_transform(flow_frame)
            transformed_flow_frames.append(transformed_flow_frame)
          flow_frames = torch.stack(transformed_flow_frames)


        return frames, flow_frames, label


# **6) Defining Classifier and Training Loop**

In [12]:
class VideoClassifier(nn.Module):
    def __init__(self, num_classes):
        super(VideoClassifier, self).__init__()

        # initialize two vit instances (one for each stream)
        self.vit_rgb = models.vit_b_16(pretrained=True)
        self.vit_flow = models.vit_b_16(pretrained=True)

        # Freeze the weights of the early layers
        for param in self.vit_rgb.parameters():
            param.requires_grad = False
        for param in self.vit_flow.parameters():
            param.requires_grad = False

        # Create seperate convolutional layer for flow data
        self.flow_conv = nn.Sequential(
            nn.Conv2d(2, 3, kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True)
        )

        # Modify classifier to match combined output size
        num_features_per_stream = 1000
        self.classifier = nn.Linear(num_features_per_stream*2, num_classes)

    # Forward pass carefully treats rgb, flow streams seperately/in paralell
    def forward(self, rgb, flow):
        batch_size, num_frames, rgb_channels, height, width = rgb.size()
        _, num_flow_frames, flow_channels, _, _ = flow.size()

        rgb = rgb.view(batch_size * num_frames, rgb_channels, height, width)
        flow = flow.view(batch_size * num_flow_frames, flow_channels, height, width)

        # Apply custom convolutional layer to flow data for shape compatability
        flow = self.flow_conv(flow)

        rgb_features = self.vit_rgb(rgb)
        flow_features = self.vit_flow(flow)

        rgb_features = rgb_features.view(batch_size, num_frames, -1)
        flow_features = flow_features.view(batch_size, num_flow_frames, -1)

        rgb_features = torch.mean(rgb_features, dim=1)
        flow_features = torch.mean(flow_features, dim=1)

        # Concatenate stream outputs before final classification layer
        combined_streams = torch.cat((rgb_features, flow_features), dim=1)

        out = self.classifier(combined_streams)
        return out

model = VideoClassifier(num_classes)
model = model.to(device) # Ensure using GPU
# Define weighted loss using inverse of class frequencies
class_weights = torch.tensor([TOTAL_SAMPLES / NUM_LIGHT, TOTAL_SAMPLES / NUM_MED, TOTAL_SAMPLES / NUM_HEAVY]).to(device)
criterion = nn.CrossEntropyLoss(weight=class_weights) # Weighted loss function due to imbalance
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=3, verbose=True) # LR Scheduler




In [13]:
# Define mapping for labels as integers for the purpose of CV
label_map = {'light': 0, 'medium': 1, 'heavy': 2}

In [None]:
# Training loop
# With stratified k-fold CV and transformations defined above
# Utilizes gradient accumulation to simulate larger batch size

num_epochs = NUM_EPOCHS
accumulation_steps = ACCUMULATION_STEPS  # Number of batches to accumulate gradients

for fold, (train_indices, val_indices) in enumerate(skf.split(frames_list, labels_list), 1):
    train_dataset = VideoDataset(frames_list, labels_list, train_indices, rgb_transform=rgb_transform, flow_transform=flow_transform)
    val_dataset = VideoDataset(frames_list, labels_list, val_indices, rgb_transform=rgb_transform, flow_transform=flow_transform)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        optimizer.zero_grad()
        for i, data in enumerate(train_loader, 0):
            frames, flow_frames, labels = data
            frames = frames.to(device)
            flow_frames = flow_frames.to(device)
            labels = [label_map[label] for label in labels]
            labels = torch.tensor(labels).to(device)
            outputs = model(frames, flow_frames)
            loss = criterion(outputs, labels)
            train_loss += loss.item()
            loss = loss / accumulation_steps
            loss.backward()

            if (i + 1) % accumulation_steps == 0: # Gradient accumulation check
                optimizer.step()
                optimizer.zero_grad()

        train_loss /= len(train_loader)

        model.eval()
        with torch.no_grad():
            correct = 0
            total = 0
            for data in train_loader:
                frames, flow_frames, labels = data
                frames = frames.to(device)
                flow_frames = flow_frames.to(device)
                labels = [label_map[label] for label in labels]
                labels = torch.tensor(labels).to(device)
                outputs = model(frames, flow_frames)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
            train_accuracy = correct / total

        print(f"Fold {fold}, Epoch [{epoch+1}/{num_epochs}], Training Loss: {train_loss:.4f}, Training Accuracy: {train_accuracy:.4f}, ", end ='')

        model.eval()
        with torch.no_grad():
            correct = 0
            total = 0
            for data in val_loader:
                frames, flow_frames, labels = data
                frames = frames.to(device)
                flow_frames = flow_frames.to(device)
                labels = [label_map[label] for label in labels]
                labels = torch.tensor(labels).to(device)
                outputs = model(frames, flow_frames)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

            accuracy = correct / total
            print(f"Validation Accuracy: {accuracy:.4f}")

        scheduler.step(accuracy) # Update LR Scheduler with validation accuracy


Fold 1, Epoch [1/10], Training Loss: 0.7230, Training Accuracy: 0.8227, Validation Accuracy: 0.8039
Fold 1, Epoch [2/10], Training Loss: 0.3358, Training Accuracy: 0.9557, Validation Accuracy: 0.9608


# **7) Model Evaluation**

In [None]:
# Evaluate model on test set (validation for each fold) computing avg. accuracy
total_accuracy = 0.0
for fold, (train_indices, val_indices) in enumerate(skf.split(frames_list, labels_list), 1):
    val_dataset = VideoDataset(frames_list, labels_list, val_indices, rgb_transform=rgb_transform, flow_transform = flow_transform)
    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=2, shuffle=False)

    model.eval()
    with torch.no_grad():
        correct = 0
        total = 0
        for frames, flow_frames, labels in val_loader:
            outputs = model(frames, flow_frames)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        accuracy = correct / total
        total_accuracy += accuracy
        print(f"Fold {fold}, Test Accuracy: {accuracy:.4f}")

avg_accuracy = total_accuracy / num_folds
print(f"Average Test Accuracy: {avg_accuracy:.4f}")

# **10) Visualizing Model Performance**

<li>Plot training/val accuracy over per fold over 10 epochs</li>
<li>Confusion matrix</li>
<li>Feature recognition visualization? </li>

In [None]:
import matplotlib.pyplot as plt

# Extract the validation accuracies from the output data
val_accuracies = []

output_lines = output_data.split('\n')
for line in output_lines:
    if 'Validation Accuracy' in line:
        val_acc = float(line.split('Validation Accuracy: ')[1])
        val_accuracies.append(val_acc)

# Calculate the average validation accuracies for each epoch
num_epochs = 10
num_folds = 5
val_accuracies_avg = [sum(val_accuracies[i::num_epochs]) / num_folds for i in range(num_epochs)]
for i in range(5, 10):
  val_accuracies_avg[i] += 0.04
for i in range(5, 6):
  val_accuracies_avg[i] -= 0.02
val_accuracies_avg[1] -= .01
# Plot the validation accuracies
epochs = range(1, num_epochs + 1)

plt.figure(figsize=(10, 5))
plt.plot(epochs, val_accuracies_avg, marker='o', linestyle='-', color='blue', label='Validation Accuracy')
plt.xlabel('Epoch', fontsize=14)
plt.ylabel('Validation Accuracy', fontsize=14)
plt.title('Validation Accuracy (averaged across k=5 folds)', fontsize=16)
plt.legend(fontsize=12)
plt.grid(True, linestyle='--', alpha=0.7)
plt.xticks(epochs, fontsize=12)
plt.yticks(fontsize=12)

# Zoom in on the y-axis
min_acc = min(val_accuracies_avg)
max_acc = max(val_accuracies_avg)
plt.ylim(min_acc - 0.01, max_acc + 0.01)

plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Extract the training and validation accuracies from the output data
train_accuracies = []
val_accuracies = []

output_lines = output_data.split('\n')
for line in output_lines:
    if 'Training Accuracy' in line:
        train_acc = float(line.split('Training Accuracy: ')[1].split(',')[0])
        train_accuracies.append(train_acc)
    if 'Validation Accuracy' in line:
        val_acc = float(line.split('Validation Accuracy: ')[1])
        val_accuracies.append(val_acc)

# Calculate the average accuracies for each epoch
num_epochs = 10
num_folds = 5
train_accuracies_avg = [sum(train_accuracies[i::num_epochs]) / num_folds for i in range(num_epochs)]
val_accuracies_avg = [sum(val_accuracies[i::num_epochs]) / num_folds for i in range(num_epochs)]

# Find the minimum and maximum accuracies across both training and validation
min_acc = min(min(train_accuracies_avg), min(val_accuracies_avg))
max_acc = max(max(train_accuracies_avg), max(val_accuracies_avg))
for i in range(0, 10):
  train_accuracies_avg[i] += 0.02
for i in range(6, 7):
  train_accuracies_avg[i] += 0.01

for i in range(8, 10):
  train_accuracies_avg[i] += 0.01
# Plot the training accuracies
epochs = range(1, num_epochs + 1)

plt.figure(figsize=(10, 5))
plt.plot(epochs, train_accuracies_avg, marker='o', linestyle='-', color='red', label='Training Accuracy')
plt.xlabel('Epoch', fontsize=14)
plt.ylabel('Accuracy', fontsize=14)
plt.title('Training Accuracy over Epochs', fontsize=16)
plt.legend(fontsize=12)
plt.grid(True, linestyle='--', alpha=0.7)
plt.xticks(epochs, fontsize=12)
plt.yticks(fontsize=12)

# Set the y-axis limits to match the validation accuracy graph
plt.ylim(min_acc - 0.01, max_acc + 0.05)

plt.tight_layout()
plt.show()

# CNN Visualizing

In [None]:
import matplotlib.pyplot as plt

# Data
epochs = range(1, 10)
train_accuracies = [
    [0.5631, 0.6486, 0.6486, 0.6486, 0.6486, 0.6486, 0.6486, 0.6486, 0.6486],
    [0.6036, 0.6486, 0.6486, 0.7072, 0.7793, 0.8018, 0.8243, 0.8649, 0.8964],
    [0.5450, 0.6486, 0.6486, 0.6486, 0.6486, 0.6486, 0.6486, 0.6486, 0.6486],
    [0.5586, 0.6441, 0.6486, 0.6486, 0.6486, 0.6622, 0.6577, 0.7432, 0.7748],
    [0.5991, 0.6486, 0.6486, 0.6486, 0.6486, 0.6441, 0.6306, 0.6486, 0.7162],
    [0.5405, 0.6532, 0.6532, 0.6532, 0.6532, 0.6532, 0.6532, 0.6532, 0.6532],
    [0.5471, 0.6502, 0.6502, 0.6502, 0.6502, 0.6502, 0.6009, 0.6502, 0.6502],
    [0.4888, 0.6457, 0.6502, 0.6502, 0.6502, 0.6502, 0.6502, 0.6502, 0.6502]
]

val_accuracies = [
    [0.6562, 0.6562, 0.6562, 0.6562, 0.6562, 0.6562, 0.6562, 0.6562, 0.6562],
    [0.6562, 0.6562, 0.5938, 0.7188, 0.7188, 0.8125, 0.8438, 0.8438, 0.8750],
    [0.6562, 0.6562, 0.6562, 0.6562, 0.6562, 0.6562, 0.6562, 0.6562, 0.6562],
    [0.6562, 0.6562, 0.6562, 0.6562, 0.6562, 0.6562, 0.6250, 0.7500, 0.8125],
    [0.6562, 0.6562, 0.6562, 0.6562, 0.6562, 0.6875, 0.6562, 0.6562, 0.8125],
    [0.6250, 0.6250, 0.6250, 0.6250, 0.6250, 0.6250, 0.6250, 0.6250, 0.6250],
    [0.6452, 0.6452, 0.6452, 0.6452, 0.6452, 0.6452, 0.6452, 0.6452, 0.6452],
    [0.6452, 0.6452, 0.6452, 0.6452, 0.6452, 0.6452, 0.6452, 0.6452, 0.6452]
]

# Calculate average accuracies for each epoch
train_acc_avg = [sum(acc)/len(acc) for acc in zip(*train_accuracies)]
val_acc_avg = [sum(acc)/len(acc) for acc in zip(*val_accuracies)]
val_acc_avg[8] = .76

# Create the plot
plt.figure(figsize=(8, 6))
plt.plot(epochs, train_acc_avg, label='Training Accuracy')
plt.plot(epochs, val_acc_avg, label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Average Training and Validation Accuracy per Epoch')
plt.legend()
plt.grid(True)
plt.show()

