## Traffic Light Detection using FasterRCNN
In this notebook, I will be walking through the source code of the traffic light detection algorithm. In summary, the implemented model starts with the FasterRCNN architecture pre-trained on the COCO dataset. This model is then fine-tuned based on the LISA Traffic Light Dataset which contains 44 minutes of annotated traffic light datac collected in San Diego, California.

### 1. Importing Necessary Libraries
Importing necessary libraries and packages for this project:

In [37]:
import importlib

def install_and_import_packages(package_list):
    for p in package_list:
        # Install package with pip3 if not already installed
        if importlib.util.find_spec(p) is None:
            print(f"{p} is not installed. Installing ...")
            try:
                !pip3 install {p
                print(f"{p} is now installed and imported.")
            except ImportError as e:
                print(f"Failed to import {p}: {e}")
        else:
            print(f"Package exists: {p}")
                               
        # Equivalent to "import package"                       
        globals()[p] = importlib.import_module(p)
    print("All packages successfully installed and imported.")

In [36]:
# Installing the correct version of torch for cuda on windows 11
!pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu117

Looking in indexes: https://download.pytorch.org/whl/cu117


In [38]:
package_list = [
    'albumentations',
    'cv2', 
    'datetime',
    'fastai',
    'matplotlib',
    'numpy', 
    'os', 
    'pynvml', 
    'pandas',
    'seaborn',
    'time',
    'torch',
    'torchvision',
    'tqdm',
    'warnings'
]

install_and_import_packages(package_list)

Package exists: albumentations
Package exists: cv2
Package exists: datetime
Package exists: fastai
Package exists: matplotlib
Package exists: numpy
Package exists: os
Package exists: pynvml
Package exists: pandas
Package exists: seaborn
Package exists: time
Package exists: torch
Package exists: torchvision
Package exists: tqdm
All packages successfully installed and imported.


In [32]:
torch.cuda.is_available()

True

In [39]:
# General
warnings.filterwarnings("ignore")

import datetime
import pandas as pd
import numpy as np

from time import time
from tqdm import tqdm

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns

plt.style.use('fivethirtyeight')
%matplotlib inline

# Replace with fastai library
from sklearn.model_selection import train_test_split, GroupKFold, StratifiedKFold
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.ops import nms
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import SequentialSampler

Here, we create a seed for reproducibility.

### 2. Load Data
The [LISA Traffic Light Dataset](https://www.kaggle.com/datasets/mbornoe/lisa-traffic-light-dataset) is organized into multiple files each with their respective annotation files. The dataset is structure as follows:

In [34]:
path = 'archive'
os.listdir(path)

['Annotations',
 'daySequence1',
 'daySequence2',
 'dayTrain',
 'nightSequence1',
 'nightSequence2',
 'nightTrain',
 'sample-dayClip6',
 'sample-nightClip1']

In [None]:
DAY_TRAIN_PATH = 'archive/Annotations/Annotations/dayTrain/'
NIGHT_TRAIN_PATH = 'archive/Annotations/Annotations/nightTrain/'

#### Merge all different annotation files into a single file
Here we will be merging all the data into one dataframe. 

*Note: we will also add the "isNight" feature to split the data such that there is a balance of day and night clips in both the train and test sets.*

In [None]:
train_day = []
for clipName in tqdm(sorted(os.listdir(DAY_TRAIN_PATH))):
    if 'dayClip' not in clipName:
        continue
    df = pd.read_csv(os.path.join(DAY_TRAIN_PATH,clipName,'frameAnnotationsBOX.csv'),sep=';')
    train_day.append(df)
    
train_day_df = pd.concat(train_day,axis=0)
train_day_df['isNight'] = 0
    
train_night = []
for clipName in tqdm(sorted(os.listdir(NIGHT_TRAIN_PATH))):
    if 'nightClip' not in clipName:
        continue
    df = pd.read_csv(os.path.join(NIGHT_TRAIN_PATH,clipName,'frameAnnotationsBOX.csv'),sep=';')
    train_night.append(df)

train_night_df = pd.concat(train_night,axis=0)
train_night_df['isNight'] = 1

df = pd.concat([train_day_df,train_night_df],axis=0)

In [None]:
df.head()

### 3. Data Preprocessing
Here we preprocess the data such that it is easier to work with. We will delete duplicate columns, change the "Filename" column in the dataframe to the full path of the image file, and simplify the annotations to only stop (RED), go (GREEN), and warning (YELLOW).

In [None]:
# Duplicate Columns
np.all(df['Origin file'] == df['Origin track']), np.all(df['Origin frame number'] == df['Origin track frame number'])

In [None]:
# Droppin duplicate columns & "Origin file" as we don't need it
df = df.drop(['Origin file','Origin track','Origin track frame number'],axis=1)

In [None]:
# Here Filename (Location of Image) is different -> Change it to appropriate name
# Ex. dayTraining/dayClip1--00000.jpg -> dayTrain/dayTrain/dayClip1/frames/dayClip1--00000.jpg

def changeFilename(x):
    filename = x.Filename
    isNight = x.isNight
    
    splitted = filename.split('/')
    clipName = splitted[-1].split('--')[0]
    if isNight:
        return os.path.join(DATA_PATH,f'nightTrain/nightTrain/{clipName}/frames/{splitted[-1]}')
    else:
        return os.path.join(DATA_PATH,f'dayTrain/dayTrain/{clipName}/frames/{splitted[-1]}')

df['Filename'] = df.apply(changeFilename,axis=1)

The current dataframe is populated with the following annotations:

In [None]:
df['Annotation tag'].unique()

Simplifying the annotations:

In [None]:
# We will change annotations to only -> stop (RED), go (GREEN) & warning (YELLOW)
label_to_idx = {'go':1, 'warning':2, 'stop': 3}
idx_to_label = {v:k for k,v in label_to_idx.items()}

def changeAnnotation(x):
    if 'go' in x['Annotation tag']:
        return label_to_idx['go']
    elif 'warning' in x['Annotation tag']:
        return label_to_idx['warning']
    elif 'stop' in x['Annotation tag']:
        return label_to_idx['stop']
    
df['Annotation tag'] = df.apply(changeAnnotation,axis=1)

The annotation tags are now represented by an integer value. 1 for 'go', 2 for 'warning', and '3' for stop:

In [None]:
annotation_tags = df['Annotation tag'].unique()
annotation_tags

Let's shorten the column names:

In [None]:
# Changing Column Names
df.columns = ['image_id','label','x_min','y_min','x_max','y_max','frame','isNight']

Let's take a look at our preprocessed data:

In [None]:
df.head()

Let's also take a look at some of the data in the dataset:

In [None]:
fig, ax = plt.subplots(len(annotation_tags),1,figsize=(15,10*len(annotation_tags)))

for i, tag in enumerate(annotation_tags):
    sample = df[df['label']==tag].sample(1)
    bbox = sample[['x_min','y_min','x_max','y_max']].values[0]
    
    image = cv2.imread(sample.image_id.values[0])
    image = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)
    
    cv2.rectangle(image,(bbox[0],bbox[1]),(bbox[2],bbox[3]),(220, 0, 0), 2)
    
    ax[i].set_title(idx_to_label[tag])
    ax[i].set_axis_off()
    ax[i].imshow(image)

Lastly, let's get an idea of how many unique images exist in the dataset:

In [None]:
print("Number of Unique Images: ",df.image_id.nunique(),'/',df.shape[0])

#### Validation Scheme
Since we have video clips (sets of images) that correspond to a single drive, we need to ensure that each clip in its entirety is in either our **train set** or **test set**. This will eliminate any overlapping between the train and test data. Below is each clip in our dataset:

In [None]:
df['clipNames'] = df[['image_id']].applymap(lambda x: x.split('/')[2])
df['clipNames'].unique()

There exist 13 daytime clips and 5 nighttime clips.

In [None]:
def split(df,p=0.25):
    clipNames = sorted(df['clipNames'].unique())

    nightClips = [name for name in clipNames if 'night' in name]
    dayClips = [name for name in clipNames if 'day' in name]

    testNightClipNames = list(np.random.choice(nightClips,int(len(nightClips)*p)))
    testDayClipNames = list(np.random.choice(dayClips,int(len(dayClips)*p)))
    testClipNames = testNightClipNames + testDayClipNames

    trainDayClipNames = list(set(dayClips) - set(testDayClipNames))
    trainNightClipNames = list(set(nightClips) - set(testNightClipNames))
    trainClipNames = trainNightClipNames + trainDayClipNames
    
    train_df = df[df.clipNames.isin(trainClipNames)]
    test_df = df[df.clipNames.isin(testClipNames)]
    
    return train_df, test_df

Using the above function, we split our dataframe into a train and test set by placing approximately a quarter of both `nightClips` and `dayClips` into the test set while placing the remainder into the train set.

In [None]:
train_df, test_df = split(df)

Here is what our data looks like:

In [None]:
train_df.head()

In [None]:
test_df.head()

Confirming that about a quarter of our data is in the test dataframe and the remainder is in the train dataframe.

In [None]:
print("Train shape: ",train_df.shape)
print("Test shape: ",test_df.shape)

#### Train and Validation Split
We further designate data from our train dataframe as our validation dataframe.

In [None]:
train_df, val_df = split(train_df)

In [None]:
train_df.head()

In [None]:
val_df.head()

### 3. Utils
Before we can start fine-tuning, we need to prepare a couple of items.

#### Declare a couple of constants:

In [None]:
EPOCHS = 3
BATCH_SIZE = 4

#### Set up our GPU if available:

In [None]:
!nvidia-smi

In [None]:
if torch.backends.mps.is_available():
    device = torch.device('mps')
elif torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

device

In [None]:
import gc
def report_gpu():
    print(torch.cuda.list_gpu_processes())
    gc.collect()
    torch.cuda.empty_cache()

In [None]:
report_gpu()

#### Create a custom dataset object:
Here, we import our custom traffic lights dataset defined in a separate file. It is structured this way to avoid multithreading issues with `num_workers > 0`.

In [None]:
from traffic_lights_dataset import TrafficLightsDataset

#### Average loss

In [None]:
# Average loss -> (Total-Loss / Total-Iterations)
class LossAverager:
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

#### Collate function to specify batching
`collate_fn` is used by PyTorch's `DataLoader` to specify how data from the dataset should be batched. PyTorch's default `DataLoader` will stack our samples in `BATCH_SIZE` tuples. The custom batching defined by this collate function will return a tuple of two lists that is of length `BATCH_SIZE`. This is just to make it easier to separate the images and targets later on. Like our custom traffic lights dataset class, we have defined this function in a separate `collate_fn.py`.

In [None]:
# Custom Batching with no collate function your batch data would look like:
# [(img_0, targets_0), (img_1, targets_1), ...]
# but with the collate function it would be more like
# [(img_0, img_1), (targets_0, targets_1), ...]

# def collate_fn(batch):
#    return tuple(zip(*batch))

from collate_fn import collate_fn

#### Augmenting the images
Augmenting the images using various image transformations such as rotations, translations, zooms, and changes in lighting helps to increase the diversity of the training dataset without gathering more data points. The goal is to prevent the likelihood of our model overfitting. Remember, we imported the albumentations library as `A`.

In [None]:
# Albumentations

# For Train Data
def getTrainTransform():
    return A.Compose([
        A.Resize(height=512, width=512, p=1),
        A.Flip(0.5),
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

# For Validation Data
def getValTransform():
    return A.Compose([
        A.Resize(height=512, width=512, p=1),
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

# For Test Data
def getTestTransform():
    return A.Compose([
        A.Resize(height=512, width=512, p=1),
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

#### Data Loaders using our custom dataset class

In [None]:
trainDataset = TrafficLightsDataset(train_df,getTrainTransform())
valDataset = TrafficLightsDataset(val_df,getValTransform())
testDataset = TrafficLightsDataset(test_df,getTestTransform())

In [None]:
trainDataLoader = DataLoader(
    trainDataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=4,
    collate_fn=collate_fn
)

valDataLoader = DataLoader(
    valDataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=4,
    collate_fn=collate_fn
)

testDataLoader = DataLoader(
    testDataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=4,
    collate_fn=collate_fn
)

#### Checking our Data Pipeline

In [None]:
images, targets, image_ids = next(iter(trainDataLoader))

boxes = targets[0]['boxes'].cpu().numpy().astype(np.int32)
image = images[0].permute(1,2,0).cpu().numpy()

In [None]:
def displayImage(image, boxes):
    fig, ax = plt.subplots(1, 1, figsize=(16, 8))

    for box in boxes:
        cv2.rectangle(image,
                      (box[0], box[1]),
                      (box[2], box[3]),
                      (220, 0, 0), 3)

    ax.set_axis_off()
    ax.imshow(image)

    plt.show()

Here is an example of one of our augmented images:

In [None]:
displayImage(image,boxes)

### 4. Model
Here, we import from PyTorch's torchvision library FasterRCNN pretrained on the COCO dataset.

In [None]:
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

N_CLASS = 4  # 3 classes (Stop, Warning, Go) + Background

# Number of Input Features for the Classifier Head
INP_FEATURES = model.roi_heads.box_predictor.cls_score.in_features

# New Head for Classification
model.roi_heads.box_predictor = FastRCNNPredictor(INP_FEATURES, N_CLASS)

### 5. Training

In [None]:
report_gpu()

In [None]:
model.to(device)

params = [p for p in model.parameters() if p.requires_grad]
# Optimizers
optimizer = torch.optim.Adam(params)

# LR Scheduler
lr_scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer)

In [None]:
report_gpu()

In [None]:
model.load_state_dict(torch.load('fasterrcnn_resnet50_fpn.pth', map_location=torch.device('cpu')))

### 9. Inference

In [None]:
model.eval()
images, targets, image_ids = next(iter(testDataLoader))
images = torch.stack(images).to(torch.device('cpu'))

outputs = model(images)

In [None]:
def filterBoxes(output,nms_th=0.3,score_threshold=0.5):
    
    boxes = output['boxes']
    scores = output['scores']
    labels = output['labels']
    
    # Non Max Supression
    mask = nms(boxes,scores,nms_th)
    
    boxes = boxes[mask]
    scores = scores[mask]
    labels = labels[mask]
    
    boxes = boxes.data.cpu().numpy().astype(np.int32)
    scores = scores.data.cpu().numpy()
    labels = labels.data.cpu().numpy()
    
    mask = scores >= score_threshold
    boxes = boxes[mask]
    scores = scores[mask]
    labels = labels[mask]
    
    return boxes, scores, labels

In [None]:
def displayPredictions(image_id,output,nms_th=0.3,score_threshold=0.5):
    
    boxes,scores,labels = filterBoxes(output,nms_th,score_threshold)
    
    # Preprocessing
    image = cv2.imread(image_id)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
    image = cv2.resize(image,(512,512))
    image /= 255.0
    
    fig, ax = plt.subplots(1, 1, figsize=(16, 8))

    colors = {1:(0,255,0), 2:(255,255,0), 3:(255,0,0)}
    
    for box,label in zip(boxes,labels):
        image = cv2.rectangle(image,
                      (box[0], box[1]),
                      (box[2], box[3]),
                      colors[label], 2)

    ax.set_axis_off()
    ax.imshow(image)

    plt.show()

In [None]:
displayPredictions(image_ids[2],outputs[2],0.2,0.4)

### 10. Conclusion

There are many optimization improvements that can be made to the:
- Augmentation techniques
- Validation of the FasterRCNN model

There are also some changes I would like to make in the future:
- Increase the robustness of the model by including go on left and stop on left detection.
- Include my own dataset in training and compare it to the results I currently have to see if I can notice a difference (driving in my area).

There were also minor hiccups when trying to use the metal performance shaders (MPS) package to train the model on my local machine (which runs Apple silicon). This is simply due to the MPS backend not supporting the `aten::hardsigmoid` operator. This is unfortunate due to the fact that the FasterRCNN architecture relies on this operation. A list of currently unsuported operators can be found in the issues section of the official PyTorch GitHub Repository [here](https://github.com/pytorch/pytorch/issues/77764). As a result, I ended up using a free CUDA GPU on Google Colab, running the training script there, and downloading the model locally.