In [57]:
import os
import random
import torch
import torch.nn as nn
from PIL import Image
import torch.optim as optim
import matplotlib.pyplot as plt
import torch.nn.functional as F
from sklearn.manifold import TSNE
from torchvision import transforms
from torch.utils.data import Subset
from sklearn.metrics import accuracy_score
from torch.utils.data import random_split
from torch.utils.data import Dataset, DataLoader
from concurrent.futures import ThreadPoolExecutor
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import ReduceLROnPlateau

# **Checking GPU**

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(torch.cuda.get_device_name(device))

NVIDIA GeForce RTX 2060


# **Histogram Image Size**

In [52]:
def process_image(file_path):
    try:
        with Image.open(file_path) as img:
            width, height = img.size
            return width, height
    except Exception as e:
        print(f"Error processing file {file_path}: {e}")
        return None

def collect_dimensions(image_dir):
    dimensions = []

    with ThreadPoolExecutor() as executor:
        futures = []
        for file_name in os.listdir(image_dir):
            file_path = os.path.join(image_dir, file_name)
            if os.path.isfile(file_path) and file_name.lower().endswith(('png', 'jpg', 'jpeg', 'bmp', 'tiff')):
                futures.append(executor.submit(process_image, file_path))

        for future in futures:
            result = future.result()
            if result:
                dimensions.append(result)

    return dimensions

def plot_histogram(dimensions, title):
    heights = [dim[1] for dim in dimensions]
    widths = [dim[0] for dim in dimensions]

    plt.figure(figsize=(10, 6))
    plt.scatter(heights, widths, alpha=0.7, label="Image Dimensions")
    plt.title(title)
    plt.xlabel("Height (pixels)")
    plt.ylabel("Width (pixels)")
    plt.grid(True)
    plt.legend()
    plt.show()

train_dir = "D:\\Computer Vision\\FYP\\TASK 1\\env\\TrackNet-X\\DataSet\\VeRi\\image_train"
test_dir = "D:\\Computer Vision\\FYP\\TASK 1\\env\\TrackNet-X\\DataSet\\VeRi\\image_test"

train_dimensions = collect_dimensions(train_dir)
test_dimensions = collect_dimensions(test_dir)

if train_dimensions:
    plot_histogram(train_dimensions, "Training Images: Height vs Width")
else:
    print("No dimensions collected for training images.")

if test_dimensions:
    plot_histogram(test_dimensions, "Test Images: Height vs Width")
else:
    print("No dimensions collected for test images.")

Average dimensions in training images: (243.56360844936205, 214.27788660066705)
Average dimensions in test images: (243.71741946627515, 217.27705328612143)


# **Dataset Loading & Triplet Generator**

In [3]:
class VERIDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []

        for img_name in os.listdir(data_dir):
            if img_name.endswith('.jpg'):
                self.image_paths.append(img_name)
                car_id = int(img_name.split('_')[0])
                self.labels.append(car_id)

        # Create a mapping for car IDs to indices
        self.id_to_indices = {}
        for idx, label in enumerate(self.labels):
            if label not in self.id_to_indices:
                self.id_to_indices[label] = []
            self.id_to_indices[label].append(idx)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, index):
        img_path = os.path.join(self.data_dir, self.image_paths[index])
        label = self.labels[index]
        img = Image.open(img_path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, label

    def get_triplet(self):
        ids_list = random.sample(self.id_to_indices.keys(), 1)[0]
        anchor_idx = random.choice(self.id_to_indices[ids_list])
        positive_idx = random.choice(self.id_to_indices[ids_list])

        # Ensure negative is from a different ID
        negative_ids = list(self.id_to_indices.keys())
        negative_ids.remove(ids_list)
        negative_id = random.choice(negative_ids)
        negative_idx = random.choice(self.id_to_indices[negative_id])

        return anchor_idx, positive_idx, negative_idx

In [4]:
class TripletDataset(Dataset):
    def __init__(self, veri_dataset):
        self.veri_dataset = veri_dataset

    def __len__(self):
        return len(self.veri_dataset)

    def __getitem__(self, index):
        anchor_idx, positive_idx, negative_idx = self.veri_dataset.get_triplet()
        anchor, _ = self.veri_dataset[anchor_idx]
        positive, _ = self.veri_dataset[positive_idx]
        negative, _ = self.veri_dataset[negative_idx]
        return anchor, positive, negative

# **Custom CNN**

In [5]:
class TripletCNN(nn.Module):
    def __init__(self):
        super(TripletCNN, self).__init__()

        # First convolution layer: 7x7 kernel, stride 5, padding 3, 12 channels
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=12, kernel_size=7, stride=5, padding=3)
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=1)  # No downsampling here

        # Second convolution layer: 3x3 kernel, stride 1, padding 1, 24 channels
        self.conv2 = nn.Conv2d(in_channels=12, out_channels=24, kernel_size=3, stride=1, padding=1)

        # Third convolution layer: 3x3 kernel, stride 1, padding 1, 32 channels (no downsampling)
        self.conv3 = nn.Conv2d(in_channels=24, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)  # Reduces to 32x32

        # Fourth convolution layer: 3x3 kernel, stride 1, padding 1, 64 channels (no downsampling)
        self.conv4 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)

        # Adaptive pooling to prevent size issues
        self.global_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Flatten()

    def forward(self, x):
        x1 = F.relu(self.conv1(x))
        x2 = self.pool1(x1)

        x3 = F.relu(self.conv2(x2))
        x4 = F.relu(self.conv3(x3))
        x5 = self.pool2(x4)

        x6 = F.relu(self.conv4(x5))
        x7 = self.global_pool(x6)
        x8 = self.fc(x7)

        return x8

# **Triplet Loss**

In [None]:
class TripletLoss(nn.Module):
    def __init__(self, margin=1.0):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        pos_dist = torch.nn.functional.pairwise_distance(anchor, positive)
        neg_dist = torch.nn.functional.pairwise_distance(anchor, negative)
        loss = torch.clamp(pos_dist - neg_dist + self.margin, min=0.0).mean()
        return loss

# **Training**

In [6]:
# def train_model(model, train_loader, val_loader, optimizer, criterion, device, epochs=10):
#     model.to(device)
#     for epoch in range(epochs):
#         model.train()
#         train_loss = 0.0
#         for anchor, positive, negative in train_loader:
#             anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)
#             optimizer.zero_grad()
#             anchor_out = model(anchor)
#             positive_out = model(positive)
#             negative_out = model(negative)
#             loss = criterion(anchor_out, positive_out, negative_out)
#             loss.backward()
#             optimizer.step()
#             train_loss += loss.item()
#         print(f"Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss / len(train_loader):.4f}")

In [12]:
def calculate_accuracy(anchor_out, positive_out, negative_out):
    pos_dist = torch.nn.functional.pairwise_distance(anchor_out, positive_out)
    neg_dist = torch.nn.functional.pairwise_distance(anchor_out, negative_out)
    correct = (pos_dist < neg_dist).sum().item()
    accuracy = correct / anchor_out.size(0)
    return accuracy

In [11]:
def train_model(model, train_loader, optimizer, criterion, device, epochs=10):
    model.to(device)
    train_losses = []
    train_accuracies = []

    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        train_accuracy = 0.0
        for anchor, positive, negative in train_loader:
            anchor, positive, negative = anchor.to(device), positive.to(device), negative.to(device)
            optimizer.zero_grad()
            anchor_out = model(anchor)
            positive_out = model(positive)
            negative_out = model(negative)

            # Calculate loss and backpropagate
            loss = criterion(anchor_out, positive_out, negative_out)
            loss.backward()
            optimizer.step()

            accuracy = calculate_accuracy(anchor_out, positive_out, negative_out)

            train_loss += loss.item()
            train_accuracy += accuracy

        # Calculate average loss and accuracy for training
        train_loss /= len(train_loader)
        train_accuracy /= len(train_loader)
        train_losses.append(train_loss)
        train_accuracies.append(train_accuracy)

        print(f"Epoch {epoch + 1}/{epochs} - Train Loss: {train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")
        
    return train_losses, train_accuracies

# **Accuracy and Loss Curves**

In [None]:
def plot_learning_curves(train_losses, train_accuracies):
    epochs = range(1, len(train_losses) + 1)

    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, train_losses, label='Train Loss', color='blue', marker='o')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training Loss')
    plt.grid(True)
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs, train_accuracies, label='Train Accuracy', color='green', marker='o')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy (%)')
    plt.title('Training Accuracy')
    plt.grid(True)
    plt.legend()

    plt.tight_layout()
    plt.show()

# **Main Function**

In [13]:
def main():
    data_dir = r'D:\Computer Vision\FYP\TASK 1\env\TrackNet-X\DataSet\VeRi\image_train'
    transform = transforms.Compose([transforms.Resize((196, 196)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

    full_dataset = VERIDataset(data_dir, transform)

    train_triplet_dataset = TripletDataset(full_dataset)
    train_loader = DataLoader(train_triplet_dataset, batch_size=64, shuffle=True)

    # Initialize the model, optimizer, and loss function
    model = TripletCNN()
    optimizer = optim.Adam(model.parameters(), lr=0.0001)
    criterion = TripletLoss()

    train_losses, train_accuracies = train_model(model, train_loader, optimizer, criterion, device, epochs=10)

    plot_learning_curves(train_losses, train_accuracies)

    torch.save(model.state_dict(), 'triplet_cnn.pth')

if __name__ == "__main__":
    main()

since Python 3.9 and will be removed in a subsequent version.
  ids_list = random.sample(self.id_to_indices.keys(), 1)[0]


Epoch 1/10 - Train Loss: 0.5859, Train Accuracy: 0.7473
Epoch 2/10 - Train Loss: 0.4586, Train Accuracy: 0.8041
Epoch 3/10 - Train Loss: 0.4261, Train Accuracy: 0.8198
Epoch 4/10 - Train Loss: 0.4051, Train Accuracy: 0.8285
Epoch 5/10 - Train Loss: 0.3863, Train Accuracy: 0.8370
Epoch 6/10 - Train Loss: 0.3717, Train Accuracy: 0.8436
Epoch 7/10 - Train Loss: 0.3586, Train Accuracy: 0.8485
Epoch 8/10 - Train Loss: 0.3439, Train Accuracy: 0.8547
Epoch 9/10 - Train Loss: 0.3292, Train Accuracy: 0.8598
Epoch 10/10 - Train Loss: 0.3221, Train Accuracy: 0.8642


# **Before & After Trining**

In [60]:
def plot_triplet_embeddings(model, dataset, device, before_training=True, title_suffix=""):
    """
    Plot embeddings before or after training for a triplet network using the anchor images.
    
    Args:
        model (nn.Module): The triplet model to generate embeddings.
        dataset (TripletDataset): The dataset containing triplets.
        device (torch.device): The device (CPU or GPU) for computation.
        before_training (bool): If True, generates embeddings using an untrained model.
        title_suffix (str): Additional string to append to the plot title.
    """
    model.to(device)
    if not before_training:
        model.eval()
    
    embeddings = []
    labels = []

    with torch.no_grad():
        for anchor, _, label in dataset:  # Only use anchor and its label
            anchor = anchor.unsqueeze(0).to(device)  # Add batch dimension
            embedding = model(anchor).cpu().numpy()  # Get embedding
            embeddings.append(embedding)
            labels.append(label)

    # Convert to NumPy arrays
    embeddings = np.vstack(embeddings)
    labels = np.array(labels)

    # Use t-SNE for dimensionality reduction to 2D
    tsne = TSNE(n_components=2, random_state=42)
    reduced_embeddings = tsne.fit_transform(embeddings)

    # Plot embeddings
    plt.figure(figsize=(10, 8))
    unique_labels = np.unique(labels)
    for label in unique_labels:
        indices = np.where(labels == label)
        plt.scatter(reduced_embeddings[indices, 0], reduced_embeddings[indices, 1], label=f"Class {label}")
    
    state = "Before Training" if before_training else "After Training"
    plt.title(f"{state} Embeddings {title_suffix}")
    plt.legend()
    plt.xlabel("t-SNE Dimension 1")
    plt.ylabel("t-SNE Dimension 2")
    plt.grid(True)
    plt.show()

# **Prediction and Testing**

In [51]:
def load_model(model, model_path):
    model.load_state_dict(torch.load(model_path))
    model.eval()
    return model

# Function to preprocess images
def preprocess_image(image_path, transform):
    img = Image.open(image_path).convert('RGB')
    img = transform(img)
    img = img.unsqueeze(0)  # Add batch dimension (1, C, H, W)
    return img

def compute_distance(model, image1_path, image2_path, transform, margin=1.0, device='cpu'):
    image1 = preprocess_image(image1_path, transform).to(device)
    image2 = preprocess_image(image2_path, transform).to(device)
    
    with torch.no_grad():
        feature1 = model(image1)
        feature2 = model(image2)
    
    distance = F.pairwise_distance(feature1, feature2).item()
    similarity = distance < margin
    
    if similarity:
        print(f"Images are similar (distance: {distance:.4f})")
    else:
        print(f"Images are different (distance: {distance:.4f})")
    
    return distance, similarity

def main():

    image1_path = r'D:\Computer Vision\FYP\TASK 1\env\TrackNet-X\DataSet\VeRi\image_test\0135_c012_00060325_0.jpg'  
    image2_path = r'D:\Computer Vision\FYP\TASK 1\env\TrackNet-X\DataSet\VeRi\image_test\0135_c011_00069430_0.jpg'
    
    transform = transforms.Compose([transforms.Resize((196, 196)),transforms.ToTensor(),transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])])

    model = TripletCNN().to(device)
    model = load_model(model, model_path)
    
    distance, similarity = compute_distance(model, image1_path, image2_path, transform, margin=1.0, device=device)

if __name__ == "__main__":
    main()

Images are different (distance: 1.5968)


  model.load_state_dict(torch.load(model_path))


In [None]:
data_dir = r'D:\Computer Vision\FYP\TASK 1\env\TrackNet-X\DataSet\VeRi\image_train'
transform = transforms.Compose([
    transforms.Resize((196, 196)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Prepare dataset
full_dataset = VERIDataset(data_dir, transform)
dataset = TripletDataset(full_dataset)

# Load model
model_path = 'triplet_cnn.pth' 
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = TripletCNN().to(device)
model = load_model(model, model_path)

# Plot embeddings after training
plot_triplet_embeddings(model, dataset, device, before_training=False, title_suffix="(Trained Model)")

  model.load_state_dict(torch.load(model_path))
since Python 3.9 and will be removed in a subsequent version.
  ids_list = random.sample(self.id_to_indices.keys(), 1)[0]
