In [1]:
import os

In [2]:
import cv2
import numpy as np

def clahe1(img, block_size=8, clip_limit=2.0):
    # Convert the input image to grayscale
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # Compute the image size
    h, w = img.shape[:2]

    # Compute the size of each small region
    sx, sy = int(w/block_size), int(h/block_size)

    # Initialize the output image
    out = np.zeros_like(img)

    # Loop over each small region
    for i in range(block_size):
        for j in range(block_size):
            # Compute the coordinates of the current region
            x, y = j*sx, i*sy

            # Extract the current region
            region = img[y:y+sy, x:x+sx]

            # Compute the histogram for the current region
            hist, bins = np.histogram(region.flatten(), 256, [0,256])
            cdf = hist.cumsum()
            cdf_normalized = cdf / cdf[-1]

            # Compute the excess and clip the histogram
            w = block_size*block_size
            excess = 0
            for k in range(256):
                if cdf_normalized[k] > clip_limit/w:
                    excess += cdf_normalized[k] - clip_limit/w
                    cdf_normalized[k] = clip_limit/w
            for k in range(256):
                cdf_normalized[k] += excess/256

            # Reallocate pixel values using the cdf
            region_clahe = np.interp(region.flatten(), bins[:-1], cdf_normalized*255).reshape(region.shape)

            # Insert the enhanced region into the output image
            out[y:y+sy, x:x+sx] = region_clahe

    return out


In [3]:
from numpy.lib.type_check import imag

def clahe2(img):
  image = cv2.resize(img, (500, 600))
  
  # The initial processing of the image
  # image = cv2.medianBlur(image, 3)
  image_bw = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
  
  # The declaration of CLAHE
  # clipLimit -> Threshold for contrast limiting
  clahe = cv2.createCLAHE(clipLimit = 5)
  final_img = clahe.apply(image_bw) + 30
  
  # Ordinary thresholding the same image
  _, ordinary_img = cv2.threshold(image_bw, 155, 255, cv2.THRESH_BINARY)

  med_final = cv2.medianBlur(final_img, 3)


  return med_final

In [4]:
from sklearn.decomposition import PCA

def pca(img):
# Load finger vein image
  #img = cv2.imread('download.png', cv2.IMREAD_GRAYSCALE)

  # Convert image to vector form
  img_1d = img.reshape(-1)

  # Apply PCA
  pca = PCA(n_components=0.5)
  pca.fit(img_1d.reshape(-1, 1))
  img_pca = pca.transform(img_1d.reshape(-1, 1)).reshape(img.shape[0], img.shape[1], -1)

  return img_pca

In [None]:
img_dir = ['1st_session/raw_data', '2nd_session/raw_data']
preprocessed_dir_prefix = 'preprocessed/'
clahe_folder = 'clahe'
pca_folder = 'pca'
if not(os.path.exists(preprocessed_dir_prefix)): os.makedirs(preprocessed_dir_prefix)

for folder in img_dir:
    for folder_name in os.listdir(folder):
        folder_path = os.path.join(folder, folder_name)
        if '.DS_Store' in folder_name:
            continue
        for img_name in os.listdir(folder_path):
            if '.jpg' in img_name:
                roi = cv2.imread(os.path.join(folder_path, img_name))
                img1 = clahe2(roi)
                f_img1 = pca(img1)
                path_clahe = os.path.join(preprocessed_dir_prefix, clahe_folder, folder_path)
                path_pca = os.path.join(preprocessed_dir_prefix, pca_folder, folder_path)
                if not(os.path.exists(path_clahe)): os.makedirs(path_clahe)
                if not(os.path.exists(path_pca)): os.makedirs(path_pca)
                cv2.imwrite(os.path.join(path_clahe, img_name), img1)
                cv2.imwrite(os.path.join(path_pca, img_name), f_img1)
            


In [1]:
import torch
import torch.nn as nn
import numpy as np

In [2]:
class VGGnet(nn.Module):
    def __init__(self):
        super(VGGnet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.avgpool = nn.AdaptiveAvgPool2d((7, 7))
        self.classifier = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 512)
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

In [3]:
import random
import torchvision.datasets as datasets
import torchvision.transforms as transforms

class TripletImageDataset(datasets.ImageFolder):
    def __init__(self, root, transform=None, target_transform=None):
        super(TripletImageDataset, self).__init__(root, transform, target_transform)
        # self.triplets = self.make_triplets()

    def make_triplets(self):
        triplets = []
        for index, (img, label) in enumerate(self.samples):
            anchor_idx = index
            
            for i in range(2):

                positive_indexes = [ind for ind, (_, label_positive) in enumerate(self.samples) if label == label_positive and ind != index]
                positive_idx = random.choice(positive_indexes)

                negative_indexes = [ind for ind, (_, label_positive) in enumerate(self.samples) if label != label_positive]
                negative_idx = random.choice(negative_indexes)

                triplets.append((anchor_idx, positive_idx, negative_idx))
        return triplets

    def __getitem__(self, index):
        # print(index)
        # print(self.samples)
        # print(len(self.triplets))
        # print(self.triplets)
        self.triplets = self.make_triplets()
        anchor, positive, negative = self.triplets[index]
        # print(anchor, positive, negative)
        anchor_img, _ = super(TripletImageDataset, self).__getitem__(anchor)
        positive_img, _ = super(TripletImageDataset, self).__getitem__(positive)
        negative_img, _ = super(TripletImageDataset, self).__getitem__(negative)
        return anchor_img, positive_img, negative_img

    def __len__(self):
        return len(self.triplets)


In [4]:
from torchvision.datasets import ImageFolder
from torchvision import transforms

In [5]:
transform = transforms.Compose([transforms.Grayscale(), transforms.Resize((224, 224)), transforms.RandomRotation((0, 350)), transforms.ToTensor()])
# transform = transforms.Compose([transforms.Resize((224, 224)), transforms.RandomRotation((0, 350)), transforms.ToTensor()])

dataset_train = TripletImageDataset('preprocessed/clahe/1st_session/raw_data/', transform=transform)
dataset_test = ImageFolder('preprocessed/clahe/2nd_session/raw_data/', transform=transform)

In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [7]:
train_dataloader = torch.utils.data.DataLoader(dataset_train, batch_size=16, shuffle=True)
test_dataloader = torch.utils.data.DataLoader(dataset_test, batch_size=16, shuffle=True)

num_classes = len(dataset_train.classes)
num_epochs = 40
learning_rate = 0.005

model = VGGnet().to(device)

optimizer = torch.optim.Adagrad(model.parameters())

In [8]:
import torch
import torch.nn.functional as F

def euclidean_distance(x, y):
    """
    Compute Euclidean distance between two tensors.
    """
    return torch.pow(x - y, 2).sum(dim=1)

def compute_distance_matrix(anchor, positive, negative):
    """
    Compute distance matrix between anchor, positive, and negative samples.
    """
    distance_matrix = torch.zeros(anchor.size(0), 3)
    distance_matrix[:, 0] = euclidean_distance(anchor, anchor)
    distance_matrix[:, 1] = euclidean_distance(anchor, positive)
    distance_matrix[:, 2] = euclidean_distance(anchor, negative)
    return distance_matrix

def batch_all_triplet_loss(anchor, positive, negative, margin=0.2):
    """
    Compute triplet loss using the batch all strategy.
    """
    distance_matrix = compute_distance_matrix(anchor, positive, negative)
    loss = torch.max(torch.tensor(0.0), distance_matrix[:, 0] - distance_matrix[:, 1] + margin)
    loss += torch.max(torch.tensor(0.0), distance_matrix[:, 0] - distance_matrix[:, 2] + margin)
    print(loss)
    return torch.mean(loss)

In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class TripletLoss(nn.Module):
    def __init__(self, margin):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative):
        distance_positive = F.pairwise_distance(anchor, positive, p=2)
        distance_negative = F.pairwise_distance(anchor, negative, p=2)
        losses = F.relu(distance_positive - distance_negative + self.margin)
        return torch.mean(losses)

In [10]:
loss_array = []
for epoch in range(num_epochs):
    for i, (anchor, positive, negative) in enumerate(train_dataloader):
        optimizer.zero_grad()
        anchor = anchor.to(device)
        positive = positive.to(device)
        negative = negative.to(device)
        anchor_embed = model(anchor)
        positive_embed = model(positive)
        negative_embed = model(negative)
        loss = batch_all_triplet_loss(anchor_embed, positive_embed, negative_embed)
        loss_array.append(float(loss))
        loss.backward()
        optimizer.step()

#     if epoch % 100 == 0 and epoch != 0:
#         torch.save(model.state_dict(), 'model_vgg_checkpoint_' + str(epoch) + '.pth')
    print ('Epoch [{}/{}], Loss: {}' 
                   .format(epoch+1, num_epochs, loss.item()))

tensor([0.3665, 0.3680, 0.3662, 0.3678, 0.3685, 0.3684, 0.3700, 0.3669, 0.3692,
        0.3677, 0.3642, 0.3687, 0.3677, 0.3682, 0.3652, 0.3651],
       grad_fn=<AddBackward0>)
tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       grad_fn=<AddBackward0>)
tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       grad_fn=<AddBackward0>)
tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
       grad_fn=<AddBackward0>)


: 

: 

In [None]:
model.save('finger_vein.pth')

In [92]:
model.load_state_dict(torch.load('model_vgg_checkpoint_20.pth', map_location=torch.device('cpu')))

<All keys matched successfully>

In [93]:
dataset_train = ImageFolder('preprocessed/clahe/1st_session/raw_data/', transform=transform)
train_dataloader = torch.utils.data.DataLoader(dataset_train, batch_size=8, shuffle=True)

In [94]:
train_data_map = []
train_label_map = []
with torch.no_grad():
    for images, labels in train_dataloader:
        images = images.to(device)
        outputs = model(images).detach().numpy()
        train_data_map.append(outputs)
        train_label_map = np.concatenate((train_label_map, labels))

In [None]:
len(train_data_map)

369

In [None]:
test_data_map = []
test_label_map = []
with torch.no_grad():
    for images, labels in test_dataloader:
        images = images.to(device)
        outputs = model(images).detach().numpy()
        test_data_map.append(outputs)
        test_label_map = np.concatenate((test_label_map, labels))

In [None]:
from scipy.spatial.distance import cdist

dists = cdist(np.concatenate(test_data_map), np.concatenate(train_data_map))
pred_labels = np.argmin(dists, axis=1)

print(train_label_map)

print(test_label_map)

print(pred_labels)

accuracy = np.mean(pred_labels == np.concatenate(test_label_map))

accuracy

# precision = metrics.precision_score(np.concatenate(train_labels), pred_labels, average='macro')
# recall = metrics.recall_score(np.concatenate(train_labels), pred_labels, average='macro')
# f1_score = metrics.f1_score(np.concatenate(train_labels), pred_labels, average='macro')

# print('Accuracy:', accuracy)
# print('Precision:', precision)
# print('Recall:', recall)
# print('F1-score:', f1_score)

[ 49.  24. 207. ... 147. 187. 361.]
[478. 109. 284. ... 412. 486. 160.]
[1935  151 2177 ...  384 1573  418]


  accuracy = np.mean(pred_labels == np.concatenate(test_data_map))


0.0

In [19]:
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_dataloader:
        print(labels)
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
        print(outputs)
        _, predicted = torch.max(outputs.data, 1)
        print(predicted)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        del images, labels, outputs

    print('Accuracy of the network: {} %'.format(100 * correct / total))

tensor([380, 315, 389, 275, 341, 447, 416, 271])
tensor([[0.0000, 0.1946, 0.0000,  ..., 0.4340, 0.0000, 0.0000],
        [0.0695, 0.3690, 0.0000,  ..., 0.1610, 0.0000, 0.3215],
        [0.7957, 0.0000, 1.1393,  ..., 0.8630, 0.0000, 0.5614],
        ...,
        [0.0000, 0.2939, 0.0000,  ..., 0.9435, 0.0000, 0.1087],
        [0.0000, 0.1365, 0.0000,  ..., 0.0000, 0.0000, 0.0603],
        [0.0505, 0.0000, 0.1425,  ..., 1.2953, 0.0000, 0.0000]])
tensor([  9, 419,  93, 504, 317, 445, 419,  89])
tensor([217, 203, 185, 374, 220, 340, 206, 248])
tensor([[0.0000, 0.0993, 0.0000,  ..., 0.2739, 0.0000, 0.0000],
        [0.0000, 0.7192, 0.0000,  ..., 0.2104, 0.0000, 0.0000],
        [0.1241, 0.1207, 0.0000,  ..., 0.5288, 0.0000, 0.2647],
        ...,
        [0.0000, 0.0000, 0.0000,  ..., 1.5024, 0.0000, 0.0000],
        [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.2282],
        [0.0000, 0.0000, 0.0000,  ..., 2.2895, 0.0000, 0.0000]])
tensor([ 88,   4, 200, 408, 405, 222, 141, 222])
tensor([

[E thread_pool.cpp:109] Exception in thread pool task: mutex lock failed: Invalid argument


KeyboardInterrupt: 

In [1]:
from torchvision.models import vgg16

In [19]:
model = vgg16(pretrained = False, )
input_lastLayer = model.classifier[6].in_features
model.classifier[6] = nn.Linear(input_lastLayer, len(dataset_train.classes))
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = 0.001, momentum=0.9,weight_decay=5e-4)

In [20]:
loss_array = []
for epoch in range(num_epochs):
    for i, (anchor, positive, negative) in enumerate(train_dataloader):
        optimizer.zero_grad()
        anchor = anchor.to(device)
        positive = positive.to(device)
        negative = negative.to(device)
        anchor_embed = model(anchor)
        positive_embed = model(positive)
        negative_embed = model(negative)
        loss = batch_all_triplet_loss(anchor_embed, positive_embed, negative_embed)
        loss_array.append(float(loss))
        loss.backward()
        optimizer.step()

    if epoch % 10 == 0:
        torch.save(model.state_dict(), 'model_vgg_' + str(epoch) + '.pth')
    print ('Epoch [{}/{}], Loss: {}' 
                   .format(epoch+1, num_epochs, loss.item()))

KeyboardInterrupt: 