Import necessary packages

In [1]:
import os
import json
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision.models as models
import torch.optim as optim
from torchvision import transforms
from torchvision.models import resnet50
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from sklearn.metrics.pairwise import cosine_similarity

import warnings
warnings.filterwarnings("ignore")

Load dataset

In [2]:
# Define the relative paths of the image and annotation folders
train_reid_path = r'dataset\plain_re-ID\atrw_reid_train'
train_anno_reid_path = r'dataset\plain_re-ID\atrw_anno_reid_train\reid_list_train.csv'

test_reid_path = r'dataset\plain_re-ID\atrw_reid_test'
test_anno_reid_path = r'dataset\plain_re-ID\atrw_anno_reid_test\reid_list_test.csv'

# Define the absolute paths of the image and annotation folders
train_reid_path = os.path.abspath(train_reid_path)
train_anno_reid_path = os.path.abspath(train_anno_reid_path)

test_reid_path = os.path.abspath(test_reid_path)
test_anno_reid_path = os.path.abspath(test_anno_reid_path)

# load training dataset
data = pd.read_csv(train_anno_reid_path, names=['id', 'image_number'])
X = np.array(data['image_number'])
y = np.array(data['id'])

Create custom dataset class

In [8]:
class MyDataset(Dataset):
    def __init__(self, data, labels=None, transform=None, target_transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image = self.data[idx]

        if self.transform:
            image = self.transform(image)

        if self.labels is not None:
            label = self.labels[idx]
            if self.target_transform:
                label = self.target_transform(label)
            return image, label
        else:
            return image

        return image, label

Preprocess the dataset

In [5]:
# Create a mapping from original IDs to sequential numbers
# This step is to avoid target out of bound issues
id_to_int = {tiger_id: i for i, tiger_id in enumerate(set(y))}
int_to_id = {i: tiger_id for tiger_id, i in id_to_int.items()}

def target_transform(tiger_id):
    """Transform the tiger_id to a sequential integer."""
    return id_to_int[tiger_id]

In [12]:
data_transform = transforms.Compose([
    transforms.Lambda(lambda file_path: os.path.join(train_reid_path, file_path)),  
    transforms.Lambda(lambda abs_img_path: Image.open(abs_img_path).convert("RGB")),
    transforms.Resize((256, 128)),
    transforms.ToTensor(),
])

test_data_transform = transforms.Compose([
    transforms.Lambda(lambda file_path: os.path.join(test_reid_path, file_path)),  
    transforms.Lambda(lambda abs_img_path: Image.open(abs_img_path).convert("RGB")),
    transforms.Resize((256, 128)),
    transforms.ToTensor(),
])

batch_size = 32
train_dataset = MyDataset(data=X, labels=y, transform=data_transform, target_transform=target_transform)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)

Create and train ResNet50

In [21]:
# Create the model
resnet_model = resnet50(pretrained=True)

# Freeze layers in the base model
for param in resnet_model.parameters():
    param.requires_grad = False

# Modify the fc layer to match the number of classes in dataset
num_classes = len(set(y))
resnet_model.fc = nn.Linear(2048, num_classes)

# Move the model to the GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet_model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(resnet_model.fc.parameters(), lr=0.001)

# Train the model
num_epochs = 10
for epoch in range(num_epochs):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        # Forward, backward, and optimize
        outputs = resnet_model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

# Save the model
model_directory_path = os.path.abspath(r'model')
model_path = os.path.join(model_directory_path, 'resnet50.pth')
torch.save(resnet_model.state_dict(), model_path)

Load testing dataset

In [13]:
test_data = pd.read_csv(test_anno_reid_path, names=['image_number'])
X_test = np.array(test_data['image_number'])

test_dataset = MyDataset(X_test, transform=test_data_transform)
test_loader = DataLoader(test_dataset, batch_size=42, shuffle=False)

Helper methods for generating results

In [27]:
def extract_features(model, dataloader):
    features_list = []
    model.eval()
    with torch.no_grad():
        for data in dataloader:
            if isinstance(data, tuple):
                inputs = data[0]
            else:  # Handles cases where data is just image
                inputs = data

            outputs = model(inputs)
            features_list.append(outputs)

    features = torch.cat(features_list, dim=0)
    return features

def rank_gallery(query_feature, gallery_features):
    distances = torch.norm(gallery_features - query_feature, dim=1)
    return distances.argsort(descending=False).tolist()

def filename_to_id(filename):
    return int(filename.split('.')[0])

Generating results

In [28]:
# Load model
resnet50 = models.resnet50(pretrained=False) 
num_classes = len(set(y))
resnet50.fc = nn.Linear(resnet50.fc.in_features, num_classes)
model_path = r"model/resnet50.pth"
resnet50.load_state_dict(torch.load(model_path))

# Move the model to the GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
resnet50.to(device)

# Extract features from the pre-trained model
features = extract_features(resnet50, test_loader)

# Generate results
filenames = test_data['image_number'].tolist()
results = []
for i, feature in enumerate(features):
    query_feature = feature.unsqueeze(0)
    
    # Taking all features except the current query feature
    gallery_features = torch.cat([features[:i], features[i+1:]], dim=0)
    
    # Extracting IDs from filenames except the current query
    gallery_ids = [filename_to_id(filenames[j]) for j in range(len(filenames)) if j != i]

    ranking = rank_gallery(query_feature, gallery_features)
    ranked_ids = [gallery_ids[idx] for idx in ranking]
    
    results.append({"query_id": filename_to_id(filenames[i]), "ans_ids": ranked_ids})

# results = []
# for i, feature in enumerate(features):
#     query_feature = feature.unsqueeze(0)
    
#     # Taking all features except the current query feature
#     gallery_features = torch.cat([features[:i], features[i+1:]], dim=0)
#     gallery_ids = list(range(i)) + list(range(i+1, len(features)))

#     ranking = rank_gallery(query_feature, gallery_features)
#     ranked_ids = [gallery_ids[idx] for idx in ranking]
#     results.append({"query_id": i, "ans_ids": ranked_ids})

Save the results to json

In [24]:
output_file_path = r'output/resnet50_predictions_2.json'
with open(output_file_path, 'w') as f:
    json.dump(results, f, indent=4)