In [1]:
import cv2
import json
import math
import numpy as np
import pandas as pd

from keras_vggface.utils import preprocess_input
from mtcnn.mtcnn import MTCNN
from python_speech_features import mfcc
from scipy.io import wavfile
from tensorflow.keras.utils import Sequence, to_categorical

from tensorflow.keras.applications import VGG16
from tensorflow.keras import layers, Model
from tensorflow.keras.optimizers import Adam

In [2]:
face_detector = MTCNN()

def extract_face_from_img(image):
    faces = face_detector.detect_faces(image)
    x1, y1, width, height = faces[0]['box']
    x1, y1 = abs(x1), abs(y1)
    x2, y2 = x1 + width, y1 + height
    face = image[y1:y2, x1:x2]
    return face

In [3]:
def extract_palm_from_img(image):
    image = np.rot90(image, 3)  # Rotate 90 degrees clockwise
    h, w = image.shape
    img = np.zeros((h + 160, w + 160), np.uint8)  # Pad the image by 80 pixels on 4 sides
    img[80:-80, 80:-80] = image
    # Apply GaussionBlur to remove noise
    blur = cv2.GaussianBlur(img, (5, 5), 0)

    # Apply Binary + OTSU thresholding to generate Black-White image
    # White pixels denote the palm and back pixels denote the background
    _, th = cv2.threshold(blur, 0, 255, cv2.THRESH_BINARY+cv2.THRESH_OTSU)

    # Section 2
    M = cv2.moments(th)
    h, w = img.shape
    # Get centroid of the white pixels
    x_c = M['m10'] // M['m00']
    y_c = M['m01'] // M['m00']

    # Apply Erosion to remove noise
    kernel = np.array([[0, 1, 0],
                       [1, 1, 1],
                       [0, 1, 0]]).astype(np.uint8)
    erosion = cv2.erode(th, kernel, iterations=1)
    boundary = th - erosion

    cnt, _ = cv2.findContours(boundary, cv2.RETR_TREE, cv2.CHAIN_APPROX_NONE)
    img_c = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)

    areas = [cv2.contourArea(c) for c in cnt]
    max_index = np.argmax(areas)
    cnt = cnt[max_index]

    img_cnt = cv2.drawContours(img_c, [cnt], 0, (255, 0, 0), 2)

    cnt = cnt.reshape(-1, 2)
    left_id = np.argmin(cnt.sum(-1))
    cnt = np.concatenate([cnt[left_id:, :], cnt[:left_id, :]])

    # Section 3
    dist_c = np.sqrt(np.square(cnt-[x_c, y_c]).sum(-1))
    f = np.fft.rfft(dist_c)
    cutoff = 15
    f_new = np.concatenate([f[:cutoff], 0*f[cutoff:]])
    dist_c_1 = np.fft.irfft(f_new)

    # Section 4
    eta = np.square(np.abs(f_new)).sum()/np.square(np.abs(f)).sum()
    # print('Power Retained: {:.4f}{}'.format(eta*100, '%'))

    # Section 5
    derivative = np.diff(dist_c_1)
    sign_change = np.diff(np.sign(derivative))/2

    # Section 6
    minimas = cnt[np.where(sign_change>0)[0]]
    v1, v2 = minimas[-1], minimas[-3]

    theta = np.arctan2((v2-v1)[1], (v2-v1)[0])*180/np.pi
    R = cv2.getRotationMatrix2D((int(v2[0]), int(v2[1])), theta, 1)

    img_r = cv2.warpAffine(img, R, (w, h))
    v1 = (R[:, :2] @ v1 + R[:, -1]).astype(np.int)
    v2 = (R[:, :2] @ v2 + R[:, -1]).astype(np.int)

    ux = v1[0]
    uy = v1[1] + (v2-v1)[0]//3
    lx = v2[0]
    ly = v2[1] + 4*(v2-v1)[0]//3

    palm = img_r[uy:ly, ux:lx]
    return palm

In [6]:
class PersonIDSequence(Sequence):

    def __init__(self, csv_file, batch_size, config_file="config.json",
                 extract_face=False, extract_palm=False):
        self.df = pd.read_csv(csv_file, index_col=0).sample(frac=1)  # Shuffle
        self.labels = list(np.unique(self.df.label))
        self.num_labels = len(self.labels)
        self.batch_size = batch_size
        self.extract_face = extract_face
        self.extract_palm = extract_palm
        with open(config_file) as file:
            self.config = json.load(file)

    def __len__(self):
        return math.ceil(self.df.shape[0] / self.batch_size)

    def load_face(self, img_file):
        image = cv2.imread(img_file)[::-1]  # Converting BGR to RGB
        if self.extract_face:
            image = extract_face_from_img(image)
        image = cv2.resize(image, self.config['face_shape'])
        image = image.astype(np.float32)
        image = preprocess_input(image, version=2)
        return image

    def load_palm_print(self, image_file):
        image = cv2.imread(image_file, 0)  # Read as Gray
        if self.extract_palm:
            image = extract_palm_from_img(image)
        image = cv2.resize(image, self.config['palm_shape'])
        image = np.expand_dims(image, axis=-1)
        image = image * 1./255
        return image

    def load_signature(self, text_file):
        data = np.loadtxt(text_file, skiprows=1, dtype=np.float32)
        # Column-wise min-max scaling
        diff = data.max(axis=0) - data.min(axis=0)
        diff = np.where(diff == 0, 1, diff)  # To handle division-by-zero error
        data = (data - data.min(axis=0)) / diff
        # Smoothing by rolling-window-mean-subtraction
        for i in range(data.shape[1]):
            data[:, i] -= pd.Series(data[:, i]).rolling(
                window=self.config['rolling_window'], center=True).mean()
        if len(data) < self.config['max_strokes']:
            pad = self.config['max_strokes'] - len(data)
            data = np.pad(data, ((0, pad), (0, 0)))  # Pad at the bottom
        else:
            n = np.linspace(0, len(data)-1, self.config['max_strokes'],
                            dtype=np.int32)
            data = data[n]
        # data = np.expand_dims(data, axis=-1)
        return data

    def load_audio(self, audio_file):
        def envelop(signal, rate, threshold):
            mask = []
            y = pd.Series(signal).apply(np.abs)
            y_mean = y.rolling(window=int(rate/10), min_periods=1, center=True).mean()
            for mean in y_mean:
                if mean > threshold:
                    mask.append(True)
                else:
                    mask.append(False)
            return mask
        rate, signal = wavfile.read(audio_file)
        mask = envelop(signal, rate, self.config['audio_clean_threshold'])
        signal = signal[mask]
        step = int(rate*self.config['audio_seconds'])
        rand_idx = np.random.randint(0, signal.shape[0]-step)
        sample = signal[rand_idx:rand_idx+step]
        sample = mfcc(sample, rate,
                      numcep=self.config['audio_numcep'],
                      nfilt=self.config['audio_nfilt'],
                      nfft=self.config['audio_nfft'])
        sample = np.expand_dims(sample, axis=-1)
        return sample.astype(np.float32)

    def __getitem__(self, idx):
        batch = self.df.iloc[idx * self.batch_size:(idx + 1) * self.batch_size]
        y = batch.pop('label')
        X = batch
        faces = np.array(list(map(self.load_face, X.face)))
        palm_prints = np.array(list(map(self.load_palm_print, X.palm_print)))
        audios = np.array(list(map(self.load_audio, X.audio)))
        signatures = np.array(list(map(self.load_signature, X.signature)))
        y_indices = [to_categorical(self.labels.index(i), num_classes=self.num_labels)
                     for i in y.values]
        return [faces, palm_prints, audios, signatures], np.array(y_indices)

    def on_epoch_end(self):
        self.df = self.df.sample(frac=1)  # Shuffle


In [20]:
def base_face_model(input_shape=(224, 224, 3)):
    vgg16 = VGG16(include_top=False, weights='imagenet', input_shape=input_shape)
    vgg16.trainable = False
    x = layers.Flatten()(vgg16.output)
    x = layers.Dense(1024, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(64, activation='relu')(x)
    # x = layers.Dropout(0.5)(x)
    # x = layers.Dense(1)(x)
    model = Model(inputs=vgg16.inputs, outputs=x, name='face_model')
    return model

In [21]:
def base_palm_print_model(input_shape=(90, 90, 1)):
    input_ = layers.Input(shape=(90, 90, 1))
    x = layers.Conv2D(32, kernel_size=3, padding='same')(input_)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling2D(pool_size=2)(x)

    x = layers.Conv2D(32, kernel_size=3, padding='same')(x)
    x = layers.BatchNormalization()(x)
    x = layers.ReLU()(x)
    x = layers.MaxPooling2D(pool_size=2)(x)

    x = layers.Conv2D(32, kernel_size=3, padding='same', activation='relu')(x)
    x = layers.MaxPooling2D(pool_size=2)(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Flatten()(x)
    x = layers.Dense(64, activation='relu')(x)

    model = Model(inputs=input_, outputs=x, name='palm_print_model')
    return model

In [22]:
def base_audio_model_cnn(input_shape=(9, 13, 1)):
    input_ = layers.Input(shape=input_shape)
    x = layers.Conv2D(16, 3, activation='relu', strides=(1, 1),
                      padding='same')(input_)
    x = layers.Conv2D(32, 3, activation='relu', strides=(1, 1),
                      padding='same')(x)
    x = layers.Conv2D( 8, 3, activation='relu', strides=(1, 1),
                      padding='same')(x)
    x = layers.MaxPooling2D((2, 2))(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(64, activation='relu')(x)

    model = Model(inputs=input_, outputs=x, name='audio_model_cnn')
    return model

In [23]:
def base_audio_model_rnn(input_shape=(9, 13)):
    input_ = layers.Input(shape=input_shape)
    x = layers.LSTM(128, return_sequences=True)(input_)
    x = layers.LSTM(128, return_sequences=True)(x)
    x = layers.Dropout(0.5)(x)
    x = layers.TimeDistributed(layers.Dense(64, activation='relu'))(x)
    x = layers.TimeDistributed(layers.Dense(32, activation='relu'))(x)
    x = layers.TimeDistributed(layers.Dense(16, activation='relu'))(x)
    x = layers.TimeDistributed(layers.Dense(8, activation='relu'))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(64, activation='relu')(x)

    model = Model(inputs=input_, outputs=x, name='audio_model_rnn')
    return model

In [24]:
def base_signature_model(input_shape=(1000, 5)):
    input_ = layers.Input(shape=input_shape)
    x = layers.LSTM(128, return_sequences=True)(input_)
    x = layers.LSTM(128, return_sequences=True)(x)
    x = layers.Dropout(0.5)(x)
    x = layers.TimeDistributed(layers.Dense(64, activation='relu'))(x)
    x = layers.TimeDistributed(layers.Dense(32, activation='relu'))(x)
    x = layers.TimeDistributed(layers.Dense(16, activation='relu'))(x)
    x = layers.TimeDistributed(layers.Dense(8, activation='relu'))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(64, activation='relu')(x)

    model = Model(inputs=input_, outputs=x, name='audio_model_rnn')
    return model

In [25]:
face_model = base_face_model(input_shape=(224, 224, 3))
palm_print_model = base_palm_print_model(input_shape=(90,90,1))
audio_model = base_audio_model_cnn(input_shape=(9, 13, 1))
sign_model = base_signature_model(input_shape=(1000, 5))

In [26]:
merge_1 = layers.Concatenate(axis=1)([audio_model.output, sign_model.output])
merge_1 = layers.Dense(64)(merge_1)
merge_1 = layers.BatchNormalization()(merge_1)
merge_1 = layers.ReLU()(merge_1)

merge_2 = layers.Concatenate(axis=1)([palm_print_model.output, merge_1])
merge_2 = layers.Dense(64)(merge_2)
merge_2 = layers.BatchNormalization()(merge_2)
merge_2 = layers.ReLU()(merge_2)

merge_3 = layers.Concatenate(axis=1)([face_model.output, merge_2])
merge_3 = layers.BatchNormalization()(merge_3)
merge_3 = layers.Dense(300, activation='softmax')(merge_3)

model = Model(inputs=[face_model.inputs, palm_print_model.inputs,
                      audio_model.inputs, sign_model.inputs], outputs=merge_3)
model.compile(optimizer=Adam(0.001), loss="categorical_crossentropy",
              metrics=['accuracy'])

In [None]:
train_ds = PersonIDSequence(csv_file='datasets/train.csv', batch_size=32)
val_ds = PersonIDSequence(csv_file='datasets/val.csv', batch_size=32)

model.fit(train_ds, epochs=2, validation_data=val_ds)

Epoch 1/2
  1/777 [..............................] - ETA: 0s - loss: nan - accuracy: 0.0000e+00