## Import libraries

In [None]:
import pickle
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.data import Dataset, DataLoader, Subset
from sklearn.model_selection import train_test_split, KFold
import torchvision.transforms as transforms
import torchvision.transforms.functional as TF
from PIL import Image


## Configuratioin

In [None]:
np.random.seed(42)
torch.manual_seed(42)

In [None]:
K_FOLDS = 5
NUM_EPOCHS = 1000
BATCH_SIZE = 32
LEARNING_RATE = 5e-5
WEIGHT_DECAY = 1
DROPOUT_RATE = 0.3
PATIENCE = 30
SCHEDULER_FACTOR = 0.1
SCHEDULER_PATIENCE = 15
EPOCHS_PER_PRINT = 1

TRAIN_DATA_PATH = 'train.pkl'
TEST_DATA_PATH = 'test.pkl'
MODEL_PATH = 'best_model.pth'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

## Load datas

In [None]:
with open(TRAIN_DATA_PATH, "rb") as f:
    full_train_data = pickle.load(f)
with open(TEST_DATA_PATH, "rb") as f:
    test_data = pickle.load(f)

In [None]:
train_df, test_df = train_test_split(full_train_data, test_size=0.1, random_state=42)

## Preprocessing

In [None]:
# Add random flips, rotations, jitters, etc. to the training images.
train_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomAffine(degrees=8, translate=(0.1, 0.1), scale=(0.9, 1.1), shear=5),
    # transforms.RandomPerspective(distortion_scale=0.1, p=0.5),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5]),
    # transforms.RandomErasing(p=0.5, scale=(0.02, 0.2), ratio=(0.3, 3.3), value=0),
])
test_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

In [None]:
# Customize the dataset
class RPSDataset(Dataset):
    def __init__(self, df, labels_available=True, transform=None, is_train=False):
        self.data = df
        self.id = np.array(self.data['id'], dtype=np.int64)
        self.image1 = np.array(self.data['img1'].tolist(), dtype=np.uint8)
        self.image2 = np.array(self.data['img2'].tolist(), dtype=np.uint8)
        self.labels_available = labels_available
        if labels_available:
            self.labels = np.array(self.data['label'], dtype=np.int64)
        else:
            self.labels = None
        self.transform = transform
        self.is_train = is_train

    def __len__(self):
        return len(self.image1)
    
    def __getitem__(self, idx):
        img1_np = self.image1[idx]
        img2_np = self.image2[idx]
        id_val = self.id[idx]

        if self.transform:
            img1 = self.transform(img1_np)
            img2 = self.transform(img2_np)
        else:
            img1 = torch.from_numpy(img1_np.astype(np.float32)).unsqueeze(0) / 255.0
            img2 = torch.from_numpy(img2_np.astype(np.float32)).unsqueeze(0) / 255.0

        sample = {'img1': img1, 'img2': img2, 'id': id_val}
        if self.labels_available:
            sample['label'] = self.labels[idx]
        return sample


In [None]:
train_dataset = RPSDataset(train_df, labels_available=True, transform=train_transform, is_train=True)
validation_dataset = RPSDataset(test_df, labels_available=True, transform=test_transform, is_train=False)
test_dataset = RPSDataset(test_data, labels_available=False, transform=test_transform, is_train=False)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
validation_loader = DataLoader(validation_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

## Set up the model structure

In [None]:
# Set up the squeeze-excitation block
class SEBlock(nn.Module):
    def __init__(self, in_channels, reduction=16):
        super(SEBlock, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(in_channels, in_channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(in_channels // reduction, in_channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        batch_size, channels, _, _ = x.size()
        y = self.avg_pool(x).view(batch_size, channels)
        y = self.fc(y).view(batch_size, channels, 1, 1)
        return x * y.expand_as(x)

In [None]:
# Set up the residual block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1, use_se=True):
        super(ResidualBlock, self).__init__()
        self.use_se = use_se
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        if use_se:
            self.se_block = SEBlock(out_channels)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.use_se:
            out = self.se_block(out)

        out += self.shortcut(x)
        out = self.relu(out)
        return out


In [None]:
# Set up the base feature extractor
class BaseFeatureExtractor(nn.Module):
    def __init__(self, output_dim=256, use_se=True):
        super(BaseFeatureExtractor, self).__init__()
        self.in_channels = 32

        self.conv1 = nn.Conv2d(1, self.in_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU(inplace=True)

        self.layer1 = self._make_layer(ResidualBlock, 64, num_blocks=2, stride=2, use_se=use_se)

        self.layer2 = self._make_layer(ResidualBlock, 128, num_blocks=2, stride=2, use_se=use_se)

        self.layer3 = self._make_layer(ResidualBlock, 256, num_blocks=2, stride=2, use_se=use_se)

        self.layer4 = self._make_layer(ResidualBlock, 512, num_blocks=2, stride=1, use_se=use_se)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, output_dim)

    def _make_layer(self, block, out_channels, num_blocks, stride=1, use_se=True):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride, use_se))
            self.in_channels = out_channels
        return nn.Sequential(*layers)

    def forward(self, x):
        
        x = self.relu(self.bn1(self.conv1(x)))
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x


In [None]:
# Set up the Siamese network
class SiameseNetwork(nn.Module):
    def __init__(self, feature_output_dim=256, dropout=DROPOUT_RATE, use_se=True, interaction='concat'):
        super(SiameseNetwork, self).__init__()
        self.base_feature_extractor = BaseFeatureExtractor(output_dim=feature_output_dim, use_se=use_se)
        self.interaction = interaction

        if interaction == 'concat':
            self.classifier_input_dim = feature_output_dim * 2
        elif interaction == 'abs':
            self.classifier_input_dim = feature_output_dim
        elif interaction == 'both':
            self.classifier_input_dim = feature_output_dim * 3
        else:
            raise ValueError("Invalid interaction type.")

        self.fc1 = nn.Linear(self.classifier_input_dim, 128)
        self.bn1 = nn.BatchNorm1d(128)
        self.Dropout1 = nn.Dropout(dropout)
        self.fc2 = nn.Linear(128, 64)
        self.bn2 = nn.BatchNorm1d(64)
        self.Dropout2 = nn.Dropout(dropout / 2)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, input1, input2):
        output1 = self.base_feature_extractor(input1)
        output2 = self.base_feature_extractor(input2)

        if self.interaction == 'concat':
            combined = torch.cat((output1, output2), dim=1)
        elif self.interaction == 'abs':
            combined = torch.abs(output1 - output2)
        elif self.interaction == 'both':
            combined = torch.cat((output1, output2, torch.abs(output1 - output2)), dim=1)
        else:
            raise ValueError("Invalid interaction type.")

        x = self.fc1(combined)
        x = self.bn1(x)
        x = F.relu(x)
        x = self.Dropout1(x)

        x = self.fc2(x)
        x = self.bn2(x)
        x = F.relu(x)
        x = self.Dropout2(x)

        x = self.fc3(x)

        return x


## Train the model

In [None]:
def train_model(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    for batch in dataloader:
        img1 = batch['img1'].to(device)
        img2 = batch['img2'].to(device)
        labels = batch['label'].to(device)

        optimizer.zero_grad()
        outputs = model(img1, img2)

        loss = criterion(outputs.squeeze(), labels.float().squeeze())
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * labels.size(0)
        predicted = torch.sign(outputs.squeeze())
        predicted[predicted == 0] = -1
        correct_predictions += (predicted == labels.float().squeeze()).sum().item()
        total_samples += labels.size(0)

    epoch_acc = correct_predictions / total_samples
    epoch_loss = running_loss / total_samples
    return epoch_loss, epoch_acc

def validate_model(model, dataloader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct_predictions = 0
    total_samples = 0

    with torch.no_grad():
        for batch in dataloader:
            img1 = batch['img1'].to(device)
            img2 = batch['img2'].to(device)
            labels = batch['label'].to(device)

            outputs = model(img1, img2)
            loss = criterion(outputs.squeeze(), labels.float().squeeze())

            running_loss += loss.item() * labels.size(0)
            predicted = torch.sign(outputs.squeeze())
            predicted[predicted == 0] = -1
            correct_predictions += (predicted == labels.float().squeeze()).sum().item()
            total_samples += labels.size(0)

    epoch_acc = correct_predictions / total_samples
    epoch_loss = running_loss / total_samples
    return epoch_loss, epoch_acc

In [None]:
def run_training(model, train_loader, validation_loader, criterion, optimizer, 
                 device, num_epochs=NUM_EPOCHS, model_save_path=MODEL_PATH,
                 use_scheduler=True, patience = PATIENCE, scheduler_factor=SCHEDULER_FACTOR, 
                 scheduler_patience=SCHEDULER_PATIENCE, print_every=EPOCHS_PER_PRINT):
    best_accuracy = 0
    history = {'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': [], 'lr': []}

    scheduler = None
    if use_scheduler:
        scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor = scheduler_factor, 
                                                   patience=scheduler_patience)

    patience = patience
    patience_counter = 0

    print("Start training ...")
    
    for epoch in range(num_epochs):
        train_loss, train_acc = train_model(model, train_loader, criterion, optimizer, device)
        val_loss, val_acc = validate_model(model, validation_loader, criterion, device)
        current_lr = optimizer.param_groups[0]['lr']
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['lr'].append(current_lr)

        if (epoch+1) % print_every == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}, "
                f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

        if use_scheduler:
            scheduler.step(val_acc)
            
        if val_acc > best_accuracy:
            best_accuracy = val_acc
            torch.save(model.state_dict(), model_save_path)
            print(f"Model saved with accuracy: {best_accuracy:.4f}")
            best_model = model.state_dict()
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping triggered after {patience_counter} epochs without improvement.")
                break
    
    print("Training complete.")
    print(f"Best validation accuracy: {best_accuracy:.4f}")
    return best_model, history, best_accuracy, model

Run the block below to see the training process

In [None]:
# import itertools

# learning_rates = [1e-6]
# weight_decays = [1e-6]
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# dropout_rates = [0.3]

# parameter_combinations = list(itertools.product(learning_rates, weight_decays, dropout_rates))
# results = []

# for lr, wd, dr in parameter_combinations:
#     print(f"Training with learning rate: {lr}, weight decay: {wd}, dropout rate: {dr}")
#     model = SiameseNetwork(dropout=dr).to(device)
#     model.load_state_dict(torch.load("all_best_model/residual_4layers_lr_5e-05_wd_1_dr_0.3.pth")) # Comment this line to train from scratch
#     criterion = nn.SoftMarginLoss()
#     optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)
#     model_path = f"residual_4layers_lr_{lr}_wd_{wd}_dr_{dr}.pth"

#     final_model, history, best_acc, last_model = run_training(model, train_loader, validation_loader, criterion, optimizer, 
#                                                   device, NUM_EPOCHS, model_path, use_scheduler=True, patience=PATIENCE)
    
#     results.append({'lr': lr, 'wd': wd, 'dr': dr, 'best_val_acc': best_acc, 'history': history})
#     print(f"Best validation accuracy for lr={lr}, wd={wd}, dr={dr}: {best_acc:.4f}")

#     del model
#     del optimizer
#     if torch.cuda.is_available():
#         torch.cuda.empty_cache()


## Generate predicted values

In [None]:
best_model_1 = SiameseNetwork(dropout=0.3).to(device)
best_model_1.load_state_dict(torch.load("residual_4layers_lr_5e-05_wd_1_dr_0.3.pth"))
best_model_1.eval()
tta_predictions = []
test_ids_tta = []

with torch.no_grad():
    for batch in test_loader:
        img1 = batch['img1'].to(device)
        img2 = batch['img2'].to(device)
        ids = batch['id']

        output_original = best_model_1(img1, img2).squeeze()

        img1_hf = TF.hflip(img1)
        img2_hf = TF.hflip(img2)
        output_hf = best_model_1(img1_hf, img2_hf).squeeze()

        if output_original.dim() == 0:
            output_original = output_original.unsqueeze(0)
        if output_hf.dim() == 0:
            output_hf = output_hf.unsqueeze(0)

        average_output = (output_original + output_hf) / 2.0

        tta_predictions.extend(average_output.cpu().numpy())

        if isinstance(ids, torch.Tensor):
            test_ids_tta.extend(ids.cpu().numpy().tolist())
        else:
            test_ids_tta.extend(ids)

final_predictions_tta = np.sign(np.array(tta_predictions))
final_predictions_tta[final_predictions_tta == 0] = -1

submission_df = pd.DataFrame({'id': test_ids_tta, 'label': final_predictions_tta.astype(int)})
submission_df.to_csv('submission.csv', index=False)

In [None]:
# model.eval()
# predictions = []
# test_ids = []

# with torch.no_grad():
#     for batch in test_loader:
#         img1 = batch['img1'].to(device)
#         img2 = batch['img2'].to(device)
#         ids = batch['id']

#         outputs = model(img1, img2)
#         predicted = torch.sign(outputs.squeeze())
#         predicted[predicted == 0] = -1

#         predicted = predicted.cpu().numpy()
#         ids = ids.numpy()

#         predictions.extend(predicted)
#         if isinstance(ids, torch.Tensor):
#             test_ids.extend(ids.cpu().numpy().tolist())
#         else:
#             test_ids.extend(ids)

# submission_df = pd.DataFrame({'id': test_ids, 'label': predictions})
# submission_df['label'] = submission_df['label'].astype(int)
# submission_df.to_csv('submission.csv', index=False)
# print("Submission file created: submission.csv")
# # Save the model
# torch.save(model.state_dict(), "siamese_model.pth")
# print("Model saved as siamese_model.pth")

## Acknowledgments / AI Assistance

- The selection of the Siamese Neural Network comes from the google search lab. The query we put was 'what neural network structure possess the ability to compare two images?' And the answer was 'A Siamese Neural Network is a neural network structure specifically designed for comparing two images. It consists of two identical sub-networks, each processing one of the input images, and then the outputs of these sub-networks are compared to determine the similarity between the two images. '
- We utilize some techniques like Squeeze-Excitation and residual block to improve our Siamese network's performance. We drawn the idea from online resources, notably articles by Paul-Louis Pröve and Neetu Sigger Choudhary on Medium: [Squeeze-and-Excitation Networks](https://medium.com/data-science/squeeze-and-excitation-networks-9ef5e71eacd7), [A Comprehensive Guide to Understanding and Implementing Bottleneck Residual Blocks](https://medium.com/@neetu.sigger/a-comprehensive-guide-to-understanding-and-implementing-bottleneck-residual-blocks-6b420706f66b).
- We also ask Gemini to help refine and debug our SE block and residual block with prompt below:

    class SEBlock(nn.Module):

        def __init__(self, input_channels, reduction):
            super(SEBlock, self).__init__()
            self.fc1 = nn.Linear(input_channels, input_channels // reduction)
            self.fc2 = nn.Linear(input_channels // reduction, input_channels)
            self.sigmoid = nn.Sigmoid()

        def forward(self, x):
            batch_size, channels, _, _ = x.size()
            y = F.adaptive_avg_pool2d(x, 1).view(batch_size, channels)
            y = F.relu(self.fc1(y))
            yn = self.sigmoid(self.fc2(y)).view(batch_size, channels, 1, 1)
            return x * y

    class ResidualBlock(nn.Module):

        def __init__(self, input_channels, reduction):
            super(ResidualBlock, self).__init__()
            self.conv1 = nn.Conv2d(input_channels, input_channels, kernel_size=3, padding=1)
            self.bn1 = nn.BatchNorm2d(input_channels)
            self.se = squeeze_excite_block(input_channels, reduction)
            self.conv2 = nn.Conv2d(input_channels, input_channels, kernel_size=3, padding=1)
            self.bn2 = nn.BatchNorm2d(input_channels)

        def forward(self, x):
            out = self.conv1(x)
            out = self.bn1(out)
            out = F.relu(out)
            out = self.se(out)
            out = self.conv2(out)
            out = self.bn2(out)
            out += x
            return F.relu(out)

    help us refine the SEBlock and residual block according to the code file.