In [1]:
import os

import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torchvision.transforms as transforms

import numpy as np
import matplotlib.pyplot as plt
import cv2
from sklearn.model_selection import train_test_split

import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, utils, datasets
from torch.utils.data import Dataset, DataLoader, SubsetRandomSampler
from sklearn.metrics import classification_report, confusion_matrix

print(torch.__version__)

1.10.1+cpu


In [38]:
data_path = "./data"
data_path_zebra = os.path.join(data_path, "zebra_dataset")
data_path_without = os.path.join(data_path_zebra, "without")
data_path_with = os.path.join(data_path_zebra, "with")

device = "cuda" if torch.cuda.is_available() else "cpu"

In [55]:
# Collected image paths and labels
images = []
labels = []

label_without = 0
label_with = 1

# Collect paths of images WITHOUT pedestrians
for image_name in os.listdir(data_path_without):
    images.append(os.path.join(data_path_without, image_name))
    labels.append(label_without)

# Collect paths of images WITH pedestrians
for image_name in os.listdir(data_path_with):
    images.append(os.path.join(data_path_with, image_name))
    labels.append(label_with)

images[:10]

['./data\\zebra_dataset\\without\\0_cam_image_array_.jpg',
 './data\\zebra_dataset\\without\\1000_cam_image_array_.jpg',
 './data\\zebra_dataset\\without\\1001_cam_image_array_.jpg',
 './data\\zebra_dataset\\without\\1002_cam_image_array_.jpg',
 './data\\zebra_dataset\\without\\1003_cam_image_array_.jpg',
 './data\\zebra_dataset\\without\\1004_cam_image_array_.jpg',
 './data\\zebra_dataset\\without\\1005_cam_image_array_.jpg',
 './data\\zebra_dataset\\without\\1006_cam_image_array_.jpg',
 './data\\zebra_dataset\\without\\1007_cam_image_array_.jpg',
 './data\\zebra_dataset\\without\\1008_cam_image_array_.jpg']

In [56]:
X_train, X_test, y_train, y_test = train_test_split(images, labels, test_size=0.1, shuffle=True, random_state=2022)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.2, shuffle=True, random_state=2022)

In [42]:
class PedestrianDataset(Dataset):
    def __init__(self, img_paths, img_labels, transform=None):
        self.img_paths = img_paths
        self.img_labels = img_labels
        self.transform = transform

    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        image_bgr = cv2.imread(self.img_paths[idx])
        while image_bgr is None:  # Failed to load the image
            idx += 1
            image_bgr = cv2.imread(self.img_paths[idx])

        image_rgb = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2RGB)
        if self.transform:
            image_rgb = self.transform(image_rgb)

        label = torch.Tensor([self.img_labels[idx]])

        return image_rgb, label

In [24]:
def get_mean_std(loader):
    channels_sum, channels_squared_sum, num_batches = 0, 0, 0

    for data, _ in loader:
        channels_sum += torch.mean(data, dim=[0, 2, 3])
        channels_squared_sum += torch.mean(data ** 2, dim=[0, 2, 3])
        num_batches += 1

    mean = channels_sum / num_batches
    std = (channels_squared_sum / num_batches - mean ** 2) ** 0.5

    return mean, std

In [28]:
data_transform = transforms.Compose([
    transforms.ToTensor(),
])

train_dataset = PedestrianDataset(X_train, y_train, transform=data_transform)
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

mean, std = get_mean_std(train_dataloader)
print(f"Mean: {mean}")
print(f"Std: {std}")

Mean: tensor([-0.0003, -0.0005, -0.0003])
Std: tensor([1.0001, 1.0000, 0.9998])


In [57]:
data_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.4251, 0.4787, 0.4311],
        std=[0.2203, 0.2276, 0.2366]
    ),
])


train_dataset = PedestrianDataset(X_train, y_train, transform=data_transform)
val_dataset = PedestrianDataset(X_val, y_val, transform=data_transform)
test_dataset = PedestrianDataset(X_test, y_test, transform=data_transform)

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)

In [79]:
class PedestrianDetector(nn.Module):
    def __init__(self):
        super(PedestrianDetector, self).__init__()

        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3)
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        self.dropout1 = nn.Dropout2d(p=0.1)

        self.conv2 = nn.Conv2d(in_channels=64, out_channels=32, kernel_size=3)
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        self.dropout2 = nn.Dropout2d(p=0.1)

        self.conv3 = nn.Conv2d(in_channels=32, out_channels=16, kernel_size=3)
        self.pool3 = nn.MaxPool2d(kernel_size=2)
        self.dropout3 = nn.Dropout2d(p=0.1)

        self.fc = nn.Linear(3744, 1)

    def forward(self, x):
        x = torch.relu(self.pool1(self.dropout1(self.conv1(x))))
        x = torch.relu(self.pool2(self.dropout2(self.conv2(x))))
        x = torch.relu(self.pool3(self.dropout3(self.conv3(x))))
        x = torch.sigmoid(self.fc(x.view(x.shape[0], -1)))
        return x


model = PedestrianDetector()
print(model)

image_batch, label_batch = next(iter(train_dataloader))
pred_batch = model(image_batch)

print(f"Image Batch Shape: {image_batch.shape}")
print(f"Label Batch Shape: {label_batch.shape}")
print(f"Predict Batch Shape: {pred_batch.shape}")

PedestrianDetector(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout1): Dropout2d(p=0.1, inplace=False)
  (conv2): Conv2d(64, 32, kernel_size=(3, 3), stride=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout2): Dropout2d(p=0.1, inplace=False)
  (conv3): Conv2d(32, 16, kernel_size=(3, 3), stride=(1, 1))
  (pool3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (dropout3): Dropout2d(p=0.1, inplace=False)
  (fc): Linear(in_features=3744, out_features=1, bias=True)
)
Image Batch Shape: torch.Size([64, 3, 120, 160])
Label Batch Shape: torch.Size([64, 1])
Predict Batch Shape: torch.Size([64, 1])


In [73]:
def get_accuracy(y_prob, y_true, threshold=0.5):
    assert y_true.size() == y_prob.size()
    y_prob = y_prob > threshold
    return (y_true == y_prob).sum().item() / y_true.size(0)

In [80]:
n_epochs = 10
log_frequency = 10

model = PedestrianDetector()
model.to(device)

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)

loss_history = []
accuracy_history = []

for epoch in range(n_epochs):
    running_loss = 0.0
    running_accuracy = 0.0
    model.train()
    for step, (image_batch, label_batch) in enumerate(train_dataloader):
        image_batch, label_batch = image_batch.to(device), label_batch.to(device)

        optimizer.zero_grad()

        pred = model(image_batch)

        loss = criterion(pred, label_batch)

        loss.backward()

        optimizer.step()

        current_loss = loss.item()
        current_accuracy = get_accuracy(pred, label_batch)

        loss_history.append(current_loss)
        accuracy_history.append(current_accuracy)

        running_loss += current_loss
        running_accuracy += current_accuracy
        if (step + 1) % log_frequency == 0:
            print(f'Train [{epoch + 1}/{n_epochs}, {step + 1}/{len(train_dataloader)}]: Loss {running_loss / log_frequency}, Accuracy {running_accuracy / log_frequency}')
            running_loss = 0.0
            running_accuracy = 0.0

    running_loss = 0
    model.eval()
    for step, (image_batch, label_batch) in enumerate(val_dataloader):
        image_batch, label_batch = image_batch.to(device), label_batch.to(device)

        pred = model(image_batch)

        loss = criterion(pred, label_batch)

        current_loss = loss.item()
        current_accuracy = get_accuracy(pred, label_batch)

        loss_history.append(current_loss)
        accuracy_history.append(current_accuracy)

        running_loss += current_loss
        running_accuracy += current_accuracy
        if (step + 1) % log_frequency == 0:
            print(f'Val [{epoch + 1}/{n_epochs}, {step + 1}/{len(val_dataloader)}]: Loss {running_loss / log_frequency}, Accuracy {running_accuracy / log_frequency}')
            running_loss = 0.0
            running_accuracy = 0.0

Train [1/10, 10/61]: Loss 0.6946838319301605, Accuracy 0.471875
Train [1/10, 20/61]: Loss 0.6868078112602234, Accuracy 0.5578125
Train [1/10, 30/61]: Loss 0.684821343421936, Accuracy 0.4875
Train [1/10, 40/61]: Loss 0.6690944790840149, Accuracy 0.703125
Train [1/10, 50/61]: Loss 0.6524347066879272, Accuracy 0.7828125
Train [1/10, 60/61]: Loss 0.6210629165172576, Accuracy 0.8171875
Val [1/10, 10/16]: Loss 0.5879678785800934, Accuracy 0.8328125
Train [2/10, 10/61]: Loss 0.5996087253093719, Accuracy 0.7625
Train [2/10, 20/61]: Loss 0.5461850166320801, Accuracy 0.825
Train [2/10, 30/61]: Loss 0.5034505933523178, Accuracy 0.8375
Train [2/10, 40/61]: Loss 0.44292531311511996, Accuracy 0.8640625
Train [2/10, 50/61]: Loss 0.3633457660675049, Accuracy 0.9046875


KeyboardInterrupt: 

In [None]:
model.eval()
test_loss = 0.0
test_accuracy = 0.0
for step, (image_batch, label_batch) in enumerate(test_dataloader):
    image_batch, label_batch = image_batch.to(device), label_batch.to(device)

    pred = model(image_batch)

    loss = criterion(pred, label_batch)

    test_loss += loss.item()
    test_accuracy += get_accuracy(pred, label_batch)

test_loss /= len(test_dataloader)
test_accuracy /= len(test_dataloader)

print(f"Test Loss: {test_loss}")
print(f"Test Accuracy: {test_accuracy}")

In [47]:
torch.save(model, "model.pth")