In [1]:
# Imports
import os
import sys
import torch
import random
import numpy as np
np.set_printoptions(threshold=sys.maxsize)
import torchvision
from tqdm import tqdm
import torch.nn as nn
import torch.optim as optim
import matplotlib as mpl
mpl.rcParams['figure.dpi'] = 300
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torchvision import transforms as T
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
from torchvision import datasets
from torch.optim.lr_scheduler import StepLR
from sklearn.metrics import classification_report
from utils.one_shot import Prediction
# from torchsummary import summary

# from pytorch_metric_learning import losses
# Lion Optimizer
# from lion_pytorch import Lion


# Utilities
from utils.nutil import Utility
from utils.persiandb import PersianDataset
from utils.threshold_types import ThresholdType
from utils.optim_acc import optim_accuracy
from model.architecture import *

# Load src/.env file
from dotenv import load_dotenv
import warnings
load_dotenv()
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

warnings.filterwarnings('ignore')

In [2]:
# Configurations
main_db_path            = os.getenv('MAIN_DB_PATH')
preprocessed_data_path  = os.getenv('PREPROCESSED_DATA_PATH')
training_csv            = os.getenv('TRAINING_CSV')
testing_csv             = os.getenv('TESTING_CSV')
anchor_data_path        = os.getenv('ANCHOR_PATH')

TRANSFORM_METHOD = T.Compose([T.ToTensor(), T.Resize(224)])
BATCH_SIZE = 32
LEARNING_RATE = 1e-5
# WEIGHT_DECAY = 0.05
STEPLR_GAMMA = 0.1
EPOCHS = 35

# For Utility class
SAMPLES_PER_CLASS = 15
THRESHOLD = ThresholdType.BINARY_INV
RUN_UTILITY = True

# Utility functiont to generate preprocessed images and csv files
if RUN_UTILITY:
    util = Utility(db_path=main_db_path,
                   anchor_path=anchor_data_path,
                   preprocessed_path=preprocessed_data_path,
                   threshold_type=THRESHOLD)

    util.load_db_per_image(images_per_character=SAMPLES_PER_CLASS, save_preprocessed=False, save_as_csv=False, shuffle=True, ratio = (1, 1))
    parent_class, one_shot_anchor, _ = util.one_shot_db()

In [3]:
one_shot_test = ['0641_26.tif']

In [4]:
# Datasets
persian_train = PersianDataset(training_csv, preprocessed_data_path, TRANSFORM_METHOD)
persian_test  = PersianDataset(testing_csv, preprocessed_data_path, TRANSFORM_METHOD)

# def seed_worker(worker_id):
#     worker_seed = torch.initial_seed() % 2**32
#     np.random.seed(worker_seed)
#     random.seed(worker_seed)

# g = torch.Generator()
# g.manual_seed(42)

train_dataloader = DataLoader(persian_train, shuffle=True, num_workers=0, batch_size=BATCH_SIZE)
test_dataloader  = DataLoader(persian_test, shuffle=True, num_workers=0, batch_size=BATCH_SIZE)

In [5]:
# Check if GPU is available
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using {device} device")

Using cuda device


In [6]:
# Contrastive Loss
class ContrastiveLoss(nn.Module):
    """
    Contrastive loss
    Takes embeddings of two samples and a target label == 1 if samples are from the same class and label == 0 otherwise
    """

    def __init__(self, margin):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin
        self.eps = 1e-9

    def forward(self, output1, output2, target, size_average=True):
        distances = (output2 - output1).pow(2).sum(1)  # squared distances
        losses = .5 * (target.float() * distances +
                        (1 + -1 * target).float() * F.relu(self.margin - (distances + self.eps).sqrt()).pow(2))
        return losses.mean() if size_average else losses.sum()

# class ContrastiveLoss(torch.nn.Module):
#     """
#         Contrastive loss function.
#         Based on: `"Learning a Similarity Metric Discriminatively, with Application to Face Verification" <http://yann.lecun.com/exdb/publis/pdf/hadsell-chopra-lecun-06.pdf>`_.
#         Takes embeddings of two samples and a target label == 1 if samples are from the same class and label == 0 otherwise
#     """
#     def __init__(self, margin=1.0):
#         super(ContrastiveLoss, self).__init__()
#         self.margin = margin

#     def forward(self, output1, output2, label):
#         euclidean_distance = F.pairwise_distance(output1, output2)
#         loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) +
#                                       (label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))
#         return loss_contrastive

In [7]:
model = CustomSiameseNetwork2().to(device)
# criterion = nn.BCELoss()
criterion = ContrastiveLoss(1.)
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = StepLR(optimizer, step_size=5, gamma=STEPLR_GAMMA)

In [8]:
counter = []
loss_train = []
loss_test = []
accuracy_train = []
accuracy_test = []
iteration_number = 0

In [9]:
def train(model, device, train_loader, optimizer, criterion, epoch, counter, iteration_number):
    model.train()
    
    running_loss = 0.0
    
    train_loss = 0
    correct = 0
    total = 0
    # use tqdm
    pbar = tqdm(train_loader)
    for batch_idx, (images_1, images_2, targets) in enumerate(pbar):
        images_1, images_2, targets = images_1.to(device), images_2.to(device), targets.to(device)
        
        optimizer.zero_grad()
        output1, output2 = model(images_1, images_2)
        
        loss = criterion(output1, output2, targets)
        train_loss += loss.item()
        distances = F.pairwise_distance(output1, output2)
        similarities = 1 - distances
        predicted = (similarities >= 0.3).int()
        correct += predicted.eq(targets).sum().item()
        total += targets.size(0)

        
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        
        # add to counter and loss
        if batch_idx % 100 == 99:
            # print('Batch %d, Loss: %.4f' % (batch_idx+1, running_loss/100))
            counter.append(iteration_number)
            # loss_history.append(running_loss/100)
            iteration_number += 100
            running_loss = 0.0
        
        pbar.set_description(desc= f'Epoch {epoch} loss={loss.item()} batch_id={batch_idx}')
    
    train_loss /= len(train_loader.dataset)
    accuracy = 100. * correct / total
    accuracy_train.append(accuracy)
    loss_train.append(train_loss)

    print('Train set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)'.format(
        train_loss, correct, total, accuracy))

# def train(model, device, train_loader, optimizer, criterion, epoch, counter, iteration_number):
#     model.train()
    
#     train_loss = 0
#     correct = 0
#     total = 0
#     # use tqdm
#     pbar = tqdm(train_loader)
#     for batch_idx, (images_1, images_2, targets) in enumerate(pbar):
#         images_1, images_2, targets = images_1.to(device), images_2.to(device), targets.to(device)
        
#         optimizer.zero_grad()
#         outputs = model(images_1, images_2).squeeze()
        
#         loss = criterion(outputs, targets)
#         train_loss += loss.item()

#         predicted = torch.where(outputs > 0.5, 1, 0)
#         correct += predicted.eq(targets.view_as(predicted)).sum().item()
#         total += targets.size(0)
#         loss.backward()
#         optimizer.step()
        
        
#         # add to counter and loss
#         if batch_idx % 100 == 99:
#             # print('Batch %d, Loss: %.4f' % (batch_idx+1, running_loss/100))
#             counter.append(iteration_number)
#             # loss_history.append(running_loss/100)
#             iteration_number += 100
        
#         pbar.set_description(desc= f'Epoch {epoch} loss={loss.item()} batch_id={batch_idx}')
    
#     train_loss /= len(train_loader.dataset)
#     accuracy = 100. * correct / total
#     accuracy_train.append(accuracy)
#     loss_train.append(train_loss)

#     print('Train set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)'.format(
#         train_loss, correct, total, accuracy))

In [10]:
# def test(model, device, test_loader, criterion):
#     model.eval()
#     test_loss = 0
#     correct = 0
    
#     # use tqdm
#     pbar = tqdm(test_loader)
#     with torch.no_grad():
#         for (images_1, images_2, targets) in pbar:
#             images_1, images_2, targets = images_1.to(device), images_2.to(device), targets.to(device)
#             outputs = model(images_1, images_2).squeeze()
#             test_loss += criterion(outputs, targets).item()
#             pred = torch.where(outputs > 0.5, 1, 0)
#             correct += pred.eq(targets.view_as(pred)).sum().item()
            
#             pbar.set_description(desc= f'test_loss={test_loss}')
#     test_loss /= len(test_loader.dataset)
#     print(f'Average loss: {test_loss:.4f}, Accuracy: {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.4f}%)')

def test(model, device, test_loader, criterion):

    # Set the model to evaluation mode
    model.eval()

    # Initialize variables to keep track of the loss and accuracy
    test_loss = 0
    # correct = 0
    total = 0
    similarity_collection, target_collection = [], []

    

    # Use tqdm to display the progress of the testing loop
    with tqdm(total=len(test_loader.dataset)) as pbar:
        with torch.no_grad():
            for batch_idx, (images_1, images_2, targets) in enumerate(test_loader):
                # Move the data and target to the GPU
                images_1, images_2, targets = images_1.to(device), images_2.to(device), targets.to(device)

                # Forward pass
                output1, output2 = model(images_1, images_2)
                loss = criterion(output1, output2, targets)

                # Update loss and accuracy
                test_loss += loss.item()
                distances = F.pairwise_distance(output1, output2)
                similarities = 1 - distances

                target_collection.extend(targets.cpu().numpy())
                similarity_collection.extend(similarities.cpu().numpy())
                # predicted = (similarities >= 0.3).int()
                # correct += predicted.eq(targets).sum().item()
                total += targets.size(0)

                # Update tqdm progress bar
                pbar.update(images_1.size(0))
    
    correct, accuracy, threshold = optim_accuracy(similarity_collection, target_collection)
    test_loss /= len(test_loader.dataset)
    accuracy_test.append(accuracy)
    loss_test.append(test_loss)

    # Print the results
    print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)'.format(
        test_loss, correct, total, accuracy))

    return test_loss, accuracy

# def test(model, device, test_loader, criterion):
#     # Set the model to evaluation mode
#     model.eval()

#     # Initialize variables to keep track of the loss and accuracy
#     test_loss = 0
#     # correct = 0
#     total = 0
#     similarity_collection, target_collection = [], []

    

#     # Use tqdm to display the progress of the testing loop
#     with tqdm(total=len(test_loader.dataset)) as pbar:
#         with torch.no_grad():
#             for batch_idx, (images_1, images_2, targets) in enumerate(test_loader):
#                 # Move the data and target to the device
#                 images_1, images_2, targets = images_1.to(device), images_2.to(device), targets.to(device)

#                 # Forward pass
#                 outputs = model(images_1, images_2).squeeze()
#                 loss = criterion(outputs, targets)

#                 # Update loss and accuracy
#                 test_loss += loss.item()

#                 target_collection.extend(targets.cpu().numpy())
#                 similarity_collection.extend(outputs.cpu().numpy())
#                 # predicted = (similarities >= 0.3).int()
#                 # correct += predicted.eq(targets).sum().item()
#                 total += targets.size(0)

#                 # Update tqdm progress bar
#                 pbar.update(images_1.size(0))
    
#     min_val, max_val = np.min(similarity_collection), np.max(similarity_collection)
#     acc_collection = []
#     correct_collection = []

#     for val in np.arange(min_val, max_val, 0.001):
#         temp = np.array(similarity_collection) > val
#         correct = np.sum(temp == np.array(target_collection))
#         correct_collection.append(correct)
#         acc_collection.append(100 * correct / len(similarity_collection))

#     # Compute the average loss and accuracy
#     test_loss /= len(test_loader.dataset)
#     # accuracy = 100. * correct / total
#     accuracy = np.max(acc_collection)
#     accuracy_test.append(accuracy)
#     loss_test.append(test_loss)

#     # Print the results
#     print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)'.format(
#         test_loss, np.max(correct_collection), total, accuracy))

#     return test_loss, accuracy

In [11]:
for epoch in range(1, EPOCHS + 1):
    train(model, device, train_dataloader, optimizer, criterion, epoch, counter, iteration_number)
    # plot_loss()
    test(model, device, test_dataloader, criterion)
    scheduler.step()

  0%|          | 0/313 [00:00<?, ?it/s]


RuntimeError: Input type (c10::Half) and bias type (float) should be the same

In [None]:
# torch.save(model.state_dict(), 'trained_model/customSNN_10sp_final_1.pth')

In [12]:
model.load_state_dict(torch.load('trained_model/customSNN_15sp_final_1.pth'))

<All keys matched successfully>

In [10]:
test(model, device, test_dataloader, criterion)

NameError: name 'test_dataloader' is not defined

In [11]:
OneShotPredictor = Prediction(model, one_shot_anchor, one_shot_test, TRANSFORM_METHOD, preprocessed_data_path, device, parent_class = parent_class)

100%|██████████| 1/1 [00:12<00:00, 12.81s/it]


In [17]:
np.argmax(list(OneShotPredictor._one_shot_result['fecb_26.tif'].values()))

64

In [22]:
OneShotPredictor._one_shot_result['fecb_26.tif']

{'0627': -40.10827445983887,
 'A': -33.01774978637695,
 '06400627': -30.006999492645264,
 '0628': -31.694190979003906,
 'fe90': -33.127503871917725,
 'fe92': -32.588833808898926,
 'fe91': -28.748319625854492,
 'fe96': -21.175569891929626,
 '062a': -20.645509123802185,
 'fe97': -27.141194581985474,
 'fe98': -18.29543375968933,
 '062b': -16.837021112442017,
 'fe9a': -18.552802681922913,
 'fe9b': -23.397119760513306,
 'fe9c': -15.973437905311584,
 'fe9f': -27.630797147750854,
 'fea0': -26.679434061050415,
 'fe9e': -12.42771589756012,
 '062c': -14.874951958656311,
 'fea2': -23.052621126174927,
 'fea4': -7.568445205688477,
 '062d': -24.32414174079895,
 'fea3': -13.407267928123474,
 'fea7': -15.632276058197021,
 'fea8': -6.581384837627411,
 '062e': -17.44503402709961,
 'fea6': -15.529524326324463,
 '062f': -30.79301428794861,
 '0640062f': -25.877999305725098,
 '06400630': -19.530291199684143,
 '0630': -31.18294858932495,
 '0631': -30.43973994255066,
 '06400631': -31.17378878593445,
 '0632': 

In [21]:
list(OneShotPredictor._one_shot_result['fecb_26.tif'].keys())[64]

'fecf'

In [13]:
# print(OneShotPredictor.plot_confusion(contextual=False))
print(OneShotPredictor.make_classification_report(contextual=True))

              precision    recall  f1-score   support

        fecb       0.00      0.00      0.00       1.0
        fecf       0.00      0.00      0.00       0.0

    accuracy                           0.00       1.0
   macro avg       0.00      0.00      0.00       1.0
weighted avg       0.00      0.00      0.00       1.0



In [12]:
import pickle
file_name = 'one_shot_pred_15sp_10epoch_contextual.pkl'
with open(file_name, 'wb') as file:
    pickle.dump(OneShotPredictor, file)
    print(f'Object successfully saved to "{file_name}"')

Object successfully saved to "one_shot_pred_15sp_10epoch_contextual.pkl"


In [15]:
import pickle
file_name = 'one_shot_pred_15sp_10epoch.pkl'
with open(file_name, 'rb') as file:
    OneShotPredictor = pickle.load(file)

In [20]:
from sklearn.metrics import precision_recall_fscore_support
precision_recall_fscore_support(OneShotPredictor._y_true, OneShotPredictor._y_pred, average='weighted')

(0.8490516274164828, 0.8396396396396396, 0.8414155821885497, None)

In [None]:
# result = one_shot_prediction(model, one_shot_anchor, one_shot_test)

In [None]:
# import operator
# from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report, accuracy_score
# y_true = []
# y_pred = []
# label = list(list(result.values())[0].keys())
# for key, value in result.items():
#     y_true.append(key[:-7])
#     y_pred.append(max(value.items(), key=operator.itemgetter(1))[0])
#     # print(key[:-7], max(value.items(), key=operator.itemgetter(1))[0])
# # cm = confusion_matrix(y_true, y_pred, labels=label)
# # disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=label)
# # disp.plot()
# # plt.show()
# print(classification_report(y_true, y_pred))
# print(accuracy_score(y_true, y_pred))

In [None]:
o1, o2 = model(test_dataloader.dataset[0][0].unsqueeze(dim = -4).to(device), test_dataloader.dataset[0][1].unsqueeze(dim = -4).to(device))
simil = 1 - F.pairwise_distance(o1, o2)
simil.detach().cpu().numpy()

In [None]:
def show_samples(model, test_loader, device):
    # Set the model to evaluation mode
    model.eval()

    # Select 10 random pairs from the test dataloader
    num_samples = min(10, len(test_loader.dataset))
    sample_indices = np.random.choice(len(test_loader.dataset), size=num_samples, replace=False)
    sample_images_1 = []
    sample_images_2 = []
    sample_targets = []
    sample_predictions = []

    for idx in sample_indices:
        images_1, images_2, targets = test_loader.dataset[idx]
        sample_images_1.append(images_1)
        sample_images_2.append(images_2)
        sample_targets.append(targets)

        # Move the images to the device and perform the forward pass
        images_1, images_2 = images_1.unsqueeze(0).to(device), images_2.unsqueeze(0).to(device)
        with torch.no_grad():
            output1, output2 = model(images_1, images_2)
        distance = F.pairwise_distance(output1, output2)
        similarity = 1 - distance
        # print(similarity)
        # temp = torch.where(similarity > 0., 1, 0)
        sample_predictions.append(similarity.item())

    # Create a grid of the sampled pairs and their labels
    fig, axes = plt.subplots(nrows=5, ncols=2, figsize=(10, 12))
    fig.suptitle('Sample Test Pairs', fontsize=16)

    for i, ax in enumerate(axes.flatten()):
        # Plot the images side by side
        if i < num_samples:
            image_1 = sample_images_1[i].numpy().transpose((1, 2, 0))
            image_2 = sample_images_2[i].numpy().transpose((1, 2, 0))
            ax.imshow(np.concatenate((image_1, image_2), axis=1))
            ax.axis('off')

            # Show the actual and predicted labels
            actual_label = sample_targets[i].item()
            similarity =  sample_predictions[i]
            ax.set_title(f'Actual: {actual_label}\n Similarity: {similarity:.2f}')

        else:
            # Hide the empty subplot
            ax.axis('off')

    plt.tight_layout()
    plt.show()
    
show_samples(model, test_dataloader, device)

In [None]:
plt.plot(range(1, len(accuracy_train) + 1), accuracy_train, label = 'Training')
plt.plot(range(1, len(accuracy_test) + 1), accuracy_test, label = 'Testing')
plt.xlabel('Epochs', fontweight = 'bold', fontsize = 24)
plt.ylabel('Accuracy', fontweight = 'bold', fontsize = 24)
plt.legend()

### Contrastive Loss
#### 1. (10 Samples per class)
a) Ratio (1:1)
- Output: 10
    > Accuracy : 86.90%

- Output: 40
    > Accuracy : 88.45%

- Output: 64
    > Accuracy : 88.62%

- Output: 256
    > Accuracy : 77.64%

b) Ratio (1:2)

- Output: 256
    > Accuracy : 87.94%

c) Ratio (1:3)

- Output: 256
    > Accuracy : 87.63%

#### 1. (5 Samples per class)
a) Ratio (1:1)
- Output: 10
    > Accuracy : 

- Output: 40
    > Accuracy : 

- Output: 64
    > Accuracy : 

- Output: 256
    > Accuracy : 79.54%

b) Ratio (1:2)

- Output: 256
    > Accuracy : 82.26%


#### 1. (15 Samples per class)
a) Ratio (1:1)
- Output: 10
    > Accuracy :

- Output: 40
    > Accuracy : 

- Output: 64
    > Accuracy : 

- Output: 256
    > Accuracy : 77.80%

b) Ratio (1:2)

- Output: 256
    > Accuracy : 87.94%

c) Ratio (1:3)

- Output: 256
    > Accuracy : 87.63%

### BCE Loss
- Resnet18 (Pretrained = True)
    > Accuracy : 81.80%

- Resnet50 (Pretrained = True)
    > Accuracy : 79.43%

- Resnet101 (Pretrained = True)
    > Accuracy : 84.37%