In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as transforms
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from PIL import ImageOps

from PIL import Image
from torch.utils.data import DataLoader

In [None]:
class SiameseDataset():
    def __init__(self, training_csv=None, training_dir=None, transform=None):
        # Load image file paths and labels from the CSV file
        self.train_df = pd.read_csv(training_csv, header=None)
        self.train_df.columns = ["image1", "image2", "label"]
        self.train_dir = training_dir
        self.transform = transform

    def __getitem__(self, index):
        image1_path = os.path.join(self.train_dir, self.train_df.iat[index, 0])
        image2_path = os.path.join(self.train_dir, self.train_df.iat[index, 1])

        img1 = Image.open(image1_path)
        img2 = Image.open(image2_path)

        img1 = img1.convert("L")
        img2 = img2.convert("L")

        if self.transform is not None:
            img1 = self.transform(img1)
            img2 = self.transform(img2)

        label = int(self.train_df.iat[index, 2])

        return (
            img1,
            img2,
            torch.from_numpy(
                np.array([int(self.train_df.iat[index, 2])], dtype=np.float32)
            ),
        )

    def __len__(self):
        return len(self.train_df)

In [None]:
# class SiameseDataset():
#     def __init__(self, training_csv=None, training_dir=None, transform=None):
#         # Load image file paths and labels from the CSV file
#         self.train_df = pd.read_csv(training_csv, header=None)
#         self.train_df.columns = ["image1", "image2", "label"]
#         self.train_dir = training_dir
#         self.transform = transform

#     def adjust_pixel_values(self, image):
#         # Điều chỉnh giá trị của ảnh tại đây
#         image_array = np.array(image)
#         threshold_value = 245
#         adjusted_array = np.where(image_array < threshold_value, image_array, 255)
#         return Image.fromarray(adjusted_array.astype('uint8'))

#     def __getitem__(self, index):
#         image1_path = os.path.join(self.train_dir, self.train_df.iat[index, 0])
#         image2_path = os.path.join(self.train_dir, self.train_df.iat[index, 1])

#         img1 = Image.open(image1_path)
#         img2 = Image.open(image2_path)

#         img1 = img1.convert("L")
#         img2 = img2.convert("L")

#         img1 = self.adjust_pixel_values(img1)
#         img2 = self.adjust_pixel_values(img2)

#         if self.transform is not None:
#             img1 = self.transform(img1)
#             img2 = self.transform(img2)

#         label = int(self.train_df.iat[index, 2])

#         return (
#             img1,
#             img2,
#             torch.from_numpy(
#                 np.array([int(self.train_df.iat[index, 2])], dtype=np.float32)
#             ),
#         )

#     def __len__(self):
#         return len(self.train_df)


In [None]:
class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 48, kernel_size=11, stride=1),
            nn.BatchNorm2d(48),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2),

            nn.Conv2d(48, 128, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2),
            nn.Dropout(p=0.3),

            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True),

            nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(3, stride=2),
            nn.Dropout(p=0.3),
        )
        self.fc = nn.Sequential(
            nn.Linear(25*17*128, 1024),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.5),

            nn.Linear(1024, 128),
            nn.ReLU(inplace=True),
        )

    def forward_once(self, x):
        output = self.cnn(x)
        output = output.view(output.size()[0], -1)
        output = self.fc(output)
        return output

    def forward(self, input1, input2):
        output1 = self.forward_once(input1)
        output2 = self.forward_once(input2)
        return output1, output2

In [None]:
# class SiameseNetwork(nn.Module):
#     def __init__(self):
#         super(SiameseNetwork, self).__init__()
#         # Define CNN layers
#         self.cnn = nn.Sequential(
#             nn.Conv2d(1, 96, kernel_size=11, stride=1),
#             nn.ReLU(inplace=True),
#             nn.LocalResponseNorm(5,alpha=0.0001,beta=0.75,k=2),
#             nn.MaxPool2d(3, stride=2),

#             nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2),
#             nn.ReLU(inplace=True),
#             nn.LocalResponseNorm(5,alpha=0.0001,beta=0.75,k=2),
#             nn.MaxPool2d(3, stride=2),
#             nn.Dropout(p=0.3),

#             nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1),
#             nn.ReLU(inplace=True),

#             nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),
#             nn.ReLU(inplace=True),
#             nn.MaxPool2d(3, stride=2),
#             nn.Dropout(p=0.3),
#         )
#         # Fully connected layers
#         self.fc = nn.Sequential(
#             nn.Linear(25*17*256, 1024),
#             nn.ReLU(inplace=True),
#             nn.Dropout(p=0.5),

#             nn.Linear(1024, 128),
#             nn.ReLU(inplace=True),
#         )

#     def forward_once(self, x):
#         output = self.cnn(x)
#         output = output.view(output.size()[0], -1)
#         output = self.fc(output)
#         return output

#     def forward(self, input1, input2):
#         output1 = self.forward_once(input1)
#         output2 = self.forward_once(input2)
#         return output1, output2

In [None]:
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=2.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, output1, output2, label):
        euclidean_distance = output1 - output2

        loss_contrastive = torch.mean((1-label) * torch.pow(euclidean_distance, 2) + (
            label) * torch.pow(torch.clamp(self.margin - euclidean_distance, min=0.0), 2))

        return loss_contrastive

In [None]:
# tranform images
image_transforms = transforms.Compose(
    [transforms.Resize((155, 220)), transforms.ToTensor()])

In [None]:
# show plot
def show_plot(train_losses, val_losses, epochs):
    plt.figure(figsize=(10, 6))
    plt.plot(range(1, epochs+1), train_losses, label='Train Loss')
    plt.plot(range(1, epochs+1), val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Train and Validation Loss')
    plt.show()

In [None]:

train_dir = '/kaggle/input/update-sign-12/chuky/chuky'
train_csv = '/kaggle/input/update-sign-12/train2.csv'
validate_csv = '/kaggle/input/update-sign-12/validate2.csv'

epochs = 90

model = SiameseNetwork().cuda()

train_dataset = SiameseDataset(train_csv, train_dir, transform=image_transforms)
train_dataloader = DataLoader(train_dataset, shuffle=True, num_workers=4,
                      pin_memory=True, batch_size=16)

validate_dataset = SiameseDataset(validate_csv, train_dir, transform=image_transforms)
validate_dataloader = DataLoader(validate_dataset, shuffle=True, num_workers=4,
                      pin_memory=True, batch_size=16)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=0.0005)
criterion = ContrastiveLoss()

def train(train_dataloader):
    loss=[]
    model.train()
    for i, data in enumerate(train_dataloader,0):
      img0, img1 , label = data
      img0, img1 , label = img0.cuda(), img1.cuda() , label.cuda()
      optimizer.zero_grad()
      output1,output2 = model(img0,img1)
      loss_contrastive = criterion(output1,output2,label)
      loss_contrastive.backward()
      optimizer.step()
      loss.append(loss_contrastive.item())
    loss = np.array(loss)
    return loss.mean() / len(train_dataloader)

def validate(validate_dataloader):
    loss=[]
    model.eval()
    for i, data in enumerate(validate_dataloader,0):
      img0, img1 , label = data
      img0, img1 , label = img0.cuda(), img1.cuda() , label.cuda()
      output1,output2 = model(img0,img1)
      loss_contrastive = criterion(output1,output2,label)
      loss.append(loss_contrastive.item())
    loss = np.array(loss)
    return loss.mean() / len(validate_dataloader)

train_losses = []
val_losses = []

for epoch in range(0, epochs):
  train_loss = train(train_dataloader) 
  val_loss = validate(validate_dataloader)
  print('Epoch ', epoch+1)
  print(f"Training loss {train_loss}")
  print(f"Validate loss {val_loss}")
  train_losses.append(train_loss)
  val_losses.append(val_loss)
  # Tạo thư mục lưu trữ nếu nó chưa tồn tại
  save_dir = "/kaggle/working/checkpoint"
  os.makedirs(save_dir, exist_ok=True)
    
  file_path = f"{save_dir}/model_epoch_{epoch+1}.pt"
  torch.save(model.state_dict(), file_path)

show_plot(train_losses, val_losses, epochs)

In [None]:
model_checkpoint = SiameseNetwork().cuda()
checkpoint = torch.load('/kaggle/input/checkpoint/model_epoch_19.pt')
model_checkpoint.load_state_dict(checkpoint)

train_dir = '/kaggle/input/dataset-dir/train/train'
train_csv = '/kaggle/input/train-val-csv/new_train.csv'
validate_csv = '/kaggle/input/train-val-csv/new_validate.csv'

train_dataset = SiameseDataset(training_csv=train_csv, training_dir=train_dir, transform=image_transforms)
validate_dataset = SiameseDataset(training_csv=validate_csv, training_dir=train_dir, transform=image_transforms)

train_dataloader = DataLoader(train_dataset, shuffle=True, num_workers=2,pin_memory=True, batch_size=22)
validate_dataloader = DataLoader(validate_dataset, shuffle=True, num_workers=2,pin_memory=True, batch_size=22)

print(len(train_dataloader), len(validate_dataloader))

def compute_distance_optimal(predictions, labels):
    distance_max = np.max(predictions)
    distance_min = np.min(predictions)
#     print('distance_max = ', distance_max)
#     print('distance_min = ', distance_min)

    num_same = np.sum(labels == 0)
    num_different = np.sum(labels == 1)
#     print('num_same = ', num_same)
#     print('num_different = ', num_different)

    step = 0.0005
    max_accuracy = 0

    distance_optimal = 0
    for distance in np.arange(distance_min, distance_max + step, step):
        same_signature_mask = predictions.ravel() <= distance
        different_signature_mask = predictions.ravel() > distance

        true_positive_rate = float(np.sum(labels[same_signature_mask] == 0)) / num_same
        true_negative_rate = float(np.sum(labels[different_signature_mask] == 1)) / num_different

        accuracy = 0.5 * (true_positive_rate + true_negative_rate)

        if accuracy > max_accuracy:
            max_accuracy = accuracy
            distance_optimal = distance

    return max_accuracy, distance_optimal

def compute_total_distance_optimal():
    total_accuracy = 0
    total_distance_optimal = 0
    num_batch = 0
    model_checkpoint.eval()
    for batch_index, data in enumerate(validate_dataloader, 0):
        img1, img2, label = data
        output1, output2 = model_checkpoint(img1.cuda(), img2.cuda())
        eucledian_distance = F.pairwise_distance(output1, output2)
        batch_max_acc, distance_optimal = compute_distance_optimal(eucledian_distance.cpu().detach().numpy(), label.detach().numpy())
#         print('Độ chính xác cao nhất của batch {} = {} tại distance_optimal = {} \n'.format(batch_index+1, batch_max_acc, distance_optimal))
        total_accuracy += batch_max_acc
        total_distance_optimal += distance_optimal
        num_batch += 1
    average_accuracy = total_accuracy / num_batch
    average_distance_optimal = total_distance_optimal / num_batch
    return average_accuracy, average_distance_optimal

average_accuracy, threshold = compute_total_distance_optimal()
print('Trung bình độ chính xác trên tất cả batch = {}, threshold = {}'.format(average_accuracy, threshold))

In [None]:
model = SiameseNetwork().cuda()
checkpoint = torch.load('/kaggle/input/checkpoint/model_epoch_19.pt')
model.load_state_dict(checkpoint)

test_dir = '/kaggle/input/dataset-dir/test/test'
test_csv = '/kaggle/input/new-test-csv/test_data.csv'

train_dir = '/kaggle/input/dataset-dir/train/train'
train_csv = '/kaggle/input/train-val-csv/new_train.csv'
validate_csv = '/kaggle/input/train-val-csv/new_validate.csv'

test_dataset = SiameseDataset(training_csv=test_csv, training_dir=test_dir, transform=image_transforms)
test_dataloader = DataLoader(test_dataset, shuffle=True, batch_size=16)

train_dataset = SiameseDataset(training_csv=train_csv, training_dir=train_dir, transform=image_transforms)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=16)

validate_dataset = SiameseDataset(training_csv=validate_csv, training_dir=train_dir, transform=image_transforms)
validate_dataloader = DataLoader(validate_dataset, shuffle=True, batch_size=16)

threshold = 5.5899291904739785

def test_with_threshold(dataloader, len_dataset):
    TP = 0
    TN = 0
    model.eval()
    count_true_predict = 0  # Số lượng dự đoán đúng
    from_wrong_to_right = 0  # Số lượng dự đoán sai thành đúng
    from_right_to_wrong = 0  # Số lượng dự đoán đúng thành sai
    for batch_index, data in enumerate(dataloader, 0):
        img1, img2, label = data
        output1, output2 = model(img1.cuda(), img2.cuda())
        euclidean_distance = F.pairwise_distance(output1, output2)

        predictions = (euclidean_distance <= threshold).float()
        predictions = 1 - predictions
        predicted_labels = np.array(predictions.cpu(), dtype=np.float32)
        true_labels = np.array(label).flatten()

        count_true_predict += np.sum(predicted_labels == true_labels)

        for i in range(len(true_labels)):
            if predicted_labels[i] != true_labels[i]:
                if predicted_labels[i] == 1:
                    from_right_to_wrong += 1
            if predicted_labels[i] == true_labels[i] and predicted_labels[i] == 0:
                    TP += 1

    from_wrong_to_right = len_dataset - count_true_predict - from_right_to_wrong
    FP = from_wrong_to_right
    FN = from_right_to_wrong
    
    precision = TP / (TP + FP)
    recall = TP / (TP + FN)
    F1_score = 2 * (precision * recall) / (precision + recall)

    print(f'Tổng số lượng dự đoán: {len_dataset}')
    print(f'Số lượng dự đoán đúng: {count_true_predict}')
    print(f'Số lượng dự đoán sai: {len_dataset - count_true_predict}')
    print(f'Số lượng dự đoán sai thành đúng: {from_wrong_to_right}')
    print(f'Số lượng dự đoán đúng thành sai: {from_right_to_wrong}')
    print(f'Độ chính xác: {(count_true_predict / len_dataset) * 100}')
    print(f'F1-score: {F1_score}')

test_with_threshold(test_dataloader, len(test_dataset))