# Matching

In this notebook we implement and evaluate (compute rank 1 and rank 5 accuracy) the matching stage of a recognition pipeline. Database of registered individuals stores features extracted using notebook [1]. We iterate over all non-registered samples, apply the fine-tuned segmentation model [2], crop the ear from the image, extract features using MLBP and ResNet50 (pretrained on ImageNet) and finally find the closests sample in the database. 

**References**

[1] https://github.com/Matjaz12/Ear-Recognition-Pipeline/blob/main/feature_extraction.ipynb

[2] https://github.com/Matjaz12/Ear-Recognition-Pipeline/blob/main/segmentation.ipynb

## Setup

Import dependencies, mount google drive and unzip data. 

In [1]:
import os

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from skimage import io, transform
import PIL
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision import transforms, utils
from tqdm import tqdm
import pickle

In [2]:
#from google.colab import drive
#drive.mount('/content/drive')

#FOLDERNAME = "IBB5"
#assert FOLDERNAME is not None, "[!] Enter the foldername."

#import sys
#DATA_PATH = '/content/drive/My Drive/{}'.format(FOLDERNAME)
#print(DATA_PATH)
#sys.path.append(DATA_PATH)

#ROOT_PATH = "."

In [3]:
#!unzip "/content/drive/My Drive/IBB5/data.zip" 

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

ROOT_PATH = "/kaggle/input/eardataset/"


In [5]:
class EarClassificationDataset(Dataset):
    def __init__(
        self, annotations_file, image_dir="./images", transform=None
    ):
        super().__init__()
        self.annotations = pd.read_csv(annotations_file, delimiter="\t", dtype="str")
        self.image_dir = image_dir
        self.transform = transform

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        # Fetch image
        folder_name, file_name = np.array(self.annotations.iloc[idx, :2])
        image_name = os.path.join(self.image_dir, f"{folder_name}/{file_name}.png")
        label = f"{folder_name}/{file_name}"

        # Read the image and covert to numpy array
        image = np.array(PIL.Image.open(image_name))

        if self.transform:
            augmentation = self.transform(image=image)
            image = augmentation["image"]

        return image, label

**Copy over the hyper-parameters used**

In [6]:
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 8
NUM_WORKERS = 0 # 2
IMAGE_HEIGHT = 300 
IMAGE_WIDTH = 300 
PIN_MEMORY = True
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

**Load the test dataset**

In [7]:
from albumentations.pytorch import ToTensorV2
import albumentations

preprocess_transform = albumentations.Compose([
    albumentations.Resize(width=IMAGE_WIDTH, height=IMAGE_HEIGHT),
    albumentations.Normalize(mean=mean, std=std, max_pixel_value=255.0),
    ToTensorV2(transpose_mask=True)
])

test_dataset = EarClassificationDataset(annotations_file=f"/kaggle/input/tempdata/annotations_test.csv",
                                image_dir=f"{ROOT_PATH}/images",
                                transform=preprocess_transform)

print(f"len(test_dataset): {len(test_dataset)}")
image, label = test_dataset[0]
image.shape, label

len(test_dataset): 3055


(torch.Size([3, 300, 300]), '001/03')

In [8]:
def crop_image(image, pred_mask):
    image = image.to("cpu").detach()
    pred_mask = pred_mask.to("cpu").detach()

    image = image[0].numpy()
    image = image.transpose((1, 2, 0))

    pred_mask = pred_mask[0].numpy()
    pred_mask = pred_mask.squeeze(axis=0)

    # Compute masked image
    pred_mask = np.stack((pred_mask, ) * 3, axis=-1)
    masked_image = image * pred_mask

    # Compute cropped image
    nonzero_rows, nonzero_cols = np.nonzero(masked_image.mean(axis=2))
    seg_y_start, seg_y_end = nonzero_rows.min(), nonzero_rows.max() + 1
    seg_x_start, seg_x_end = nonzero_cols.min(), nonzero_cols.max() + 1

    cropped_image = masked_image[seg_y_start:seg_y_end, seg_x_start:seg_x_end, :]

    return cropped_image


def plot_image_and_cropped_ear(image, pred_mask, cropped_image):
    image = image[0].to("cpu").detach()
    pred_mask = pred_mask[0].to("cpu").detach()

    if torch.is_tensor(image):
        image = image.numpy()
        image = image.transpose((1, 2, 0))

    if torch.is_tensor(pred_mask):
        pred_mask = pred_mask.numpy()
        pred_mask = pred_mask.squeeze(axis=0)

    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))
    ax1.imshow(image)
    ax1.imshow(pred_mask, alpha=0.6 * (pred_mask > 0))
    ax2.imshow(cropped_image)
    plt.show()


def plot_predictions(images, masks, pred_masks, n_samples=3):
    images = images.to("cpu").detach()
    masks = masks.to("cpu").detach()
    pred_masks = pred_masks.to("cpu").detach()

    for idx, (image, mask, pred_mask) in enumerate(zip(images, masks, pred_masks)):
        if idx == n_samples:
            break

        if torch.is_tensor(image):
            image = image.numpy()
            image = image.transpose((1, 2, 0))

        if torch.is_tensor(mask):
            mask = mask.numpy()
            mask = mask.squeeze(axis=0)

        if torch.is_tensor(pred_mask):
            pred_mask = pred_mask.numpy()
            pred_mask = pred_mask.squeeze(axis=0)

        plt.figure(figsize=(6, 6))
        plt.imshow(image)
        # plt.imshow(mask, alpha=0.6 * (mask > 0))
        plt.imshow(pred_mask, alpha=0.6 * (pred_mask > 0))
        plt.show()
        
        
def de_normalize(images, mean, std):
    if isinstance(mean, list):
        mean = torch.tensor(mean).reshape((3, 1, 1)).to(DEVICE)
        
    if isinstance(std, list):
        std = torch.tensor(std).reshape((3, 1, 1)).to(DEVICE)
    
    images = images.to(DEVICE)
    images = images * std + mean
    
    return images


def normalize(images, mean, std):
    if isinstance(mean, list):
        mean = torch.tensor(mean).reshape((3, 1, 1)).to(DEVICE)
        
    if isinstance(std, list):
        std = torch.tensor(std).reshape((3, 1, 1)).to(DEVICE)
    
    images = images.to(DEVICE)
    images = (images - mean) / std
    
    return images

## Load ResNet50

In [9]:
#from torchvision.models import resnet50, ResNet50_Weights
#resnet50_model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)

from torchvision.models import resnet50
resnet50_model = resnet50(pretrained=True)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth


  0%|          | 0.00/97.8M [00:00<?, ?B/s]

In [10]:
resnet50_feature_extractor = torch.nn.Sequential(*list(resnet50_model.children())[:-1])

##  Load Finetuned DeepLabV3 model

In [11]:
from torchvision.models.segmentation.deeplabv3 import DeepLabHead
import torchvision

def get_model(out_channels=1, train_mode=False, pretrained=True):
    """
    DeepLabv3 class with a custom head.
    """
    # Load the pretrained model
    model = torchvision.models.segmentation.deeplabv3_resnet50(pretrained=pretrained)

    # Replace the classifier module with a new head
    model.classifier = DeepLabHead(2048, out_channels)

    # Set model in appropriate mode
    model.train() if train_mode else model.eval()

    return model

In [12]:
print("Loading model...")
MODELS_DIR = "./DeepLabV3_v1"
MODELS_DIR = "/kaggle/input/seg-model/DeepLabV3_v1"
deeplabv3_model = get_model(out_channels=1, train_mode=False, pretrained=True)
deeplabv3_model.load_state_dict(torch.load(MODELS_DIR))

Loading model...


Downloading: "https://download.pytorch.org/models/deeplabv3_resnet50_coco-cd0a2569.pth" to /root/.cache/torch/hub/checkpoints/deeplabv3_resnet50_coco-cd0a2569.pth


  0%|          | 0.00/161M [00:00<?, ?B/s]

<All keys matched successfully>

## Local Binary Patterns

In [13]:
import numpy as np
import numpy.typing as npt
from skimage import feature
from tqdm import tqdm
from sympy import divisors


def compute_concatenated_histogram(image, tile_size, num_bins):
    """
    Function computes a concatenated histogram, by spliting the image
    into tiles of size `tile_size` and computing a local histogram on each tile.
    Histograms are normalized and concatenated together.

    :param image: Input image
    :param tile_size: Size of each tile (Width, Height)
    :param num_bins: Number of bins used in computation of local histogram.
    :return: Concatenated histogram.
    """

    height, width = image.shape
    tile_height, tile_width = tile_size
    fv = []

    # Split image into tiles of size (tile_height x tile_width)
    tiled_image = image.reshape(height // tile_height,
                                tile_height,
                                width // tile_width,
                                tile_width)

    tiled_image = tiled_image.swapaxes(1, 2)
    tiled_image = tiled_image.reshape(tiled_image.shape[0] * tiled_image.shape[1], tile_height, tile_width)

    # Compute histogram for each tile and concatenate them
    for tile in tiled_image:
        from scipy.sparse import csr_matrix
        tile_hist, _ = np.histogram(tile, density=True, bins=num_bins, range=(0, num_bins))
        fv.extend(tile_hist)

    # Normalize the concatenated histogram
    fv = np.array(fv)
    # fv = fv / np.sum(fv)

    return fv


class ScikitLBP:
    def __init__(self, num_points: int = 8, radius: int = 1, to_hist: bool = True, method: str = "default"):
        self.num_points = num_points
        self.radius = radius
        self.to_hist = to_hist
        self.method = method

    def describe(self, images: npt.NDArray, *args, **kwargs) -> npt.NDArray:
        """
        Function computes a feature vector for each image.

        :param images: A set of images.
        :return: A set of feature vectors.
        """

        fvs = []

        if self.to_hist:
            image_size_divisors = list(divisors(images[0].shape[0], generator=True))[1: -1]
            tile_width = kwargs.get("tile_width", image_size_divisors[len(image_size_divisors) // 2])
            # print(f"tile width: {tile_width}")
            # print(f"number of tiles: {(images[0].shape[0] / tile_width) ** 2}")

        # Extract features for each image
        for image in images:
            image_lbp = feature.local_binary_pattern(image, self.num_points, self.radius, method=self.method)

            # Compute feature vector
            if self.to_hist:
                num_bins = int(image_lbp.max()) + 1
                fv = compute_concatenated_histogram(image_lbp, tile_size=(tile_width, tile_width),
                                                    num_bins=num_bins)
            else:
                fv = image_lbp.flatten()

            # Store feature vector
            fvs.append(fv)

        fvs = np.array(fvs)
        return fvs

    def __str__(self):
        return "ScikitLBP"

## Recognition Pipeline

In [14]:
from PIL import Image

OUT_IMAGE_HEIGHT, OUT_IMAGE_WIDTH = 128, 128

def segment_ear(model, image, threshold):
    image = image.unsqueeze(dim=0).to(DEVICE)
    model = model.to(DEVICE)

    # Predict a segmentation mask and apply threshold
    pred_mask = torch.sigmoid(model(image)["out"])        
    pred_mask[pred_mask >= threshold] = 1.0
    pred_mask[pred_mask < threshold] = 0.0
    
    # check if empty mask
    if not pred_mask.nonzero().sum().item():
        return None

    # Crop image using predicted seg. mask & rescale the image
    cropped_image = crop_image(de_normalize(image, mean, std), pred_mask)
    cropped_image = Image.fromarray(np.uint8(cropped_image * 255)).resize((OUT_IMAGE_HEIGHT, OUT_IMAGE_WIDTH))

    return cropped_image


def resnet_50_features(feature_extractor, image):
    # Convert image to tensor
    image_c = np.array(image).transpose((2, 0, 1))
    image_c = torch.from_numpy(image_c)

    # Normalize image
    image_c = (image_c / 255.0).unsqueeze(dim=0)
    image_c = normalize(image_c, mean, std)

    # Move data and model on the device
    feature_extractor = feature_extractor.to(DEVICE)
    image_c = image_c.to(DEVICE)

    feature_vect = feature_extractor(image_c).flatten()

    return feature_vect

In [15]:
import os
from scipy.spatial import distance

def matching(
    fv, db, dist_metric="euclidean", n_closest=1
):
    # Fetch feature vectors from the database
    db_labels = np.array(list(db.keys()))
    db_fvs = np.array(list(db.values()))
    
    # Compute distance between fv and all samples in the database
    # print("Computing distance matrix...")
    distances = distance.cdist(fv[np.newaxis, :], db_fvs, metric=dist_metric)[0]

    # Sort distances
    sorted_idxs = np.argsort(distances)
    n_closest_samples = db_labels[sorted_idxs[0:n_closest]].copy()
    n_closest_dists = distances[sorted_idxs[0:n_closest]].copy()
    
    del db_labels
    del db_fvs
    
    return n_closest_samples, n_closest_dists


def rec_pipeline(
    test_image, seg_model, lbp_descriptor, resnet_model=None, 
    db=None, n_closest=1, dist_metric="euclidean"
):
    
    closest_samples, closest_dists = None, None
    
    # Segment ear from the image
    cropped_image = segment_ear(seg_model, test_image, THRESHOLD)
    #print(np.array(cropped_image).shape)
    
    if not cropped_image:
        return closest_samples, closest_dists
    
    # Convert to gray scale
    cropped_gray_image = np.array(cropped_image.convert("L"))
    
    # Extract local binary patterns
    fv = lbp_descriptor.describe(cropped_gray_image[np.newaxis, :, :], tile_width=TILE_WIDTH)[0]
    
    # ResNet50 features
    if resnet_model:
        resnet_fv = resnet_50_features(resnet_model, cropped_image).cpu()
        fv = np.concatenate([fv, resnet_fv])
    
    # Find the closest samples in the database
    closest_samples, closest_dists = matching(fv, db, dist_metric, n_closest)
    
    return closest_samples, closest_dists

In [16]:
def load_database(database_dir):
    database = {}

    print(f"Loading database: {database_dir}")
    for (dirpath, dirnames, filenames) in os.walk(database_dir):
        for filename in filenames:
            if filename.endswith('.pickle'):
                full_path = os.sep.join([dirpath, filename])
                identity = "/".join(full_path.strip(".pickle").split("/")[-2:])
                
                with open(full_path, "rb") as handle:
                    db_fv = pickle.load(handle)

                database[identity] = db_fv
                
    return database

## Evaluation

**Make sure than non of the test samples are accidentally in the database**

In [17]:
LBP_FEATURES_DIR = "/kaggle/input/ear-features/lbp_features1"
db = load_database(LBP_FEATURES_DIR)

Loading database: /kaggle/input/ear-features/lbp_features1


In [18]:
db_labels = np.array(list(db.keys()))

for _, (_, test_label) in enumerate(tqdm(test_dataset)):
    if test_label in list(db.keys()):
        raise Expection(f"test_label: {test_label} in the database !!!")

del db_labels

100%|██████████| 3055/3055 [01:38<00:00, 30.93it/s]


### Random Recognition Model

In [19]:
def random_model(identities, n_closest=1):
    closest_samples = np.random.choice(list(identities.keys()), n_closest, replace=False)
    return closest_samples

In [20]:
rank_1_count, rank_5_count = 0, 0
N_CLOSEST = 5

for idx, (test_image, test_label) in enumerate(tqdm(test_dataset)):
    closest_samples = random_model(db, n_closest=N_CLOSEST)
    
    closest_identities = [str(cs.split('/')[0]) for cs in closest_samples]
    actual_identitiy = test_label.split("/")[0]
        
    if actual_identitiy in closest_identities:
        rank_5_count += 1
            
    if actual_identitiy == closest_identities[0]:
        rank_1_count += 1

rank_1 = 100 * (rank_1_count / len(test_dataset))
rank_5 = 100 * (rank_5_count / len(test_dataset))

print(f"rank 1[%]: {rank_1}")
print(f"rank 5[%]: {rank_5}")

100%|██████████| 3055/3055 [01:14<00:00, 41.24it/s] 

rank 1[%]: 0.1309328968903437
rank 5[%]: 0.7201309328968903





### Recognition using LBP feature vectors

In [21]:
THRESHOLD = 0.25

NUM_POINTS = 8
RADIUS = 1
TO_HIST = True
TILE_WIDTH = 16
lbp_descriptor = ScikitLBP(num_points=NUM_POINTS, radius=RADIUS, to_hist=TO_HIST, method="default")

LBP_FEATURES_DIR = "/kaggle/input/ear-features/lbp_features1"

# db = load_database(LBP_FEATURES_DIR)

In [22]:
N_CLOSEST = 5
rank_1_count, rank_5_count = 0, 0
seg_model_fails = 0

with torch.no_grad():
    for idx, (test_image, test_label) in enumerate(tqdm(test_dataset)):
        closest_samples, closest_dists = rec_pipeline(test_image, deeplabv3_model, lbp_descriptor, 
                                                      resnet_model=None, db=db, n_closest=N_CLOSEST)
        
        if closest_samples is None:
            #print("Segmentation model failed !")
            seg_model_fails += 1
            continue
        
        closest_identities = [str(cs.split('/')[0]) for cs in closest_samples]
        actual_identitiy = test_label.split("/")[0]
        
        #print(f"closest_identities: {closest_identities}")
        #print(f"actual_identitiy: {actual_identitiy}")
        
        if actual_identitiy in closest_identities:
            rank_5_count += 1
            
        if actual_identitiy == closest_identities[0]:
            rank_1_count += 1
                                    
rank_1 = 100 * (rank_1_count / len(test_dataset))
rank_5 = 100 * (rank_5_count / len(test_dataset))

print(f"rank 1[%]: {rank_1}") # 12
print(f"rank 5[%]: {rank_5}") # 21

100%|██████████| 3055/3055 [34:15<00:00,  1.49it/s]

rank 1[%]: 12.962356792144025
rank 5[%]: 21.374795417348608





### Recognition using LBP and ResNet50 (pretrained on ImageNet) feature vectors

In [23]:
THRESHOLD = 0.25

NUM_POINTS = 8
RADIUS = 1
TO_HIST = True
TILE_WIDTH = 16
lbp_descriptor = ScikitLBP(num_points=NUM_POINTS, radius=RADIUS, to_hist=TO_HIST, method="default")

LBP_RESNET_FEATURES_DIR = "/kaggle/input/ear-features/lbp_resnet_features1"

db = load_database(LBP_RESNET_FEATURES_DIR)

Loading database: /kaggle/input/ear-features/lbp_resnet_features1


In [24]:
rank_1_count, rank_5_count = 0, 0
seg_model_fails = 0

with torch.no_grad():
    for idx, (test_image, test_label) in enumerate(tqdm(test_dataset)):
        closest_samples, closest_dists = rec_pipeline(test_image, deeplabv3_model, lbp_descriptor, 
                                                      resnet_model=resnet50_feature_extractor,
                                                      db=db, n_closest=N_CLOSEST)
        
        if closest_samples is None:
            #print("Segmentation model failed !")
            seg_model_fails += 1
            continue
        
        closest_identities = [str(cs.split('/')[0]) for cs in closest_samples]
        actual_identitiy = test_label.split("/")[0]
        
        #print(f"closest_identities: {closest_identities}")
        #print(f"actual_identitiy: {actual_identitiy}")
        
        if actual_identitiy in closest_identities:
            rank_5_count += 1
            
        if actual_identitiy == closest_identities[0]:
            rank_1_count += 1
                                    
rank_1 = 100 * (rank_1_count / len(test_dataset))
rank_5 = 100 * (rank_5_count / len(test_dataset))

print(f"rank 1[%]: {rank_1}")
print(f"rank 5[%]: {rank_5}")

100%|██████████| 3055/3055 [38:03<00:00,  1.34it/s]

rank 1[%]: 12.76595744680851
rank 5[%]: 23.20785597381342





### Try out different distance metrics

We'll also compute rank 1 and rank 5 using the cosine and Manhattan distance metric.

In [25]:
distance_metrics = ["cosine", "cityblock"]
N_CLOSEST = 5

# Iterate over lbp features
for mode in ["LBP", "LBP_RESNET50"]:
    if mode == "LBP":
        resnet_model = None
        db = load_database(LBP_FEATURES_DIR) 
    else:
        resnet_model = resnet50_feature_extractor
        db = load_database(LBP_RESNET_FEATURES_DIR)
        
    run_stats = []
    
    # Try out all distance metrics
    for dist_metric in distance_metrics:
        rank_1_count, rank_5_count = 0, 0
        
        # Iterate over test samples and compute rank 1 and rank 5.
        with torch.no_grad():
            for idx, (test_image, test_label) in enumerate(test_dataset):
                # if idx == 5: break
        
                closest_samples, closest_dists = rec_pipeline(test_image, deeplabv3_model, lbp_descriptor, 
                                                      resnet_model=resnet_model,
                                                      db=db, n_closest=N_CLOSEST, dist_metric=dist_metric)

                if closest_samples is None:
                    # print("Segmentation model failed !")
                    continue

                closest_identities = [str(cs.split('/')[0]) for cs in closest_samples]
                actual_identitiy = test_label.split("/")[0]

                if actual_identitiy in closest_identities:
                    rank_5_count += 1

                if actual_identitiy == closest_identities[0]:
                    rank_1_count += 1

        rank_1 = 100 * (rank_1_count / len(test_dataset))
        rank_5 = 100 * (rank_5_count / len(test_dataset))
        run_stats.append([mode, dist_metric, rank_1, rank_5])
        
    # Create a pandas dataframe and save it
    run_stats_df = pd.DataFrame(np.array(run_stats), columns=["mode", "distance metric", "rank 1", "rank 5"])
    print(run_stats_df)
    run_stats_df.to_csv(f"./{mode}_run_stats.csv", index=False)

Loading database: /kaggle/input/ear-features/lbp_features1
  mode distance metric              rank 1              rank 5
0  LBP          cosine   14.63175122749591  23.404255319148938
1  LBP       cityblock  19.410801963993453  29.885433715220948
Loading database: /kaggle/input/ear-features/lbp_resnet_features1
           mode distance metric              rank 1              rank 5
0  LBP_RESNET50          cosine  13.486088379705402  23.011456628477905
1  LBP_RESNET50       cityblock   16.53027823240589   29.49263502454992
